In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum
import os
from datetime import datetime
import glob

In [16]:
def create_spark_session():
    """Crée une session Spark"""
    return SparkSession.builder \
        .appName("LogProcessing") \
        .getOrCreate()


In [17]:
def get_hour_folders(base_path):
    """Récupère tous les dossiers horaires"""
    # Les dossiers sont au format YYYYMMDDHH
    return [d for d in os.listdir(base_path) if len(d) == 10 and d.isdigit()]

In [19]:
def process_log_file(spark, file_path):
    """Traite un fichier log individuel"""
    try:
        # Lit le fichier avec Spark
        df = spark.read.option("delimiter", "|").csv(file_path)
        
        # Renomme les colonnes pour correspondre au format des logs
        df = df.toDF("timestamp", "id", "product_name", "currency", "price", "action")
        
        # Filtre les lignes avec action 'purchase' et convertit le prix en double
        df = df.filter(df.action == "purchase") \
              .withColumn("price", df.price.cast("double"))
        
        # Groupe par produit et somme les prix
        result_df = df.groupBy("product_name") \
                     .agg(sum("price").alias("total_price"))
        
        return result_df
    except Exception as e:
        print(f"Erreur lors du traitement du fichier {file_path}: {str(e)}")
        return None

In [20]:
def format_datetime(folder_name):
    """Formate le nom du dossier en date lisible"""
    # Convertit YYYYMMDDHH en YYYY/MM/DD HH
    year = folder_name[:4]
    month = folder_name[4:6]
    day = folder_name[6:8]
    hour = folder_name[8:10]
    return f"{year}/{month}/{day} {hour}"

In [14]:
def main():
    # Configuration du logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    
    # Paramètres
    base_input_path = "./logs"
    output_path = "./output"
    
    # Obtention de l'heure courante au format YYYYMMDDHH
    current_hour = datetime.now().strftime("%Y%m%d%H")
    
    # Création de la session Spark
    spark = create_spark_session()
    
    try:
        # Construction du chemin d'entrée
        input_path = get_input_path(base_input_path, current_hour)
        
        # Traitement des logs
        process_logs(spark, input_path, output_path, current_hour)
        
    finally:
        spark.stop()

if __name__ == "__main__":
    main()

2024-11-10 11:59:50,242 - ERROR - Erreur lors du traitement pour 2024111011: [PATH_NOT_FOUND] Path does not exist: file:/home/jovyan/work/logs/2024/11/10/11/*.log.


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/jovyan/work/logs/2024/11/10/11/*.log.