In [1]:
# Importation
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Instantiation
spark = SparkSession.builder.master("local").appName("FolksDF").getOrCreate()


Q1 : Importation du fichier

In [2]:
# Importation des fichiers
Walmart = spark.read.option("header",'true')\
                    .csv("Input/walmart_stock.csv")
Walmart.show(5)
# donc les noms de colonnes sont: Date, Open, Hight, Low, ..

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



Q2 : afficher les colonnes d'un data frame

In [3]:
Walmart.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

Q3 : le schéma

In [4]:
Walmart.printSchema()
# nullable = true : autorise les valeurs nulles

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



Q4 : Create a new dataframe with a column called HV_Ratio that is the ratio of the High Price versus volume of stock traded for a day

In [5]:
Walmart2 = Walmart.withColumn("HV_Ratio", F.col("High")/F.col("Volume"))
Walmart2.head() # juste la premiere ligne
Walmart2.show(4) # les 4 premieres lignes

+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|            HV_Ratio|
+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|4.669412994783916E-6|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|7.367338463826307E-6|
+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
only showing top 4 rows



Q5 : What day had the Peak High in Price?

In [6]:
# SQL
Walmart2.createOrReplaceTempView("WalmartSQL") # transformation du data frame en table !!!!
#spark.sql("""select Date from WalmartSQL order by High desc""").first() # solution 1
spark.sql("""select Date from WalmartSQL order by High desc limit 1""").show() # solution 2


+----------+
|      Date|
+----------+
|2015-01-13|
+----------+



In [7]:
# DSL donc en spark
# Walmart2.orderBy(F.col("High").desc()).select(F.col("Date")).head() # solution 1
Walmart2.select(F.col("Date")) \
        .orderBy(F.col("High").desc()) \
        .head() # solution 1 bis ( ressemble au SQL)

Row(Date='2015-01-13')

Q6 : What is the mean of the Close column?

In [8]:
# SQL
spark.sql("""select mean(Close) as Moyenne from WalmartSQL""").show()

+-----------------+
|          Moyenne|
+-----------------+
|72.38844998012726|
+-----------------+



In [9]:
# DSL
# Walmart2.select("Close") \
#        .summary("mean") \
#        .show()  # solution 1

Walmart2.agg(F.mean("Close") \
        .alias("Moyenne")) \
        .show() # solution 2 (on peut changer mean par avg)

+-----------------+
|          Moyenne|
+-----------------+
|72.38844998012726|
+-----------------+



Q 7 : What is the max and min of the Volume column?

In [10]:
# SQL
spark.sql("""select max(Volume), min(Volume) from WalmartSQL""").show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|    9994400|   10010500|
+-----------+-----------+



In [11]:
# DSL
Walmart2.agg(F.max("Volume"),F.min("Volume")) \
        .show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|    9994400|   10010500|
+-----------+-----------+



Q 8 : How many days was the Close lower than 60 dollars?

In [12]:
# SQL
spark.sql("""select count(Date) from WalmartSQL where Close < '60' """).show()

+-----------+
|count(Date)|
+-----------+
|         81|
+-----------+



In [13]:
# DSL
#Walmart2.filter(F.col("Close") < '60') \
#        .agg(F.count("Date")) \ # faire l'aggregation apres le filter
#        .show()  # solution 1

Walmart2.filter(F.col("Close") < '60') \
        .count()   # solution 2

81

Q 10 : What percentage of the time was the High greater than 80 dollars ?(In other words, (Number of Days High>80)/(Total Days in the dataset))

In [14]:
# SQL
spark.sql(""" select round((select count(*) from WalmartSQL
                where High>='80')/(count(*)) * 100, 2 ) as Percentage  from WalmartSQL """).show()


+----------+
|Percentage|
+----------+
|      9.14|
+----------+



In [15]:
# DSL
Temp = Walmart2.filter(F.col("High")>'80') \
        .agg(F.count("*").alias("Comptage")) \
        .collect()[0][0] 
Walmart2.agg(F.round((Temp/F.count("*")*100),2).alias("Percentage")) \
        .show()

+----------+
|Percentage|
+----------+
|      9.14|
+----------+



Q 11 : What is the max High per year?

In [16]:
# SQL
spark.sql(""" select max(High) as max_High, substr(Date,1,4) as Annee from WalmartSQL group by Annee """).show() 

+---------+-----+
| max_High|Annee|
+---------+-----+
|75.190002| 2016|
|77.599998| 2012|
|88.089996| 2014|
|81.370003| 2013|
|90.970001| 2015|
+---------+-----+



In [17]:
# DSL
Walmart2.groupBy(F.year("Date").alias("Année"))\
        .agg(F.max("High").alias("Max_High"))\
        .sort(F.year("Date"))\
        .show()

+-----+---------+
|Année| Max_High|
+-----+---------+
| 2012|77.599998|
| 2013|81.370003|
| 2014|88.089996|
| 2015|90.970001|
| 2016|75.190002|
+-----+---------+



Q12- What is the average Close for each Calendar Month? In other words, across all the years, what is the average Close price for Jan,Feb, Mar, etc... Your result will have a value for each of these months.

In [18]:
# SQL
spark.sql("""select month(Date) as Month, mean(Close) as Mean_Close 
                        from WalmartSQL group by Month order by Month""").show()

+-----+-----------------+
|Month|       Mean_Close|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+



In [19]:
# DSL
Walmart2.groupBy(F.month("Date").alias("Month"))\
        .agg(F.mean(F.col("Close")).alias("Mean_Close"))\
        .orderBy("Month")\
        .show()

+-----+-----------------+
|Month|       Mean_Close|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

