## Task-3 

### Data Enrichment with Object Detection 

In [1]:
import sys
import os
# Add parent directory to path so we can import from src
sys.path.append(os.path.abspath('..'))

In [2]:
from src.config import DATA_PATHS

IMAGE_ROOT = DATA_PATHS["raw_images"]
OUTPUT_CSV = "../data/processed/yolo_detections.csv"

print("Image root:", IMAGE_ROOT)
print("Output CSV:", OUTPUT_CSV)


Image root: c:\Users\hp\Desktop\python-projects\Shipping-a-Data-Product\data\raw\images
Output CSV: ../data/processed/yolo_detections.csv


In [3]:
from pathlib import Path

assert Path(IMAGE_ROOT).exists(), "‚ùå Raw image directory does not exist"

Path("../data/processed").mkdir(parents=True, exist_ok=True)

print("‚úÖ Paths validated and output directory ready")


‚úÖ Paths validated and output directory ready


In [4]:
try:
    from ultralytics import YOLO
    print("‚úÖ ultralytics available")
except ImportError:
    print("Installing ultralytics...")
    !pip install ultralytics


‚úÖ ultralytics available


In [9]:
# Diagnostic: Check what schemas and tables exist in the database
import psycopg2
from src.config import DATABASE_CONFIG

def check_database_structure():
    """Check what schemas and tables exist in the database"""
    try:
        conn = psycopg2.connect(**DATABASE_CONFIG)
        cur = conn.cursor()
        
        # Get all schemas
        cur.execute("""
            SELECT schema_name 
            FROM information_schema.schemata 
            WHERE schema_name NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
            ORDER BY schema_name
        """)
        schemas = cur.fetchall()
        print("üìã Available schemas:")
        for schema in schemas:
            print(f"   - {schema[0]}")
        
        # Check for message-related tables in each schema
        print("\nüîç Looking for message-related tables...")
        cur.execute("""
            SELECT table_schema, table_name 
            FROM information_schema.tables 
            WHERE table_name LIKE '%message%' OR table_name LIKE '%fct%' OR table_name LIKE '%stg%'
            ORDER BY table_schema, table_name
        """)
        tables = cur.fetchall()
        
        if tables:
            print("\nüìä Found related tables:")
            for schema, table in tables:
                # Get row count
                try:
                    cur.execute(f'SELECT COUNT(*) FROM "{schema}"."{table}"')
                    count = cur.fetchone()[0]
                    print(f"   - {schema}.{table} ({count:,} rows)")
                except:
                    print(f"   - {schema}.{table} (could not count rows)")
        else:
            print("   ‚ö†Ô∏è No message-related tables found")
        
        cur.close()
        conn.close()
        return schemas, tables
    except Exception as e:
        print(f"‚ùå Error checking database: {e}")
        import traceback
        traceback.print_exc()
        return None, None

schemas, tables = check_database_structure()

üìã Available schemas:
   - public
   - raw

üîç Looking for message-related tables...

üìä Found related tables:
   - public.fct_messages (30,714 rows)
   - public.stg_telegram_messages (30,714 rows)
   - raw.telegram_messages (34,474 rows)


In [10]:
import psycopg2
from src.config import DATABASE_CONFIG

