# Data Importation and exploration

In [1]:
# Install pyspark
# ---
#
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m16.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=ac1c1942f0e33d9871e0291f7aa1cddf192adaee2d150756be7f89aa62bb1bd6
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [27]:
# run local pyspark session
#
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf,col
import pandas as pd
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sqlCtx = SQLContext(sc)



In [28]:
# Load data without headers
# 
df = sqlCtx.read.option("header","true").csv("saf_stock.csv")
df.show(5, False)

+----------+------------------+---------+---------+------------------+--------+------------------+
|Date      |Open              |High     |Low      |Close             |Volume  |Adj Close         |
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|59.970001         |61.060001|59.869999|60.330002         |12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996|9593300 |52.078475         |
|2012-01-05|59.349998         |59.619999|58.369999|59.419998         |12768200|51.825539         |
|2012-01-06|59.419998         |59.450001|58.869999|59.0              |8069400 |51.45922          |
|2012-01-09|59.029999         |59.549999|58.919998|59.18             |6679300 |51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [29]:
# Show schema
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [30]:
#Describe the dataframe
df.describe()


DataFrame[summary: string, Date: string, Open: string, High: string, Low: string, Close: string, Volume: string, Adj Close: string]

# Data Preparation

In [40]:
# Format all the data to 2 decimal places i.e. format_number()
# Columns to be formated: Open, High, Low, Close, Adj Close
from pyspark.sql.types import *
# function
def format_number(vals):
  # get substring with only 2 digits after period
  # print(new_str)
  return '%.2f' % float(vals)

# udf_format_number = udf(lambda x:format_number(x), FloatType())
# df.withColumn("Open",udf_format_number(col("Open")))

# Convert to pandas
pandas_df = df.toPandas()
pandas_df.head()

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
0,2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619235
1,2012-01-04,60.209999,60.349998,59.470001,59.709999,9593300,52.078475
2,2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539
3,2012-01-06,59.419998,59.450001,58.869999,59.0,8069400,51.45922
4,2012-01-09,59.029999,59.549999,58.919998,59.18,6679300,51.616215


In [67]:
# Convert columns to 2 dec places
pandas_df['Open'] = pandas_df['Open'].apply(format_number)
pandas_df['High'] = pandas_df['High'].apply(format_number)
pandas_df['Close'] = pandas_df['Close'].apply(format_number)
pandas_df['Adj Close'] = pandas_df['Adj Close'].apply(format_number)
pandas_df['Low'] = pandas_df['Low'].apply(format_number)
pandas_df.head()

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
0,2012-01-03,59.97,61.06,59.87,60.33,12668800,52.62
1,2012-01-04,60.21,60.35,59.47,59.71,9593300,52.08
2,2012-01-05,59.35,59.62,58.37,59.42,12768200,51.83
3,2012-01-06,59.42,59.45,58.87,59.0,8069400,51.46
4,2012-01-09,59.03,59.55,58.92,59.18,6679300,51.62


In [68]:
# Convert back to spark dataframe
spark_df = spark.createDataFrame(pandas_df)
spark_df.describe()

DataFrame[summary: string, Date: string, Open: string, High: string, Low: string, Close: string, Volume: string, Adj Close: string]

In [78]:
'''
Create a new data frame with a column called HV Ratio that is the ratio of the
High Price versus volume of stock traded for a day
'''

# Custom function
def hv_ratio(high, vol):
  return float(int(float(high)) / int(vol))

# UDF Method
udf_hv_ratio = udf(lambda x,y:hv_ratio(x,y), FloatType())
new_df = spark_df.withColumn("hv_ratio",udf_hv_ratio(col("High"), col('Volume')))
new_df.show()

+----------+-----+-----+-----+-----+--------+---------+------------+
|      Date| Open| High|  Low|Close|  Volume|Adj Close|    hv_ratio|
+----------+-----+-----+-----+-----+--------+---------+------------+
|2012-01-03|59.97|61.06|59.87|60.33|12668800|    52.62|4.8149786E-6|
|2012-01-04|60.21|60.35|59.47|59.71| 9593300|    52.08| 6.254365E-6|
|2012-01-05|59.35|59.62|58.37|59.42|12768200|    51.83|4.6208547E-6|
|2012-01-06|59.42|59.45|58.87|59.00| 8069400|    51.46| 7.311572E-6|
|2012-01-09|59.03|59.55|58.92|59.18| 6679300|    51.62| 8.833261E-6|
|2012-01-10|59.43|59.71|58.98|59.04| 6907300|    51.49| 8.541688E-6|
|2012-01-11|59.06|59.53|59.04|59.40| 6365600|    51.81| 9.268569E-6|
|2012-01-12|59.79|60.00|59.40|59.50| 7236400|    51.90| 8.291416E-6|
|2012-01-13|59.18|59.61|59.01|59.54| 7729300|    51.93| 7.633292E-6|
|2012-01-17|59.87|60.11|59.52|59.85| 8500000|    52.20|7.0588235E-6|
|2012-01-18|59.79|60.03|59.65|60.01| 5911400|    52.34| 1.014988E-5|
|2012-01-19|59.93|60.73|59.75|60.6

# Data Analysis

In [109]:
# What day had the Peak High in Price?
spark_df = spark_df.sort("High")
spark_df.tail(1)

[Row(Date='2015-01-13', Open='90.80', High='90.97', Low='88.93', Close='89.31', Volume='8215400', Adj Close='83.83')]

In [84]:
# What is the mean of the Close column?
from pyspark.sql.functions import mean as _mean
df_stats = spark_df.select(
    _mean(col('Close')).alias('mean')
).collect()

mean = df_stats[0]['mean']
mean

72.38844992050878

In [95]:
spark_df.withColumn("Volume",col("Volume").cast(IntegerType())).printSchema()
spark_df.head()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: string (nullable = true)



Row(Date='2012-01-03', Open='59.97', High='61.06', Low='59.87', Close='60.33', Volume='12668800', Adj Close='52.62')

In [112]:
# What is the max and min of the Volume column? Min
sorted_df = spark_df.sort("Volume")
sorted_df.head(2)

[Row(Date='2016-03-04', Open='66.14', High='67.28', Low='66.09', Close='66.78', Volume='10010500', Adj Close='64.42'),
 Row(Date='2013-05-14', Open='77.98', High='78.86', Low='77.67', Close='78.78', Volume='10013700', Adj Close='71.28')]

In [113]:
# What is the max and min of the Volume column? Max
sorted_df = spark_df.sort("Volume")
sorted_df.tail(2)

[Row(Date='2016-08-12', Open='73.80', High='74.12', Low='73.56', Close='73.89', Volume='9994200', Adj Close='72.83'),
 Row(Date='2012-06-14', Open='67.10', High='67.79', Low='67.09', Close='67.63', Volume='9994400', Adj Close='59.78')]

In [130]:
# How many days was the Close lower than 60 dollars?
from pyspark.sql.functions import col,sum,when

cnt_cond = lambda cond: sum(when(cond, 1).otherwise(0))
list_data = sorted_df.agg(
    cnt_cond(col('Close') < 60).alias('count')
)

sum = 0
list_data.show()

+-----+
|count|
+-----+
|   81|
+-----+

