# Playing with Walmart Stock Data

In this notebook, I am trying some basic things with Pyspark. 

First, I am starting a simple Spark session:    

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("dfbasics").getOrCreate() 

In this notebook I am using a dataset stored in `walmart_stock.csv`.
I don't know whether I can share it or not, so I prefer not sharing it by now. 

Then, the next step is to load this Walmart Stock CSV file and let Spark infer the data types.    

In [2]:
file = "walmart_stock.csv"
df = spark.read.csv(file, header = True, inferSchema = True)

Let's take a glance at this dataset using Pyspark. How many columns does this dataset have and which are their names?

In [3]:
print("This dataset has {} columns. Their names are:\n  {}".format(len(df.columns), df.columns))

This dataset has 7 columns. Their names are:
  ['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']


Now, what kind of values contains each column?

In [4]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



We see that all columns except Date (string values) and Volume (integer values) contain doubles.

Now I am wondering how this dataset looks like, so I am printing out its first 5 rows:

In [5]:
df.limit(5).toPandas()

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
0,2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619235
1,2012-01-04,60.209999,60.349998,59.470001,59.709999,9593300,52.078475
2,2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539
3,2012-01-06,59.419998,59.450001,58.869999,59.0,8069400,51.45922
4,2012-01-09,59.029999,59.549999,58.919998,59.18,6679300,51.616215


When the dataset to show is small, I prefer to convert the Spark dataframe to a Pandas dataframe. However, this is not the command which can be used for printing a dataframe. In the two following pieces of code I am doing the same with .show(5), and with .sql().show().

In [6]:
df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [7]:
df.registerTempTable('dataset')
spark.sql('SELECT * FROM dataset LIMIT 5').show()

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+



Before the SQL command, I am using registerTempTable('dataset') method so that besides being able to use the methods provided by Spark, I can also employ SQL queries.

Next, I use **describe()** to get some more information about the dataframe: count, mean, standard deviation, minimum and maximum for each column:

In [8]:
df.describe().toPandas()

Unnamed: 0,summary,Date,Open,High,Low,Close,Volume,Adj Close
0,count,1258,1258.0,1258.0,1258.0,1258.0,1258.0,1258.0
1,mean,,72.35785375357709,72.83938807631165,71.9186009594594,72.38844998012726,8222093.481717011,67.23883848728146
2,stddev,,6.76809024470826,6.768186808159218,6.744075756255496,6.756859163732991,4519780.8431556,6.722609449996857
3,min,2012-01-03,56.389999,57.060001,56.299999,56.419998,2094900.0,50.363689
4,max,2016-12-30,90.800003,90.970001,89.25,90.470001,80898100.0,84.91421600000001


You might think that it is ugly how all values were represented in the previous dataframes (more than 6 decimal places? Awful!). Now, I am formatting the dataset so that it only shows up to two decimal places. This one was a little hard for a beginner and was my teacher José Manuel Moya who solved it, so all credits to him!

In [9]:
from pyspark.sql.functions import format_number

desc = df.describe()
desc.select('summary', 'Date', 
            format_number(desc['Open'].cast('float'), 2).alias('Open'),
            format_number(desc['High'].cast('float'), 2).alias('High'),
            format_number(desc['Low'].cast('float'), 2).alias('Low'),
            format_number(desc['Close'].cast('float'), 2).alias('Close'),
            format_number(desc['Volume'].cast('float'), 2).alias('Volume'),
            format_number(desc['Adj Close'].cast('float'), 2).alias('Adj Close')
           ).toPandas()

Unnamed: 0,summary,Date,Open,High,Low,Close,Volume,Adj Close
0,count,1258,1258.0,1258.0,1258.0,1258.0,1258.0,1258.0
1,mean,,72.36,72.84,71.92,72.39,8222093.5,67.24
2,stddev,,6.77,6.77,6.74,6.76,4519781.0,6.72
3,min,2012-01-03,56.39,57.06,56.3,56.42,2094900.0,50.36
4,max,2016-12-30,90.8,90.97,89.25,90.47,80898096.0,84.91


We can see the appearence of the description dataframe has improved a lot.

In the following code cell I'm creting a new DataFrame with a column called **HV Ratio**, which is the ratio of the High Price vs Volume of Stock traded for a day. Thanks to the method **.withColumn()**, we can add new columns to the dataframe.

In [10]:
new_df = df.withColumn("HV Ratio", df["High"]/df["Volume"])

In [11]:
new_df.limit(5).toPandas()

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close,HV Ratio
0,2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619235,5e-06
1,2012-01-04,60.209999,60.349998,59.470001,59.709999,9593300,52.078475,6e-06
2,2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539,5e-06
3,2012-01-06,59.419998,59.450001,58.869999,59.0,8069400,51.45922,7e-06
4,2012-01-09,59.029999,59.549999,58.919998,59.18,6679300,51.616215,9e-06


Now I want to know what day had the peak High. First, I am storing a float containing the value of the maximum in **High**, through the use of a resilient distributed dataset (.rdd). With this value being stored, it is easy to filter the dataframe and select the **Date** with the maximum **High** value.

In [12]:
max_high = df.select("High").rdd.max()[0]

df.filter(df["High"] == max_high).select("Date").head()[0]

'2015-01-13'

An easier way to solve this is to issue a SQL query, selecting only the first value in **Date** column after ordering the dataset by **High** in descending order.

In [13]:
spark.sql('SELECT Date FROM dataset ORDER BY High DESC LIMIT 1').head()[0]

'2015-01-13'

Another possible question: what is the mean of the some column? For example, **Close**.

In [14]:
one_possible_answer = df.agg({"Close": "mean"}).head()[0]
another_possible_answer = spark.sql('SELECT MEAN(Close) FROM dataset').head()[0]

if one_possible_answer == another_possible_answer:
    print(one_possible_answer)

72.38844998012726


We see we can solve this question through the use of **.agg** method and with a SQL query.

And what if I wanted to know what is the maximum (or minimum) value of a given column? For example, **Volume**.

In [15]:
print("Using .agg() Spark method:\n")
df.agg(F.min(F.col("Volume")), F.max(F.col("Volume"))).show()

print("    Using a SQL query:\n")
spark.sql('SELECT MIN(Volume), MAX(Volume) FROM dataset').show()

Using .agg() Spark method:

+-----------+-----------+
|min(Volume)|max(Volume)|
+-----------+-----------+
|    2094900|   80898100|
+-----------+-----------+

    Using a SQL query:

+-----------+-----------+
|min(Volume)|max(Volume)|
+-----------+-----------+
|    2094900|   80898100|
+-----------+-----------+



Also here, we obtain the same result both with .agg() method and with a SQL query.

Next question: how many days was the Close lower than 60 dollars?

In [16]:
one_possible_answer = df.where(df["Close"] < 60).count()
another_possible_answer = spark.sql('SELECT COUNT(CASE WHEN Close < 60 THEN 1 END) FROM dataset').head()[0]

if one_possible_answer == another_possible_answer:
    print("There were {} days where Close was lower than 60 dollars.".format(one_possible_answer))

There were 81 days where Close was lower than 60 dollars.


Some more questions:

What percentage of time was **High** column greater than 80 dollars?

In [17]:
print("%.0f%%" % (df.filter(df["High"]>80).count()/df.count()*100))

9%


What is the Pearson correlation between High and Volume?

In [18]:
print("The Pearson correlation between High and Volume columns is %.2f" % df.stat.corr("High", "Volume"))

The Pearson correlation between High and Volume columns is -0.34


These two variables have a low negative correlation.

What is the max High per year?

In [19]:
spark.sql('SELECT year(Date) AS Year, max(High) FROM dataset GROUP BY year(Date) ORDER BY year(Date)').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2012|77.599998|
|2013|81.370003|
|2014|88.089996|
|2015|90.970001|
|2016|75.190002|
+----+---------+



Finally, what is the average Close for each calendar month?

In [20]:
spark.sql('SELECT month(Date) AS Month, mean(Close) FROM dataset GROUP BY month(Date) ORDER BY month(Date)').show()

+-----+-----------------+
|Month|      mean(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+



In this notebook, I have made an introduction of some basic functionalities of Spark to deal with datasets. These exercises were proposed by José Manuel Moya, my instructor in Big Data Engineering subject.

I hope you enjoy this work!