# <font color='blue'>**BIG** **DATA** </font>

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
#1)
# instanciation

Spark = SparkSession.builder.master("local").appName("FolksDF").getOrCreate()

In [3]:
#2)
#Importation 
Walmart = Spark.read.option("header",'true').option("delimiter",",").csv("walmart_stock.csv")
Walmart.show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000000004|
|2012-01-10|             59.43|59.709998999999996|             5

In [4]:
#3)
Walmart.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [5]:
#4) le schema 

Walmart.printSchema()
#nullable =true : autorise les valeurs nulles

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [6]:
#5)
Walmart2 = Walmart.withColumn('HV_Ratio',F.col('High')/F.col('Volume')) #Preciser que ce sont des colonnes sinon spark comprend pas
Walmart2.head()#La premiere ligne
Walmart2.show(4)#les 4 premieres lignes

+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|            HV_Ratio|
+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|4.669412994783916E-6|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|7.367338463826307E-6|
+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
only showing top 4 rows



In [7]:
#6)
Walmart2.createOrReplaceTempView("WalmartSQL") #Transformation du data frame en table !!!!

In [8]:
#SOLUTION 1
#SQL
Spark.sql("SELECT Date FROM WalmartSQL ORDER BY High desc").first()#Montre la premiere data apres avoir trié 


Row(Date='2015-01-13')

In [9]:
#Solution 2
Spark.sql("SELECT Date from WalmartSQL ORDER BY High Desc limit 1").show()

+----------+
|      Date|
+----------+
|2015-01-13|
+----------+



In [10]:
#DSL
Walmart2.orderBy(F.col("High").desc()).select(F.col("Date")).head()

Row(Date='2015-01-13')

In [11]:
Walmart2.select(F.col("Date")).orderBy(F.col("High").desc()).head()#Il n'y a pas d'ordre en fait

Row(Date='2015-01-13')

In [12]:
#On prefere ecrire comme ceci
Walmart2.select(F.col("Date")) \
        .orderBy(F.col("High") \
        .desc()) \
        .head()

Row(Date='2015-01-13')

In [13]:
#7 la moyenne de close en sql et dsl
#SQL
Spark.sql("Select mean(Close) as Moyenne_Close from WalmartSQL").show()

+-----------------+
|    Moyenne_Close|
+-----------------+
|72.38844998012726|
+-----------------+



In [14]:
#DSL
#Solution 1
Walmart2.select(F.col("Close"))\
        .summary("mean") \
        .show()

+-------+-----------------+
|summary|            Close|
+-------+-----------------+
|   mean|72.38844998012726|
+-------+-----------------+



In [15]:
#Solution 2
Walmart2.agg(F.mean("Close")\
        .alias("Moyenne_Close"))\
        .show()

+-----------------+
|    Moyenne_Close|
+-----------------+
|72.38844998012726|
+-----------------+



In [16]:
#7 
#SQL
Spark.sql("Select min(Volume),max(Volume) from WalmartSQL").show()

+-----------+-----------+
|min(Volume)|max(Volume)|
+-----------+-----------+
|   10010500|    9994400|
+-----------+-----------+



In [17]:
#DSL
#Solution 1
Walmart2.select(F.col("Volume"))\
        .summary("min","max") \
        .show()

+-------+--------+
|summary|  Volume|
+-------+--------+
|    min|10010500|
|    max| 9994400|
+-------+--------+



In [18]:
#Solution 1
Walmart2.select(F.max("Volume"),F.min("Volume"))\
        .show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|    9994400|   10010500|
+-----------+-----------+



In [19]:
#8 
#SQL
Spark.sql("Select count(Date) from WalmartSQL  where Close < '60' ").show() #'60' pas obligé sur cette version mias preferable

+-----------+
|count(Date)|
+-----------+
|         81|
+-----------+



