In [229]:
from google.cloud import storage, bigquery
import pandas as pd
import os
import logging

In [281]:
# Configurations GCP

storage_client = storage.Client()
bq_client = bigquery.Client()

PROJECT_ID = "isi-group-m2-dsia"
BUCKET_NAME = "m2dsia-sylla-mame-diarra-data"
DATASET_ID = "dataset_mame_diarra_sylla"
TABLE_ID = f"{bq_client.project}.{DATASET_ID}.transactions"

In [282]:
bucket = storage_client.bucket(BUCKET_NAME)


In [283]:
logging.basicConfig(level=logging.INFO)

In [284]:
blobs = bucket.list_blobs(prefix="input/")
for blob in blobs:
    print(blob.name)

input/transactions.csv


In [285]:
def list_files_in_folder(folder):
    """Liste les fichiers dans un dossier GCS."""
    return [blob.name for blob in bucket.list_blobs(prefix=folder) if not blob.name.endswith("/")]

In [286]:
def download_file(gcs_path, local_path):
    """Télécharge un fichier de GCS vers le local."""
    blob = bucket.blob(gcs_path)
    blob.download_to_filename(local_path)
    logging.info(f"Téléchargé : {gcs_path} -> {local_path}")


In [287]:
# Fonction pour uploader un fichier local vers GCS
def upload_file(local_path, gcs_path):
    """Upload un fichier local vers GCS."""
    blob = bucket.blob(gcs_path)
    blob.upload_from_filename(local_path)
    logging.info(f"Uploadé : {local_path} -> {gcs_path}")

In [270]:
upload_blob('files/transactions.csv','input/transactions.csv')

Uploaded files/transactions.csv to input/transactions.csv


In [288]:
#Q3
def copy_from_blob(blob, folder):
    bucket.copy_blob(blob, bucket, folder+'/'+blob.name.split('/')[-1])


In [289]:
def copy_from_file(file_name, folder):
    blob = bucket.blob(file_name)
    bucket.copy_blob(blob, bucket, folder+'/'+file_name.split('/')[-1])


In [290]:
# Fonction pour valider et nettoyer les données
def validate_and_clean_data(local_file):
    """Valide et nettoie les données."""
    try:
        df = pd.read_csv(local_file)

        # Validation des colonnes requises
        required_columns = [
            "transaction_id", "product_name", "category", "price", "quantity", "date",
            "customer_name", "customer_email"
        ]
        if not all(col in df.columns for col in required_columns):
            raise ValueError("Colonnes manquantes dans le fichier d'entrée.")

        # Remplir les valeurs manquantes dans les colonnes obligatoires
        df["product_name"] = df["product_name"].fillna("Produit inconnu")
        df["category"] = df["category"].fillna("Catégorie inconnue")

        # Conversion des types de données
        df["price"] = pd.to_numeric(df["price"], errors="coerce")
        df["quantity"] = pd.to_numeric(df["quantity"], errors="coerce").fillna(0).astype(int)
        df["transaction_id"] = pd.to_numeric(df["transaction_id"], errors="coerce").fillna(0).astype(int)

        # Filtrer les lignes avec des dates invalides
        df["date"] = pd.to_datetime(df["date"], format="%Y-%m-%d", errors="coerce")
        valid_data = df.dropna(subset=["price", "quantity", "date"])
        invalid_data = df[~df.index.isin(valid_data.index)]

        # Enregistrer les fichiers nettoyés et rejetés
        cleaned_file = local_file.replace(".csv", "_cleaned.csv")
        error_file = local_file.replace(".csv", "_errors.csv")
        valid_data.to_csv(cleaned_file, index=False)
        invalid_data.to_csv(error_file, index=False)

        logging.info(f"Fichier nettoyé : {cleaned_file}")
        logging.info(f"Fichier d'erreurs : {error_file}")
        return cleaned_file, error_file
    except Exception as e:
        logging.error(f"Erreur de validation/cleaning : {e}")
        return None, None

In [291]:
# Fonction pour charger les données dans BigQuery
def load_to_bigquery(file_path):
    """Charge un fichier dans BigQuery."""
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        schema=[
            bigquery.SchemaField("transaction_id", "INTEGER", mode="NULLABLE"),
            bigquery.SchemaField("product_name", "STRING", mode="REQUIRED"),
            bigquery.SchemaField("category", "STRING", mode="REQUIRED"),
            bigquery.SchemaField("price", "FLOAT", mode="REQUIRED"),
            bigquery.SchemaField("quantity", "INTEGER", mode="REQUIRED"),
            bigquery.SchemaField("date", "DATE", mode="REQUIRED"),
            bigquery.SchemaField("customer_name", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("customer_email", "STRING", mode="NULLABLE"),
        ],
        write_disposition="WRITE_APPEND"  # Ajout des données sans écraser
    )
    uri = f"gs://{BUCKET_NAME}/{file_path}"
    load_job = bq_client.load_table_from_uri(uri, TABLE_ID, job_config=job_config)
    load_job.result()  # Attendre la fin du job
    logging.info(f"Données chargées dans BigQuery depuis : {uri}")


In [292]:
# Fonction pour déplacer un fichier vers le dossier done/
def move_to_done(file_path):
    """Déplace un fichier vers le dossier done/."""
    source_blob = bucket.blob(file_path)
    new_blob = bucket.blob(file_path.replace("clean", "done"))
    bucket.copy_blob(source_blob, bucket, new_blob.name)
    source_blob.delete()
    logging.info(f"Fichier déplacé : {file_path} -> {new_blob.name}")


In [296]:
import tempfile
import os

def process_files():
    """Traite les fichiers du dossier input/."""
    files = list_files_in_folder("input/")
    for file_path in files:
        # Créer un fichier temporaire
        with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as temp_file:
            local_input_path = temp_file.name

        # Télécharger le fichier dans le répertoire temporaire
        download_file(file_path, local_input_path)

        # Valider et nettoyer les données
        cleaned_file, error_file = validate_and_clean_data(local_input_path)
        
        if cleaned_file:
            upload_file(cleaned_file, f"clean/{os.path.basename(cleaned_file)}")
            load_to_bigquery(f"clean/{os.path.basename(cleaned_file)}")
            move_to_done(f"clean/{os.path.basename(cleaned_file)}")
        if error_file:
            upload_file(error_file, f"error/{os.path.basename(error_file)}")

        # Supprimer les fichiers temporaires
        os.remove(local_input_path)
        if cleaned_file and os.path.exists(cleaned_file):
            os.remove(cleaned_file)
        if error_file and os.path.exists(error_file):
            os.remove(error_file)

In [297]:
process_files()

INFO:root:Téléchargé : input/transactions.csv -> C:\Users\DIARRA\AppData\Local\Temp\tmpqmjsu9uu.csv
INFO:root:Fichier nettoyé : C:\Users\DIARRA\AppData\Local\Temp\tmpqmjsu9uu_cleaned.csv
INFO:root:Fichier d'erreurs : C:\Users\DIARRA\AppData\Local\Temp\tmpqmjsu9uu_errors.csv
INFO:root:Uploadé : C:\Users\DIARRA\AppData\Local\Temp\tmpqmjsu9uu_cleaned.csv -> clean/tmpqmjsu9uu_cleaned.csv
INFO:root:Données chargées dans BigQuery depuis : gs://m2dsia-sylla-mame-diarra-data/clean/tmpqmjsu9uu_cleaned.csv
INFO:root:Fichier déplacé : clean/tmpqmjsu9uu_cleaned.csv -> done/tmpqmjsu9uu_doneed.csv
INFO:root:Uploadé : C:\Users\DIARRA\AppData\Local\Temp\tmpqmjsu9uu_errors.csv -> error/tmpqmjsu9uu_errors.csv


In [None]:
# #fonction pour supprimer
# def supprimer_blob(nom_fichier_cloud):
#     blob = bucket.blob(nom_fichier_cloud)
#     blob.delete()

