In [148]:
!pip install google-cloud-storage google-cloud-bigquery


Defaulting to user installation because normal site-packages is not writeable


In [26]:
from google.cloud import storage, bigquery
import pandas as pd
import os
import tempfile

---
# Étape 1 : Configuration des ressources GCP

In [5]:
# Configuration de base
PROJECT_ID = "isi-group-m2-dsia"
BUCKET_NAME = "m2dsia-ndao-ibrahima-data"
DATASET_ID = f"{PROJECT_ID}.dataset_ndao_ibrahima"
TABLE_ID = f"{DATASET_ID}.auchan_sales"
storage_client = storage.Client()
bq_client = bigquery.Client()


In [3]:
client = storage.Client(project=PROJECT_ID)
bucket = client.get_bucket(BUCKET_NAME)

In [4]:
bucket

<Bucket: m2dsia-ndao-ibrahima-data>

In [29]:
# Fonction pour valider et nettoyer les données
def validate_and_clean_data(df):
    try:
        df = df.dropna(subset=["date", "price", "category", "qty", "product", "revenue"])
        df["price"] = df["price"].astype(float)
        df["qty"] = df["qty"].astype(int)
        df["date"] = pd.to_datetime(df["date"], errors='coerce').dt.date
        df = df.dropna()
        return df, True
    except Exception as e:
        print("Erreur de validation:", e)
        return df, False


In [30]:
# Fonction pour déplacer un fichier dans un autre dossier du bucket
def move_blob(blob_name, destination_folder):
    bucket = storage_client.bucket(BUCKET_NAME)
    blob = bucket.blob(blob_name)
    new_blob_name = f"{destination_folder}/{os.path.basename(blob_name)}"
    bucket.copy_blob(blob, bucket, new_blob_name)
    blob.delete()

In [34]:
# Traitement des fichiers
bucket = storage_client.bucket(BUCKET_NAME)
blobs = bucket.list_blobs(prefix="input/")
for blob in blobs:
    if not blob.name.endswith(".csv"):
        continue
    
    # Téléchargement du fichier CSV
    local_filename = os.path.join(tempfile.gettempdir(), os.path.basename(blob.name))
    blob.download_to_filename(local_filename)
    df = pd.read_csv(local_filename)
    
    # Validation et nettoyage
    df_cleaned, is_valid = validate_and_clean_data(df)
    if is_valid:
        df_cleaned.to_csv(local_filename, index=False)
        move_blob(blob.name, "clean")
        # Chargement dans BigQuery
        job_config = bigquery.LoadJobConfig(
            write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
            source_format=bigquery.SourceFormat.CSV,
            autodetect=True,
            skip_leading_rows=1
        )
        with open(local_filename, "rb") as source_file:
            job = bq_client.load_table_from_file(source_file, TABLE_ID, job_config=job_config)
        job.result()
        move_blob(f"clean/{os.path.basename(blob.name)}", "done")
    else:
        move_blob(blob.name, "error")

    os.remove(local_filename)

print("Pipeline de traitement terminé avec succès !")

Pipeline de traitement terminé avec succès !
