In [21]:
import os
import pandas as pd
from google.cloud import storage
from google.cloud import bigquery

In [22]:
# Configuration
BUCKET_NAME = "m2-dsia-mamadou-moustapha-diallo-data"
PROJECT_ID = "isi-group-m2-dsia"
DATASET_ID = "dataset_mamadou_mustafa_diallo"
TABLE_ID = "transactions"

In [23]:
# Clients GCP
storage_client = storage.Client(project=PROJECT_ID)
bigquery_client = bigquery.Client(project=PROJECT_ID)
bucket_client = storage_client.get_bucket(BUCKET_NAME)

In [24]:
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    # Télécharge un fichier vers un bucket GCS.
    blob = bucket_client.blob(destination_blob_name)
    
    blob.upload_from_filename(source_file_name)
    
    print(f"Fichier {source_file_name} téléchargé vers {destination_blob_name} dans le bucket {bucket_name}.")

In [25]:
# Télécharger le fichier
upload_to_gcs("m2-dsia-mamadou-moustapha-diallo-data",
              "files/transactions.csv",
              "input/transactions.csv")

Fichier files/transactions.csv téléchargé vers input/transactions.csv dans le bucket m2-dsia-mamadou-moustapha-diallo-data.


In [26]:
def validate_and_clean(file_path):
    try:
        # Charger le fichier
        df = pd.read_csv(file_path)
        
        # Validation des colonnes
        required_columns = ["transaction_id", "product_name", "category", "price", "quantity", "date"]
        if not all(column in df.columns for column in required_columns):
            return None, "Colonnes manquantes"
        
        # Nettoyage des données
        df = df.dropna(subset=required_columns)
        
        # Vérifier que le prix et la quantité ne sont pas nuls ou égaux à zéro
        df = df[(df["price"] > 0) & (df["quantity"] > 0)]
        
        # Convertir la colonne "date" en format date et supprimer les lignes invalides
        df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.date
        df = df[df["date"].notna()]
        
        return df, None
    except Exception as e:
        return None, str(e)

In [27]:
def move_file(source_folder, destination_folder, file_name):
    # Déplace un fichier dans un autre dossier.
    blob = bucket_client.blob(f"{source_folder}/{file_name}")
    new_blob = bucket_client.rename_blob(blob, f"{destination_folder}/{file_name}")
    print(f"Fichier {file_name} déplacé vers {destination_folder}.")

In [28]:
def load_to_bigquery(df):
    # Charge les données nettoyées dans BigQuery.
    table_ref = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
    job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND",
        autodetect=True,
        source_format=bigquery.SourceFormat.CSV,
    )
    
    # Charger le DataFrame directement dans BigQuery
    job = bigquery_client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    job.result()  # Attendre la fin du job
    print(f"Données chargées dans BigQuery.")

In [30]:
def process_files():
    # Traite les fichiers dans le dossier input.
    blobs = bucket_client.list_blobs(prefix="input/")
    
    for blob in blobs:
        file_name = os.path.basename(blob.name)
        print(file_name)
        # Lire le fichier depuis GCS
        file_content = blob.download_as_text()
        print(file_content)
        
        # Valider et nettoyer les données
        df, error = validate_and_clean(file_content)
        if error:
            print(f"Erreur dans le fichier {file_name}: {error}")
            move_file("input", "error", file_name)
        else:
            # Sauvegarder les données nettoyées dans GCS
            clean_file_name = f"clean/{file_name}"
            clean_blob = bucket_client.blob(clean_file_name)
            clean_blob.upload_from_string(df.to_csv(index=False), content_type="text/csv")
            
            # Déplacer le fichier original vers "done"
            move_file("input", "done", file_name)
            
            # Charger les données dans BigQuery
            load_to_bigquery(df)

In [31]:
if __name__ == "__main__":
    process_files()

transactions.csv
transaction_id,product_name,category,price,quantity,date,customer_name,customer_email
1,Running Shoes,Sports,560.97,9,2024-03-10,Cameron Martin,akeller@yahoo.com
2,Office Chair,Furniture,1371.39,7,2024-12-31,Joshua Ellis,riveranicholas@hotmail.com
3,Tablet,Furniture,620.46,5,2024-06-22,Jessica Price,nathanhill@johnson-montes.com
4,Headphones,Electronics,827.16,7,2024-01-30,,smithlawrence@gmail.com
5,Gaming Laptop,Accessories,1663.99,7,2024-11-11,Melissa Snow,nicolenguyen@thompson.net
6,Gaming Laptop,Accessories,700.67,10,2024-08-16,,melissaadams@rogers.org
7,Tablet,Accessories,868.09,10,2024-11-05,Adam Hughes,ocarr@hotmail.com
8,Wireless Mouse,Stationery,1053.81,7,2024-07-21,Brandon Hess,miranda35@contreras.org
9,Gaming Laptop,Stationery,1891.53,9,2024-09-03,,vancerichard@gmail.com
10,Wireless Mouse,Electronics,1547.25,4,2024-01-28,Elizabeth Cook,olyons@shelton-harvey.org
11,Gaming Laptop,Appliances,1692.78,2,2024-09-14,Taylor Herman,novaktimothy@hotmail.com
12,Gaming 