In [None]:
import os
import json
import re
from datetime import datetime
import psycopg2
from psycopg2 import OperationalError
from dotenv import load_dotenv


Chargement des variables d'environnement

In [None]:
load_dotenv()


Configuration

In [None]:
DATA_RAW_TRUSTPILOT = os.getenv("DATA_RAW_TRUSTPILOT")
LOG_DIR = os.getenv("LOG_DIR")
SOCIETES_A_TRAITER = ['temu', 'tesla', 'chronopost', 'vinted']

class Logger:
    def __init__(self, filepath):
        self.filepath = filepath
        self.log_lines = []
    
    def print(self, msg):
        print(msg)
        self.log_lines.append(msg)
    
    def save(self):
        try:
            with open(self.filepath, "w", encoding="utf-8") as f:
                f.write("\n".join(self.log_lines))
        except Exception as e:
            print(f"Erreur sauvegarde log : {e}")

def ensure_log_dir():
    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR)

def get_log_file():
    return os.path.join(LOG_DIR, f"import_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")

def connect_db():
    try:
        return psycopg2.connect(
            dbname=os.getenv("POSTGRES_DB"),
            user=os.getenv("POSTGRES_USER"),
            password=os.getenv("POSTGRES_PASSWORD"),
            host=os.getenv("POSTGRES_HOST"),
            port=os.getenv("POSTGRES_PORT")
        )
    except OperationalError as e:
        raise RuntimeError(f"Erreur connexion PostgreSQL : {e}")

def truncate_tables(cur, logger):
    try:
        cur.execute("TRUNCATE TABLE avis_trustpilot, societe CASCADE;")
        logger.print("🗑️ Tables vidées (avis_trustpilot et societe)")
    except Exception as e:
        logger.print(f"Erreur TRUNCATE tables : {e}")
        raise

def safe_int(val):
    if val is None:
        return 0
    if isinstance(val, (int, float)):
        return int(val)
    try:
        cleaned = ''.join(c for c in str(val) if c.isdigit() or c in '.-')
        return int(float(cleaned.split()[0])) if cleaned.split() else 0
    except (ValueError, TypeError):
        return 0

def insert_societe(cur, societe_data, logger):
    repartition = societe_data.get("repartition_avis", {})
    
    notes = {
        '1': safe_int(repartition.get("1") or repartition.get("1 étoile") or repartition.get("1 star")),
        '2': safe_int(repartition.get("2") or repartition.get("2 étoiles") or repartition.get("2 stars")),
        '3': safe_int(repartition.get("3") or repartition.get("3 étoiles") or repartition.get("3 stars")),
        '4': safe_int(repartition.get("4") or repartition.get("4 étoiles") or repartition.get("4 stars")),
        '5': safe_int(repartition.get("5") or repartition.get("5 étoiles") or repartition.get("5 stars"))
    }

    date_extraction = None
    if societe_data.get("date_extraction"):
        try:
            date_extraction = datetime.strptime(societe_data["date_extraction"], "%Y-%m-%d %H:%M:%S")
        except ValueError:
            logger.print(f"⚠ Format date invalide : {societe_data.get('date_extraction')}")

    try: