In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('walmart').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/23 12:37:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [9]:
df=spark.read.csv("./walmart_stock.csv",inferSchema=True, header=True)

                                                                                

In [11]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [12]:
#print first 5 rows
df.head()

Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996)

In [13]:
#check the schema
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [14]:
#print general stats
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

In [15]:
#Format columns to show just 2 decimal places
from pyspark.sql.functions import format_number
summary = df.describe()
summary.select(summary['summary'],
              format_number(summary['Open'].cast('float'),2).alias('Open'),
              format_number(summary['High'].cast('float'),2).alias('High'),
              format_number(summary['Low'].cast('float'),2).alias('Low'),
              format_number(summary['Close'].cast('float'),2).alias('Close'),
              format_number(summary['Volume'].cast('float'),2).alias('Volume')
              ).show()

+-------+--------+--------+--------+--------+-------------+
|summary|    Open|    High|     Low|   Close|       Volume|
+-------+--------+--------+--------+--------+-------------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|     1,258.00|
|   mean|   72.36|   72.84|   71.92|   72.39| 8,222,093.50|
| stddev|    6.77|    6.77|    6.74|    6.76| 4,519,781.00|
|    min|   56.39|   57.06|   56.30|   56.42| 2,094,900.00|
|    max|   90.80|   90.97|   89.25|   90.47|80,898,096.00|
+-------+--------+--------+--------+--------+-------------+



In [16]:
#Create a new dataframe with a column called HV Ratio that is the ratio of the High Price
#versus volume of stock traded for a day
df_hv = df.withColumn('HV Ratio',df['High']/df['Volume']).select(['HV Ratio'])
df_hv.show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



In [17]:
#Which day has the peak high in price?
df.orderBy(df['High'].desc()).select(['Date']).head(1)[0]['Date']

datetime.datetime(2015, 1, 13, 0, 0)

In [18]:
#What is the mean of the "Close" column
from pyspark.sql.functions import mean
df.select(mean('Close')).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



In [19]:
#What is the maximum and minimum value of the "Volumn" column
from pyspark.sql.functions import min, max
df.select(max('Volume'),min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



In [20]:
#How many days did the stocks close lower than 60 dollars?
df.filter(df['Close'] <60).count()

81

In [21]:
#What percentage of the time was the "High" greater than 80 dollars?
df.filter('High>80').count() *100/df.count()

9.141494435612083

In [23]:
#What is the Pearson correlation between "High" and "Volume"
df.corr('High','Volume')

-0.3384326061737161

In [24]:
from pyspark.sql.functions import corr
df.select(corr(df['High'],df['Volume'])).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



In [26]:
#What is the max "High" per year?
from pyspark.sql.functions import (dayofmonth, hour, dayofyear, month, year, weekofyear, format_number, date_format)
year_df = df.withColumn('Year', year(df['Date']))
year_df.groupBy('Year').max()['Year','max(High)'].show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



In [27]:
#What is the average "Close" for each calender month?

#Create a new coLumn Month from existina Date coLumn
month_df = df.withColumn('Month', month(df['Date']))

#GrouD by month and take average or alL other coLumns
month_df = month_df.groupBy('Month').mean()

#sort by month
month_df = month_df.orderBy('Month') 

#Display only month and avg(Close), the desired columns
month_df['Month','avg(Close)'].show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

