In [None]:
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
import os
from dotenv import load_dotenv
import re
from datetime import datetime
import uuid

# Load environment variables
load_dotenv()

# Database Configuration
DB_NAME = os.getenv("DB_NAME", "bpr_cars")
DB_USER = os.getenv("DB_USER", "postgres")
DB_PASS = os.getenv("DB_PASS", "postgres")
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "5432")

TABLE_NAME = 'cars'
CSV_FILE = 'bilbasen_scrape/car_details.csv'

print(f"üìä Database: {DB_NAME}@{DB_HOST}:{DB_PORT}")
print(f"üìÅ CSV File: {CSV_FILE}")

In [None]:
# Load CSV Data
df = pd.read_csv(CSV_FILE, low_memory=False)
print(f"‚úÖ Loaded {len(df)} rows from {CSV_FILE}")
print(f"\nColumns in CSV ({len(df.columns)}):")
print(list(df.columns))
print(f"\nFirst few rows:")
df.head(3)

In [None]:
# Inspect data types and missing values
print("\nüìä Data Quality Report:")
print(f"Total rows: {len(df)}")
print(f"\nMissing values per column:")
missing = df.isnull().sum()
missing_pct = (missing / len(df) * 100).round(2)
missing_report = pd.DataFrame({'Missing': missing, 'Percent': missing_pct})
print(missing_report[missing_report['Missing'] > 0].sort_values('Missing', ascending=False))

print(f"\nüîë External ID analysis:")
print(f"Unique external_ids: {df['external_id'].nunique()}")
print(f"Duplicate external_ids: {df['external_id'].duplicated().sum()}")

## Helper Functions for Data Cleaning

In [None]:
def clean_price(price_str):
    """Extract numeric price from strings like '38.900 kr.' or '250.000 kr.'"""
    if pd.isna(price_str):
        return None
    # Remove 'kr.', spaces, and dots, then convert to float
    cleaned = str(price_str).replace('kr.', '').replace('.', '').replace(',', '.').strip()
    try:
        return float(cleaned)
    except:
        return None

def clean_numeric(value, allow_comma=False):
    """Clean numeric values, handle commas as decimal separators"""
    if pd.isna(value):
        return None
    cleaned = str(value).replace('.', '').replace(',', '.').strip()
    try:
        return float(cleaned)
    except:
        return None

def extract_horsepower(power_str):
    """Extract HP from strings like '60 HK / 44 kW' or '150 HK'"""
    if pd.isna(power_str):
        return None
    match = re.search(r'(\d+)\s*HK', str(power_str))
    if match:
        return int(match.group(1))
    return None

def extract_torque(power_str):
    """Extract Nm from strings like '60 HK / 44 kW / 120 Nm'"""
    if pd.isna(power_str):
        return None
    match = re.search(r'(\d+)\s*Nm', str(power_str))
    if match:
        return int(match.group(1))
    return None

def extract_mileage(mileage_str):
    """Extract mileage from strings like '150.000 km' or use mileage_km_numeric"""
    if pd.isna(mileage_str):
        return None
    # Remove 'km', spaces, dots
    cleaned = str(mileage_str).replace('km', '').replace('.', '').replace(',', '').strip()
    try:
        return int(cleaned)
    except:
        return None

def parse_date(date_str):
    """Parse dates in various formats"""
    if pd.isna(date_str) or date_str == '':
        return None
    try:
        # Try ISO format first
        return pd.to_datetime(date_str).strftime('%Y-%m-%d %H:%M:%S')
    except:
        return None

def parse_boolean(value):
    """Parse boolean values from various formats"""
    if pd.isna(value):
        return None
    if str(value).lower() in ['true', 'yes', 'ja', '1', 't', 'y']:
        return True
    if str(value).lower() in ['false', 'no', 'nej', '0', 'f', 'n']:
        return False
    return None

