# Preprocessing the Text

In [11]:
!pip install pyobjc-core pyobjc-framework-Quartz pyobjc-framework-Vision wurlitzer
import csv
import requests
from io import BytesIO
from PIL import Image
import pathlib
import Quartz
import Vision
from Cocoa import NSURL
from Foundation import NSDictionary
from wurlitzer import pipes

def image_to_text(img_path, lang="eng"):
    input_url = NSURL.fileURLWithPath_(img_path)

    with pipes() as (out, err):
        input_image = Quartz.CIImage.imageWithContentsOfURL_(input_url)

    vision_options = NSDictionary.dictionaryWithDictionary_({})
    vision_handler = Vision.VNImageRequestHandler.alloc().initWithCIImage_options_(
        input_image, vision_options
    )
    results = []
    handler = make_request_handler(results)
    vision_request = Vision.VNRecognizeTextRequest.alloc().initWithCompletionHandler_(handler)
    error = vision_handler.performRequests_error_([vision_request], None)

    return results

def make_request_handler(results):
    """ results: list to store results """
    if not isinstance(results, list):
        raise ValueError("results must be a list")

    def handler(request, error):
        if error:
            print(f"Error! {error}")
        else:
            observations = request.results()
            for text_observation in observations:
                recognized_text = text_observation.topCandidates_(1)[0]
                results.append([recognized_text.string(), recognized_text.confidence()])
    return handler

def download_image(image_url):
    try:
        response = requests.get(image_url)
        response.raise_for_status()
        img = Image.open(BytesIO(response.content))
        img_path = 'temp_image.jpg'
        img.save(img_path)
        return img_path
    except Exception as e:
        print(f"Error downloading {image_url}: {e}")
        return None

def process_images_and_save_to_csv(csv_file):
    with open(csv_file, mode='r') as file:
        reader = csv.reader(file)
        next(reader)  # Skip header
        data = list(reader)[:100]  # Get first 100 rows

    output_data = []
    for row in data:
        image_url, group_id, entity_name, entity_value = row
        img_path = download_image(image_url)

        if img_path:
            text_results = image_to_text(img_path)
            if text_results:
                extracted_text = text_results[0][0]  # Take first recognized text
                output_data.append([image_url, group_id, entity_name, entity_value, extracted_text])
            else:
                output_data.append([image_url, group_id, entity_name, entity_value, "No text found"])

    # Save results to a new CSV
    output_csv = "output_text.csv"
    with open(output_csv, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['image_link', 'group_id', 'entity_name', 'entity_value', 'extracted_text'])
        writer.writerows(output_data)
    
    print(f"Results saved to {output_csv}")

if __name__ == "__main__":
    # Specify your train dataset CSV file here
    csv_file = '/Users/prathambhonge/AmazonML/student_resource 3/dataset/train.csv'
    process_images_and_save_to_csv(csv_file)


Results saved to output_text.csv


## tokenize the text and label tokens that belong to an entity and create databin

In [4]:
def label_entities(text, entity_name, entity_value):
    """This function will label the tokens in the text as entity or not"""
    words = text.split() 
    entities = []
    
    for word in words:
        if entity_value in word:
            entities.append((word, entity_name))
        else:
            entities.append((word, "O"))  # "O" means outside any entity
    
    return entities

def create_training_data(train_df):
    train_data = []
    
    for _, row in train_df.iterrows():
        text = preprocess_text(row['extracted_text'])  # Assume 'extracted_text' contains the OCR output
        entities = label_entities(text, row['entity_name'], row['entity_value'])
        annotations = {"entities": []}
        
        for word, label in entities:
            start = text.find(word)
            end = start + len(word)
            annotations['entities'].append((start, end, label))
        
        if annotations['entities']:
            train_data.append((text, annotations))
    
    return train_data

train_df = pd.read_csv('dataset/train.csv')
train_df['extracted_text'] = train_df['image_link'].apply(lambda x: extract_text_from_image(x))  # Extract text using Tesseract
train_data = create_training_data(train_df)

def create_docbin(train_data):
    db = DocBin()
    for text, annotations in train_data:
        doc = nlp.make_doc(text)
        ents = []
        for start, end, label in annotations["entities"]:
            span = doc.char_span(start, end, label=label)
            if span is not None:
                ents.append(span)
        doc.ents = ents
        db.add(doc)
    return db

train_db = create_docbin(train_data)
train_db.to_disk("./train.spacy")

NameError: name 'pd' is not defined

# Train the Model

In [None]:
from spacy.training import Example
import random

def train_ner_model(nlp, train_db_path, n_iter=20):
    # Load the training data
    train_db = DocBin().from_disk(train_db_path)
    train_examples = []
    
    for doc in train_db.get_docs(nlp.vocab):
        example = Example.from_dict(doc, {"entities": [(ent.start_char, ent.end_char, ent.label_) for ent in doc.ents]})
        train_examples.append(example)

    # Start the training process
    optimizer = nlp.begin_training()
    
    for i in range(n_iter):
        random.shuffle(train_examples)
        losses = {}
        batches = spacy.util.minibatch(train_examples, size=spacy.util.compounding(4.0, 32.0, 1.001))
        for batch in batches:
            nlp.update(batch, drop=0.5, losses=losses)
        print(f"Iteration {i}: Losses {losses}")

    # Save the trained model
    nlp.to_disk("ner_model")

# Train the model with 20 iterations
train_ner_model(nlp, "./train.spacy", n_iter=20)


# Test the model

In [None]:
def predict_entities(nlp, text):
    doc = nlp(text)
    entities = {}
    for ent in doc.ents:
        entities[ent.label_] = ent.text
    return entities

nlp = spacy.load("ner_model")

test_text = preprocess_text(extract_text_from_image('test_image_url'))
predicted_entities = predict_entities(nlp, test_text)

print(predicted_entities)


# Run on test set

In [None]:
import pandas as pd

def format_predictions(test_df, predictions):
    output = pd.DataFrame(columns=["index", "prediction"])
    for idx, row in test_df.iterrows():
        prediction = predictions.get(row["entity_name"], "")
        output = output.append({"index": row["index"], "prediction": prediction}, ignore_index=True)
    return output

test_df = pd.read_csv('dataset/test.csv')
test_df['extracted_text'] = test_df['image_link'].apply(lambda x: extract_text_from_image(x))  # Extract text using Tesseract
test_predictions = [predict_entities(nlp, preprocess_text(text)) for text in test_df['extracted_text']]

formatted_output = format_predictions(test_df, test_predictions)
formatted_output.to_csv('test_out.csv', index=False)
