# Spark Walmart Data Analysis Project Exercise

Réalisé par : **Redouane ESSAMMAA**

Let's get some quick practice with your new Spark DataFrame skills, you will be asked some basic questions about some stock market data, in this case Walmart Stock from the years 2012-2017. This exercise will just ask a bunch of questions, unlike the machine learning exercises, which will be a little looser and be in the form of "Consulting Projects".

In [1]:
#import libraries:

import findspark
from pyspark.sql import SparkSession

### 1- Start a simple Spark Session

In [2]:
# creating the spark session
spark = SparkSession.builder.appName('walmart').getOrCreate()

### 2- Load the Walmart Stock CSV File

In [20]:
df = spark.read.csv('walmart_stock.csv', inferSchema=True, header=True)
df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [21]:
# SQL method :

df.createOrReplaceTempView('Table')

spark.sql("""SELECT * FROM Table""").show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



### 3- What are the column names ?

In [24]:
print("Column names are : ",df.columns)

Column names are :  ['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']


### 4- What does the Schema look like? 

In [5]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



### 5-Create a new dataframe with a column called HV_Ratio that is the ratio of the High Price versus volume of stock traded for a day

In [6]:
df_hv = df.withColumn('HV Ratio', df['High']/df['Volume']).select(['HV Ratio'])
df_hv.show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



In [25]:
# SQL method
spark.sql("""SELECT High/Volume as HV_Ratio FROM Table""").show()

+--------------------+
|            HV_Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



### 6- What day had the Peak High in Price?

In [8]:
print("Le jour avec le prix le plus élevé : ", df.orderBy(df['High'].desc()).select(['Date']).head(1)[0]['Date'])

Le jour avec le prix le plus élevé :  2015-01-13


In [31]:
# SQL method
df.createOrReplaceTempView("Walmart")
#on transforme la DataTable en table
spark.sql("""SELECT Date FROM Walmart ORDER BY High""").show(1)

+----------+
|      Date|
+----------+
|2015-11-13|
+----------+
only showing top 1 row



### 7- What is the mean of the Close column?

In [30]:
from pyspark.sql.functions import mean

df.select(mean('Close')).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



In [29]:
# SQL method
spark.sql("""SELECT ROUND(MEAN(Close), 2) as Moyenne FROM Table""").show()

+-------+
|Moyenne|
+-------+
|  72.39|
+-------+



### 8- What is the max and min of the Volume column?

In [35]:
from pyspark.sql.functions import min, max

df.select(max('Volume'),min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



In [32]:
# SQL method
spark.sql("""SELECT MAX(Volume) as MAX, MIN(Volume) as MIN FROM Table""").show()

+--------+-------+
|     MAX|    MIN|
+--------+-------+
|80898100|2094900|
+--------+-------+



### 9- How many days was the Close lower than 60 dollars?

In [13]:
print("Nombre de jours où 'Close' est inférieure à 60€ : ",df.filter(df['Close'] < 60).count(), "jours.")

Nombre de jours où 'Close' est inférieure à 60€ :  81 jours.


In [36]:
# SQL method
spark.sql("""SELECT COUNT(Date) as Jours FROM Table WHERE Close < 60""").show()

+-----+
|Jours|
+-----+
|   81|
+-----+



### 10- What percentage of the time was the High greater than 80 dollars ? In other words, (Number of Days High>80)/(Total Days in the dataset)

In [14]:
df.filter('High > 80').count() * 100/df.count()

9.141494435612083

In [37]:
# SQL method
spark.sql("""SELECT ROUND((SELECT COUNT(High) FROM Table WHERE High > 80)*100/COUNT(High), 2) as Max_High FROM Table""").show()

+--------+
|Max_High|
+--------+
|    9.14|
+--------+



### 11-What is the max High per year ?

In [16]:
from pyspark.sql.functions import year

year_df = df.withColumn('Year', year(df['Date']))

year_df.groupBy('Year').max()['Year', 'max(High)'].show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



In [38]:
# SQL method
spark.sql("""SELECT YEAR(Date) as Year, ROUND(MAX(High), 2) as Max_High FROM Table GROUP BY Year ORDER BY Year""").show()

+----+--------+
|Year|Max_High|
+----+--------+
|2012|    77.6|
|2013|   81.37|
|2014|   88.09|
|2015|   90.97|
|2016|   75.19|
+----+--------+



### 12- What is the average Close for each Calendar Month? In other words, across all the years, what is the average Close price for Jan,Feb, Mar, etc...

In [18]:
from pyspark.sql.functions import month

#Create a new column Month from existing Date column
month_df = df.withColumn('Month', month(df['Date']))

#Group by month and take average of all other columns
month_df = month_df.groupBy('Month').mean()

#Sort by month
month_df = month_df.orderBy('Month')

#Display only month and avg(Close), the desired columns
month_df['Month', 'avg(Close)'].show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+



In [39]:
# SQL method
spark.sql("""SELECT MONTH(Date) as Month, ROUND(AVG(Close), 2) as AVG_Close FROM Table GROUP BY MONTH(Date) ORDER BY MONTH(Date)""").show()

+-----+---------+
|Month|AVG_Close|
+-----+---------+
|    1|    71.45|
|    2|    71.31|
|    3|    71.78|
|    4|    72.97|
|    5|    72.31|
|    6|     72.5|
|    7|    74.44|
|    8|    73.03|
|    9|    72.18|
|   10|    71.58|
|   11|    72.11|
|   12|    72.85|
+-----+---------+

