In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms.functional as F
import pandas as pd
from PIL import Image
import torchvision.transforms as transforms
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, confusion_matrix
import numpy as np
import time
from tqdm import tqdm
import os
import cv2

## Data augmentation

In [None]:
input_folder = "train_ocr"
output_folder = "augmented_ocr"
os.makedirs(output_folder, exist_ok=True)

transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2), 
    transforms.GaussianBlur(kernel_size=(3, 3)),  
    transforms.ToTensor(),
    transforms.Lambda(lambda x: (x * 255).byte())
])

image_data = []
for class_name in os.listdir(input_folder):
    class_path = os.path.join(input_folder, class_name)
    if not os.path.isdir(class_path):
        continue
    
    images = []
    for img_name in os.listdir(class_path):
        img_path = os.path.join(class_path, img_name)
        img = cv2.imread(img_path)
        if img is not None:
            images.append((img, img_name))
    
    num_existing = len(images)
    num_to_generate = max(2500 - num_existing, 0)
    
    for img, img_name in images:
        output_path = os.path.join(output_folder, img_name)
        cv2.imwrite(output_path, img)
        image_data.append([output_path, class_name])
    
    for i in tqdm(range(num_to_generate), desc=f"Generating for {class_name}"):
        img, _ = images[np.random.randint(num_existing)]
        img_transformed = transform(img)
        augmented_img = img_transformed.permute(1, 2, 0).numpy()
        aug_img_name = f"{class_name}_aug_{i}.jpg"
        aug_img_path = os.path.join(output_folder, aug_img_name)
        cv2.imwrite(aug_img_path, augmented_img)
        image_data.append([aug_img_name, class_name])

image_df = pd.DataFrame(image_data, columns=["image_path", "label"])
image_df.to_csv("ocr_data.csv", index=False)

In [None]:
image_df.drop_duplicates(subset=['image_path'], inplace=True)
image_df['image_path'] = image_df['image_path'].str.replace('\\', '/')
image_df.drop(image_df[image_df['image_path'].str.contains('augmented_ocr')].index,inplace=True)
image_df.to_csv("ocr_data.csv", index=False)

In [None]:
image_df['label'].value_counts()
image_df.head(10)

## Dataset and Data Loader

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

class CustomImageDataset(Dataset):
    def __init__(self, dataframe, image_dir, transform=None):
        self.dataframe = dataframe
        self.image_dir = image_dir
        self.transform = transform
        self.label_map = {label: idx for idx, label in enumerate(sorted(dataframe['label'].unique()))}
        self.idx_to_label = {idx: label for label, idx in self.label_map.items()}
        
    def __len__(self):
        return len(self.dataframe)
    
    def __getitem__(self, idx):
        img_path = f"{self.image_dir}/{self.dataframe.iloc[idx]['image_path']}"
        try:
            if not os.path.exists(img_path):
                raise FileNotFoundError(f"Image not found: {img_path}")
            image = Image.open(img_path).convert('L')
            label = self.label_map[self.dataframe.iloc[idx]['label']]
            if self.transform:
                image = self.transform(image)
            return image, label
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")
            raise  

