In [1]:
entity_unit_map = {
    'width': {'centimetre', 'foot', 'inch', 'metre', 'millimetre', 'yard'},
    'depth': {'centimetre', 'foot', 'inch', 'metre', 'millimetre', 'yard'},
    'height': {'centimetre', 'foot', 'inch', 'metre', 'millimetre', 'yard'},
    'item_weight': {'gram',
        'kilogram',
        'microgram',
        'milligram',
        'ounce',
        'pound',
        'ton'},
    'maximum_weight_recommendation': {'gram',
        'kilogram',
        'microgram',
        'milligram',
        'ounce',
        'pound',
        'ton'},
    'voltage': {'kilovolt', 'millivolt', 'volt'},
    'wattage': {'kilowatt', 'watt'},
    'item_volume': {'centilitre',
        'cubic foot',
        'cubic inch',
        'cup',
        'decilitre',
        'fluid ounce',
        'gallon',
        'imperial gallon',
        'litre',
        'microlitre',
        'millilitre',
        'pint',
        'quart'}
}

allowed_units = {unit for entity in entity_unit_map for unit in entity_unit_map[entity]}

In [14]:
import re
import os
import requests
import pandas as pd
import multiprocessing
import time
from time import time as timer
from tqdm import tqdm
import numpy as np
from pathlib import Path
from functools import partial
import requests
import urllib
from PIL import Image

def common_mistake(unit):
    if unit in allowed_units:
        return unit
    if unit.replace('ter', 'tre') in allowed_units:
        return unit.replace('ter', 'tre')
    if unit.replace('feet', 'foot') in allowed_units:
        return unit.replace('feet', 'foot')
    return unit

def parse_string(s):
    s_stripped = "" if s==None or str(s)=='nan' else s.strip()
    if s_stripped == "":
        return None, None
    pattern = re.compile(r'^-?\d+(\.\d+)?\s+[a-zA-Z\s]+$')
    if not pattern.match(s_stripped):
        raise ValueError("Invalid format in {}".format(s))
    parts = s_stripped.split(maxsplit=1)
    number = float(parts[0])
    unit = common_mistake(parts[1])
    if unit not in allowed_units:
        raise ValueError("Invalid unit [{}] found in {}. Allowed units: {}".format(
            unit, s, allowed_units))
    return number, unit


def create_placeholder_image(image_save_path):
    try:
        placeholder_image = Image.new('RGB', (100, 100), color='black')
        placeholder_image.save(image_save_path)
    except Exception as e:
        return

def download_image(image_link, save_folder, retries=3, delay=3):
    if not isinstance(image_link, str):
        return

    filename = Path(image_link).name
    image_save_path = os.path.join(save_folder, filename)

    if os.path.exists(image_save_path):
        return

    for _ in range(retries):
        try:
            urllib.request.urlretrieve(image_link, image_save_path)
            return
        except:
            time.sleep(delay)
    
    create_placeholder_image(image_save_path) 

def download_images(image_links, download_folder, allow_multiprocessing=True):
    if not os.path.exists(download_folder):
        os.makedirs(download_folder)

    if allow_multiprocessing:
        download_image_partial = partial(
            download_image, save_folder=download_folder, retries=3, delay=3)

        with multiprocessing.Pool(64) as pool:
            list(tqdm(pool.imap(download_image_partial, image_links), total=len(image_links)))
            pool.close()
            pool.join()
    else:
        for image_link in tqdm(image_links, total=len(image_links)):
            download_image(image_link, save_folder=download_folder, retries=3, delay=3)
        

In [15]:
import os 
import json 
import pandas as pd
DATASET_FOLDER = '../dataset/'
train = pd.read_csv(os.path.join(DATASET_FOLDER, '/kaggle/input/train-dataset/test.csv'))
test = pd.read_csv(os.path.join(DATASET_FOLDER, '/kaggle/input/train-dataset/train.csv'))

In [23]:
download_image(link[0] , "/kaggle/working/")

In [None]:
import easyocr
from PIL import Image
import pandas as pd
import os
reader = easyocr.Reader(['en'])  
image_folder = 'Grayscaled/' 
data = []
image_files = [f for f in os.listdir(image_folder) if f.endswith(('.png', '.jpg', '.jpeg'))]
for image_file in image_files:
    image_path = os.path.join(image_folder, image_file)
    try:
        result = reader.readtext(image_path)
        extracted_text = ' '.join([text[1] for text in result])
        cleaned_text = extracted_text.strip().replace('\n', ' ').replace('\r', '')
        data.append({'Image_File': image_file, 'Extracted_Text': cleaned_text})
        
    except Exception as e:
        print(f"An error occurred while processing {image_file}: {e}")
