In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=0657ea4152ce86be573cd1987fcc54ade95c5d257943e6ef5fdcce101719c57b
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


Problem statement:
You work as a Big Data Engineer at GrapeVine Pvt. Ltd. Your company is currently working as a 
Data Analytics consultant for a hedge fund. Due to the size of the available dataset, the 
company requires you to increase computational efficiency using Apache Spark. You have been 
assigned certain tasks for the fulfillment of this analysis through stock market backtesting.

Dataset description:
The dataset used for this assignment is ‘Reliance NSE Stock Market Data.’ The relevant fields 
that will be put to use in further analysis are as follows:
time – The timestamp of the data record (separated by 5-minute intervals)
open – The opening price of the stock
high – The highest point of the stock in the last 5-minute interval
low – The lowest point of the stock in the last 5-minute interval 
close – The price of the stock at the end of the 5-minute interval
The rest of the fields or columns can be ignored

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NSE").getOrCreate()
df=spark.read.format('csv')\
.option("header","true")\
.option("inferSchema","true")\
.load("/content/NSE_RELIANCE_5_1.csv")
df.show(5)

+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+---+---+---+----+------+---------+------------+------------+------------+
|               time|       open|       high|        low|      close|        MA5|        MA6|MA7|MA8|MA9|MA10|Volume|Volume MA|   Histogram|        MACD|      Signal|
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+---+---+---+----+------+---------+------------+------------+------------+
|2020-04-03 07:20:00|1053.771557|1056.297519|1051.790411|1053.424857|1056.784037|1055.267738|NaN|NaN|NaN| NaN|354666| 140161.0|-0.038974088|-1.535973794|-1.496999706|
|2020-04-03 07:25:00|1053.573443|1055.455532|  1053.3258|1054.266844|1056.555201|1055.017514|NaN|NaN|NaN| NaN| 59611|133630.15|-0.051333212| -1.56116622|-1.509833009|
|2020-04-03 07:30:00|1053.969672|1057.535735|1053.078156|1056.941391|1056.590309|1055.498484|NaN|NaN|NaN| NaN| 88783|129284.15| 0.156916075|-1.313687914| -1.47060399

In [5]:
# Create a temporary view of the DataFrame
df.createOrReplaceTempView("NSE_Data")

# Query the table and show the result
result = spark.sql("SELECT * FROM NSE_Data")
#result.unpersist()
result.show()

+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+---+---+---+----+------+---------+------------+------------+------------+
|               time|       open|       high|        low|      close|        MA5|        MA6|MA7|MA8|MA9|MA10|Volume|Volume MA|   Histogram|        MACD|      Signal|
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+---+---+---+----+------+---------+------------+------------+------------+
|2020-04-03 07:20:00|1053.771557|1056.297519|1051.790411|1053.424857|1056.784037|1055.267738|NaN|NaN|NaN| NaN|354666| 140161.0|-0.038974088|-1.535973794|-1.496999706|
|2020-04-03 07:25:00|1053.573443|1055.455532|  1053.3258|1054.266844|1056.555201|1055.017514|NaN|NaN|NaN| NaN| 59611|133630.15|-0.051333212| -1.56116622|-1.509833009|
|2020-04-03 07:30:00|1053.969672|1057.535735|1053.078156|1056.941391|1056.590309|1055.498484|NaN|NaN|NaN| NaN| 88783|129284.15| 0.156916075|-1.313687914| -1.47060399

In [7]:
# Find out the average ‘close’ price of Reliance throughout the duration of the dataset
Avg_Close_Price=spark.sql("select Avg(close) as avg_close_price from NSE_Data ")
Avg_Close_Price.show()

+-----------------+
|  avg_close_price|
+-----------------+
|1414.248173455824|
+-----------------+



In [9]:
#If a Reliance stock was bought at the beginning of the trading day, ‘2020-04-07’ (YYYY-MM-DD), 
#at the close price of the first 5-minute window, scan the dataset to find out the point to sell the 
#stock to maximize profits. You are required to print the specific timestamp
time_stamp=spark.sql("select time from NSE_DATA where time>='2020-04-07' and close=(select max(close) from NSE_Data where time>='2020-04-07')")
time_stamp.show()

+-------------------+
|               time|
+-------------------+
|2020-06-05 03:55:00|
+-------------------+



In [10]:
#Find out the net profit or net loss to be accumulated if one stock of Reliance is bought at the 
#opening of every 5-minute slot and sold at the lowest possible point in that 5-minute slot

lowest_possible_point=spark.sql("select sum(open-low) as net_profit_loss from NSE_Data")
lowest_possible_point.show()

+-----------------+
|  net_profit_loss|
+-----------------+
|8877.819898999991|
+-----------------+



In [12]:
# Find out the net profit or net loss to be accumulated if one stock of Reliance is bought at the 
#opening of every 5-minute slot and sold at the highest possible point in that 5-minute slot
highest_possible_point=spark.sql("select sum(high-open) as net_profit_high from NSE_Data")
highest_possible_point.show()

+----------------+
| net_profit_high|
+----------------+
|8896.90313300003|
+----------------+

