# Walmart's stock price analysis using PySpark

## Introduction

#### Use the walmart_stock.csv file to Answer and complete the  tasks below!
https://www.kaggle.com/datasets/amandam1/walmart-stock-20122016

## Mandatory steps to follow:

1. **Starting the spark session:** In this segment we will setup the spark session so that we can use it with the python's distribution of Spark i.e. **PySpark**.


2. **Reading the dataset:** In this section we will be reading the walmart stock price data which I have first downloaded from **Kaggle** and then extracted the stock data from **2012-2017** and we will be analyzing the data for these years only in this particular blog.

**Let's get started and analyze the walmart stocks using PySpark** 

## Setting up a simple Spark Session

This is one of the mandatory things to do before getting started with PySpark i.e. to **setup an environment for the Spark** to use the python's PySpark library and use all of it's resources in that session.

**Note:** Before importing stuffs don't forget to install PySpark library, Command to install the library is **!pip install pyspark**.

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 33.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=a3412aa65aab592354c67ba92994d77a34ce7f642617a3884b41f66ddd6bd632
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
from pyspark.sql import SparkSession

spark_walmart = SparkSession.builder.appName("Walmart_Stock_Price").getOrCreate()
spark_walmart

**Code breakdown**

Here we will breakdown the above code and understand each element required for creating and starting the Spark environment.

1. Firstly we have imported **SparkSession** object from the pyspark.sql library and this object will hold all the curated functions that are required.

2. Then using the SparkSession object we are calling the builder function which will **build** the session and give the specific name to the session using **AppName** at the end we will create the environment using **getOrCreate()** function.

3. At the last we will be looking at **Spark UI** which curates all the neccessary information about the Spark's environment like it's **version**, **Branch (Master)** and the **AppName**.

## Reading the Walmart stock price data

In this section we will be reading the Walmart's stock price data using PySpark and store it in the variable to use it for further analysis. As we know that in **pandas** we used **csv()** function similarly we use **read.csv()** function in **PySpark**. Let's further discuss about the same.


In [None]:
df = spark_walmart.read.csv('walmart_stock.csv',header=True,inferSchema=True)
df.show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000000004|
|2012-01-10|             59.43|59.709998999999996|             5

**Inference:** So here we can see that **show()** function have returned the **top 20 rows** of the dataset. Note that we have keep the **header type as True** so that spark will treat first row as header and **inferSchema** is also set to **True** so that it return the values with real data type.

## Understanding about dataset

In this section we will be using the relevant functions of PySpark to **analyze the dataset** from analyzing here I mean that we will see **what our dataset looks like**, **what is the structure of the same** and **what formatting needs to be done** as a cleaning part.

Here are the following things that we will be covering here:

1. Looking at the **name of the columns** that walmart data holds.
2. Understanding about the **Schema of the dataset**.
3. Looping through the **specific number of rows** in the data.
4. Understanding about the **statistical information** that our data signifies.

**Let's have a look on the columns names.**

In [None]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

**Inference:** From the above output we can see that it returned the list of values where **'values'** denotes the **name of the columns** which are present and for that we used the **columns** object.

**Now we will see how the schema of dataset looks like!**

In [None]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



**Inference:** Here by using the **printScehma()** function we are actually looking at the data type of each column of the dataset and here we can note one thing as well which is **"nullable = True"** this signifies that the column can hold the **NULL** values.

**Looping through the data and fetching top 5 rows.**

In [None]:
for row in df.head(5):
    print(row)
    print('\n')

Row(Date='2012-01-03', Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996)


Row(Date='2012-01-04', Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475)


Row(Date='2012-01-05', Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539)


Row(Date='2012-01-06', Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922)


Row(Date='2012-01-09', Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)




**Inference:** In the above output we can see that it returned the "**ROW**" object and in this row object it hold the real top 5 data ( because we iterated through **top 5 data using the head() function** ) and this is one of the ways where we can extract the one or multiple tuples of record seperately.

**Note:** This is completely optional thing to involve in this analysis as we will be using this concept if we want to hold each row in a different variable to play with each data point.

**Using describe() function to see the statistical information of dataset.**

In [None]:
df.describe().show()

+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|      Date|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1258|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean|      null| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|      null|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|2012-01-03|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|2016-12-30|         90.800003|        90.970001|            89.25|        90.4700

## Formatting the dataset

In this section we will **format our dataset** to make it more clearer and precise so that we will have the clean data which will be easier to analyze than to the current state of the data as the inferences which we will draw now will not **give clearer picture of the results.**

**Hence here we will first format some of the columns to their nearest integer value and along with that adding one column too for avoid further calculations**

So let's format our dataset and perform each step one by one:

Before moving further and changing the formatting of the data points let us first see on what columns we have to apply those changes and for that we will use the **combination of printSchema and describe function**

In [None]:
df.describe().printSchema()

root
 |-- summary: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



**Inference:** In the above output we can see the data type of each method that was returned by describe function and from the output we can see that all the columns hold the string values which is not good if we want to format them and further analysing them 

Hence now it's our task to first change the data type of these columns (specifically for **mean** and **stddev**) and later format it for better understanding. 

As discussed earlier, now as we know what to do its time to know how to do and to answer this question we will be using the **format_number** function from the "**functions**" package and this will help us to,

* Firstly change the data type of the column to the relevant type
* Secondly it will also converts the values to the nearest integer.

In [None]:
from pyspark.sql.functions import format_number

In [None]:
result = df.describe()
result.select(result['summary'],
              format_number(result['Open'].cast('float'),2).alias('Open'),
              format_number(result['High'].cast('float'),2).alias('High'),
              format_number(result['Low'].cast('float'),2).alias('Low'),
              format_number(result['Close'].cast('float'),2).alias('Close'),
              result['Volume'].cast('int').alias('Volume')
             ).show()