def get_message_keys_from_db():
    """Fetch message_id -> (channel_key, date_key) mapping from database"""
    try:
        conn = psycopg2.connect(**DATABASE_CONFIG)
        cur = conn.cursor()
        
        # First, discover what tables actually exist
        cur.execute("""
            SELECT table_schema, table_name 
            FROM information_schema.tables 
            WHERE (table_name LIKE '%message%' OR table_name LIKE '%fct%' OR table_name LIKE '%stg%')
            AND table_schema NOT IN ('information_schema', 'pg_catalog')
            ORDER BY table_schema, table_name
        """)
        available_tables = cur.fetchall()
        
        if not available_tables:
            print("‚ö†Ô∏è No message-related tables found in database")
            cur.close()
            conn.close()
            return {}
        
        print(f"üìã Found {len(available_tables)} potential tables:")
        for schema, table in available_tables:
            print(f"   - {schema}.{table}")
        
        results = None
        schema_used = None
        
        # Try to find fct_messages first (if dbt has run)
        for schema_name, table_name in available_tables:
            if table_name in ['fct_messages', 'fct_message']:
                try:
                    # Check if it has the required columns
                    cur.execute(f"""
                        SELECT column_name 
                        FROM information_schema.columns 
                        WHERE table_schema = %s AND table_name = %s
                        AND column_name IN ('message_id', 'channel_key', 'date_key')
                    """, (schema_name, table_name))
                    columns = [row[0] for row in cur.fetchall()]
                    if len(columns) == 3:
                        cur.execute(f"""
                            SELECT message_id, channel_key, date_key
                            FROM "{schema_name}"."{table_name}"
                        """)
                        results = cur.fetchall()
                        schema_used = f"{schema_name}.{table_name}"
                        print(f"‚úÖ Found and loaded from {schema_used}")
                        break
                except Exception as e:
                    continue
        
        # If fct_messages doesn't exist, try staging tables
        if results is None:
            print("‚ö†Ô∏è fct_messages not found, trying staging tables...")
            for schema_name, table_name in available_tables:
                if 'stg' in table_name.lower() or 'staging' in table_name.lower() or 'raw' in table_name.lower():
                    try:
                        # Check if it has required columns
                        cur.execute(f"""
                            SELECT column_name 
                            FROM information_schema.columns 
                            WHERE table_schema = %s AND table_name = %s
                            AND column_name IN ('message_id', 'channel_name', 'message_date')
                        """, (schema_name, table_name))
                        columns = [row[0] for row in cur.fetchall()]
                        if 'message_id' in columns and 'channel_name' in columns and 'message_date' in columns:
                            print(f"   Trying to compute keys from {schema_name}.{table_name}...")
                            cur.execute(f"""
                                WITH channel_keys AS (
                                    SELECT 
                                        ROW_NUMBER() OVER (ORDER BY channel_name) AS channel_key,
                                        channel_name
                                    FROM (
                                        SELECT DISTINCT channel_name 
                                        FROM "{schema_name}"."{table_name}"
                                    ) t
                                ),
                                date_keys AS (
                                    SELECT DISTINCT
                                        TO_CHAR(message_date, 'YYYYMMDD')::INT AS date_key,
                                        DATE(message_date) AS full_date
                                    FROM "{schema_name}"."{table_name}"
                                )
                                SELECT 
                                    s.message_id,
                                    ck.channel_key,
                                    dk.date_key
                                FROM "{schema_name}"."{table_name}" s
                                JOIN channel_keys ck ON s.channel_name = ck.channel_name
                                JOIN date_keys dk ON DATE(s.message_date) = dk.full_date
                            """)
                            results = cur.fetchall()
                            schema_used = f"{schema_name}.{table_name}"
                            print(f"‚úÖ Computed keys from {schema_used}")
                            break
                    except Exception as e:
                        print(f"   ‚ö†Ô∏è Could not use {schema_name}.{table_name}: {str(e)[:100]}")
                        continue
        
        if results is None:
            print("‚ö†Ô∏è Could not find suitable tables for channel_key and date_key")
            print("   Will proceed with None values for these fields")
            cur.close()
            conn.close()
            return {}
        
        cur.close()
        conn.close()
        
        # Create mapping: message_id -> (channel_key, date_key)
        message_map = {}
        for message_id, channel_key, date_key in results:
            message_map[str(message_id)] = (channel_key, date_key)
        
        print(f"‚úÖ Loaded {len(message_map)} message keys from database")
        return message_map
    except Exception as e:
        print(f"‚ö†Ô∏è Could not load from database: {e}")
        print("   Will proceed without channel_key and date_key")
        import traceback
        traceback.print_exc()
        return {}

message_keys_map = get_message_keys_from_db()

üìã Found 3 potential tables:
   - public.fct_messages
   - public.stg_telegram_messages
   - raw.telegram_messages
‚úÖ Found and loaded from public.fct_messages
‚úÖ Loaded 29021 message keys from database


In [11]:
from ultralytics import YOLO
from pathlib import Path
from src.yolo.classifier import classify_image
from src.yolo.utils import extract_channel_and_message_id
import csv

model = YOLO("yolov8n.pt")

