In [17]:
import pandas as pd
import mysql.connector
from mysql.connector import Error
from dotenv import load_dotenv
import os
from datetime import datetime
from glob import glob
import csv

# Load environment variables
load_dotenv()

FILESS_MYSQL_HOST = os.getenv("FILESS_MYSQL_HOST")
FILESS_MYSQL_PORT = int(os.getenv("FILESS_MYSQL_PORT"))
FILESS_MYSQL_USER = os.getenv("FILESS_MYSQL_USER")
FILESS_MYSQL_PASSWORD = os.getenv("FILESS_MYSQL_PASSWORD")
FILESS_MYSQL_DATABASE = os.getenv("FILESS_MYSQL_DATABASE")
FILESS_MYSQL_TABLE = os.getenv("FILESS_MYSQL_DELHI_TABLE")  # table name for all data

csv_folder_path = "/home/neosoft/Desktop/wheather_analytics_trends/data_lake/raw/"
csv_file_paths = glob(os.path.join(csv_folder_path, "*.csv"))

if not csv_file_paths:
    raise FileNotFoundError(f"No CSV files found in folder: {csv_folder_path}")

# ---------- Helper functions ----------
def detect_delimiter(file_path):
    """Detect CSV delimiter dynamically."""
    with open(file_path, 'r', newline='', encoding='utf-8') as f:
        sample = f.read(2048)
        return csv.Sniffer().sniff(sample).delimiter

def clean_value(val):
    if pd.isna(val):
        return None
    val_str = str(val).strip().strip('"').strip("'").replace("(", "").replace(")", "").strip()
    return val_str if val_str != "" else None

def clean_number(val):
    val_clean = clean_value(val)
    if val_clean is None:
        return None
    try:
        return float(val_clean)
    except ValueError:
        print(f"[WARN] Could not convert to float: {val} → cleaned: {val_clean}")
        return None

def parse_date(val):
    val_clean = clean_value(val)
    if not val_clean:
        return None
    try:
        return datetime.strptime(val_clean, '%Y-%m-%d').date()
    except ValueError:
        print(f"[WARN] Could not parse date: {val} → cleaned: {val_clean}")
        return None

def read_csv_with_fallback(file_path):
    """Read CSV, auto-detect delimiter, and handle bad header cases."""
    # First, try normal read with sniffed delimiter
    delimiter = detect_delimiter(file_path)
    df = pd.read_csv(file_path, delimiter=delimiter)

    # If only one column, try splitting manually on comma
    if len(df.columns) == 1:
        print(f"[WARN] File {file_path} has only 1 column after reading. Trying manual split.")
        with open(file_path, 'r', encoding='utf-8') as f:
            first_line = f.readline().strip()
            # Try guessing delimiter from the first line directly
            possible_delims = [',', ';', '\t', '|']
            for delim in possible_delims:
                if delim in first_line:
                    print(f"[INFO] Overriding delimiter to '{delim}' for {file_path}")
                    df = pd.read_csv(file_path, delimiter=delim)
                    break

    return df

def parse_time(val):
    val_clean = clean_value(val)
    if not val_clean:
        return None
    # Try ISO time format first
    try:
        return datetime.strptime(val_clean, "%Y-%m-%dT%H:%M:%S").time()
    except ValueError:
        pass
    # Try space-separated datetime
    try:
        return datetime.strptime(val_clean, "%Y-%m-%d %H:%M:%S").time()
    except ValueError:
        print(f"[WARN] Could not parse time: {val} → cleaned: {val_clean}")
        return None
# --------------------------------------

