In [8]:
from google.cloud import storage
import pandas as pd 
from google.cloud import bigquery 

In [9]:
PROJECT_ID = "isi-group-m2-dsia"
BUCKET_NAME = "m2dsia-attoisse-mohamed-data"
TABLE_ID = f"{PROJECT_ID}.dataset_attoisse_mohamed.transactions"
client = storage.Client(project=PROJECT_ID)
bucket = client.get_bucket(BUCKET_NAME)
storage_client = storage.Client()
bigquery_client = bigquery.Client()

In [10]:
blobs = bucket.list_blobs(prefix="input/")
for blob in blobs:
    print(blob.name)

input/
input/error1.csv
input/error2.csv
input/error3.csv
input/transaction.csv
input/transactions.csv


In [11]:
def upload_file(file_path, destination_blob_name):

    blob = bucket.blob(destination_blob_name)
    

    blob.upload_from_filename(file_path)
    
    print(f"Fichier '{file_path}' uploadé vers {destination_blob_name}.")

In [25]:
upload_file('files/errors/error3.csv','input/error3.csv')

Fichier 'files/errors/error3.csv' uploadé vers input/error3.csv.


In [13]:
def delete_file(filename):
    try:
        blob = bucket.blob(filename)
        if blob.exists():
            blob.delete()  
            print(f"Le fichier {filename} a été supprimé avec succès.")
        else:
            print(f"Le fichier {filename} n'existe pas dans le bucket.")
    except Exception as e:
        print(f"Une erreur s'est produite lors de la suppression du fichier {filename}: {e}")

In [None]:
delete_file('input/transactions.csv')

In [14]:
# 📥 Téléchargement d'un fichier depuis Cloud Storage
def download_blob(source_blob_name, destination_file_name):
    """Télécharge un fichier depuis Cloud Storage vers un fichier local temporaire."""
    bucket = storage_client.bucket(BUCKET_NAME)
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)
    print(f"📥 {source_blob_name} téléchargé en local.")

In [15]:
# 📤 Upload d'un fichier ou d'un contenu texte dans Cloud Storage
def upload_blob(content, destination_blob_name):
    """Upload un fichier ou une donnée string dans Cloud Storage."""
    bucket = storage_client.bucket(BUCKET_NAME)
    blob = bucket.blob(destination_blob_name)
    
    if isinstance(content, str):  
        blob.upload_from_string(content, content_type='text/csv')
    else:  
        blob.upload_from_string(content)
    
    print(f"📤 {destination_blob_name} uploadé dans {BUCKET_NAME}.")

In [16]:
def clean_data(df):
    """Nettoie les données du DataFrame en évitant les avertissements Pandas."""
    required_columns = ['transaction_id', 'product_name', 'category', 'price', 'quantity', 'date']
    
    if df[required_columns].isnull().any().any():
        raise ValueError("❌ Champs obligatoires manquants. Impossible de nettoyer les données.")

    df = df.copy()  
    df['customer_name'] = df['customer_name'].fillna(df['customer_name'].mode()[0])
    df['customer_email'] = df['customer_email'].fillna(df['customer_email'].mode()[0])

    df['transaction_id'] = df['transaction_id'].astype(int)
    df['product_name'] = df['product_name'].astype(str)
    df['category'] = df['category'].astype(str)
    df['price'] = df['price'].astype(float)
    df['quantity'] = df['quantity'].astype(int)
    df['date'] = pd.to_datetime(df['date']).dt.date

    df.drop_duplicates(inplace=True)

    return df

In [17]:
# 🛡️ Validation des données
def validate_data(df):
    """Vérifie que les données sont valides."""
    required_columns = ['transaction_id', 'product_name', 'category', 'price', 'quantity', 'date']
    if not all(col in df.columns for col in required_columns):
        return False
    
    if df[required_columns].isnull().any().any():
        return False
    
    try:
        df['transaction_id'] = df['transaction_id'].astype(int)
        df['product_name'] = df['product_name'].astype(str)
        df['category'] = df['category'].astype(str)
        df['price'] = df['price'].astype(float)
        df['quantity'] = df['quantity'].astype(int)
        df['date'] = pd.to_datetime(df['date']).dt.date
    except ValueError:
        return False
    
    return True

