# Importation des bibliothèques et modules

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf 
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F

# Instancier le client Spark Session

In [2]:
conf = SparkConf().set('spark.driver.host','127.0.0.1')
sc = SparkContext(master='local', appName='myAppName',conf=conf)
spark = SparkSession.builder\
                    .master("local[*]")\
                    .appName("CreateTable")\
                    .getOrCreate()

# Création des tables

In [3]:
Adresse = sc.parallelize([["75","Paris"], ["51","Reims"], ["06","Nice"]]).toDF(("id", "Adress"))
Adresse.show()
Adresse.createOrReplaceTempView("Adresse_sql")

+---+------+
| id|Adress|
+---+------+
| 75| Paris|
| 51| Reims|
| 06|  Nice|
+---+------+



In [4]:
Prix = sc.parallelize([["1","10"], ["2","20"], ["3","70"]]).toDF(("IDM", "Prix"))
Prix.show()
Prix.createOrReplaceTempView("Prix_sql")

+---+----+
|IDM|Prix|
+---+----+
|  1|  10|
|  2|  20|
|  3|  70|
+---+----+



In [5]:
Produit = sc.parallelize([["1","Sony","Souris"], ["2","Samsung","Clavier"], ["3","Apple","Ecran"]]).toDF(("IDM", "Marque","Produit"))
Produit.show()
Produit.createOrReplaceTempView("Produit_sql")

+---+-------+-------+
|IDM| Marque|Produit|
+---+-------+-------+
|  1|   Sony| Souris|
|  2|Samsung|Clavier|
|  3|  Apple|  Ecran|
+---+-------+-------+



In [6]:
Vente = sc.parallelize([["3","75","toto","2","2021"], ["2","75","toto","1","2020"], ["1","51","toto","5","2021"],["2","51","fofo","6","2021"]]).toDF(("Article", "Agence","Client","Quantite","Annee"))
Vente.show()
Vente.createOrReplaceTempView("Vente_sql")

+-------+------+------+--------+-----+
|Article|Agence|Client|Quantite|Annee|
+-------+------+------+--------+-----+
|      3|    75|  toto|       2| 2021|
|      2|    75|  toto|       1| 2020|
|      1|    51|  toto|       5| 2021|
|      2|    51|  fofo|       6| 2021|
+-------+------+------+--------+-----+



# Déterminer les prix de chaque produit

In [7]:
#DSL
Produit.join(Prix,["Idm"]).drop(col("Idm")).show()


#SQL
spark.sql("""SELECT Marque, Produit, Prix from Produit_sql
    left join Prix_sql
    on Produit_sql.IDM=Prix_sql.IDM""").show()

+-------+-------+----+
| Marque|Produit|Prix|
+-------+-------+----+
|  Apple|  Ecran|  70|
|   Sony| Souris|  10|
|Samsung|Clavier|  20|
+-------+-------+----+

+-------+-------+----+
| Marque|Produit|Prix|
+-------+-------+----+
|  Apple|  Ecran|  70|
|   Sony| Souris|  10|
|Samsung|Clavier|  20|
+-------+-------+----+



# Déterminer les articles que toto a acheté en 2021

In [11]:
#DSL
Vente.join(Produit, col("Article") == col("Idm"))\
     .select("Client","Produit","Marque","Annee")\
     .filter(col("Annee") == "2021")\
     .filter(col("Client") == "toto")\
     .show()

#SQL
spark.sql("""SELECT Client, Produit, Marque, Annee from Produit_sql
        left join Vente_sql
        on Produit_sql.IDM=Vente_sql.Article
        WHERE Vente_sql.Client="toto"
        and Vente_sql.Annee="2021" """).show()

+------+-------+------+-----+
|Client|Produit|Marque|Annee|
+------+-------+------+-----+
|  toto|  Ecran| Apple| 2021|
|  toto| Souris|  Sony| 2021|
+------+-------+------+-----+

+------+-------+------+-----+
|Client|Produit|Marque|Annee|
+------+-------+------+-----+
|  toto|  Ecran| Apple| 2021|
|  toto| Souris|  Sony| 2021|
+------+-------+------+-----+



# Montant dépensé par toto par agence

