In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.0-bin-hadoop3.2')

In [2]:
from pyspark.sql import SparkSession

In [15]:
# Check out some date functions
from pyspark.sql.functions import format_number


In [3]:
spark = SparkSession.builder.appName('exercise').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/23 17:00:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Read-in data and explore

In [4]:
df = spark.read.csv('../data/Spark_DataFrame_Project_Exercise/walmart_stock.csv', 
                     inferSchema=True, 
                     header=True)

                                                                                

In [5]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [8]:
df.collect()[:5]

[Row(Date='2012-01-03', Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996),
 Row(Date='2012-01-04', Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475),
 Row(Date='2012-01-05', Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539),
 Row(Date='2012-01-06', Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922),
 Row(Date='2012-01-09', Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)]

In [31]:
desc_df = df.describe()
desc_df.select([desc_df['summary'], desc_df['date']]+[desc_df[i].cast('integer') for i in desc_df.columns[2:]]).show()

+-------+----------+----+----+----+-----+--------+---------+
|summary|      date|Open|High| Low|Close|  Volume|Adj Close|
+-------+----------+----+----+----+-----+--------+---------+
|  count|      1258|1258|1258|1258| 1258|    1258|     1258|
|   mean|      null|  72|  72|  71|   72| 8222093|       67|
| stddev|      null|   6|   6|   6|    6| 4519780|        6|
|    min|2012-01-03|  56|  57|  56|   56| 2094900|       50|
|    max|2016-12-30|  90|  90|  89|   90|80898100|       84|
+-------+----------+----+----+----+-----+--------+---------+



## Analysis

In [39]:
#High Price vs. volume (HV) ratio
hv_ratio = df.select(
    (df['High'] / df['Volume']).alias('HV Ratio'))

In [40]:
hv_ratio.show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



In [50]:
# Day of highest price?
max_price = df.agg({'High':'max'}).collect()[0][0]
df.filter(df['High']==max_price).select('Date').collect()

[Row(Date='2015-01-13')]

In [56]:
# Mean close?
df.agg({'Close': 'mean'}).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



In [57]:
# Min / max volume?
from pyspark.sql.functions import min, max
df.agg(min('Volume'), max('Volume')).show()

+-----------+-----------+
|min(Volume)|max(Volume)|
+-----------+-----------+
|    2094900|   80898100|
+-----------+-----------+



In [59]:
# Days with close less than 60?
df.filter('Close < 60').count()

81

In [61]:
# Percent of time with High > 80?

df.filter('High > 80').count() / df.count()

0.09141494435612083

In [63]:
# Pearson's corr between volume and high?
from pyspark.sql.functions import corr
df.select(corr(df['High'], df['Volume'])).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



In [68]:
# Max high per year?
from pyspark.sql.functions import month
(df.withColumn("Month", month("Date"))
 .groupby('Month')
 .mean()
 .select('Month','avg(Close)')
 .orderBy('Month')
 .show())

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