In [18]:
# 🚀 Chargement des données dans BigQuery
def load_to_bigquery(df):
    """Charge un DataFrame Pandas dans une table BigQuery."""
    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND  # Ajoute les nouvelles données
    )
    
    job = bigquery_client.load_table_from_dataframe(df, TABLE_ID, job_config=job_config)
    job.result()  
    
    print(f"✅ Données chargées dans BigQuery : {TABLE_ID}")

In [31]:
import tempfile
import os

# 🔄 Traitement d'un fichier Cloud Storage
def process_cloud_file(cloud_file_path):
    """Télécharge, traite et charge un fichier dans BigQuery."""
    local_tmp_dir = tempfile.gettempdir()  
    local_file_path = os.path.join(local_tmp_dir, cloud_file_path.split('/')[-1])
    download_blob(cloud_file_path, local_file_path)  # Téléchargement

    try:
        df = pd.read_csv(local_file_path)

        if not validate_data(df):
            raise ValueError("Données invalides. Champs obligatoires manquants ou types incorrects.")

        df_cleaned = clean_data(df)

        clean_file_path = f"clean/{cloud_file_path.split('/')[-1]}"
        upload_blob(df_cleaned.to_csv(index=False), clean_file_path)

        load_to_bigquery(df_cleaned)

        delete_file(clean_file_path)

        done_file_path = f"done/{cloud_file_path.split('/')[-1]}"
        upload_blob(open(local_file_path, 'rb').read(), done_file_path)

        print(f"✅ {cloud_file_path} traité et déplacé vers clean/ et done/.")
        
    except Exception as e:
        error_file_path = f"error/{cloud_file_path.split('/')[-1]}"
        upload_blob(open(local_file_path, 'rb').read(), error_file_path)

        print(f"❌ Erreur lors du traitement de {cloud_file_path}. Déplacé vers error/. Erreur : {e}")

In [32]:
# 🔍 Lister les fichiers dans `input/`
def list_files_in_input():
    """Liste tous les fichiers dans le dossier 'input/' du bucket."""
    bucket = storage_client.bucket(BUCKET_NAME)
    blobs = bucket.list_blobs(prefix="input/")  # Liste les fichiers dans input/
    
    files = [blob.name for blob in blobs if not blob.name.endswith("/")]
    
    return files

In [33]:
# 🏁 MAIN
def main():
    files = list_files_in_input()

    if not files:
        print("🔎 Aucun fichier à traiter dans input/.")
        return
    i = 0
    for file in files:
        i += 1
        print('processus du fichier',i)
        process_cloud_file(file)
        print()
    print('Fin du traitement')

In [34]:
main()

processus du fichier 1
📥 input/error1.csv téléchargé en local.
📤 error/error1.csv uploadé dans m2dsia-attoisse-mohamed-data.
❌ Erreur lors du traitement de input/error1.csv. Déplacé vers error/. Erreur : Données invalides. Champs obligatoires manquants ou types incorrects.

processus du fichier 2
📥 input/error2.csv téléchargé en local.
📤 error/error2.csv uploadé dans m2dsia-attoisse-mohamed-data.
❌ Erreur lors du traitement de input/error2.csv. Déplacé vers error/. Erreur : Données invalides. Champs obligatoires manquants ou types incorrects.

processus du fichier 3
📥 input/error3.csv téléchargé en local.
📤 error/error3.csv uploadé dans m2dsia-attoisse-mohamed-data.
❌ Erreur lors du traitement de input/error3.csv. Déplacé vers error/. Erreur : Données invalides. Champs obligatoires manquants ou types incorrects.

processus du fichier 4
📥 input/transaction.csv téléchargé en local.
📤 clean/transaction.csv uploadé dans m2dsia-attoisse-mohamed-data.
✅ Données chargées dans BigQuery : isi-g