try:
    connection = mysql.connector.connect(
        host=FILESS_MYSQL_HOST,
        database=FILESS_MYSQL_DATABASE,
        user=FILESS_MYSQL_USER,
        password=FILESS_MYSQL_PASSWORD,
        port=FILESS_MYSQL_PORT
    )

    if connection.is_connected():
        print("✅ Connected to MySQL Server successfully!")
        cursor = connection.cursor()

        for csv_file_path in csv_file_paths:
            print(f"\n📂 Processing file: {csv_file_path}")
            # data = read_csv_with_fallback(csv_file_path)
            # print(f"📊 {len(data)} records found. Columns: {list(data.columns)}")

            # Detect delimiter
            delimiter = detect_delimiter(csv_file_path)
            print(f"🔍 Detected delimiter: '{delimiter}'")

            # Read CSV with correct delimiter
            data = read_csv_with_fallback(csv_file_path)
            total_records = len(data)
            print(f"📊 {len(data)} records found. Columns: {list(data.columns)}")
            print(f"📊 {total_records} records found. Columns: {list(data.columns)}")

            for row_index, row in data.iterrows():
                record = (
                    clean_value(row.get('name')),
                    parse_date(row.get('datetime')),
                    clean_number(row.get('tempmax')),
                    clean_number(row.get('tempmin')),
                    clean_number(row.get('temp')),
                    clean_number(row.get('feelslikemax')),
                    clean_number(row.get('feelslikemin')),
                    clean_number(row.get('feelslike')),
                    clean_number(row.get('dew')),
                    clean_number(row.get('humidity')),
                    clean_number(row.get('precip')),
                    clean_number(row.get('precipprob')),
                    clean_number(row.get('precipcover')),
                    clean_value(row.get('preciptype')),
                    clean_number(row.get('snow')),
                    clean_number(row.get('snowdepth')),
                    clean_number(row.get('windgust')),
                    clean_number(row.get('windspeed')),
                    clean_number(row.get('winddir')),
                    clean_number(row.get('sealevelpressure')),
                    clean_number(row.get('cloudcover')),
                    clean_number(row.get('visibility')),
                    clean_number(row.get('solarradiation')),
                    clean_number(row.get('solarenergy')),
                    clean_number(row.get('uvindex')),
                    clean_number(row.get('severerisk')),
                    parse_time(row.get('sunrise')),
                    parse_time(row.get('sunset')),
                    clean_number(row.get('moonphase')),
                    clean_value(row.get('conditions')),
                    clean_value(row.get('description')),
                    clean_value(row.get('icon')),
                    clean_value(row.get('stations'))
                )

                # Debug output
                print(f"🔍 Row {row_index+1} cleaned record: {record}")

                insert_query = f"""
                INSERT INTO {FILESS_MYSQL_TABLE} (
                    name, datetime, tempmax, tempmin, temp, feelslikemax, feelslikemin, feelslike,
                    dew, humidity, precip, precipprob, precipcover, preciptype, snow, snowdepth,
                    windgust, windspeed, winddir, sealevelpressure, cloudcover, visibility,
                    solarradiation, solarenergy, uvindex, severerisk, sunrise, sunset, moonphase,
                    conditions, description, icon, stations
                ) VALUES ({', '.join(['%s'] * len(record))});
                """

                try:
                    cursor.execute(insert_query, record)
                except Error as e:
                    print(f"[ERROR] Failed to insert row {row_index+1}: {e}")

            connection.commit()
            print(f"✅ Inserted {total_records} records from file {csv_file_path}")

except Error as e:
    print("❌ Error while connecting to MySQL or inserting data:", e)

finally:
    if 'connection' in locals() and connection.is_connected():
        cursor.close()
        connection.close()
        print("🔒 MySQL connection is closed.")


✅ Connected to MySQL Server successfully!

📂 Processing file: /home/neosoft/Desktop/wheather_analytics_trends/data_lake/raw/Delhi.csv
🔍 Detected delimiter: ','
[WARN] File /home/neosoft/Desktop/wheather_analytics_trends/data_lake/raw/Delhi.csv has only 1 column after reading. Trying manual split.
[INFO] Overriding delimiter to ',' for /home/neosoft/Desktop/wheather_analytics_trends/data_lake/raw/Delhi.csv
📊 378 records found. Columns: ['name,datetime,tempmax,tempmin,temp,feelslikemax,feelslikemin,feelslike,dew,humidity,precip,precipprob,precipcover,preciptype,snow,snowdepth,windgust,windspeed,winddir,sealevelpressure,cloudcover,visibility,solarradiation,solarenergy,uvindex,severerisk,sunrise,sunset,moonphase,conditions,description,icon,stations']
📊 378 records found. Columns: ['name,datetime,tempmax,tempmin,temp,feelslikemax,feelslikemin,feelslike,dew,humidity,precip,precipprob,precipcover,preciptype,snow,snowdepth,windgust,windspeed,winddir,sealevelpressure,cloudcover,visibility,solar