Sommaire :
 - <a href="#1">Importation des packages du sparkSession et de la base </a>
 - <a href="#2">Questions </a>

<a name="1">Importation des packages du sparkSession et de la base </a>

In [67]:
# Importation
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


# Instantiation
spark = SparkSession.builder \
                    .master("local") \
                    .appName("FolksDF") \
                    .getOrCreate()

<a name="2">Questions </a>

**Questions 1&2 : Start a Spark Session, load the Walmart StockFile**


In [68]:
# Importation du fichier
Walmart = spark.read \
                .option("header",'true') \
                .option("delimiter",",") \
                .csv("walmart_stock.csv")
Walmart.show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000000004|
|2012-01-10|             59.43|59.709998999999996|             5

**Question 3 : Column names**

In [5]:
Walmart.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

<font color="blue"> Donc les noms de colonnes sont: Date, Open, Hight, Low, .. 

**Question 4 : Schema** 

In [6]:
Walmart.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



<font color="blue"> Cette commande affiche chaque nom de variable avec son type ansi que 'nullable = true' : autorise les valeurs nulles 

**Question 5 : Create a new dataframe WalmartNew with a column called HV_Ratio that is the ratio of the High Price versus volume of stock traded for a day**

In [7]:
WalmartNew = Walmart.withColumn("HV_Ratio", F.col("High")/F.col("Volume"))
WalmartNew.head() # juste la premiere ligne

Row(Date='2012-01-03', Open='59.970001', High='61.060001', Low='59.869999', Close='60.330002', Volume='12668800', Adj Close='52.619234999999996', HV_Ratio=4.819714653321546e-06)

<font color="green"> **Remarque**: WalmartNew.show(4) affiche les 4 premieres lignes </font>

**Question 6: What day had the Peak High in Price?**

- En SQL :


In [8]:
WalmartNew.createOrReplaceTempView("WalmartSQL") # transformation du data frame en table

In [9]:
#spark.sql("""select Date from WalmartSQL order by High desc""").first() # solution 1

spark.sql("""select Date from WalmartSQL order by High desc limit 1""").show() # solution 2

+----------+
|      Date|
+----------+
|2015-01-13|
+----------+



- En DSL : (donc en spark)

In [10]:
#WalmartNew.orderBy(F.col("High").desc()) \
#          .select(F.col("Date")) \
#          .head()                                    # solution 1

WalmartNew.select(F.col("Date")) \
          .orderBy(F.col("High").desc()) \
          .head()                                    # solution 1 bis ( ressemble au SQL)

Row(Date='2015-01-13')

**Question 6 : What is the mean of the Close column?**

- En SQL :

In [11]:
spark.sql("""select mean(Close) as Moyenne from WalmartSQL""").show()

+-----------------+
|          Moyenne|
+-----------------+
|72.38844998012726|
+-----------------+



- En DSL

In [12]:
#WalmartNew.select("Close") \
#          .summary("mean") \
#          .show()                                # solution 1

WalmartNew.agg(F.mean("Close") \
          .alias("Moyenne")) \
          .show()                                # solution 2

+-----------------+
|          Moyenne|
+-----------------+
|72.38844998012726|
+-----------------+



<font color="green"> **Remarque** : on peut changer mean par avg

**Question 7 : What is the max and min of the Volume column?**

- En SQL :

In [14]:
spark.sql("""select max(Volume) as Max, min(Volume) as Min from WalmartSQL""").show()

+-------+--------+
|    Max|     Min|
+-------+--------+
|9994400|10010500|
+-------+--------+



- En DSL :

In [16]:
WalmartNew.agg(F.max("Volume").alias("Max"),F.min("Volume").alias("Min")) \
          .show()

+-------+--------+
|    Max|     Min|
+-------+--------+
|9994400|10010500|
+-------+--------+



**Question 8 : How many days was the Close lower than 60 dollars?**

- En SQL :

In [17]:
spark.sql("""select count(Date) as Nb from WalmartSQL where Close < '60' """).show()

+---+
| Nb|
+---+
| 81|
+---+



- En DSL :

In [18]:
#WalmartNew.filter(F.col("Close") < '60') \
#          .agg(F.count("Date")) \ # faire l'aggregation apres le filter
#          .show()                                             # solution 1

