In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Setting up PySpark

In [None]:
!pip install pyspark

# Import SparkSession
from pyspark.sql import SparkSession

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 47 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 44.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=eee71422941a71b6e2d991dd42c8e371f6ca4338db96da4776b658ad73cd2cc7
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
spark = SparkSession.builder.appName('walmart').getOrCreate()

In [None]:
df = spark.read.csv('walmart_stock.csv', inferSchema=True, header=True)
df.show()

+-------------------+------------------+------------------+------------------+------------------+--------+------------------+
|               Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000

In [None]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [None]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [None]:
for row in df.head(5):
  print(row)
  print('\n')

Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996)


Row(Date=datetime.datetime(2012, 1, 4, 0, 0), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475)


Row(Date=datetime.datetime(2012, 1, 5, 0, 0), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539)


Row(Date=datetime.datetime(2012, 1, 6, 0, 0), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922)


Row(Date=datetime.datetime(2012, 1, 9, 0, 0), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)




In [None]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

In [None]:
from pyspark.sql.functions import format_number

In [None]:
result = df.describe()
result.select(result['summary'],
              format_number(result['Open'].cast('float'), 2).alias('Open'),
              format_number(result['High'].cast('float'), 2).alias('High'),
              format_number(result['Low'].cast('float'), 2).alias('Low'),
              format_number(result['Close'].cast('float'), 2).alias('Volume'),
              format_number(result['Adj Close'].cast('float'), 2).alias('Adj Close'),
              result['Volume'].cast('int').alias('Volume')
              ).show()

+-------+--------+--------+--------+--------+---------+--------+
|summary|    Open|    High|     Low|  Volume|Adj Close|  Volume|
+-------+--------+--------+--------+--------+---------+--------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00| 1,258.00|    1258|
|   mean|   72.36|   72.84|   71.92|   72.39|    67.24| 8222093|
| stddev|    6.77|    6.77|    6.74|    6.76|     6.72| 4519780|
|    min|   56.39|   57.06|   56.30|   56.42|    50.36| 2094900|
|    max|   90.80|   90.97|   89.25|   90.47|    84.91|80898100|
+-------+--------+--------+--------+--------+---------+--------+



In [None]:
df2 = df.withColumn('HV Ratio', df['High']/df['Volume'])
df2.select('HV Ratio').show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



In [None]:
#The day had the peak high price
df.orderBy(df['High'].desc()).head(1)[0][0]

datetime.datetime(2015, 1, 13, 0, 0)

In [None]:
#Mean of the Close column
from pyspark.sql.functions import mean
df.select(mean('Close')).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



In [None]:
#Max and min Volume column
from pyspark.sql.functions import min, max
df.select(min('Volume'), max('Volume')).show()

+-----------+-----------+
|min(Volume)|max(Volume)|
+-----------+-----------+
|    2094900|   80898100|
+-----------+-----------+



In [None]:
#Count days was the Close lower than 60 dolars
df.filter(df['Close']< 60).count()

81

In [None]:
from pyspark.sql.functions import count
result = df.filter(df['Close'] < 60)
result.select(count('Close')).show()

+------------+
|count(Close)|
+------------+
|          81|
+------------+



In [None]:
#Days percentage of High column greater than 80 dolars
day_80 = df.filter('High > 80').count()
percentage_80 =(day_80/df.count())*100
percentage_80

9.141494435612083

In [None]:
#Pearson correlation between High and Volume column
from pyspark.sql.functions import corr
df.select(corr('High', 'Volume')).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



In [None]:
#The max High column per year
from pyspark.sql.functions import year
df_year = df.withColumn('Year', year(df['Date']))
group_year = df_year.groupBy('Year').max()
group_year.select('Year', 'max(High)').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



In [None]:
#Average Close for each Calendar Month
from pyspark.sql.functions import month
df_month = df.withColumn('Month', month('Date'))
av_month = df_month.select('Month', 'Close').groupBy('Month').mean()
av_month.select('Month', 'avg(Close)').orderBy('Month').show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