def extract_co2(co2_str):
    """Extract CO2 value from strings like '120 g/km'"""
    if pd.isna(co2_str):
        return None
    match = re.search(r'(\d+)', str(co2_str))
    if match:
        return str(match.group(1)) + ' g/km'
    return str(co2_str)

# Test the functions
print("Testing helper functions:")
print(f"Price '38.900 kr.' -> {clean_price('38.900 kr.')}")
print(f"Price '250.000 kr.' -> {clean_price('250.000 kr.')}")
print(f"Horsepower '150 HK / 110 kW' -> {extract_horsepower('150 HK / 110 kW')}")
print(f"Mileage '150.000 km' -> {extract_mileage('150.000 km')}")

## Data Transformation

In [None]:
# Create a clean DataFrame for upload
clean_df = pd.DataFrame()

# Generate UUIDs and external_id
clean_df['id'] = [str(uuid.uuid4()) for _ in range(len(df))]
clean_df['external_id'] = df['external_id'].astype(str)

# Basic info
clean_df['url'] = df['url']
clean_df['brand'] = df['brand']
clean_df['model'] = df['model']
clean_df['variant'] = df['variant']
clean_df['title'] = df['title']
clean_df['description'] = df['description']

# Price
clean_df['price'] = df['price'].apply(clean_price)
clean_df['new_price'] = df['model_new_price'].apply(clean_price)

# Years and dates
clean_df['model_year'] = pd.to_numeric(df['details_model_year'], errors='coerce').astype('Int64')
clean_df['year'] = clean_df['model_year']  # Use model_year as primary year
clean_df['first_registration'] = df['details_first_registration']
clean_df['production_date'] = df['details_production_year']

# Mileage - prefer mileage_km_numeric if available
clean_df['mileage'] = df['mileage_km_numeric'].fillna(df['details_mileage_km'].apply(extract_mileage))
clean_df['mileage'] = pd.to_numeric(clean_df['mileage'], errors='coerce').astype('Int64')

# Fuel and transmission
clean_df['fuel_type'] = df['details_fuel_type']
clean_df['transmission'] = df['details_geartype'].fillna(df['attr_gear_type'])
clean_df['gear_count'] = pd.to_numeric(df['details_number_of_gears'], errors='coerce').astype('Int64')

# Engine specs
clean_df['cylinders'] = pd.to_numeric(df['model_cylinders'], errors='coerce').astype('Int64')
clean_df['horsepower'] = df['details_power_hp_nm'].apply(extract_horsepower).fillna(
    pd.to_numeric(df['attr_power_hp'], errors='coerce')
).astype('Int64')
clean_df['torque_nm'] = df['details_power_hp_nm'].apply(extract_torque).astype('Int64')

# Performance
clean_df['acceleration'] = df['details_acceleration_0_100'].apply(clean_numeric).fillna(
    df['attr_acceleration_0_100'].apply(clean_numeric)
)
clean_df['top_speed'] = pd.to_numeric(df['details_top_speed'], errors='coerce').fillna(
    pd.to_numeric(df['attr_top_speed_kmh'], errors='coerce')
).astype('Int64')

# Electric vehicle specs
clean_df['range_km'] = pd.to_numeric(df['details_range_km'], errors='coerce').astype('Int64')
clean_df['battery_capacity'] = df['details_battery_capacity_kwh'].apply(clean_numeric)
clean_df['energy_consumption'] = pd.to_numeric(df['details_energy_consumption'], errors='coerce').astype('Int64')
clean_df['home_charging_ac'] = df['details_home_charging_ac']
clean_df['fast_charging_dc'] = df['details_fast_charging_dc']
clean_df['charging_time_dc'] = df['details_charging_time_dc_10_80_pct']

# Fuel consumption and emissions
clean_df['fuel_consumption'] = df['details_fuel_consumption']
clean_df['co2_emission'] = df['details_co2_udledning'].apply(extract_co2)
clean_df['euro_norm'] = df['details_euro_norm']
clean_df['tank_capacity'] = pd.to_numeric(df['model_tankkapacitet'], errors='coerce').astype('Int64')