WalmartNew.filter(F.col("Close") < '60') \
          .count()                                             # solution 2

81

<font color="green"> **Remarque:** Faire le filter avant agg 

**Question 10 : What percentage of the time was the High greater than 80 dollars ?**

- En SQL :

In [44]:
spark.sql(""" select round(
                            (select count(*) from WalmartSQL
                                where High>='80') / (count(*))* 100
                        , 2)  as Pourcentage from WalmartSQL""").show()


+-----------+
|Pourcentage|
+-----------+
|       9.14|
+-----------+



<font color="green">  **Remarque:** En SQL, il faut mettre >= et non > </font> 


- En DSL

In [45]:
Temporaire = WalmartNew.filter(F.col("High")>'80') \
                        .agg(F.count("*").alias ("Comptage")) \
                        .collect()[0][0] 

WalmartNew.agg(F.round((Temporaire/F.count("*")*100),2).alias("Pourcentage")) \
          .show()

+-----------+
|Pourcentage|
+-----------+
|       9.14|
+-----------+



**Question 11 : What is the max High per year?**

<font color="blue"> **Astuce:  per -> group by**

- En SQL :

In [73]:
spark.sql(""" select  substr(Date,1,4) as Annee, max(High) as Max_High from WalmartSQL group by Annee order by Annee asc """).show() 

+-----+---------+
|Annee| Max_High|
+-----+---------+
| 2012|77.599998|
| 2013|81.370003|
| 2014|88.089996|
| 2015|90.970001|
| 2016|75.190002|
+-----+---------+



- En DSL :

<font color="blue"> Partie 1 : Crée une nouelle colonne Annee qui extrait l'annee depuis la variable Date

In [51]:
WalmartNew = WalmartNew.withColumn("Annee",F.substring("Date",1,4))
WalmartNew.select("Date", "Annee").show(4) 


+----------+-----+
|      Date|Annee|
+----------+-----+
|2012-01-03| 2012|
|2012-01-04| 2012|
|2012-01-05| 2012|
|2012-01-06| 2012|
+----------+-----+
only showing top 4 rows



<font color="blue"> Partie 2 : Pour chaque annee, trouver le max du prix correspondant

In [74]:
WalmartNew.groupBy(F.col("Annee")) \
          .agg(F.max("High")) \
          .orderBy(F.col("Annee").asc()) \
          .show()

+-----+---------+
|Annee|max(High)|
+-----+---------+
| 2012|77.599998|
| 2013|81.370003|
| 2014|88.089996|
| 2015|90.970001|
| 2016|75.190002|
+-----+---------+



<font color="green"> **Remarque** : Encore une fois, le groupBy avant le agg

**Question 12 : What is the average Close for each Calendar Month?**

- En SQL :

In [70]:
spark.sql(""" select  substr(Date,6,2) as Mois, mean(Close) as Moyenne from WalmartSQL group by Mois order by Mois asc """).show()  

+----+-----------------+
|Mois|          Moyenne|
+----+-----------------+
|  01|71.44801958415842|
|  02|  71.306804443299|
|  03|71.77794377570092|
|  04|72.97361900952382|
|  05|72.30971688679247|
|  06| 72.4953774245283|
|  07|74.43971943925233|
|  08|73.02981855454546|
|  09|72.18411785294116|
|  10|71.57854545454543|
|  11| 72.1110893069307|
|  12|72.84792478301885|
+----+-----------------+



- En DSL :

In [72]:
WalmartNew = WalmartNew.withColumn("Mois",F.substring("Date",6,2))

WalmartNew.groupBy(F.col("Mois")) \
          .agg(F.mean("Close").alias("Moyenne")) \
          .orderBy(F.col("Mois").asc()) \
          .show()

+----+-----------------+
|Mois|          Moyenne|
+----+-----------------+
|  01|71.44801958415842|
|  02|  71.306804443299|
|  03|71.77794377570092|
|  04|72.97361900952382|
|  05|72.30971688679247|
|  06| 72.4953774245283|
|  07|74.43971943925233|
|  08|73.02981855454546|
|  09|72.18411785294116|
|  10|71.57854545454543|
|  11| 72.1110893069307|
|  12|72.84792478301885|
+----+-----------------+

