In [1]:
import findspark
findspark.init('/home/osboxes/SparkClass/spark-2.4.3-bin-hadoop2.7')
import pyspark
import os

In [2]:
from pyspark.sql import SparkSession

In [3]:
app_name = 'exercises'

spark = SparkSession.builder.appName(app_name).getOrCreate()

In [7]:
path = '/home/osboxes/SparkClass/Spark_DataFrame_Project_Exercise/'

os.chdir(path)

os.listdir()

['Spark DataFrames Project Exercise.ipynb',
 'walmart_stock.csv',
 'Spark DataFrames Project Exercise - SOLUTIONS.ipynb']

In [8]:
df = spark.read.csv(path+'walmart_stock.csv',
                    inferSchema=True, header=True)
df.show(3)

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 3 rows



In [9]:
# get the schema

df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [10]:
# get the column names

df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [13]:
# print out the first 5 rows

# option 1 - df.show(5)


# option 2

for row in df.head(5):
    print(row)
    print('\n')

Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996)


Row(Date=datetime.datetime(2012, 1, 4, 0, 0), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475)


Row(Date=datetime.datetime(2012, 1, 5, 0, 0), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539)


Row(Date=datetime.datetime(2012, 1, 6, 0, 0), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922)


Row(Date=datetime.datetime(2012, 1, 9, 0, 0), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)




In [14]:
# describe and show

df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

In [15]:
from pyspark.sql.functions import format_number


In [19]:
df.describe().printSchema()

root
 |-- summary: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [20]:
# need to cast as numbers

result = df.describe()

In [29]:
result = df.describe()
result.select(result['summary'],
              format_number(result['Open'].cast('float'),2).alias('Open'),
              format_number(result['High'].cast('float'),2).alias('High'),
              format_number(result['Low'].cast('float'),2).alias('Low'),
              format_number(result['Close'].cast('float'),2).alias('Close'),
              result['Volume'].cast('int').alias('Volume')
                           ).show()

+-------+--------+--------+--------+--------+--------+
|summary|    Open|    High|     Low|   Close|  Volume|
+-------+--------+--------+--------+--------+--------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|    1258|
|   mean|   72.36|   72.84|   71.92|   72.39| 8222093|
| stddev|    6.77|    6.77|    6.74|    6.76| 4519780|
|    min|   56.39|   57.06|   56.30|   56.42| 2094900|
|    max|   90.80|   90.97|   89.25|   90.47|80898100|
+-------+--------+--------+--------+--------+--------+



In [34]:
# create new dataframe with column HV Ratio which is High/Volume

df2 = df.withColumn('HV Ratio', df['High']/df['Volume'])

df2.select('HV Ratio').show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



In [37]:
# What day had the peak High in price
# simple solution is sort

df.orderBy(df['High'].desc()).head(1)

[Row(Date=datetime.datetime(2015, 1, 13, 0, 0), Open=90.800003, High=90.970001, Low=88.93, Close=89.309998, Volume=8215400, Adj Close=83.825448)]

In [41]:
# what is the mean of the close column

from pyspark.sql.functions import mean

df.select(format_number(mean('Close'),2).alias('Avg Close')).show()

+---------+
|Avg Close|
+---------+
|    72.39|
+---------+



In [43]:
# what is the max and min of Volume columns

from pyspark.sql.functions import max, min

df.select(max('Volume').alias('Max Vol'), min('Volume').alias('Min Vol')).show()

+--------+-------+
| Max Vol|Min Vol|
+--------+-------+
|80898100|2094900|
+--------+-------+



In [44]:
# How many days was the close lower than 60 dollars
# sql method

df.filter('Close < 60').count()


81

In [45]:
# python way

df.filter(df['Close'] < 60).count()

81

In [46]:
from pyspark.sql.functions import count

In [47]:
result = df.filter(df['Close'] < 60)
result.select(count('Close')).show()

+------------+
|count(Close)|
+------------+
|          81|
+------------+



In [48]:
# what percentage og time was the high greater than 80
# (Number of Days HIgh > 80)/T(Total Days in the dataset)

(df.filter('High > 80').count()/df.count())*100

9.141494435612083

In [50]:
# what is the Pearson correlation between High and volume
from pyspark.sql.functions import corr

df.select(format_number(corr('High', 'Volume'),2).alias('Pearson Corr')).show()

+------------+
|Pearson Corr|
+------------+
|       -0.34|
+------------+



In [57]:
# what is the max High per year
from pyspark.sql.functions import year

yeardf = df.withColumn('Year', year(df['Date']))

# this statement creates the column name max(High)
max_df = yeardf.groupBy('Year').max()

max_df.select('Year', 'max(High)').orderBy(max_df['Year']).show()


+----+---------+
|Year|max(High)|
+----+---------+
|2012|77.599998|
|2013|81.370003|
|2014|88.089996|
|2015|90.970001|
|2016|75.190002|
+----+---------+



In [58]:
# what is the average close for each calendar month

from pyspark.sql.functions import month

In [68]:
month_df = df.withColumn('Month', month(df['Date']))

# this will create the column mean(Close)

avg_df = month_df.select(['Month', 'Close']).groupBy('Month').mean()

avg_df.select('Month',format_number('avg(Close)', 2).alias('Avg Close')).orderBy('Month').show()

+-----+---------+
|Month|Avg Close|
+-----+---------+
|    1|    71.45|
|    2|    71.31|
|    3|    71.78|
|    4|    72.97|
|    5|    72.31|
|    6|    72.50|
|    7|    74.44|
|    8|    73.03|
|    9|    72.18|
|   10|    71.58|
|   11|    72.11|
|   12|    72.85|
+-----+---------+

