## Spark  DataFrames Project 
### Walmart Stock 
@Author - Amruta Abhyankar

In [3]:
# Start a Simple Spark Session
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('project').getOrCreate()

In [6]:
#Load the Walmart Stock CSV File, have Spark infer data types.
df = spark.read.csv("/Users/amrutaabhyankar/Downloads/Python-and-Spark-for-Big-Data-master/Spark_DataFrame_Project_Exercise/walmart_stock.csv",
                   header=True,inferSchema=True)

In [8]:
# Get the column names
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [9]:
# Get the schema 
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [10]:
# Printing firt five rows
df.head(5)


[Row(Date='2012-01-03', Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996),
 Row(Date='2012-01-04', Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475),
 Row(Date='2012-01-05', Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539),
 Row(Date='2012-01-06', Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922),
 Row(Date='2012-01-09', Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)]

In [20]:
for row in df.head(5):
    print(row)
    print("\n")

Row(Date='2012-01-03', Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996)


Row(Date='2012-01-04', Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475)


Row(Date='2012-01-05', Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539)


Row(Date='2012-01-06', Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922)


Row(Date='2012-01-09', Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)




In [21]:
# Learn about the DataFrame
df.describe().show()

+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|      Date|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1258|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean|      null| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|      null|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|2012-01-03|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|2016-12-30|         90.800003|        90.970001|            89.25|        90.4700

In [22]:
df.describe().printSchema()

root
 |-- summary: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [23]:
from pyspark.sql.functions import format_number

In [24]:
result = df.describe()

In [27]:
result.select(result['summary'],
             format_number(result['Open'].cast('float'),2).alias('Open'),
             format_number(result['High'].cast('float'),2).alias('High'),
             format_number(result['Low'].cast('float'),2).alias('Low'),
             format_number(result['Close'].cast('float'),2).alias('Close'),
             format_number(result['Volume'].cast('int'),2).alias('Volume')).show()

+-------+--------+--------+--------+--------+-------------+
|summary|    Open|    High|     Low|   Close|       Volume|
+-------+--------+--------+--------+--------+-------------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|     1,258.00|
|   mean|   72.36|   72.84|   71.92|   72.39| 8,222,093.00|
| stddev|    6.77|    6.77|    6.74|    6.76| 4,519,780.00|
|    min|   56.39|   57.06|   56.30|   56.42| 2,094,900.00|
|    max|   90.80|   90.97|   89.25|   90.47|80,898,100.00|
+-------+--------+--------+--------+--------+-------------+



### Create a new dataframe with a column called HV Ratio that is the ratio of the High Price versus volume of a stock traded for a day

In [28]:
df2 = df.withColumn("HV Ratio",df["High"]/df["Volume"])
df2.select('HV Ratio').show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



### What day had the Peak High Price?

In [29]:
df.orderBy(df['High'].desc()).show()

+----------+-----------------+-----------------+-----------------+-----------------+--------+-----------------+
|      Date|             Open|             High|              Low|            Close|  Volume|        Adj Close|
+----------+-----------------+-----------------+-----------------+-----------------+--------+-----------------+
|2015-01-13|        90.800003|        90.970001|            88.93|        89.309998| 8215400|        83.825448|
|2015-01-08|        89.209999|90.66999799999999|            89.07|        90.470001|12713600|84.91421600000001|
|2015-01-09|            90.32|        90.389999|            89.25|        89.349998| 8522500|        83.862993|
|2015-01-12|        89.360001|        90.309998|        89.220001|        90.019997| 7372500|        84.491846|
|2015-01-23|88.41999799999999|        89.260002|        87.889999|        88.510002| 7565800|83.07458100000001|
|2015-01-26|        88.309998|        89.160004|        88.120003|        88.629997| 4666700|        83.

In [30]:
df.orderBy(df['High'].desc()).head(1)

[Row(Date='2015-01-13', Open=90.800003, High=90.970001, Low=88.93, Close=89.309998, Volume=8215400, Adj Close=83.825448)]

In [31]:
df.orderBy(df['High'].desc()).head(1)[0]

Row(Date='2015-01-13', Open=90.800003, High=90.970001, Low=88.93, Close=89.309998, Volume=8215400, Adj Close=83.825448)

In [32]:
df.orderBy(df['High'].desc()).head(1)[0][0]

'2015-01-13'

### Mean of the Close Column

In [33]:
from pyspark.sql.functions import mean
df.select(mean("Close")).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



### Calculating the Max and Mean of the Volume Column

In [35]:
from pyspark.sql.functions import max,min
df.select(max("Volume"),min("Volume")).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



### How many days was the Close lower than 60 dollars?

In [36]:
df.filter('Close < 60').count()

81

In [37]:
# Alternate way
df.filter(df['Close']<60).count()

81

In [38]:
from pyspark.sql.functions import count

In [39]:
result = df.filter(df['Close']<60)

In [41]:
result.select(count('Close')).show()

+------------+
|count(Close)|
+------------+
|          81|
+------------+



### What percentage of the time was the High greater than 80 dolllars ?

In [43]:
(df.filter(df["High"]>80).count()/df.count())*100

9.141494435612083

### What is the Pearson correaltion between high and Volume?

In [44]:
from pyspark.sql.functions import corr
df.select(corr('High','Volume')).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



### What is the max high per year?

In [45]:
from pyspark.sql.functions import year
yeardf = df.withColumn("Year",year(df['Date']))

In [46]:
maxdf = yeardf.groupBy('Year').max()

In [48]:
maxdf.select('Year',"max(High)").show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



In [49]:
maxdf.select('Year',"max(High)").orderBy('Year').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2012|77.599998|
|2013|81.370003|
|2014|88.089996|
|2015|90.970001|
|2016|75.190002|
+----+---------+



### What is the average Close for each Calendar Month ?

In [50]:
from pyspark.sql.functions import month

In [51]:
monthdf = df.withColumn('Month',month('Date'))

In [52]:
monthavgs = monthdf.select(['Month','Close']).groupBy('Month').mean()

In [53]:
monthavgs.select(['Month','avg(Close)']).orderBy('Month').show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