In [None]:
class OCR_CNN(nn.Module):
    def __init__(self, num_classes=16):
        super(OCR_CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(128 * 8 * 8, 256)  # Fixed
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(256, num_classes)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = self.pool(self.relu(self.conv3(x)))
        x = x.view(-1, 128 * 8 * 8)  # Fixed
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x
    

def resize_with_padding(image, target_size=(64, 64), fill=255):
    w, h = image.size
    scale = min(target_size[0] / w, target_size[1] / h)
    new_w, new_h = int(w * scale), int(h * scale)

    resized_image = image.resize((new_w, new_h), Image.LANCZOS)
    new_image = Image.new("L", target_size, fill)
    paste_x = (target_size[0] - new_w) // 2
    paste_y = (target_size[1] - new_h) // 2
    new_image.paste(resized_image, (paste_x, paste_y))

    return new_image

def invert_if_black_bg(image):
    mean_value = np.array(image).mean() / 255.0
    if mean_value < 0.5:
        image = F.invert(image)
    return image

data_transform = transforms.Compose([
    transforms.Lambda(lambda img: resize_with_padding(img, (64, 64))),
    transforms.Lambda(lambda img: invert_if_black_bg(img)), 
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

In [None]:
balanced_train = pd.read_csv('ocr_data.csv')
train_df, val_df = train_test_split(balanced_train, test_size=0.2, stratify=balanced_train['label'])

train_dataset = CustomImageDataset(train_df, image_dir='augmented_ocr', transform=data_transform)
val_dataset = CustomImageDataset(val_df, image_dir='augmented_ocr', transform=data_transform)

train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False, num_workers=4, pin_memory=True)

print(device)
model = OCR_CNN(num_classes=16).to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)

scaler = torch.amp.GradScaler('cuda')

## train

In [None]:
def train_and_evaluate(model, train_loader, val_loader, criterion, optimizer, num_epochs=20):
    for epoch in range(num_epochs):
        start_time = time.time()
        
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        train_loop = tqdm(train_loader, desc=f'Epoch [{epoch+1}/{num_epochs}] Training', leave=False)
        for i, (images, labels) in enumerate(train_loop):
            images, labels = images.to(device, non_blocking=True), labels.to(device, non_blocking=True)
            
            optimizer.zero_grad()
            with torch.amp.autocast('cuda'):  # Updated
                outputs = model(images)
                loss = criterion(outputs, labels)
            
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            train_loop.set_postfix(loss=loss.item())
            if i == 0:  
                print(f"Batch 1 processed - Loss: {loss.item()}")
        
        train_loss = running_loss / len(train_loader)
        train_acc = 100 * correct / total
        
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        all_preds = []
        all_labels = []
        
        val_loop = tqdm(val_loader, desc=f'Epoch [{epoch+1}/{num_epochs}] Validation', leave=False)
        with torch.no_grad():
            for images, labels in val_loop:
                images, labels = images.to(device, non_blocking=True), labels.to(device, non_blocking=True)
                with torch.cuda.amp.autocast():
                    outputs = model(images)
                    loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                
                all_preds.extend(predicted.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
                
                val_loop.set_postfix(val_loss=loss.item())
        
        val_loss = val_loss / len(val_loader)
        val_acc = 100 * correct / total
        val_f1 = f1_score(all_labels, all_preds, average='weighted')
        cm = confusion_matrix(all_labels, all_preds)
        
        epoch_time = time.time() - start_time
        print(f'\nEpoch [{epoch+1}/{num_epochs}] - Time: {epoch_time:.2f}s')
        print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
        print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%, Val F1-Score: {val_f1:.4f}')
        print('Confusion Matrix:')
        print(cm)
        print('-' * 50)

train_and_evaluate(model, train_loader, val_loader, criterion, optimizer, num_epochs=5)
torch.save(model.state_dict(), 'trained_model.pth')

# use model

In [None]:
sub = pd.read_csv('submission.csv')
sub.head()

In [None]:
def find_chars(img, img_path:str):
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    _, thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
    num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(thresh, connectivity=4)
    
    id = int(img_path.split('\\')[-1].split('.')[0])
    flag = sub.loc[id, 'type']
    num = 0.5 if flag == 1 else 1
    # print(id, sub.loc[id, 'type'], num)
    H, W = gray.shape
    min_area = int(W*H*0.001*num)  
    boxes = []
    components = []
    stats = list(stats)
    stats.sort(key=lambda comp: comp[0])
    for i in range(1, num_labels):
        x, y, w, h, area = stats[i]
        if i > 1 and area < min_area:
            if w < W*num*0.05*(1/(2*num)) and h < H*num*0.2:
                # print("width:",W, W*num*0.05*(1/num), w )
                # print("hieght:",H, H*num*0.2, h )
                # img2 = img.copy()
                # cv2.rectangle(img2, (x, y), (x + w, y + h), (0, 255, 0), 2)
                # cv2.imshow('1', img2)
                # cv2.waitKey(0)
                # # print(i, stats[i], stats[i-1], stats[i-2], end=' \n ------------- \n')
                if (x > stats[i-1][0] and x < stats[i-1][0] + stats[i-1][2]
                    ) or (x > stats[i-2][0] and x < stats[i-2][0] + stats[i-2][2]):
                    continue
        components.append((x, y, w, h, area))

    components.sort(key=lambda comp: comp[0])
    for comp in components:
        x, y, w, h, area = comp
        if w > h:
            if w > h*3:
                # print(comp, min_area)
                x -= 15
                y -= int(15*2)
                w += 30
                h += int(30*2)
            else:
                x -= 7
                y -= 15
                w += 15
                h += 30
        elif h > w:
            x -= 10
            y -= 5
            w += 20
            h += 10
        else:
            x -= 10
            y -= 10
            w += 20
            h += 20
        boxes.append([x, y, w, h])

    return boxes


def predict_image(image, model, transform, device):
    if isinstance(image, np.ndarray):
        if len(image.shape) == 3 and image.shape[2] == 3:
            image = np.mean(image, axis=2).astype(np.uint8) 
        image = Image.fromarray(image) 
    image = transform(image)  
    image = image.unsqueeze(0)  
    image = image.to(device)  

    model.eval()
    with torch.no_grad():  
        output = model(image) 
        _, predicted = torch.max(output, 1) 
    return predicted.item()  

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# print(f"Using device: {device}")
model = OCR_CNN(num_classes=16).to(device)
model.load_state_dict(torch.load('models/2-ocr_model.pth'))  
model.eval()  

def calculate(image_path, model):
    image  = cv2.imread(image_path)
    boxes = find_chars(image, image_path)
    text = ''
    for i in range(len(boxes)):
        x,y, w,h =boxes[i]
        img = image[y:y+h, x:x+w]

        predicted_class = predict_image(img, model, data_transform, device)

        label_map = {idx: label for idx, label in enumerate(sorted(pd.read_csv('ocr_data.csv')['label'].unique()))}
        predicted_label = label_map[predicted_class]
        cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)
        cv2.imshow('1', image)
        cv2.waitKey(0)
        if predicted_label == 'divide':
            text += '/'
        elif predicted_label == 'x':
            text += '*'
        else:
            text += predicted_label
        # print(text)
    try:
        # print(round(eval(text),2), end='\n ------------------------- \n')
        output = round(eval(text),2)
    except:
        # print(0, end='\n ------------------------- \n')
        output = 0
    cv2.imshow('1', image)
    print(f"final output: {output}, {text}") 
    cv2.waitKey(0)
    formatted_num = "{:.2f}".format(output)
    return formatted_num 


In [None]:
import os

folder_path = "test"

files = sorted(
    os.listdir(folder_path), 
    key=lambda x: int(os.path.splitext(x)[0]) if x.split('.')[0].isdigit() else float('inf')
)

data = []
for filename in files:
    if filename.endswith(('.png', '.jpg', '.jpeg')): 
        image_path = os.path.join(folder_path, filename)
        print(image_path)
        result = calculate(image_path, model)  
        data.append({'filename': filename, 'result': result})

df = pd.DataFrame(data)
print(df)


In [None]:
sub.loc[:, 'answer'] = df['result'].values

In [None]:
sub.to_csv('submission.csv', index=False)

In [None]:
sub.loc[6]