# Physical specs
clean_df['body_type'] = df['model_body_type']
clean_df['weight'] = pd.to_numeric(df['model_weight_kg'], errors='coerce').fillna(
    pd.to_numeric(df['attr_weight_kg'], errors='coerce')
).astype('Int64')
clean_df['width'] = pd.to_numeric(df['model_width_cm'], errors='coerce').astype('Int64')
clean_df['length'] = pd.to_numeric(df['model_length_cm'], errors='coerce').astype('Int64')
clean_df['height'] = pd.to_numeric(df['model_height_cm'], errors='coerce').astype('Int64')
clean_df['trunk_size'] = pd.to_numeric(df['model_trunk_size'], errors='coerce').astype('Int64')
clean_df['load_capacity'] = pd.to_numeric(df['model_load_capacity_kg'], errors='coerce').astype('Int64')

# Towing
clean_df['towing_capacity'] = df['details_towing_capacity'].apply(clean_numeric).astype('Int64')
clean_df['max_towing_weight'] = pd.to_numeric(df['model_max_towing_with_brake'], errors='coerce').astype('Int64')

# Drive and safety
clean_df['drive_type'] = df['model_drive_type']
clean_df['abs_brakes'] = df['model_abs_brakes'].apply(parse_boolean)
clean_df['esp'] = df['model_esp'].apply(parse_boolean)
clean_df['airbags'] = pd.to_numeric(df['model_airbags'], errors='coerce').astype('Int64')
clean_df['doors'] = pd.to_numeric(df['model_doors'], errors='coerce').astype('Int64')
clean_df['seats'] = None  # Not in scraper output

# Color and category
clean_df['color'] = df['details_color'].fillna(df['attr_color'])
clean_df['category'] = df['model_category']
clean_df['equipment'] = df['equipment']

# Tax
clean_df['periodic_tax'] = df['details_periodic_tax']
clean_df['tax'] = df['details_periodic_tax'].apply(clean_price)  # NEW: Store tax as numeric

# Engine size (not in scraper, set to None)
clean_df['engine_size'] = None

# Location and seller
clean_df['source_url'] = df['url']
clean_df['location'] = df['seller_city'].fillna('') + ' ' + df['seller_zipcode'].fillna('')
clean_df['location'] = clean_df['location'].str.strip()
clean_df['dealer_name'] = df['seller_name']

# Image handling - NEW
clean_df['image_path'] = df['image_filename'].apply(
    lambda x: f"bilbasen_scrape/images/{x}" if pd.notna(x) and x != '' else None
)
clean_df['image_downloaded'] = False  # Will be updated by auto_scraper

