In [1]:

import findspark
findspark.init("C:\Spark")
import os
import sys
import findspark
findspark.init()
import pyspark 
from pyspark import SparkContext
from pyspark import SparkConf

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.functions import format_number
from pyspark.sql.functions import max, min
from pyspark.sql.functions import mean
from pyspark.sql.functions import corr
from pyspark.sql.functions import year
from pyspark.sql.functions import month

#### 1) Start a simple Spark Session

In [2]:
spark = SparkSession.builder.appName('walmart_stock').getOrCreate()
sc = spark.sparkContext # Lancement de la session Spark

In [6]:
#### 2ème variante du lancement de la session Spark
spark=SparkSession.builder\
                .master("local[*]")\
                .appName("walmark_stock")\
                .getOrCreate()

#### 2) Load the Walmart Stock CSV File

In [7]:

df = spark.read.option("header", True).csv(r"C:\Users\rbamb\Downloads\Cours M2 SEP\CM Outils BIG DATA\Outils Big Data\walmart_stock.csv")
df.show(3) # On charge la base de données et on lance les trois premières lignes pour être sûre que tout fonctionne

df.createOrReplaceTempView('Table')

spark.sql("""SELECT * FROM Table""").show() 

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 3 rows

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+----------+------------------+-----------------

#### 3) What are the column names ?

In [8]:

print("Les noms des colonnes sont :", df.columns) # On va ressortir les noms de colonnes

Les noms des colonnes sont : ['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']


#### 4) What does the Schema look like ?

In [9]:

df.printSchema() 

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



#### 5) Create a new dataframe with a column called HV_Ratio that is the ratio of the High Priceversus volume of stock traded for a day

In [10]:

df_ratio = df.withColumn('HV_Ratio', df['High']/df['Volume']).select(['HV_Ratio'])
df_ratio.show()

#### SQL method
spark.sql("""SELECT High/Volume as HV_Ratio FROM Table""").show() 

+--------------------+
|            HV_Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows

+--------------------+
|            HV_Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.04144

#### 6) What day had the Peak High in Price?

In [11]:

print(u"Le jour avec le prix le plus élevé est le :", df.orderBy(df['High'].desc()).select(['Date']).head(1)[0]['Date'])

# SQL method
spark.sql("""SELECT ROUND(MAX(High), 2 ) as Prix FROM Table""").show()

Le jour avec le prix le plus élevé est le : 2015-01-13
+-----+
| Prix|
+-----+
|90.97|
+-----+



#### 7) What is the mean of the Close column?

In [19]:

df.select(mean('Close')).show()

# SQL method
spark.sql("""SELECT ROUND(MEAN(Close), 2) as Moyenne FROM Table""").show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+

+-------+
|Moyenne|
+-------+
|  72.39|
+-------+



#### 8) What is the max and min of the Volume column?

In [22]:

print("What is the max and min of the Volume column?")
df.select(max('Volume'),min('Volume')).show()

# SQL method
spark.sql("""SELECT MAX(Volume) as MAX, MIN(Volume) as MIN FROM Table""").show()

What is the max and min of the Volume column?
+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|    9994400|   10010500|
+-----------+-----------+

+-------+--------+
|    MAX|     MIN|
+-------+--------+
|9994400|10010500|
+-------+--------+



#### 9) How many days was the Close lower than 60 dollars?


In [12]:

days = df.filter(df['Close'] < 60).count()
print("Il y avait", days ,"jours durant lesquels la valeur Close était inférieur à 60.")

# SQL method
spark.sql("""SELECT COUNT(Date) as Jours FROM Table WHERE Close < 60""").show()

Il y avait 81 jours durant lesquels la valeur Close était inférieur à 60.
+-----+
|Jours|
+-----+
|   81|
+-----+



#### 10) What percentage of the time was the High greater than 80 dollars ?

In [13]:

percent = df.filter('High > 80').count() * 100/df.count()
print("Il y avait", round(percent, 2), "% de temps durant lequel la valeur High était supérieur à 80 dollars.")

# SQL method
spark.sql("""SELECT ROUND((SELECT COUNT(High) FROM Table WHERE High > 80)*100/COUNT(High), 2) as Max_High FROM Table""").show()

Il y avait 8.43 % de temps durant lequel la valeur High était supérieur à 80 dollars.
+--------+
|Max_High|
+--------+
|    8.43|
+--------+



#### 11) What is the max High per year?

In [14]:

# SQL method
spark.sql("""SELECT YEAR(Date) as Year, ROUND(MAX(High), 2) as Max_High FROM Table GROUP BY Year ORDER BY Year""").show()

+----+--------+
|Year|Max_High|
+----+--------+
|2012|    77.6|
|2013|   81.37|
|2014|   88.09|
|2015|   90.97|
|2016|   75.19|
+----+--------+



#### 12) What is the average Close for each Calendar Month?

In [61]:

# SQL method
spark.sql("""SELECT MONTH(Date) as Month, ROUND(AVG(Close), 2) as AVG_Close FROM Table GROUP BY MONTH(Date) ORDER BY MONTH(Date)""").show()

+-----+---------+
|Month|AVG_Close|
+-----+---------+
|    1|    71.45|
|    2|    71.31|
|    3|    71.78|
|    4|    72.97|
|    5|    72.31|
|    6|     72.5|
|    7|    74.44|
|    8|    73.03|
|    9|    72.18|
|   10|    71.58|
|   11|    72.11|
|   12|    72.85|
+-----+---------+

