In [7]:
import findspark
findspark.init("D:/spark")
import pyspark 
findspark.find()

'D:/spark'

## Importation 

In [8]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

## Question1 : Start a simple Spark Session


In [9]:
spark = SparkSession.builder.master("local").appName("FolksDF").getOrCreate()

## Question2 : Load the Walmart Stock CSV File


In [11]:
Walmart_stock =spark.read.option("header",True).csv('Data/walmart_stock.csv')
Walmart_stock.show(3)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 3 rows



## Question3 : What are the column names?


In [16]:
# DSL
Walmart_stock.columns
# SQL
Walmart_stock.createOrReplaceTempView("Walmart_stockSQL") 
spark.sql("""SHOW COLUMNS 
    from Walmart_stockSQL
""").show() 

+---------+
| col_name|
+---------+
|     Date|
|     Open|
|     High|
|      Low|
|    Close|
|   Volume|
|Adj Close|
+---------+



## Question4 : What does the Schema look like?


In [17]:
Walmart_stock.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



## Question5 : Create a new dataframe with a column called HV_Ratio that is the ratio of the High Price versus volume of stock traded for a day


In [18]:
Walmart_stock_=Walmart_stock.withColumn("HV_Ratio",F.col("High")/F.col("Volume"))
Walmart_stock.show(2)

+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|            HV_Ratio|
+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
only showing top 2 rows



In [19]:
Walmart_stock.createOrReplaceTempView("DFSQL")
spark.sql(""" 
select *, High/Volume as HV_Ratio 
from DFSQL""").show(2)

+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|            HV_Ratio|
+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
only showing top 2 rows



## Question6 : What day had the Peak High in Price? 


In [20]:
Walmart_stock_.orderBy(F.col("High").desc()).select(F.col("Date")).head(1)

[Row(Date='2015-01-13')]

In [21]:
spark.sql("""
select Date 
from DFSQL
order by High desc""").show(1)

+----------+
|      Date|
+----------+
|2015-01-13|
+----------+
only showing top 1 row



## Question7 : What is the mean of the Close column?


In [45]:
Walmart_stock_.select(F.mean('Close').alias('moyenne')).show()

+-----------------+
|          moyenne|
+-----------------+
|72.38844998012726|
+-----------------+



In [53]:
DF2.agg(F.mean(F.col('Close'))).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



In [38]:
spark.sql("""
select avg(Close) as moyenne
from DFSQL""").show()

+-----------------+
|          moyenne|
+-----------------+
|72.38844998012726|
+-----------------+



## Question8 : What is the max and min of the Volume column?


In [54]:
Walmart_stock_.agg(F.min(F.col('Volume')).alias('minimum'),F.max(F.col('Volume')).alias('maximum')).show()

+--------+-------+
| minimum|maximum|
+--------+-------+
|10010500|9994400|
+--------+-------+



In [56]:
spark.sql("""
select max(Volume) as maximum, min(Volume) as minimum
from DFSQL
""").show()

+-------+--------+
|maximum| minimum|
+-------+--------+
|9994400|10010500|
+-------+--------+



## Question9 : How many days was the Close lower than 60 dollars?


In [69]:
spark.sql("""
select count(Date) as N_Days
from DFSQL
where Close < 60
""").show()

+------+
|N_Days|
+------+
|    81|
+------+



In [81]:
Walmart_stock_.filter(F.col('Close')<60).select(F.count(F.col('Date')).alias('N_Days')).show()

+------+
|N_Days|
+------+
|    81|
+------+



## Question10 : What percentage of the time was the High greater than 80 dollars ?


In [21]:
spark.sql("""
SELECT ROUND((SELECT COUNT(High) FROM DFSQL WHERE High > 80)*100/COUNT(High), 2) as freq FROM DFSQL
""").show()

+----+
|freq|
+----+
|8.43|
+----+



In [109]:
Walmart_stock.filter(F.col('High')>80).select(F.round((F.count(F.col('Date'))*100/DF.count()),2).alias('freq')).show()

+----+
|freq|
+----+
|8.43|
+----+



## Question11 : What is the max High per year?


In [112]:
Walmart_stock = Walmart_stock.withColumn('years',F.year(F.to_timestamp('Date', 'yyyy-MM-dd')))
Walmart_stock.show(2)

+----------+------------------+---------+---------+------------------+--------+------------------+-----+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|years|
+----------+------------------+---------+---------+------------------+--------+------------------+-----+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996| 2012|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475| 2012|
+----------+------------------+---------+---------+------------------+--------+------------------+-----+
only showing top 2 rows



In [116]:
Walmart_stock.groupBy(F.col('years')).agg(F.max(F.col('High')).alias('maximum par an')).show()

+-----+--------------+
|years|maximum par an|
+-----+--------------+
| 2015|     90.970001|
| 2013|     81.370003|
| 2014|     88.089996|
| 2012|     77.599998|
| 2016|     75.190002|
+-----+--------------+



In [124]:
Walmart_stock.createOrReplaceTempView("DFSQL")
spark.sql("""
select years, max(High) as maximum
from DFSQL
group by years""").show()

+-----+---------+
|years|  maximum|
+-----+---------+
| 2015|90.970001|
| 2013|81.370003|
| 2014|88.089996|
| 2012|77.599998|
| 2016|75.190002|
+-----+---------+



## Question12 : What is the average Close for each Calendar Month? 

In [22]:
Walmart_stock = Walmart_stock.withColumn('month',F.month(F.to_timestamp('Date', 'yyyy-MM-dd')))
Walmart_stock.show(2)

+----------+------------------+---------+---------+------------------+--------+------------------+-----+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|month|
+----------+------------------+---------+---------+------------------+--------+------------------+-----+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|    1|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|    1|
+----------+------------------+---------+---------+------------------+--------+------------------+-----+
only showing top 2 rows



In [125]:
Walmart_stock.groupBy(F.col('month')).agg(F.mean(F.col('Close')).alias('moyenne par an')).show()

+-----+-----------------+
|month|   moyenne par an|
+-----+-----------------+
|   12|72.84792478301885|
|    1|71.44801958415842|
|    6| 72.4953774245283|
|    3|71.77794377570092|
|    5|72.30971688679247|
|    9|72.18411785294116|
|    4|72.97361900952382|
|    8|73.02981855454546|
|    7|74.43971943925233|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|    2|  71.306804443299|
+-----+-----------------+



In [126]:
Walmart_stock.createOrReplaceTempView("DFSQL")
spark.sql("""
select month, mean(Close) as moyenne
from DFSQL
group by month""").show()

+-----+-----------------+
|month|          moyenne|
+-----+-----------------+
|   12|72.84792478301885|
|    1|71.44801958415842|
|    6| 72.4953774245283|
|    3|71.77794377570092|
|    5|72.30971688679247|
|    9|72.18411785294116|
|    4|72.97361900952382|
|    8|73.02981855454546|
|    7|74.43971943925233|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|    2|  71.306804443299|
+-----+-----------------+



In [23]:
# Close Spark Session
spark.stop()