# Timestamps
clean_df['listing_date'] = df['listing_date'].apply(parse_date)
clean_df['created_at'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
clean_df['updated_at'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')

print(f"\n‚úÖ Cleaned data ready: {len(clean_df)} rows, {len(clean_df.columns)} columns")
print(f"\nColumns in clean DataFrame:")
print(list(clean_df.columns))
clean_df.head(3)

In [None]:
# Check for required fields
print("\nüîç Required fields validation:")
required = ['external_id', 'brand', 'model', 'price']
for col in required:
    missing = clean_df[col].isna().sum()
    print(f"{col}: {missing} missing ({missing/len(clean_df)*100:.2f}%)")

# Drop rows with missing required fields
before_count = len(clean_df)
clean_df = clean_df.dropna(subset=['external_id', 'brand', 'model', 'price'])
after_count = len(clean_df)
print(f"\n‚úÇÔ∏è Dropped {before_count - after_count} rows with missing required fields")
print(f"Final dataset: {after_count} rows")

## Upload to Database with UPSERT

In [None]:
# Connect to database
try:
    conn = psycopg2.connect(
        dbname=DB_NAME,
        user=DB_USER,
        password=DB_PASS,
        host=DB_HOST,
        port=DB_PORT
    )
    cur = conn.cursor()
    print(f"‚úÖ Connected to database: {DB_NAME}")
    
    # Check current count
    cur.execute(f"SELECT COUNT(*) FROM {TABLE_NAME}")
    before_count = cur.fetchone()[0]
    print(f"üìä Current rows in {TABLE_NAME}: {before_count}")
    
except Exception as e:
    print(f"‚ùå Database connection failed: {e}")
    raise

In [None]:
# Prepare data for UPSERT
# Replace None with NULL for psycopg2
upload_data = clean_df.where(pd.notnull(clean_df), None)

# Convert to list of tuples
data_tuples = [tuple(x) for x in upload_data.values]

print(f"\nüì¶ Prepared {len(data_tuples)} records for upload")
print(f"Sample record (first 10 fields): {data_tuples[0][:10]}")

In [None]:
# Build UPSERT query
columns = list(clean_df.columns)
columns_str = ', '.join(columns)
placeholders = ', '.join(['%s'] * len(columns))

# Update columns (all except id and external_id)
update_columns = [col for col in columns if col not in ['id', 'external_id']]
update_str = ', '.join([f"{col} = EXCLUDED.{col}" for col in update_columns])

upsert_query = f"""
    INSERT INTO {TABLE_NAME} ({columns_str})
    VALUES %s
    ON CONFLICT (external_id)
    DO UPDATE SET {update_str}
"""

print("\nüìù UPSERT Query prepared (will insert new or update existing based on external_id)")
print(f"Columns: {len(columns)}")
print(f"Update fields: {len(update_columns)}")

In [None]:
# Execute UPSERT in batches
batch_size = 1000
total_batches = (len(data_tuples) + batch_size - 1) // batch_size

print(f"\nüöÄ Starting upload in {total_batches} batches of {batch_size}...")

try:
    for i in range(0, len(data_tuples), batch_size):
        batch = data_tuples[i:i+batch_size]
        execute_values(cur, upsert_query, batch, page_size=100)
        batch_num = i // batch_size + 1
        print(f"‚úÖ Batch {batch_num}/{total_batches} uploaded ({len(batch)} records)")
    
    conn.commit()
    print(f"\n‚úÖ All data committed to database!")
    
    # Check final count
    cur.execute(f"SELECT COUNT(*) FROM {TABLE_NAME}")
    after_count = cur.fetchone()[0]
    
    print(f"\nüìä Upload Summary:")
    print(f"Before: {before_count} rows")
    print(f"After: {after_count} rows")
    print(f"Net change: +{after_count - before_count} rows")
    
except Exception as e:
    conn.rollback()
    print(f"\n‚ùå Upload failed: {e}")
    raise
finally:
    cur.close()
    conn.close()
    print(f"\nüîí Database connection closed")

## Verify Upload

In [None]:
# Reconnect and verify
conn = psycopg2.connect(
    dbname=DB_NAME,
    user=DB_USER,
    password=DB_PASS,
    host=DB_HOST,
    port=DB_PORT
)

# Sample queries
queries = [
    ("Total cars", "SELECT COUNT(*) FROM cars"),
    ("Cars with external_id", "SELECT COUNT(*) FROM cars WHERE external_id IS NOT NULL"),
    ("Cars with images", "SELECT COUNT(*) FROM cars WHERE image_path IS NOT NULL"),
    ("Highest external_id", "SELECT MAX(external_id::bigint) FROM cars"),
    ("Brands", "SELECT COUNT(DISTINCT brand) FROM cars"),
    ("Fuel types", "SELECT fuel_type, COUNT(*) FROM cars GROUP BY fuel_type ORDER BY COUNT(*) DESC LIMIT 5")
]

print("\nüîç Database Verification:\n")
for description, query in queries:
    result = pd.read_sql(query, conn)
    print(f"{description}:")
    print(result)
    print()

conn.close()
print("‚úÖ Verification complete!")