In [1]:
#Start a simple Spark Session
import findspark
findspark.init("C:/spark")
import pyspark 
findspark.find()

'C:/spark'

In [2]:
#importer spark Session
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [3]:
#1- Start a simple Spark Session
spark = SparkSession.builder \
                    .master("local") \
                    .appName("FolksDF") \
                    .getOrCreate()


In [4]:
#2 Load the Walmart Stock CSV File
Walmart = spark.read \
                .option("header",'true') \
                .option("delimiter",",") \
                .csv("walmart_stock.csv")
Walmart.show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000000004|
|2012-01-10|             59.43|59.709998999999996|             5

In [5]:
#3-the column names
Walmart.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [6]:
#4-the Schema look like:
Walmart.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [7]:
#5-Create a new dataframe with a column called "HV_Ratio" that is the ratio of the High Price versus volume of stock traded for a day
WalmartNew = Walmart\
            .withColumn('HV_Ratio', col('High')/col('Volume'))


NameError: name 'col' is not defined

In [8]:
#5-Create a new dataframe with a column called "HV_Ratio" that is the ratio of the High Price versus volume of stock traded for a day
WalmartNew = Walmart.withColumn("HV_Ratio", F.col("High")/F.col("Volume"))
WalmartNew.head() 


Row(Date='2012-01-03', Open='59.970001', High='61.060001', Low='59.869999', Close='60.330002', Volume='12668800', Adj Close='52.619234999999996', HV_Ratio=4.819714653321546e-06)

In [9]:
#show the new dataframe
WalmartNew.show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|            HV_Ratio|
+----------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|4.669412994783916E-6|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|7.367338463826307E-6|
|2012-01-09|         59.029999|   

In [10]:
#6-The day  that had the Peak High in Price
#en SQL 
# transformation du dataframe en table
WalmartNew.createOrReplaceTempView("WalmartSQL") 
spark.sql("""select Date from WalmartSQL order by High desc limit 1""").show()

+----------+
|      Date|
+----------+
|2015-01-13|
+----------+



In [11]:
#en DSL 
WalmartNew.select(F.col("Date")) \
          .orderBy(F.col("High").desc()) \
          .head()  

Row(Date='2015-01-13')

In [12]:
#7-The mean of the Close column
# en SQL
spark.sql("""select mean(Close) as Moyenne from WalmartSQL""").show()

+-----------------+
|          Moyenne|
+-----------------+
|72.38844998012726|
+-----------------+



In [13]:
# en DSL
WalmartNew.agg(F.mean("Close") \
          .alias("Moyenne")) \
          .show()    

+-----------------+
|          Moyenne|
+-----------------+
|72.38844998012726|
+-----------------+



In [14]:
#8-The max and min of the Volume column
#en SQL
spark.sql("""select max(Volume) as Max, min(Volume) sas Min from WalmartSQL""").show()

ParseException: 
mismatched input 'Min' expecting {<EOF>, ';'}(line 1, pos 43)

== SQL ==
select max(Volume) as Max, min(Volume) sas Min from WalmartSQL
-------------------------------------------^^^


In [15]:
#8-The max and min of the Volume column
#en SQL
spark.sql("""select max(Volume) as Max, min(Volume) as Min from WalmartSQL""").show()

+-------+--------+
|    Max|     Min|
+-------+--------+
|9994400|10010500|
+-------+--------+



In [16]:
#en DSL
WalmartNew.agg(F.max("Volume").alias("Max"),F.min("Volume").alias("Min")) \
          .show()


+-------+--------+
|    Max|     Min|
+-------+--------+
|9994400|10010500|
+-------+--------+



In [17]:
#9-The days that were the Close lower than 60 dollars
#en SQL
spark.sql("""select count(Date) as Nb from WalmartSQL where Close < '60' """).show()

#en DSL
WalmartNew.filter(F.col("Close") < '60') \
          .count()          

+---+
| Nb|
+---+
| 81|
+---+



81

In [18]:
#9-The days that were the Close lower than 60 dollars
#en SQL
spark.sql("""select count(Date) as Nb from WalmartSQL where Close < '60' """).show()

+---+
| Nb|
+---+
| 81|
+---+



In [19]:
#10-The percentage of the time that was the High greater than 80 dollars.(In other words, (Number of Days High>80)/(Total Days in the dataset))
#en SQL
spark.sql(""" select round(
                            (select count(*) from WalmartSQL
                                where High>='80') / (count(*))* 100
                        , 2)  as Pourcentage from WalmartSQL""").show()

#en DSL
Temporaire = WalmartNew.filter(F.col("High")>'80') \
                        .agg(F.count("*").alias ("Comptage")) \
                        .collect()[0][0] 
WalmartNew.agg(F.round((Temporaire/F.count("*")*100),2).alias("Pourcentage")) \
          .show()



+-----------+
|Pourcentage|
+-----------+
|       9.14|
+-----------+

+-----------+
|Pourcentage|
+-----------+
|       9.14|
+-----------+



In [20]:
#11-The max High per year
#en SQL
WalmartNew.agg(F.round((Temporaire/F.count("*")*100),2).alias("Pourcentage")) \
          .show()
        
#en DSL
WalmartNew = WalmartNew.withColumn("Annee",F.substring("Date",1,4))
WalmartNew.select("Date", "Annee").show(4)

WalmartNew.groupBy(F.col("Annee")) \
          .agg(F.max("High")) \
          .orderBy(F.col("Annee").asc()) \
          .show()

+-----------+
|Pourcentage|
+-----------+
|       9.14|
+-----------+

+----------+-----+
|      Date|Annee|
+----------+-----+
|2012-01-03| 2012|
|2012-01-04| 2012|
|2012-01-05| 2012|
|2012-01-06| 2012|
+----------+-----+
only showing top 4 rows

+-----+---------+
|Annee|max(High)|
+-----+---------+
| 2012|77.599998|
| 2013|81.370003|
| 2014|88.089996|
| 2015|90.970001|
| 2016|75.190002|
+-----+---------+



In [21]:
# 12-The average Close for each Calendar Month, In other words, across all the years, the average Close price for Jan,Feb, Mar, etc... the result will have a value for each of these months.
#en SQL
spark.sql(""" select  substr(Date,6,2) as Mois, mean(Close) as Moyenne from WalmartSQL group by Mois order by Mois asc """).show()

#en DSL
WalmartNew = WalmartNew.withColumn("Mois",F.substring("Date",6,2))

WalmartNew.groupBy(F.col("Mois")) \
          .agg(F.mean("Close").alias("Moyenne")) \
          .orderBy(F.col("Mois").asc()) \
          .show()

+----+-----------------+
|Mois|          Moyenne|
+----+-----------------+
|  01|71.44801958415842|
|  02|  71.306804443299|
|  03|71.77794377570092|
|  04|72.97361900952382|
|  05|72.30971688679247|
|  06| 72.4953774245283|
|  07|74.43971943925233|
|  08|73.02981855454546|
|  09|72.18411785294116|
|  10|71.57854545454543|
|  11| 72.1110893069307|
|  12|72.84792478301885|
+----+-----------------+

+----+-----------------+
|Mois|          Moyenne|
+----+-----------------+
|  01|71.44801958415842|
|  02|  71.306804443299|
|  03|71.77794377570092|
|  04|72.97361900952382|
|  05|72.30971688679247|
|  06| 72.4953774245283|
|  07|74.43971943925233|
|  08|73.02981855454546|
|  09|72.18411785294116|
|  10|71.57854545454543|
|  11| 72.1110893069307|
|  12|72.84792478301885|
+----+-----------------+