+-------+--------+--------+--------+--------+--------+
|summary|    Open|    High|     Low|   Close|  Volume|
+-------+--------+--------+--------+--------+--------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|    1258|
|   mean|   72.36|   72.84|   71.92|   72.39| 8222093|
| stddev|    6.77|    6.77|    6.74|    6.76| 4519780|
|    min|   56.39|   57.06|   56.30|   56.42| 2094900|
|    max|   90.80|   90.97|   89.25|   90.47|80898100|
+-------+--------+--------+--------+--------+--------+



**Code breakdown:**

1. In this first step of formatting the data we have imported the **format_number** class from the **functions** package of pyspark.
2. Then, as we already knew that we have to change the data of **mean and stddev** so based on that **describe** method has been used and stored in the seperate variable.
3. Then comes the main thing where firstly select statement is used (equally to SQL's select statement) and here,
    * **cast** function is used to change the data type (type casting) and setting the decimal values after point.
    * **alias** function is used to change the column name (not permanently just for first time view)

Now let's create an altogether new DataFrame which will have one column named as **HV ratio** which will stimulates the ratio of **High Price** and **Total Volume** of stock which were traded for a day.

In [None]:
df2 = df.withColumn("HV Ratio",df["High"]/df["Volume"])#.show()
# df2.show()
df2.select('HV Ratio').show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



**Inference**

Here in the output one can see that this new DataFrame holds the ratio of discussed fields, we introduced the column in this DataFrame with the help of **"withColumn()"** function and then simply performed the **ratio operation** and showed it with combination of **select and show** statement.

Enough of preparing the dataset! We are suppose to do the analysis of **Walmart Stock Price** and now it's time for it!

## Questions to analyze using PySpark

In this section we will give answer to some questions proposed to us by a firm to give us a feel of the data science consulting projects here we will draw the insights using the PySpark's data preprocessing technique

1. On what day stock price was the highest?
2. What is the average of Closing price?
3. What is the maximum and minimum Volume of stock traded
4. For how many days the closing value was less than 60 dollars?
5. What could be the maximum high value for each year?

**On what day stock price was the highest?**

In [None]:
df.orderBy(df["High"].desc()).head(1)[0][0]

'2015-01-13'

**Inference:** In the above output we can see that we got the date on which stock price was highest by usinh the orderBy function and selecting it as **descending** order then we have simply used **head** function with a little bit of **indexing** to fetch the **date object** from the data

**What is the average of Closing price?**

In [None]:
from pyspark.sql.functions import mean
df.select(mean("Close")).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



**Inference:** From the above output we can say that the average closing price is **72.38844998012726** and we have fethed this value by using the **select** statement and then **mean** function to show the **mean closing stock price**

**Note:** Here we could have also used the describe method but I wanted you to know the operations that are possible with PySpark

**What is the maximum and minimum Volume of stock traded**

In [None]:
from pyspark.sql.functions import max,min

In [None]:
df.select(max("Volume"),min("Volume")).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



**Inference:** To get the maximum and minumum volume of the stocks we have first imported the "max" and "min" function using **"functions"** package and then it's like walking on the cake, we have simply used these function to get the desired results using select statement. Here also we can use the **describe** function.

**For how many days the closing value was less than 60 dollars?**

In [None]:
df.filter("Close < 60").count()

81

In [None]:
df.filter(df['Close'] < 60).count()

81

In [None]:
from pyspark.sql.functions import count
result = df.filter(df['Close'] < 60)
result.select(count('Close')).show()

+------------+
|count(Close)|
+------------+
|          81|
+------------+



**Inference:** Now to get the total number of days to get the closing value which is less than 60 dollars we have to follow **two steps** to get the desired results:

1. Using the **filter operations** we are filtering the **closing value less than 60** so we could get the required data only and then with the count function counting the total number of filtered records.
2. Then we are simply using the **select** statement and putting everything in this select statement

**What could be the maximum high value for each year?**

In [None]:
from pyspark.sql.functions import year
yeardf = df.withColumn("Year",year(df["Date"]))

In [None]:
max_df = yeardf.groupBy('Year').max()

In [None]:
max_df.select('Year','max(High)').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



**Inference:** To get the maximum value of stock price for each year (as it will be more informative in terms of collective information to analyze) we need to follow **three steps** and they are as follows:
1. Firstly we have imported the **"year"** function from functions package then we created a new dataframe and inserted a new column named as **Year** along with that extracted it from the date column.

2. Then by using the GroupBy function we simply grouped the Yearly column with max aggregate function.

3. At the last we have shown the maximum value per year using the show function, note that we use the max function for the output parameter.

## Conclusion

In this section we will discuss about whatever we have learned so far in this blog from discussing about the setup up the Spark session .....

#### What percentage of the time was the High greater than 80 dollars ?
#### In other words, (Number of Days High>80)/(Total Days in the dataset)

In [None]:
# 9.14 percent of the time it was over 80
# Many ways to do this
(df.filter(df["High"]>80).count()*1.0/df.count())*100

9.141494435612083

#### What is the Pearson correlation between High and Volume?
#### [Hint](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameStatFunctions.corr)

In [None]:
from pyspark.sql.functions import corr
df.select(corr("High","Volume")).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



#### What is the average Close for each Calendar Month?
#### In other words, across all the years, what is the average Close price for Jan,Feb, Mar, etc... Your result will have a value for each of these months. 

In [None]:
from pyspark.sql.functions import month
monthdf = df.withColumn("Month",month("Date"))
monthavgs = monthdf.select("Month","Close").groupBy("Month").mean()
monthavgs.select("Month","avg(Close)").orderBy('Month').show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

