In [9]:
# Importation des bibliothèques
import pandas as pd
from google.cloud import storage, bigquery

# Configuration des variables
PROJECT_ID = "isi-groupe-m2-dsia"
BUCKET_NAME = "m2dsia-dublond-junior-data"
DATASET_ID = "dataset_dublond_junior"
TABLE_ID = "transactions"

# Chemins des dossiers
input_folder = 'input/'
clean_folder = 'clean/'
error_folder = 'error/'
done_folder = 'done/'

# Clients GCP
storage_client = storage.Client(project=PROJECT_ID)
bigquery_client = bigquery.Client(project=PROJECT_ID)
bucket = storage_client.get_bucket(BUCKET_NAME)

# Message de confirmation
print("Configuration initiale terminée.")
print(f"Bucket utilisé : {BUCKET_NAME}")
print(f"Dataset BigQuery : {DATASET_ID}.{TABLE_ID}")

Configuration initiale terminée.
Bucket utilisé : m2dsia-dublond-junior-data
Dataset BigQuery : dataset_dublond_junior.transactions


In [10]:
# Fonction pour téléverser un fichier dans Cloud Storage
def upload_blob(fichier_a_transferer, le_nom_du_fichier_dans_le_cloud):
    blob = bucket.blob(le_nom_du_fichier_dans_le_cloud)
    blob.upload_from_filename(fichier_a_transferer)
    print(f"Fichier {fichier_a_transferer} téléversé avec succès vers {le_nom_du_fichier_dans_le_cloud}.")

# Fonction pour lister les fichiers dans un dossier
def get_blobs_from_bucket(folder):
    blobs = bucket.list_blobs(prefix=folder)
    print(f"Liste des fichiers dans {folder}:")
    for blob in blobs:
        print(f" - {blob.name}")
    return bucket.list_blobs(prefix=folder)  # Retourner la liste pour réutilisation

# Fonction pour copier un fichier vers un autre dossier
def copy_from_blob(blob, destination_folder):
    bucket.copy_blob(blob, bucket, destination_folder + '/' + blob.name.split('/')[-1])
    print(f"Fichier {blob.name} copié avec succès vers {destination_folder}.")

# Fonction pour supprimer un fichier
def delete_file(filename):
    blob = bucket.blob(filename)
    blob.delete()
    print(f"Fichier {filename} supprimé avec succès.")

# Message de confirmation
print("Fonctions utilitaires définies avec succès.")

Fonctions utilitaires définies avec succès.


In [11]:
# Fonction pour valider et nettoyer les données
def validate_and_clean_data(file_path):
    try:
        # Lire le fichier CSV
        df = pd.read_csv(file_path)
        
        # Validation des colonnes
        required_columns = ['transaction_id', 'product_name', 'category', 'price', 'quantity', 'date', 'customer_name', 'customer_email']
        if not all(column in df.columns for column in required_columns):
            raise ValueError("Colonnes manquantes dans le fichier.")
        
        # Nettoyage des données
        df['transaction_id'] = df['transaction_id'].astype('int64')
        df['product_name'] = df['product_name'].astype('string')
        df['category'] = df['category'].astype('string')
        df['price'] = df['price'].astype('float64')
        df['quantity'] = df['quantity'].astype('int64')
        df['date'] = pd.to_datetime(df['date']).dt.date
        df['customer_name'] = df['customer_name'].astype('string')
        df['customer_email'] = df['customer_email'].astype('string')
        
        print(f"Fichier {file_path} validé et nettoyé avec succès.")
        return df, None
    except Exception as e:
        print(f"Erreur lors de la validation ou du nettoyage du fichier {file_path}: {e}")
        return None, str(e)

# Message de confirmation
print("Fonction de validation et nettoyage définie avec succès.")

Fonction de validation et nettoyage définie avec succès.


In [12]:
# Traitement des fichiers dans le dossier input/
def process_files():
    blobs = get_blobs_from_bucket(input_folder)
    for blob in blobs:
        if blob.name.endswith('.csv'):
            file_name = blob.name.split('/')[-1]
            file_path = f"gs://{BUCKET_NAME}/{blob.name}"
            
            # Validation et nettoyage
            df, error = validate_and_clean_data(file_path)
            if df is not None:
                # Déplacer vers clean/
                copy_from_blob(blob, clean_folder)
                delete_file(blob.name)
                print(f"Fichier {file_name} traité avec succès et déplacé vers clean/.")
            else:
                # Déplacer vers error/
                copy_from_blob(blob, error_folder)
                delete_file(blob.name)
                print(f"Fichier {file_name} contient des erreurs et a été déplacé vers error/.")

# Exécution du traitement
process_files()

Liste des fichiers dans input/:


In [13]:
# Fonction pour charger les données dans BigQuery
def load_data_to_bigquery():
    blobs = get_blobs_from_bucket(clean_folder)
    for blob in blobs:
        if blob.name.endswith('.csv'):
            file_name = blob.name.split('/')[-1]
            file_path = f"gs://{BUCKET_NAME}/{blob.name}"
            
            # Charger le fichier nettoyé dans BigQuery
            table_ref = bigquery_client.dataset(DATASET_ID).table(TABLE_ID)
            job_config = bigquery.LoadJobConfig(
                source_format=bigquery.SourceFormat.CSV,
                skip_leading_rows=1,
                autodetect=True,
                write_disposition='WRITE_APPEND'
            )
            
            try:
                with open(file_path, 'rb') as source_file:
                    job = bigquery_client.load_table_from_file(source_file, table_ref, job_config=job_config)
                    job.result()  # Attendre la fin du job
                print(f"Fichier {file_name} chargé avec succès dans BigQuery.")
                
                # Déplacer le fichier vers done/
                copy_from_blob(blob, done_folder)
                delete_file(blob.name)
                print(f"Fichier {file_name} déplacé avec succès vers done/.")
            except Exception as e:
                print(f"Erreur lors du chargement du fichier {file_name} dans BigQuery: {e}")

# Exécution du chargement
load_data_to_bigquery()

Liste des fichiers dans clean/:
 - clean/


In [15]:
# Interaction utilisateur
def main():
    print("1. Traiter les fichiers dans le dossier input/")
    print("2. Charger les données dans BigQuery")
    choix = input("Choisissez une option (1 ou 2) : ")
    
    if choix == '1':
        process_files()
    elif choix == '2':
        load_data_to_bigquery()
    else:
        print("Option invalide. Veuillez choisir 1 ou 2.")

# Exécution du programme
main()

1. Traiter les fichiers dans le dossier input/
2. Charger les données dans BigQuery
Liste des fichiers dans clean/:
 - clean/
