In [None]:
import csv
import sys
csv.field_size_limit(sys.maxsize)
import json
import psycopg2
import pandas as pd
from datetime import datetime
import os
import glob
import re

##########################################
# Connexion à la base de données PostgreSQL
##########################################
conn = psycopg2.connect(
    dbname="stocks_db",
    user="postgres",
    password="postgres",
    host="db"
)
cur = conn.cursor()

# Récupérer la liste des colonnes existantes dans la table stocks
cur.execute("SELECT column_name FROM information_schema.columns WHERE table_name = 'stocks';")
db_columns_map = {row[0].lower(): row[0] for row in cur.fetchall()}

##########################################
# Définition des colonnes attendues et fonctions utilitaires
##########################################

expected_columns = {
    "ticker", "longName", "shortName", "sector", "industry", "country", "fullTimeEmployees",
    "city", "state", "zip", "website", "phone", "longBusinessSummary", "exchange", "quoteType",
    "marketCap", "enterpriseValue", "forwardEps", "trailingPE", "dividendRate", "dividendYield",
    "beta", "priceToBook", "pegRatio", "fiftyDayAverage", "twoHundredDayAverage", "fiftyTwoWeekHigh",
    "fiftyTwoWeekLow", "52WeekChange", "SandP52WeekChange", "sharesOutstanding", "floatShares", "bookValue", "exDividendDate",
    "earningsTimestamp", "earningsQuarterlyGrowth", "revenueQuarterlyGrowth", "lastFiscalYearEnd",
    "nextFiscalYearEnd", "mostRecentQuarter", "shortRatio", "sharesShort", "sharesPercentSharesOut",
    "priceHint", "regularMarketOpen", "regularMarketDayHigh", "regularMarketDayLow", "regularMarketVolume",
    "open", "high", "low", "close", "volume", "indice", "historical_start", "historical_end"
}

# Mapping attendu en ignorant la casse
expected_columns_map = {col.lower(): col for col in expected_columns}

# Ensemble des colonnes de type date (en minuscules)
date_columns = {"exdividenddate", "lastfiscalyearend", "nextfiscalyearend", "mostrecentquarter", "historical_start", "historical_end"}

def convert_date(value):
    if value is None:
        return None
    if not isinstance(value, str):
        value = str(value)
    if not value or value.strip() in ["", "N/A", "null"]:
        return None

    try:
        datetime.strptime(value, "%Y-%m-%d")
        return value
    except ValueError:
        pass

    try:
        dt = datetime.fromisoformat(value)
        return dt.strftime("%Y-%m-%d")
    except Exception:
        pass

    try:
        ts = float(value)
        ts_int = int(ts)
        if len(str(ts_int)) > 10:
            ts_int = ts_int / 1000
        return datetime.fromtimestamp(ts_int).strftime("%Y-%m-%d")
    except Exception as e:
        print(f"Erreur lors de la conversion du timestamp {value}: {e}")
        return None

def convert_value(key, value):
    if value is None:
        return None
    if isinstance(value, str) and value.strip() == "":
        return None
    if key in date_columns:
        return convert_date(value)
    try:
        stripped = value.strip() if isinstance(value, str) else value
        num = float(stripped)
        if num.is_integer():
            return int(num)
        return num
    except Exception:
        return value

##########################################
# Fonctions pour le traitement des fichiers
##########################################

def extract_date_from_filename(filepath, base_pattern):
    filename = os.path.basename(filepath)
    pattern = re.escape(base_pattern) + r"_(\d{8})\.csv$"
    match = re.search(pattern, filename)
    if match:
        date_str = match.group(1)
        return datetime.strptime(date_str, "%Y%m%d")
    return None

def get_new_files(directory, base_pattern, last_date_file):
    """
    Récupère tous les fichiers dont le nom correspond à 'base_pattern_*.csv'
    en ignorant la date du fichier (on lit tous les fichiers trouvés).
    Retourne une liste de tuples (date, chemin_du_fichier) triés par date.
    """
    pattern = os.path.join(directory, base_pattern + "_*.csv")
    files = glob.glob(pattern)
    new_files = []
    for file in files:
        file_date = extract_date_from_filename(file, base_pattern)
        if file_date:
            new_files.append((file_date, file))
    new_files.sort(key=lambda x: x[0])
    return new_files, None

##########################################
# Récupération des fichiers à traiter
##########################################

# --- Fichiers de Stocks ---
stocks_files = []  # Liste de tuples (date, chemin, type) où type est "sp500" ou "tsx"
for base, file_type in [("sp500_stocks_info", "sp500")]:
    last_date_file = os.path.join("stocks", f"last_processed_{file_type}.txt")
    new_files, _ = get_new_files("stocks", base, last_date_file)
    for file_date, file_path in new_files:
        stocks_files.append((file_date, file_path, file_type))
stocks_files.sort(key=lambda x: x[0])


##########################################
# Traitement et insertion dans la base de données
##########################################

# --- Insertion des Stocks ---
for file_date, csv_filename, file_type in stocks_files:
    with open(csv_filename, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            stock_data = {}
            for key, value in row.items():
                lower_key = key.lower()
                if lower_key in expected_columns_map and lower_key in db_columns_map:
                    db_col = db_columns_map[lower_key]
                    stock_data[db_col] = convert_value(lower_key, value)
            if file_type == "tsx":
                stock_data[db_columns_map["indice"]] = "TSX"
            historical_json = row.get("historical", None)
            if stock_data:
                cols = list(stock_data.keys())
                cols_quoted = ','.join([f'"{col}"' for col in cols])
                values = [stock_data[col] for col in cols]
                placeholders = ','.join(['%s'] * len(values))
                insert_query = (
                    f"INSERT INTO stocks ({cols_quoted}) "
                    f"VALUES ({placeholders}) "
                    "ON CONFLICT (ticker) DO NOTHING;"
                )
                try:
                    cur.execute(insert_query, values)
                except Exception as e:
                    print(f"Erreur lors de l'insertion dans stocks pour {row.get('ticker')}: {e}")
            if historical_json and historical_json != "N/A":
                try:
                    historical_data = json.loads(historical_json)
                    for record in historical_data:
                        date_val = convert_date(record.get("Date"))
                        if not date_val:
                            continue
                        open_val = record.get("Open")
                        high_val = record.get("High")
                        low_val = record.get("Low")
                        close_val = record.get("Close")
                        volume_val = record.get("Volume")
                        insert_prices_query = """
                        INSERT INTO stock_prices (ticker, date, open, high, low, close, volume)
                        VALUES (%s, %s, %s, %s, %s, %s, %s)
                        """
                        ticker_val = stock_data.get("ticker", row.get("ticker"))
                        cur.execute(insert_prices_query, (
                            ticker_val,
                            date_val,
                            open_val,
                            high_val,
                            low_val,
                            close_val,
                            volume_val
                        ))
                except Exception as e:
                    print(f"Erreur lors du traitement des données historiques pour {row.get('ticker')}: {e}")
            conn.commit()

cur.close()
conn.close()
print("✅ Chargement terminé avec succès.")