df = pd.DataFrame(data)
csv_path = 'extracted_texts_easyocr.csv'
df.to_csv(csv_path, index=False)
print(f"Data saved to {csv_path}")

In [131]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

In [132]:
class Seq2SeqDataset(Dataset):
    def __init__(self, input_texts, target_texts, input_token_index, target_token_index):
        self.input_texts = input_texts
        self.target_texts = target_texts
        self.input_token_index = input_token_index
        self.target_token_index = target_token_index

    def __len__(self):
        return len(self.input_texts)

    def __getitem__(self, idx):
        input_text = self.input_texts[idx]
        target_text = self.target_texts[idx]
        input_seq = [self.input_token_index[char] for char in input_text]
        target_seq = [self.target_token_index[char] for char in target_text]
        return torch.tensor(input_seq, dtype=torch.long), torch.tensor(target_seq, dtype=torch.long)

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        outputs, (hidden, cell) = self.rnn(embedded)
        return hidden, cell

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.output_dim = output_dim
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(hid_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        input = input.unsqueeze(0)
        embedded = self.dropout(self.embedding(input))
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(0))
        return prediction, hidden, cell

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio = 0.5):
        batch_size = trg.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim

        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        hidden, cell = self.encoder(src)

        # first input to the decoder is the <sos> tokens
        input = trg[0,:]

        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[t] = output
            top1 = output.argmax(1) 
            input = trg[t] if (random.random() < teacher_forcing_ratio) else top1

        return outputs


In [133]:
from dataclasses import dataclass
from typing import List

@dataclass
class TextPair:
    input_text: str
    target_text: str

@dataclass
class Vocabulary:
    token_to_index: dict
    index_to_token: dict
    vocab_size: int

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class Seq2SeqDataset(Dataset):
    def __init__(self, pairs: List[TextPair], vocab: Vocabulary):
        self.pairs = pairs
        self.vocab = vocab

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        input_seq = [self.vocab.token_to_index[char] for char in self.pairs[idx].input_text]
        target_seq = [self.vocab.token_to_index[char] for char in self.pairs[idx].target_text]
        return torch.tensor(input_seq, dtype=torch.long), torch.tensor(target_seq, dtype=torch.long)

def create_vocab(texts: List[str]):
    unique_chars = set(''.join(texts))
    token_to_index = {char: i for i, char in enumerate(unique_chars, start=1)}
    index_to_token = {i: char for char, i in token_to_index.items()}
    vocab = Vocabulary(token_to_index, index_to_token, len(token_to_index) + 1)
    return vocab

text_pairs = [TextPair("12 cats ran", "cats 12 ran"), TextPair("7 dogs slept", "dogs 7 slept")]
vocab = create_vocab([pair.input_text + " " + pair.target_text for pair in text_pairs])
dataset = Seq2SeqDataset(text_pairs, vocab)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

In [None]:
def train_model(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for input_seq, target_seq in dataloader:
        input_seq, target_seq = input_seq.to(device), target_seq.to(device)
        optimizer.zero_grad()
        output = model(input_seq, target_seq)
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        target_seq = target_seq[1:].view(-1)
        loss = criterion(output, target_seq)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
encoder = Encoder(vocab.vocab_size, 10, 512, 2, 0.5)
decoder = Decoder(vocab.vocab_size, 10, 512, 2, 0.5)
model = Seq2Seq(encoder, decoder, device).to(device)
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
num_epochs = 10
for epoch in range(num_epochs):
    loss = train_model(model, dataloader, optimizer, criterion, device)
    print(f"Epoch {epoch+1}, Loss: {loss:.4f}")


In [None]:
def evaluate_model(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for input_seq, target_seq in dataloader:
            input_seq, target_seq = input_seq.to(device), target_seq.to(device)
            output = model(input_seq, target_seq)
            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            target_seq = target_seq[1:].view(-1)
            loss = criterion(output, target_seq)
            total_loss += loss.item()
    return total_loss / len(dataloader)

test_loss = evaluate_model(model, dataloader, criterion, device)
print(f"Test Loss: {test_loss:.4f}")