In [23]:
#DSL
df = Vente.join(Prix, col("Article") == col("Idm"))\
     .filter(col("client") == "toto")\
     .withColumn("PrixTotal", col("Quantite")*col("Prix").cast(IntegerType()))\
     .groupBy("Agence")\
     .agg(F.sum('PrixTotal').alias('Montant'))\
     .show()

#SQL
spark.sql("""SELECT Client, Agence, sum(Prix*Quantite) as Montant from Vente_sql
        left join Prix_sql
        on Vente_sql.Article=Prix_sql.IDM
        WHERE Vente_sql.Client="toto" 
        GROUP BY Vente_sql.agence, Vente_sql.client""").show()

+------+-------+
|Agence|Montant|
+------+-------+
|    51|   50.0|
|    75|  160.0|
+------+-------+

+------+------+-------+
|Client|Agence|Montant|
+------+------+-------+
|  toto|    51|   50.0|
|  toto|    75|  160.0|
+------+------+-------+



# Déterminer le CA pour chaque agence en 2021

In [29]:
#DSL
Vente.join(Prix, col("Article") == col("Idm"))\
    .filter(col("Annee") == 2021)\
    .groupBy(col("Agence"))\
    .agg(F.sum(col("quantite")*col("Prix")).alias("CA"))\
    .show()

#SQL
spark.sql("""SELECT Annee, Agence, sum(Prix*Quantite) as CA from Vente_sql
     left join Prix_sql
     on Vente_sql.Article=Prix_sql.IDM
     WHERE Vente_sql.Annee="2021"
     GROUP BY Vente_sql.agence, Vente_sql.Annee """).show()

+------+-----+
|Agence|   CA|
+------+-----+
|    51|170.0|
|    75|140.0|
+------+-----+

+-----+------+-----+
|Annee|Agence|   CA|
+-----+------+-----+
| 2021|    75|140.0|
| 2021|    51|170.0|
+-----+------+-----+



# Quel est le produit qui a le plus été acheté en 2021 ? 

In [36]:
#DSL
Vente.join(Produit, col("Article") == col("Idm"))\
    .filter(col("Annee") == 2021)\
    .groupBy(col("Produit"))\
    .agg(F.max("Quantite").alias("Quantite"))\
    .sort("Quantite", ascending=False)\
    .limit(1)\
    .show()

#SQL
spark.sql("""SELECT Produit, Quantite from Vente_sql
        left join Produit_sql
        on Vente_sql.Article=Produit_sql.IDM
        WHERE Vente_sql.Annee="2021"
        AND Quantite=(SELECT MAX(Quantite) FROM Vente_sql)
        """).show()

+-------+--------+
|Produit|Quantite|
+-------+--------+
|Clavier|       6|
+-------+--------+

+-------+--------+
|Produit|Quantite|
+-------+--------+
|Clavier|       6|
+-------+--------+



# Quel est le client qui a acheté le plus d’article ?  

In [43]:
#DSL
Vente.groupBy(col("Client"))\
    .agg(F.sum("quantite").alias("Article"))\
    .sort(col("Article"), ascending=False)\
    .limit(1)\
    .show()


#SQL
spark.sql("""SELECT Client, sum(Quantite) as Article  from Vente_sql
          GROUP BY Client
          ORDER BY Article DESC LIMIT 1""").show()


+------+-------+
|Client|Article|
+------+-------+
|  toto|    8.0|
+------+-------+

+------+-------+
|Client|Article|
+------+-------+
|  toto|    8.0|
+------+-------+



# Qui est le client qui a dépensé le plus d’argent ? 

In [47]:
#DSL
Vente.join(Prix, col("Article") == col("Idm"))\
    .groupBy(col("Client"))\
    .agg(F.sum(col("quantite")*col("Prix")).alias("Montant"))\
    .sort(col("Montant"), ascending=False)\
    .limit(1)\
    .show()

#SQL       
spark.sql("""SELECT Client, sum(Quantite*Prix) as Montant from Vente_sql
        left join Prix_sql
        on Vente_sql.Article=Prix_sql.IDM
        GROUP BY Vente_sql.client
        ORDER BY Montant DESC LIMIT 1 """).show()

+------+-------+
|Client|Montant|
+------+-------+
|  toto|  210.0|
+------+-------+

+------+-------+
|Client|Montant|
+------+-------+
|  toto|  210.0|
+------+-------+

