In [None]:
#Importing all necessary libraries

import pandas, numpy, os
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import *
import boto3
import findspark
findspark.init()
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType

In [None]:
#Creating a Spark Session

conf = SparkConf() \
    .setAppName("Purush_ETL")

conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.0')
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider')

spark = SparkSession \
    .builder \
    .config(conf = conf) \
    .config("spark.dynamicAllocation.enabled", "true") \
    .enableHiveSupport() \
    .getOrCreate()
spark.catalog.clearCache()

In [None]:
S3_BUCKET_PATH = "s3a://purushstockdata/data/"

## Reading the data from S3

### Reading the Metadata

In [None]:
#Schema for symbol metadata
sym_meta_schema = StructType([ \
    StructField("Symbol", StringType(), True), \
    StructField("Name", StringType(), True), \
    StructField("Country", StringType(), True), \
    StructField("Sector", StringType(), True), \
    StructField("Industry", StringType(), True), \
    StructField("Address", StringType(), True) \
])

sym_meta = spark.read.option("header", True).schema(sym_meta_schema).csv(S3_BUCKET_PATH+"symbol_metadata.csv")

sym_meta.printSchema() #check the Schema of the dataframe
sym_meta.show()

In [None]:
## Before reading the Stock data, we need to make sure we read the Stock data  
## only for those companies/symbols that are listed in the Metadata file.
sym_list = [x for x in sym_meta["Symbol"].rdd.flatMap(lambda x: x).collect()]
#print(sym_list)

## Reading the Stock data

In [None]:
stock_data_paths = [S3_BUCKET_PATH+x+".csv" for x in sym_list]
#print(stock_data_paths)
#stock_data = spark.read.option("header", True).schema(stock_data_schema).csv(stock_data_paths)
#stock_data.printSchema() #to check the Schema of the dataframe
#stock_data.show()

stock_data = spark.read.schema(stock_data_schema).csv(stock_data_paths, header=True)
stock_data = stock_data.withColumn("Symbol", regexp_replace(input_file_name(), r'^.*/([^/]+)\.csv$', '$1'))
#stock_data.printSchema() #to check the Schema of the dataframe
stock_data.show()

In [None]:
stock_data_full = stock_data.join(sym_meta, upper(stock_data["Symbol"]) == upper(sym_meta["Symbol"]), "left") \
    .select(stock_data['*'],
            sym_meta["Name"],
            sym_meta["Country"],
            sym_meta["Sector"],
            sym_meta["Industry"]) \
    .repartition(col("Symbol")) 

stock_data_full.printSchema()
stock_data_full.show()

### Summary Report (All Time)

In [None]:
def summary_report_all_func(stock_data_full, industries):
    summary_report_output__all_time = stock_data_full.filter(stock_data_full["Sector"].isin(sectors)) \
        .groupBy(stock_data_full["Sector"])\
        .agg(
            avg(stock_data_full["open"]).alias("Avg Open Price"), \
            avg(stock_data_full["close"]).alias("Avg Close Price"), \
            max(stock_data_full["high"]).alias("Max High Price"), \
            min(stock_data_full["low"]).alias("Min Low Price"), \
            avg(stock_data_full["volume"]).alias("Avg Volume") \
        )
    summary_report_output__all_time.printSchema()
    #For developmental purposes I'm trying to store the data in a Dataframe and then return it; rather than directly returning it.
    return summary_report_output__all_time

In [None]:
no_industries = int(input("Enter the number of industries for which you wanted Summary Report (All Time):"))
arr = input()   # takes the whole line of no_sectors strings
sectors = [sector.strip() for sector in arr.split(',')] # split those strings with ','

summary_report_all_func(stock_data_full, sectors).show(n = len(sym_list), truncate = False)



### Summary Report (Period)

In [None]:
def summary_report_period_func(stock_data_full, sectors, start_date, end_date):
    start_date = to_date(lit(start_date), "yyyy-MM-dd")
    end_date = to_date(lit(end_date), "yyyy-MM-dd")
    summary_report_output__period = stock_data_full \
        .filter((stock_data_full["Sector"].isin(sectors))  & \
                (to_date(stock_data_full["timestamp"], "yyyy-MM-dd").between(start_date, end_date))
        ) \
        .groupBy(stock_data_full["Sector"]) \
        .agg(
            avg(stock_data_full["open"]).alias("Avg Open Price"), \
            avg(stock_data_full["close"]).alias("Avg Close Price"), \
            max(stock_data_full["high"]).alias("Max High Price"), \
            min(stock_data_full["low"]).alias("Min Low Price"), \
            avg(stock_data_full["volume"]).alias("Avg Volume") \
        )
    #For developmental purposes I'm trying to store the data in a Dataframe and then return it; rather than directly returning it.
    return summary_report_output__period

In [None]:
start_date = input("Enter the start date of the period for Summary Report [yyyy-MM-dd]:")
end_date = input("Enter the end date of the period for Summary Report [yyyy-MM-dd]:")
no_sectors = int(input("Enter the number of sectors for which you wanted Summary Report (Given Period):"))
arr = input()   # takes the whole line of no_sectors strings
sectors = [sector.strip() for sector in arr.split(',')] # split those strings with ','

summary_report_period_func(stock_data_full, sectors, start_date, end_date).show(n = len(sym_list), truncate = False)


### Detailed Reports (Period)

In [None]:
def detailed_report_period_func(stock_data_full, sectors, start_date, end_date):
    start_date = to_date(lit(start_date), "yyyy-MM-dd")
    end_date = to_date(lit(end_date), "yyyy-MM-dd")

    detailed_report_output__period = stock_data_full \
        .filter((stock_data_full["Sector"].isin(sectors)) & \
                (to_date(stock_data_full["timestamp"], "yyyy-MM-dd").between(start_date, end_date))) \
        .groupBy(stock_data_full["Symbol"], stock_data_full["Name"]) \
        .agg(
            avg(stock_data_full["open"]).alias("Avg Open Price"), \
            avg(stock_data_full["close"]).alias("Avg Close Price"), \
            max(stock_data_full["high"]).alias("Max High Price"), \
            min(stock_data_full["low"]).alias("Min Low Price"), \
            avg(stock_data_full["volume"]).alias("Avg Volume") \
        )
    #For developmental purposes I'm trying to store the data in a Dataframe and then return it; rather than directly returning it.
    return detailed_report_output__period

In [None]:
start_date = input("Enter the start date of the period for Detailed Reports [yyyy-MM-dd]:")
end_date = input("Enter the end date of the period for Detailed Reports [yyyy-MM-dd]:")
no_sectors = int(input("Enter the number of sectors for which you wanted Detailed Report (Given Period):"))
arr = input()   # takes the whole line of no_sectors strings
sectors = [sector.strip() for sector in arr.split(',')] # split those strings with ','

detailed_report_period_func(stock_data_full, sectors, start_date, end_date).show(n = len(sym_list), truncate = False)

In [None]:
spark.stop()