In [15]:
import pandas as pd
import os
import boto3
from dotenv import load_dotenv
import psycopg2
from sqlalchemy import create_engine, text
from io import StringIO


Récupération des connections

In [16]:
load_dotenv("secrets.env")

AWS_KEY = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET = os.getenv("AWS_SECRET_ACCESS_KEY")
BUCKET_NAME = os.getenv("AWS_BUCKET_NAME")

s3 = boto3.client("s3", 
                  aws_access_key_id=AWS_KEY, 
                  aws_secret_access_key=AWS_SECRET)


## export des csv sur le bucket

In [None]:
rep_csv = "data"  # ton dossier local avec les CSV

# Connexion à S3
s3 = boto3.client("s3")

for file in os.listdir(rep_csv):
    if file.endswith(".csv"):
        local_path = os.path.join(rep_csv, file)
        s3_path = f"export_csv/{file}"  # dossier distant S3

        s3.upload_file(local_path, BUCKET_NAME, s3_path)
        print(f"Upload de {file} terminé vers s3://{BUCKET_NAME}/{s3_path}")


## Verif connexion NeonDB

In [18]:
NEON_USER=os.getenv("NEON_USER")
NEON_PASSWORD=os.getenv("NEON_PASSWORD")
NEON_HOST=os.getenv("NEON_HOST")
NEON_DBNAME=os.getenv("NEON_DBNAME")

In [19]:
conn = psycopg2.connect(
    dbname=NEON_DBNAME,
    user=NEON_USER,
    password=NEON_PASSWORD,
    host=os.getenv("NEON_HOST"),
    port="5432",
    sslmode="require")

## Connexion à neonDB via SQLAlchemy

In [20]:
engine = create_engine(
    f"postgresql+psycopg2://{NEON_USER}:{NEON_PASSWORD}@{NEON_HOST}/{NEON_DBNAME}"
)

## Création ou chargement des données depuis S3

In [None]:
response = s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix="export_csv/")

for obj in response.get("Contents", []):
    key = obj["Key"]
    if not key.endswith(".csv"):
        continue

    table_name = os.path.splitext(os.path.basename(key))[0]
    print(f"\nFichier détecté : {key} → Table '{table_name}'")

    # Lecture du CSV depuis S3
    csv_obj = s3.get_object(Bucket=BUCKET_NAME, Key=key)
    body = csv_obj["Body"].read().decode("utf-8")
    df = pd.read_csv(StringIO(body), sep=";", engine="python")

    # Vérifier si la table existe déjà
    with engine.connect() as conn:
        exists = conn.execute(
            text("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = :t)"),
            {"t": table_name.lower()},
        ).scalar()

    # pour les autres, on supprime si existe et on créé
    with engine.connect() as conn:
        if exists:
            conn.execute(text(f"DROP TABLE IF EXISTS {table_name} CASCADE;"))
            conn.commit()
            print(f"Ancienne table '{table_name}' supprimée.")
    df.to_sql(table_name, engine, index=False)
    print(f"Table '{table_name}' créée et données insérées.")


