In [1]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("Walmart_DA").getOrCreate())

In [2]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, DoubleType

In [3]:
schema = StructType([
        StructField('Date',TimestampType(), True),
        StructField('Open',DoubleType(), True),
        StructField('High',DoubleType(), True),
        StructField('Low',DoubleType(), True),
        StructField('Close',DoubleType(), True),
        StructField('Adj Close',DoubleType(), True),
        StructField('Volume',IntegerType(), True),
])

In [4]:
df = spark.read.csv(path='Walmart-Data/WMT.csv', schema=schema, header=True)

In [5]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [6]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']

In [7]:
df.show(10, False)

+-------------------+--------+--------+--------+--------+---------+-------+
|Date               |Open    |High    |Low     |Close   |Adj Close|Volume |
+-------------------+--------+--------+--------+--------+---------+-------+
|1972-08-25 00:00:00|0.063477|0.064697|0.063477|0.064453|0.036447 |2508800|
|1972-08-28 00:00:00|0.064453|0.064941|0.064209|0.064209|0.036309 |972800 |
|1972-08-29 00:00:00|0.063965|0.063965|0.063477|0.063477|0.035895 |1945600|
|1972-08-30 00:00:00|0.063477|0.063477|0.062988|0.063477|0.035895 |409600 |
|1972-08-31 00:00:00|0.062988|0.062988|0.0625  |0.0625  |0.035343 |870400 |
|1972-09-01 00:00:00|0.062744|0.062988|0.062744|0.062988|0.035619 |256000 |
|1972-09-05 00:00:00|0.062988|0.062988|0.0625  |0.0625  |0.035343 |563200 |
|1972-09-06 00:00:00|0.062988|0.062988|0.062988|0.062988|0.035619 |256000 |
|1972-09-07 00:00:00|0.062988|0.062988|0.062744|0.062744|0.035481 |1177600|
|1972-09-08 00:00:00|0.0625  |0.0625  |0.062256|0.062256|0.035205 |665600 |
+-----------

In [8]:
df.describe().show()

+-------+-----------------+------------------+------------------+------------------+------------------+-----------------+
|summary|             Open|              High|               Low|             Close|         Adj Close|           Volume|
+-------+-----------------+------------------+------------------+------------------+------------------+-----------------+
|  count|            12491|             12491|             12491|             12491|             12491|            12491|
|   mean|35.47919765615241|35.811310182371315|35.151301605716114| 35.48537980233778|28.948680092146382|7540559.743735489|
| stddev|37.20707114804321| 37.51941889361142| 36.91418633394463|37.214409106712566|34.585374876991864|6237161.700819969|
|    min|         0.015625|          0.015625|          0.014404|          0.015625|          0.008889|                0|
|    max|       153.600006|        153.660004|        151.660004|        152.789993|        150.842651|        131833600|
+-------+---------------

In [9]:
from pyspark.sql.functions import format_number

In [10]:
summary = df.describe()
summary.select(summary['summary'],
               format_number(summary['Open'].cast('float'), 2).alias('Open'),
               format_number(summary['High'].cast('float'), 2).alias('High'),
               format_number(summary['Low'].cast('float'), 2).alias('Low'),
               format_number(summary['Close'].cast('float'), 2).alias('Close'),
               format_number(summary['Volume'].cast('int'),0).alias('Volume')
 ).show()

+-------+---------+---------+---------+---------+-----------+
|summary|     Open|     High|      Low|    Close|     Volume|
+-------+---------+---------+---------+---------+-----------+
|  count|12,491.00|12,491.00|12,491.00|12,491.00|     12,491|
|   mean|    35.48|    35.81|    35.15|    35.49|  7,540,559|
| stddev|    37.21|    37.52|    36.91|    37.21|  6,237,161|
|    min|     0.02|     0.02|     0.01|     0.02|          0|
|    max|   153.60|   153.66|   151.66|   152.79|131,833,600|
+-------+---------+---------+---------+---------+-----------+



#### HV ratio. High Price Vs Volume of Stock

In [11]:
hv_df = df.withColumn('HV Ratio',df['High']/df['Low']).select(['HV Ratio'])
hv_df.show()

+------------------+
|          HV Ratio|
+------------------+
|1.0192195598405722|
| 1.011400270990048|
|1.0076878239362286|
|1.0077633835016193|
|          1.007808|
|1.0038888180543162|
|          1.007808|
|               1.0|
|1.0038888180543162|
|1.0039193009509124|
|               1.0|
|               1.0|
|               1.0|
|          1.011712|
|1.0038888180543162|
|1.0038737537308693|
|1.0078386019018246|
|               1.0|
|1.0243760302369336|
|1.0337533053351997|
+------------------+
only showing top 20 rows



#### mean closing price of the stock

In [12]:
from pyspark.sql.functions import mean

In [13]:
df.select(mean('Close')).show()

+-----------------+
|       avg(Close)|
+-----------------+
|35.48537980233778|
+-----------------+



#### mean opening price of the stock

In [14]:
df.select(mean('Open')).show()

+-----------------+
|        avg(Open)|
+-----------------+
|35.47919765615241|
+-----------------+



#### avg volume traded

In [15]:
df.select(mean('Volume')).show()

+-----------------+
|      avg(Volume)|
+-----------------+
|7540559.743735489|
+-----------------+



#### Max and Min values for Volume, Opening and Closing price traded

In [16]:
from pyspark.sql.functions import min, max

In [17]:
# Volume
df.select(max('Volume'), min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|  131833600|          0|
+-----------+-----------+



In [18]:
# Opening
df.select(max('Open'), min('Open')).show()

+----------+---------+
| max(Open)|min(Open)|
+----------+---------+
|153.600006| 0.015625|
+----------+---------+



In [19]:
# Closing
df.select(max('Close'), min('Close')).show()

+----------+----------+
|max(Close)|min(Close)|
+----------+----------+
|152.789993|  0.015625|
+----------+----------+



#### number of days Close lower than 60 $ 

In [20]:
df.filter(df['Close'] < 60).count()

9903

#### Number of days Open greater than 90 $

In [21]:
df.filter(df['Open'] > 90).count()

972

#### What percent of time was High greater than 90 $

In [22]:
df.filter('High > 90').count()/ df.count() * 100

7.885677687935313

In [23]:
# another approach
df.filter(df['High'] > 90).count()/ df.count() * 100

7.885677687935313

#### Correlation between high and volume

In [24]:
df.corr('High', 'Volume')

0.3241577762793653