<h1 style="color:#FF00A6; font-weight:bold; font-family:sans-serif;">
  Snappfood Data Analyst Task
</h1>

<h3 style="color:#F9FAFB; font-family:sans-serif;">
  Task 3 – <em>OCR Verification</em>
</h3>

<p style="color:#EBEDF0; font-size:14px; font-family:sans-serif;">
  Extract restaurant names and contact numbers from storefront images using OCR, and verify consistency with structured records.
</p>
<br>
<hr style="color:#FF00A6;">

# Imports & Setup

In [None]:
# Import the importlib module to check if the required libraries are installed
import importlib.util

# List of required libraries
required_libraries = ['requests', 'pandas', 'tqdm', 'easyocr', 'opencv-python']

# Install the required libraries if they are not already installed
for lib in required_libraries:
    if importlib.util.find_spec(lib) is None:
        %pip install {lib}

import requests
import os
import re
import pandas as pd
import cv2
import easyocr
from tqdm import tqdm
from difflib import SequenceMatcher


Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


# Prepare Dataset

## Downlaod from Google Sheets

In [6]:
def download_sheet_as_csv(spreadsheet_id, gid, output_path):
    url = f"https://docs.google.com/spreadsheets/d/{spreadsheet_id}/export"
    params = {
        "format": "csv",
        "gid": gid
    }
    response = requests.get(url, params=params)

    if response.status_code == 200:
        with open(output_path, "wb") as f:
            f.write(response.content)
        print(f"Sheet downloaded and saved to '{output_path}'")
    else:
        print(f"Failed to download sheet. Status code: {response.status_code}")

# Replace with your actual spreadsheet ID and sheet GID
SPREADSHEET_ID = "1ic4RLD_r4ASfl7nRk2ctagH_98j2sPDYN7IB4n6n9e8"
GID = "912424133"  # GID of Task 3 sheet
OUTPUT_FILE = "./task3_dataset.csv"

download_sheet_as_csv(SPREADSHEET_ID, GID, OUTPUT_FILE)

# Correct the names and save the dataset again
df = pd.read_csv("./task3_dataset.csv", encoding="utf-8-sig")
df.to_csv("task3_dataset.csv", index=False, encoding="utf-8-sig")


Sheet downloaded and saved to './task3_dataset.csv'


## Image Download

In [7]:
# Config
DATA_PATH = "task3_dataset.csv"
IMAGE_DIR = "images"
os.makedirs(IMAGE_DIR, exist_ok=True)

# Load Dataset
df = pd.read_csv(DATA_PATH, encoding="utf-8-sig")

# Utility: Extract File ID from Google Drive Link
def extract_drive_file_id(link):
    match = re.search(r"/d/([a-zA-Z0-9_-]+)", link)
    return match.group(1) if match else None

# Download Function
def download_drive_image(file_id, output_path):
    """Downloads a file from Google Drive using file ID."""
    URL = "https://drive.google.com/uc?export=download"

    session = requests.Session()
    response = session.get(URL, params={"id": file_id}, stream=True)

    if response.status_code == 200:
        with open(output_path, "wb") as f:
            for chunk in response.iter_content(1024):
                if chunk:
                    f.write(chunk)
        return True
    else:
        return False

# Process and Download All Images
image_paths = []

print("Downloading images...")

for idx, row in tqdm(df.iterrows(), total=len(df)):
    link = row['Photo']
    file_id = extract_drive_file_id(link)

    if file_id:
        filename = f"{idx}.jpg"
        save_path = os.path.join(IMAGE_DIR, filename)

        success = download_drive_image(file_id, save_path)
        image_paths.append(save_path if success else None)
    else:
        image_paths.append(None)

# Add Image Path Column to DataFrame
df["Image_Path"] = image_paths
print("All images processed and stored in 'images/'")

# Optional: Save updated dataset with image paths
df.to_csv("task3_dataset_with_image_paths.csv", index=False, encoding="utf-8-sig")


Downloading images...


100%|██████████| 10/10 [01:18<00:00,  7.87s/it]

All images processed and stored in 'images/'





## Text Normalization

In [8]:
# Text normalization
def normalize_text(text):
    if pd.isna(text):
        return ""
    text = str(text)
    replacements = {
        "ي": "ی", "ك": "ک", "ئ": "ی", "أ": "ا", "إ": "ا", "ۀ": "ه", "ؤ": "و",
    }
    for src, dst in replacements.items():
        text = text.replace(src, dst)
    text = re.sub(r"[\u064B-\u0652]", "", text)
    text = re.sub(r"[^\w\s\u0600-\u06FF]", " ", text)
    return re.sub(r"\s+", " ", text).strip()