def run_yolo_with_keys(image_root: Path, output_csv: str, limit=None, message_keys_map=None):
    """Run YOLO detection pipeline with channel_key and date_key enrichment"""
    if message_keys_map is None:
        message_keys_map = {}
    
    results_rows = []
    images = list(image_root.rglob("*.jpg"))
    
    if limit:
        images = images[:limit]
        print(f"üì∏ Processing first {limit} images for sample...")
    else:
        print(f"üì∏ Processing {len(images)} images...")
    
    for idx, image_path in enumerate(images, 1):
        if idx % 100 == 0:
            print(f"   Progress: {idx}/{len(images)}")
        
        channel_code, message_id = extract_channel_and_message_id(image_path)
        
        # Run YOLO detection
        detections = model(image_path, verbose=False)[0]
        
        detected_objects = [
            (model.names[int(box.cls)], float(box.conf))
            for box in detections.boxes
        ]
        
        # Get channel_key and date_key from database mapping
        channel_key = None
        date_key = None
        if message_id in message_keys_map:
            channel_key, date_key = message_keys_map[message_id]
        
        if not detected_objects:
            # Still write a row even if no detections, but with "none" class
            image_category = "none"
            results_rows.append({
                "message_id": message_id,
                "channel_key": channel_key,
                "date_key": date_key,
                "detected_class": "none",
                "confidence_score": 0.0,
                "image_category": image_category
            })
        else:
            image_category = classify_image(detected_objects)
            
            for label, confidence in detected_objects:
                results_rows.append({
                    "message_id": message_id,
                    "channel_key": channel_key,
                    "date_key": date_key,
                    "detected_class": label,
                    "confidence_score": round(confidence, 4),
                    "image_category": image_category
                })
    
    # Write results
    fieldnames = ["message_id", "channel_key", "date_key", "detected_class", "confidence_score", "image_category"]
    with open(output_csv, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(results_rows)
    
    print(f"‚úÖ Saved {len(results_rows)} detection records to {output_csv}")
    return results_rows

print("‚úÖ YOLO detection function ready")

‚úÖ YOLO detection function ready


In [12]:
# Process first 5 images for sample
SAMPLE_OUTPUT = "../data/processed/yolo_detections_sample.csv"

sample_results = run_yolo_with_keys(
    image_root=IMAGE_ROOT,
    output_csv=SAMPLE_OUTPUT,
    limit=5,
    message_keys_map=message_keys_map
)

print(f"\n‚úÖ Sample processing complete!")

üì∏ Processing first 5 images for sample...
‚úÖ Saved 7 detection records to ../data/processed/yolo_detections_sample.csv

‚úÖ Sample processing complete!


In [13]:
# Display sample results
import pandas as pd

sample_df = pd.read_csv(SAMPLE_OUTPUT)
print("üìä Sample Results (First 5 images):")
print("\n" + "="*80)
print(sample_df.to_string(index=False))
print("="*80)
print(f"\n‚úÖ Total rows in sample: {len(sample_df)}")
print(f"‚úÖ Columns: {list(sample_df.columns)}")

üìä Sample Results (First 5 images):

 message_id  channel_key  date_key detected_class  confidence_score  image_category
      53469            1  20251116       scissors            0.3423           other
      53490            1  20251123         person            0.7094       lifestyle
      53490            1  20251123         person            0.5319       lifestyle
      53490            1  20251123         person            0.3156       lifestyle
      53491            1  20251123           none            0.0000            none
      53492            1  20251123           book            0.4193           other
      53493            1  20251127         bottle            0.7497 product_display

‚úÖ Total rows in sample: 7
‚úÖ Columns: ['message_id', 'channel_key', 'date_key', 'detected_class', 'confidence_score', 'image_category']


In [14]:
# Now process ALL images
FINAL_OUTPUT = "../data/processed/yolo_detections.csv"

all_results = run_yolo_with_keys(
    image_root=IMAGE_ROOT,
    output_csv=FINAL_OUTPUT,
    limit=None,  # Process all images
    message_keys_map=message_keys_map
)

print(f"\n‚úÖ Full processing complete!")

üì∏ Processing 15494 images...
   Progress: 100/15494
   Progress: 200/15494
   Progress: 300/15494
   Progress: 400/15494
   Progress: 500/15494
   Progress: 600/15494
   Progress: 700/15494
   Progress: 800/15494
   Progress: 900/15494
   Progress: 1000/15494
   Progress: 1100/15494
   Progress: 1200/15494
   Progress: 1300/15494
   Progress: 1400/15494
   Progress: 1500/15494
   Progress: 1600/15494
   Progress: 1700/15494
   Progress: 1800/15494
   Progress: 1900/15494
   Progress: 2000/15494
   Progress: 2100/15494
   Progress: 2200/15494
   Progress: 2300/15494
   Progress: 2400/15494
   Progress: 2500/15494
   Progress: 2600/15494
   Progress: 2700/15494
   Progress: 2800/15494
   Progress: 2900/15494
   Progress: 3000/15494
   Progress: 3100/15494
   Progress: 3200/15494
   Progress: 3300/15494
   Progress: 3400/15494
   Progress: 3500/15494
   Progress: 3600/15494
   Progress: 3700/15494
   Progress: 3800/15494
   Progress: 3900/15494
   Progress: 4000/15494
   Progress: 4100

In [15]:
# Display summary of final results
final_df = pd.read_csv(FINAL_OUTPUT)

print("Final Results Summary:")
print("="*80)
final_df.head()

Final Results Summary:


Unnamed: 0,message_id,channel_key,date_key,detected_class,confidence_score,image_category
0,53469,1.0,20251116.0,scissors,0.3423,other
1,53490,1.0,20251123.0,person,0.7094,lifestyle
2,53490,1.0,20251123.0,person,0.5319,lifestyle
3,53490,1.0,20251123.0,person,0.3156,lifestyle
4,53491,1.0,20251123.0,none,0.0,none


In [9]:
import importlib
import sys

# Force reload the modules
for module in ['src.yolo.detector', 'src.yolo.utils', 'src.yolo.classifier']:
    if module in sys.modules:
        importlib.reload(sys.modules[module])

from src.yolo.detector import run_yolo_pipeline

print("‚úÖ YOLO detection module loaded")

‚úÖ YOLO detection module loaded


In [19]:
expected_cols = {
    "message_id",
    "channel_key",
    "detected_class",
    "confidence_score",
    "image_category"
}

assert expected_cols.issubset(final_df.columns), "‚ùå CSV schema invalid"

print("‚úÖ CSV schema validated")


‚úÖ CSV schema validated
