In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *


In [2]:
spark = SparkSession \
    .builder \
    .appName("Projet Spark") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


In [3]:
schema1 = StructType([StructField('Invoice_ID', StringType(), True),
                      StructField('Branch', StringType(), True),
                      StructField('City', StringType(), True),
                      StructField('Customer_type', StringType(), True),
                      StructField('Gender', StringType(), True),
                      StructField('Product_line', StringType(), True),
                      StructField('Unit_price', DoubleType(), True),
                      StructField('Quantity', DoubleType(), True),
                      StructField('Tax', DoubleType(), True),
                      StructField('Total', DoubleType(), True),
                      StructField('Date', StringType(), True),
                      StructField('Time', StringType(), True),
                      StructField('Payment', StringType(), True),
                      StructField('cogs', DoubleType(), True),
                      StructField('gross_margin_percentage', DoubleType(), True),
                      StructField('gross_income', DoubleType(), True),
                      StructField('Rating', DoubleType(), True),  
])

In [4]:
sales = spark.read.format('csv')\
                        .schema(schema1)\
                        .option('header', True)\
                        .option("delimiter",",")\
                        .load(r'supermarket_sales.csv')
        

In [39]:
sales = sales.withColumn("month_day",concat_ws("-",split(sales["Date"], '/').getItem(0), split(sales["Date"], '/').getItem(1)))\
            .withColumn("month",concat_ws("-",split(sales["Date"], '/').getItem(0)))

In [6]:
liste_dates = sales.select("month_day").distinct().collect()

In [None]:
for i in liste_dates:
    df_day = sales.filter(sales.month_day == i["month_day"])
    ppath = 'sales_dates//'+i["month_day"]+'.csv'
    df_day.show()
    
    df_day.repartition(1).write\
            .mode("overwrite")\
            .format("csv")\
            .option("header","true")\
            .save('sales_dates//'+i["month_day"]+'.csv')

In [40]:
schema2 = StructType([StructField('Invoice_ID', StringType(), True),
                      StructField('Branch', StringType(), True),
                      StructField('City', StringType(), True),
                      StructField('Customer_type', StringType(), True),
                      StructField('Gender', StringType(), True),
                      StructField('Product_line', StringType(), True),
                      StructField('Unit_price', DoubleType(), True),
                      StructField('Quantity', DoubleType(), True),
                      StructField('Tax', DoubleType(), True),
                      StructField('Total', DoubleType(), True),
                      StructField('Date', StringType(), True),
                      StructField('Time', StringType(), True),
                      StructField('Payment', StringType(), True),
                      StructField('cogs', DoubleType(), True),
                      StructField('gross_margin_percentage', DoubleType(), True),
                      StructField('gross_income', DoubleType(), True),
                      StructField('Rating', DoubleType(), True),  
                      StructField('month_day', StringType(), True),
                      StructField('month', StringType(), True)
])

In [41]:
sales_streaming = spark.readStream.format('csv')\
    .schema(schema2)\
    .option("recursiveFileLookup","true")\
    .option("header","true")\
    .csv("sales_dates/*.csv")

In [42]:
print("spark is streaming " , sales_streaming.isStreaming)

spark is streaming  True


In [43]:
nb_magasins_branche_Mandalay = sales_streaming.filter(sales_streaming.City == "Mandalay").groupBy("Branch").count()

In [44]:
query_magasin_branche_Mandalay = nb_magasins_branche_Mandalay.writeStream\
  .outputMode("complete")\
  .format("console")\
  .start()


In [13]:
print("Le nombre de magasins par branche : ")

Le nombre de magasins par branche : 


In [14]:
nb_magasins_branche = sales_streaming.groupBy("Branch").count()

query_magasin_branche = nb_magasins_branche.writeStream\
  .outputMode("complete")\
  .format("console")\
  .start()


In [15]:
print("Pourcentage des clients membres / non membres : ")

Pourcentage des clients membres / non membres : 


In [28]:
membre_pourcentage = sales_streaming.groupBy("Customer_type").count().withColumn("pourcentage", col("count")/1000)
query_membership = membre_pourcentage.writeStream\
  .outputMode("complete")\
  .format("console")\
  .start()


In [26]:
print("Pourcentage des clients hommes / femmes : ")

Pourcentage des clients hommes / femmes : 


In [30]:
gender_pourcentage = sales_streaming.groupBy("Gender").count().withColumn("pourcentage", col("count")/1000)
query_membership = gender_pourcentage.writeStream\
  .outputMode("complete")\
  .format("console")\
  .start()

In [32]:
print("Prix moyen HT/des taxes/TTC : ")

Prix moyen HT/des taxes/TTC : 


In [35]:
prix_moyen = sales_streaming.selectExpr(
            "round(avg(Quantity * Unit_price), 2) AS HT_average",
            "round(avg(Tax), 2) AS Tax_average",
            "round(avg(Total),2) AS TTC_average"
            )
query_prix = prix_moyen.writeStream\
  .outputMode("complete")\
  .format("console")\
  .start()

In [None]:
print("moyene gross_income par jour :")

In [55]:
income_date = sales_streaming.groupBy("month_day").sum("gross_income")
query_income = income_date.writeStream\
  .outputMode("complete")\
  .format("console")\
  .start()

In [None]:
print("Moyenne rating par mois : ")

In [52]:
rating_mois = sales_streaming.groupBy("month").avg("Rating")
query_rating = rating_mois.writeStream\
  .outputMode("complete")\
  .format("console")\
  .start()

In [None]:
print("Top 10 des mois les plus rentables : ")

In [57]:
top_10 = income_date.sort(col("sum(gross_income)").desc())
query_top_10 = top_10.writeStream\
  .outputMode("complete")\
  .format("console")\
  .start()

In [None]:
print("Répartition des achats par moyen de paiement")

In [59]:
moyens_paiments = sales_streaming.groupBy("Payment").sum("Total")
query_moyens_paiments = moyens_paiments.writeStream\
  .outputMode("complete")\
  .format("console")\
  .start()