In [20]:
#DSL
#Solution 1
Walmart2.filter(F.col("Close") < '60') \
        .agg(F.count("Date"))\
        .show()

+-----------+
|count(Date)|
+-----------+
|         81|
+-----------+



In [21]:
#Solution 2
Walmart2.filter(F.col("Close") < '60') \
        .count()

81

In [22]:
#9 What percentage of the time was the High greater than 80 dollars ?(In other words, (Number of Days High>80)/(Total Days in the dataset))
#SQL

Spark.sql("select round((select count(*) from WalmartSQL where High > '80')/(count(*))* 100, 2) as Percentage from WalmartSQL").show()

+----------+
|Percentage|
+----------+
|      9.14|
+----------+



In [23]:
#DSL 
Temp = Walmart2.filter(F.col("High")> '80')\
        .agg(F.count("*").alias("Comptage")) \
        .collect()[0][0]

Walmart2.agg(F.round((Temp/F.count("*")*100),2).alias("Comptage"))\
        .show()

+--------+
|Comptage|
+--------+
|    9.14|
+--------+



In [24]:
#11-What is the max High per year?
#SQL

Spark.sql("select max(High),SUBSTR(Date, 1, 4) as Annee from WalmartSQL group by Annee").show()


+---------+-----+
|max(High)|Annee|
+---------+-----+
|75.190002| 2016|
|77.599998| 2012|
|88.089996| 2014|
|81.370003| 2013|
|90.970001| 2015|
+---------+-----+



In [25]:
# DSL 

#Partie 1 : Crée une nouvelle colonne Annee qui extrait l'année depuis la date
Walmart3 = Walmart2.withColumn("Annee",F.substring("Date",1,4))
Walmart3.select("Date","Annee").show(5)#On fait pas le show direct lorsque qu'on crée un dataframe mais apres
#Partie 2 :  Retrouver le maximum du prix pour chaque année
Walmart3.groupBy(F.col("Annee")) \
        .agg(F.max("High")) \
        .show()

+----------+-----+
|      Date|Annee|
+----------+-----+
|2012-01-03| 2012|
|2012-01-04| 2012|
|2012-01-05| 2012|
|2012-01-06| 2012|
|2012-01-09| 2012|
+----------+-----+
only showing top 5 rows

+-----+---------+
|Annee|max(High)|
+-----+---------+
| 2016|75.190002|
| 2012|77.599998|
| 2014|88.089996|
| 2013|81.370003|
| 2015|90.970001|
+-----+---------+



In [42]:
#What is the average Close for each Calendar Month? In other words, across all the years, what is the average Close price for Jan,Feb, Mar, etc... Your result will have a value for each of these months.
Spark.sql("select SUBSTR(Date, 1, 4) as Annee, SUBSTR(Date, 6, 2) as Mois, mean(Close) as Moy_Close from WalmartSQL group by Annee , Mois order by Annee, Mois").show()

+-----+----+------------------+
|Annee|Mois|         Moy_Close|
+-----+----+------------------+
| 2012|  01|        60.2354999|
| 2012|  02|            60.898|
| 2012|  03|60.433636818181796|
| 2012|  04|60.149000150000006|
| 2012|  05|61.456363409090905|
| 2012|  06| 67.50380961904762|
| 2012|  07| 72.40666661904763|
| 2012|  08| 73.04478265217392|
| 2012|  09| 74.18157921052631|
| 2012|  10| 75.30619061904761|
| 2012|  11| 71.10952333333333|
| 2012|  12| 69.71100009999999|
| 2013|  01| 69.09476142857143|
| 2013|  02| 70.62315857894738|
| 2013|  03| 73.43649940000002|
| 2013|  04| 77.68954572727273|
| 2013|  05| 77.81636368181817|
| 2013|  06| 74.97800020000001|
| 2013|  07| 77.11545418181818|
| 2013|  08| 75.22409204545455|
+-----+----+------------------+
only showing top 20 rows