## Number Normalization

In [9]:
# Number normalization
def normalize_numbers(text):
    if pd.isna(text):
        return []
    text = str(text)
    text = text.translate(str.maketrans("۰۱۲۳۴۵۶۷۸۹", "0123456789"))
    digits = re.findall(r"\d{6,}", text)
    return list(set(digits))


## Apply Cleaning

In [10]:
# Normalize name column
df["Normalized_Name"] = df["Name"].apply(normalize_text)

# Normalize and split numbers
df["Normalized_Numbers"] = df["Number"].apply(normalize_numbers)

# Preview
df[["Name", "Normalized_Name", "Number", "Normalized_Numbers"]]

# Update dataset csv file
df.to_csv("task3_dataset_with_image_paths.csv", index=False, encoding="utf-8-sig")


# OCR

In [None]:
# Load data
df = pd.read_csv("task3_dataset_with_image_paths.csv", encoding="utf-8-sig")

# OCR preprocessing (CLAHE + Threshold)
def preprocess_image(path, max_width=1024):
    img = cv2.imread(path)
    if img is None:
        return None
    h, w = img.shape[:2]
    if w > max_width:
        img = cv2.resize(img, (max_width, int(h * max_width / w)))
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    clahe = cv2.createCLAHE(clipLimit=2.0).apply(gray)
    th = cv2.adaptiveThreshold(clahe, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                               cv2.THRESH_BINARY, 15, 10)
    return th

# Run OCR
reader = easyocr.Reader(['fa', 'en'], gpu=True)

def extract_easyocr_lines(image_path):
    img = preprocess_image(image_path)
    if img is None:
        return []
    results = reader.readtext(img, detail=0)
    return [line.strip() for line in results if line.strip()]

# Post-OCR extraction

def extract_best_name(lines):
    clean_lines = []
    for line in lines:
        line = normalize_text(line)
        if not line or len(line) < 5:
            continue
        if re.search(r"\d", line):  # skip digits
            continue
        if re.search(r"[\u0600-\u06FF]", line):
            clean_lines.append(line)

    if not clean_lines:
        return ""
    return sorted(clean_lines, key=lambda x: len(x.split()), reverse=True)[0]

def extract_clean_numbers(lines):
    all_text = "\n".join(lines).translate(str.maketrans("۰۱۲۳۴۵۶۷۸۹", "0123456789"))
    raw_numbers = re.findall(r"\d{6,}", all_text)

    # Accept numbers starting with known patterns
    clean = []
    for num in raw_numbers:
        if len(num) >= 10 and (num.startswith("09") or num.startswith("021")):
            clean.append(num)
    return list(set(clean))

# Matching & Scoring
ocr_names = []
ocr_numbers = []
name_scores = []
name_matches = []
number_matches = []

for _, row in tqdm(df.iterrows(), total=len(df)):
    img_path = row["Image_Path"]
    if not os.path.exists(img_path):
        ocr_names.append("")
        ocr_numbers.append([])
        name_scores.append(0)
        name_matches.append(False)
        number_matches.append(0)
        continue

    lines = extract_easyocr_lines(img_path)
    name = extract_best_name(lines)
    numbers = extract_clean_numbers(lines)

    # Fuzzy match
    expected_name = normalize_text(row["Name"])
    score = int(SequenceMatcher(None, name, expected_name).ratio() * 100)
    match = score >= 85

    # Phone number match
    expected_numbers = normalize_numbers(row["Number"])
    matched_numbers = [n for n in numbers if n in expected_numbers]

    # Store
    ocr_names.append(name)
    ocr_numbers.append(numbers)
    name_scores.append(score)
    name_matches.append(match)
    number_matches.append(len(matched_numbers))

# Finalize dataframe
df["OCR_Name"] = ocr_names
df["OCR_Numbers"] = ocr_numbers
df["Name_Score"] = name_scores
df["Name_Match"] = name_matches
df["Number_Match_Count"] = number_matches

# Save it
df.to_csv("task3_output_easyocr.csv", index=False, encoding="utf-8-sig")
print("OCR pipeline complete using EasyOCR.")


Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.
100%|██████████| 10/10 [01:36<00:00,  9.67s/it]

OCR pipeline complete using EasyOCR.



