In [30]:
# ! pip install pyspark

In [31]:
import pyspark

In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, year
from pyspark import SparkContext, SparkConf
import sys
from datetime import datetime

# Création de la session Spark

In [33]:
spark = SparkSession.builder \
    .appName("Analyse des Ventes") \
    .master("local[*]") \
    .getOrCreate()

#### Création d'un exemple de données pour le test

In [34]:
donnees_test = """
2024-01-01 Paris Ordinateur 1200
2024-01-02 Lyon Telephone 800
2024-01-03 Paris Tablette 600
2024-01-04 Marseille Ordinateur 1100
2024-02-01 Lyon Telephone 750
2023-02-02 Paris Ordinateur 1300
"""

##### Écriture des données de test dans un fichier


In [35]:
with open("ventes.txt", "w") as f:
    f.write(donnees_test.strip())

### Exercice 1: Total des ventes par ville
#### Lecture du fichier

In [36]:
df = spark.read.csv("ventes.txt", sep=" ", schema="date STRING, ville STRING, produit STRING, prix DOUBLE")


#### Calcul du total des ventes par ville

In [37]:
total_ventes_ville = df.groupBy("ville") \
    .agg(sum("prix").alias("total_ventes")) \
    .orderBy("ville")

print("Résultat Exercice 1 - Total des ventes par ville:")
total_ventes_ville.show()

Résultat Exercice 1 - Total des ventes par ville:
+---------+------------+
|    ville|total_ventes|
+---------+------------+
|     Lyon|      1550.0|
|Marseille|      1100.0|
|    Paris|      3100.0|
+---------+------------+



### Exercice 2: Total des ventes par ville pour une année donnée


In [38]:
# Ajout d'une colonne année
df_with_year = df.withColumn("annee", year(df.date))

# Fonction pour calculer le total des ventes par ville pour une année spécifique
def calculer_ventes_annee(annee):
    resultat = df_with_year.filter(df_with_year.annee == annee) \
        .groupBy("ville") \
        .agg(sum("prix").alias(f"total_ventes_{annee}")) \
        .orderBy("ville")
    return resultat

# Exemple pour l'année 2024
print("\nRésultat Exercice 2 - Total des ventes par ville pour 2024:")
ventes_2024 = calculer_ventes_annee(2024)
ventes_2024.show()


Résultat Exercice 2 - Total des ventes par ville pour 2024:
+---------+-----------------+
|    ville|total_ventes_2024|
+---------+-----------------+
|     Lyon|           1550.0|
|Marseille|           1100.0|
|    Paris|           1800.0|
+---------+-----------------+



In [41]:
def create_test_file():
    """Crée un fichier de test avec des données d'exemple"""
    test_data = """2024-01-01 Paris Ordinateur 1200
2024-01-02 Lyon Telephone 800
2024-01-03 Paris Tablette 600
2024-02-01 Marseille Ordinateur 1100
2024-02-02 Lyon Telephone 750
2023-12-25 Paris Television 2000"""
    with open("sales_cluster.txt", "w") as f:
                f.write(test_data)

def init_spark(app_name, master=None):
    """Initialise Spark en fonction du mode (local ou cluster)"""
    if master:
        conf = SparkConf().setAppName(app_name).setMaster(master)
    else:
        conf = SparkConf().setAppName(app_name)
    
    return SparkContext(conf=conf)
def process_sales_by_city(sc, input_path):
    """Calcule le total des ventes par ville"""
    try:
        # Lecture du fichier
        sales_rdd = sc.textFile(input_path)

        # Transformation en paires (ville, prix)
        city_sales = sales_rdd \
            .map(lambda line: line.split()) \
            .map(lambda fields: (fields[1], float(fields[3]))) \
            .reduceByKey(lambda x, y: x + y)

        return city_sales
    except Exception as e:
        print(f"Erreur lors du traitement: {str(e)}")
        sys.exit(1)
def main_ex1(is_local=True):
    """Programme principal pour l'exercice 1"""
    if is_local:
        # Création du fichier de test en local
        create_test_file()
        master = "local[*]"
        input_path = "ventes.txt"
    else:
        # En mode cluster
        master = None
        input_path = "hdfs:///data/ventes.txt"

    # Initialisation de Spark
    sc = init_spark("Exercice1_VentesParVille", master)
    
    try:
        # Traitement des ventes
        city_sales = process_sales_by_city(sc, input_path)

        # Affichage des résultats
        print("\nRésultats des ventes totales par ville:")
        for city, total in city_sales.collect():
            print(f"Ville: {city}, Total des ventes: {total}€")

    finally:
        sc.stop()

# ================= Exercice 2 =================
def extract_year(date_str):
    """Extrait l'année d'une date au format YYYY-MM-DD"""
    try:
        return datetime.strptime(date_str, '%Y-%m-%d').year
    except ValueError as e:
        print(f"Erreur de format de date: {str(e)}")
        return None

def process_sales_by_city_year(sc, input_path, target_year):
    """Calcule le total des ventes par ville pour une année donnée"""
    try:
        # Lecture du fichier
        sales_rdd = sc.textFile(input_path)

        # Broadcast de l'année cible
        broadcast_year = sc.broadcast(target_year)

        # Transformation et filtrage par année
        yearly_sales = sales_rdd \
            .map(lambda line: line.split()) \
            .filter(lambda fields: extract_year(fields[0]) == broadcast_year.value) \
            .map(lambda fields: (fields[1], float(fields[3]))) \
            .reduceByKey(lambda x, y: x + y)

        return yearly_sales
    except Exception as e:
        print(f"Erreur lors du traitement: {str(e)}")
        sys.exit(1)

def main_ex2(target_year=2024, is_local=True):
    """Programme principal pour l'exercice 2"""
    if is_local:
        master = "local[*]"
        input_path = "sales_cluster.txt"
    else:
        master = None
        input_path = "hdfs:///data/sales_cluster.txt"

    # Initialisation de Spark
    sc = init_spark("Exercice2_VentesParVilleAnnee", master)
    
    try:
        # Traitement des ventes pour l'année cible
        yearly_sales = process_sales_by_city_year(sc, input_path, target_year)

        # Affichage des résultats
        print(f"\nRésultats des ventes par ville pour l'année {target_year}:")
        for city, total in yearly_sales.collect():
            print(f"Ville: {city}, Total des ventes: {total}€")

    finally:
        sc.stop()

if __name__ == "__main__":
    # Par défaut, exécution en mode local
    is_local = True
    
    if len(sys.argv) > 1:
        # Si des arguments sont fournis, on suppose une exécution sur cluster
        is_local = False
        
    # Exécution des deux exercices
    print("=== Exécution de l'exercice 1 ===")
    main_ex1(is_local)
    
    print("\n=== Exécution de l'exercice 2 ===")
    main_ex2(2024, is_local)

=== Exécution de l'exercice 1 ===


ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Analyse des Ventes, master=local[*]) created by getOrCreate at C:\Users\hp\AppData\Local\Temp\ipykernel_9660\2219761829.py:4 

# *** Arrêt de la session Spark ***

In [26]:
spark.stop()