In [None]:
#!pip install openpyxl
import findspark
findspark.init()

import os

import pyspark
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col, date_format, to_date

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

In [None]:
# Global SPARK variables
sc = None
ss = None
dfr = None

# Create an empty DataFrame for storing the results
df_results = None # pd.DataFrame()

# Other global variables
mySYMBOL = None 
mySET    = None 
myKEY    = None 
mySYMBOLDataFile = None  # "/home/hduser/spark/endsem/company_data/"+mySYMBOL+".csv"

def initialize_global_variables(lRoll_No, lfile, lset_no):
    global mySYMBOL, mySET, myKEY, mySYMBOLDataFile
    mySYMBOL = lfile.split(".")[0]
    mySET = lset_no
    myKEY = str(lRoll_No)+"-"+mySYMBOL+"-"+str(mySET)
    mySYMBOLDataFile = "/home/hduser/spark/endsem/company_data/"+mySYMBOL+".csv"

def release_global_variables():
    global mySYMBOL, mySET, myKEY, mySYMBOLDataFile
    del(mySYMBOL)
    del(mySET)
    del(myKEY)
    del(mySYMBOLDataFile)
    mySYMBOL = None 
    mySET    = None 
    myKEY    = None
    mySYMBOLDataFile = None
    

In [None]:
def create_spark_variables():
    global sc, ss, dfr
    sc = pyspark.SparkContext(appName="Set-3")
    ss = pyspark.sql.SparkSession(sc)
    dfr = ss.read
#######################################################
def release_spark_variables():
    global ss,sc,dfr
    ss.stop()
    sc.stop()
    ss = None
    sc = None
    dfr= None
#######################################################
def create_results_df():
    global df_results
    df_results = pd.DataFrame()

def add_to_results_df(row_key, col_key, avalue):
    global df_results
    df_results.at[row_key, col_key] = avalue
    #print(f"RESULT: {row_key} : {col_key} : {avalue}")

def write_results_df(ldir, lfile):
    global df_results
    lpath = ldir+lfile
    print(f"Writing results into: {lpath}")
    df_results.to_csv(lpath)  # write the index too
#######################################################

In [None]:
def read_allotted_data_into_spark():
    global df_spark_subset_sorted, dfr
    print(f"Reading file into SPARK: {mySYMBOLDataFile}")
    df_spark_subset_sorted = dfr.csv(mySYMBOLDataFile, inferSchema=True, header=True)

def release_allotted_data_spark():
    global df_spark_subset_sorted
    del(df_spark_subset_sorted)
    df_spark_subset_sorted = None
    
def read_allotted_data_into_pd():
    global pd_df
    # Alternatively, create a PANDAS dataframe 
    pd_df = pd.read_csv(mySYMBOLDataFile)

def release_allotted_data_pd():
    global pd_df
    del(pd_df)
    pd_df = None

In [None]:
# 1(a) Using SPARK
def process_1a_spark():
    quartiles = df_spark_subset_sorted.approxQuantile("CLOSE", [0.25, 0.75], 0.0001)
    Q1, Q3 = quartiles
    IQR = Q3 - Q1
    add_to_results_df(myKEY, "1a.spark", IQR)
    return(Q1,Q3,IQR)
#######################################################
# 1(a) using PANDAS
def process_1a_pd():
    Q1 = pd_df["CLOSE"].quantile(0.25)
    Q3 = pd_df["CLOSE"].quantile(0.75)
    IQR = Q3 - Q1

    add_to_results_df(myKEY,"1a.pd",IQR)
    return(Q1, Q3, IQR)
#######################################################
# 1(b) Using SPARK
def process_1b_spark(Q1, IQR):
    LL = Q1 - 1.5 * IQR
    count = df_spark_subset_sorted.filter(df_spark_subset_sorted["CLOSE"] < LL).count()
    
    add_to_results_df(myKEY,"1b.spark",count)
#######################################################
# 1(b) using Pandas
def process_1b_pd(Q1, IQR):
    LL = Q1 - 1.5 * IQR
    count = pd_df[pd_df["CLOSE"] < LL].shape[0]
    
    add_to_results_df(myKEY,"1b.pd",count)
#######################################################
# 1(c) using SPARK
def process_1c_spark(Q3):
    count_GT_75 = df_spark_subset_sorted.filter(df_spark_subset_sorted["CLOSE"] >= Q3).count()
    count_overall = df_spark_subset_sorted.count()
    if(count_overall != 0):
        pc = count_GT_75 / count_overall * 100
    else:
        pc = 0
    
    add_to_results_df(myKEY,"1c.spark",pc)
#######################################################
# 1(c) using Pandas
def process_1c_pd(Q3):
    count_GT_75 = pd_df[pd_df["CLOSE"] >= Q3].shape[0]
    count_overall = pd_df.shape[0]
    if(count_overall != 0):
        pc = count_GT_75 / count_overall * 100
    else:
        pc = 0    

    add_to_results_df(myKEY,"1c.pd",pc)
#######################################################
# 2(a) using SPARK
# Solution using SPARK
def process_2a_spark():
    list_30 = df_spark_subset_sorted.select("OPEN").limit(30)
    list_30_stdev = list_30.select(F.stddev("OPEN")).collect()[0][0] 
    count = 30
    stderr = list_30_stdev / np.sqrt(count)

    add_to_results_df(myKEY,"2a.spark",stderr)
    return
#######################################################
# 2(a) using PANDAS
# Calculate the standard deviation of the "HIGH" column for the sample
def process_2a_pd():
    list_30_stdev = pd_df["OPEN"].head(30).std()
    count = 30
    stderr = list_30_stdev / np.sqrt(count)

    add_to_results_df(myKEY,"2a.pd",stderr)
    return
#######################################################
from scipy.stats import norm

def process_2b():
    list_30_mean = pd_df["OPEN"].head(30).mean()
    list_30_stdev = pd_df["OPEN"].head(30).std()
    count = 30
    
    list_30_stderr = list_30_stdev / np.sqrt(count)

    lower_limit_95 = norm.ppf(0.025, list_30_mean, list_30_stderr)
    higher_limit_95 = norm.ppf(0.975, list_30_mean, list_30_stderr)
    width_95 = higher_limit_95 - lower_limit_95

    add_to_results_df(myKEY,"2b",width_95)
    return
#######################################################
# 2(c)

def process_2c():
    list_30_mean = pd_df["OPEN"].head(30).mean()
    list_30_stdev = pd_df["OPEN"].head(30).std()
    count = 30
    
    list_30_stderr = list_30_stdev / np.sqrt(count)    
    
    lower_value_90_interval = norm.ppf(0.05, list_30_mean, list_30_stderr)
    higher_value_90_interval = norm.ppf(0.95, list_30_mean, list_30_stderr)
    width_90 = higher_value_90_interval - lower_value_90_interval
    
    add_to_results_df(myKEY,"2c.1",lower_value_90_interval)
    add_to_results_df(myKEY,"2c.2",width_90)
#######################################################
def assign_bins(ldf, colname, boundaries): # assumption : boundaries [0, ..., ..., ..., total length]
    # First of all, sort the dataframe in ascending values of colname
    ldf.sort_values(by = colname, inplace=True)
    
    # then create a column "BIN" and assign the bin values sequentially 
    ldf['BIN']= "NOVAL"

    for i in range(len(boundaries)-1):
        bin_value = "B"+str(i+1)
        ldf.iloc[boundaries[i]:boundaries[i+1], ldf.columns.get_loc('BIN')] = bin_value
    
    # re-sort the values back as per the original index
    ldf.sort_index(inplace=True)
#######################################################
# 3(a) 

def process_3a():
    count_all = len(pd_df)
    boundaries = [0,300,600,900,count_all]
    assign_bins(pd_df, "HIGH", boundaries)   
      
    # Print out the min and max values in each of the bins
    for label in pd_df['BIN'].unique():
    
        min_val = pd_df[pd_df['BIN'] == label]['HIGH'].min()
        max_val = pd_df[pd_df['BIN'] == label]['HIGH'].max()
    
        add_to_results_df(myKEY,"3a."+str(label)+".Start", min_val)
        add_to_results_df(myKEY,"3a."+str(label)+".End", max_val)

In [None]:
def get_tp_fn(conf_df, class_label):
    # True Positive (TP) value is the diagonal element of the class
    tp = conf_df.loc[class_label, class_label]

    # False Negative (FN) value is the sum of the row except the diagonal element
    fn = conf_df.loc[class_label, :].sum() - tp

    return tp, fn

In [None]:
# 3(b) 
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix

def process_3b():
    # Create features using previous day's OPEN, HIGH, LOW, CLOSE
    pd_df['prev_open'] = pd_df['OPEN'].shift(1)
    pd_df['prev_high'] = pd_df['HIGH'].shift(1)
    pd_df['prev_low'] = pd_df['LOW'].shift(1)
    pd_df['prev_close'] = pd_df['CLOSE'].shift(1)
    #print(pd_df.head(10))
    
    # Drop rows with NaN values resulting from shifting
    pd_df_LR = pd_df.dropna()
    
    # Encode the categorical variable BIN
    label_encoder = LabelEncoder()
    pd_df_LR['BIN_encoded'] = label_encoder.fit_transform(pd_df_LR['BIN'])
    
    encoded_labels = label_encoder.inverse_transform([0,1,2,3])
       
    # Define features and target
    features = ['prev_open', 'prev_high', 'prev_low', 'prev_close']
    target = 'BIN_encoded'
    
    # Train a Logistic Regression model
    clf = LogisticRegression(max_iter=100000)
    clf.fit(pd_df_LR[features], pd_df_LR[target])
    
    # Predictions
    y_pred = clf.predict(pd_df_LR[features])
    
    # Decode the predictions
    y_pred_decoded = label_encoder.inverse_transform(y_pred)
    y_true_decoded = label_encoder.inverse_transform(pd_df_LR[target])
   
    # Print confusion matrix
    conf_matrix = confusion_matrix(y_true_decoded, y_pred_decoded)
    conf_df = pd.DataFrame(conf_matrix, columns=encoded_labels, index=encoded_labels)
    #print(conf_df)

    # Print classification metrics for all the BINs
    my_classification_report = classification_report(y_true_decoded, y_pred_decoded, output_dict=True)
    
    tp, fn = get_tp_fn(conf_df, "B1")
    add_to_results_df(myKEY,"3b.B1.Value-1",tp)
    add_to_results_df(myKEY,"3b.B1.Value-2",fn)
    
    tp, fn = get_tp_fn(conf_df, "B2")
    add_to_results_df(myKEY,"3b.B2.Value-1",tp)
    add_to_results_df(myKEY,"3b.B2.Value-2",fn)
    
    tp, fn = get_tp_fn(conf_df, "B3")
    add_to_results_df(myKEY,"3b.B3.Value-1",tp)
    add_to_results_df(myKEY,"3b.B3.Value-2",fn)
    
    tp, fn = get_tp_fn(conf_df, "B4")
    add_to_results_df(myKEY,"3b.B4.Value-1",tp)
    add_to_results_df(myKEY,"3b.B4.Value-2",fn)
    ##########################################################################################

In [None]:
# 4(a)
def read_all_data_into_spark():
    
    global df_all_spark

    df_all_in_spark = dfr.csv("/home/hduser/spark/nsedata.csv", inferSchema=True, header=True)
    df_all_spark = df_all_in_spark.filter(df_all_in_spark["SERIES"] == "EQ")
###################
def process_4a():
    global df_all_spark

    df_high_median = df_all_spark.groupBy("SYMBOL").agg(F.median("HIGH").alias("HIGH_MEDIAN"))
    
    df_high_median_desc = df_high_median.orderBy(df_high_median["HIGH_MEDIAN"].desc())
    
    alist = df_high_median_desc.take(2)
    add_to_results_df(myKEY,"4a.SYMBOL-1",alist[0][0])
    add_to_results_df(myKEY,"4a.Value-1",alist[0][1])
    add_to_results_df(myKEY,"4a.SYMBOL-2",alist[1][0])
    add_to_results_df(myKEY,"4a.Value-2",alist[1][1])
    return
###################

In [None]:
# 4(b)
def process_4b():
    
    global df_all_spark
    
    overall_median = df_all_spark.select(F.median("HIGH")).collect()[0][0]

    df_high_median = df_all_spark.groupBy("SYMBOL").agg(F.median("HIGH").alias("HIGH_MEDIAN"))
    
    higher_than_overall_median = df_high_median.filter(df_high_median["HIGH_MEDIAN"] > overall_median)
    count = higher_than_overall_median.count()
    
    add_to_results_df(myKEY,"4b",count)

In [None]:
# 4(c)
def process_4c():
    df_high_median = df_all_spark.groupBy("SYMBOL").agg(F.median("HIGH").alias("HIGH_MEDIAN"))
    
    mySYMBOL_high_median = df_high_median.filter(df_high_median["SYMBOL"] == mySYMBOL).select("HIGH_MEDIAN").collect()[0][0]
    
    df_abs_diff = df_high_median.withColumn("ABS_DIFF", F.abs(df_high_median["HIGH_MEDIAN"] - mySYMBOL_high_median)).filter(df_high_median["SYMBOL"] != mySYMBOL)
    
    closest_stock = df_abs_diff.orderBy("ABS_DIFF").select("SYMBOL").first()[0]

    # Join the DataFrame to compare CLOSE values of closest stock to mySYMBOL
    t1 = df_all_spark.filter(df_all_spark["SYMBOL"] == mySYMBOL)\
                     .withColumnRenamed("HIGH","HIGH_1")\
                     .withColumnRenamed("SYMBOL","SYMBOL_1")
                     
    t2 = df_all_spark.filter(df_all_spark["SYMBOL"] == closest_stock)\
                     .withColumnRenamed("HIGH","HIGH_2")\
                     .withColumnRenamed("SYMBOL","SYMBOL_2")\
    
    joined_df = t1.join(t2, "TIMESTAMP")
    
    close_difference_less_than_75 = joined_df.filter(F.abs(joined_df["HIGH_1"] - joined_df["HIGH_2"]) < 75)
    count_close_difference_less_than_75 = close_difference_less_than_75.count()
    
    add_to_results_df(myKEY,"4c.SYMBOL",closest_stock)
    add_to_results_df(myKEY,"4c.Count",count_close_difference_less_than_75)

In [None]:
def process_file(lRoll_No, ldirectory, lfile):
    # print(f"Processing: {lfile}")
    initialize_global_variables(lRoll_No, lfile, "SET3")
    read_all_data_into_spark()
    read_allotted_data_into_spark()
    read_allotted_data_into_pd()
    ##
    Q1_spark, Q3_spark, IQR_spark = process_1a_spark()
    Q1_pd, Q3_pd, IQR_pd = process_1a_pd()
    process_1b_spark(Q1_spark, IQR_spark)
    process_1b_pd(Q1_pd, IQR_pd)
    process_1c_spark(Q3_spark)
    process_1c_pd(Q3_pd)
    process_2a_spark()
    process_2a_pd()
    process_2b()
    process_2c()
    process_3a()
    process_3b()
    process_4a()
    process_4b()
    process_4c()
    ##
    release_allotted_data_spark()
    release_allotted_data_pd()
    release_global_variables()

In [None]:
def process_row(arow):
    print(f"Processing: {arow['Roll']}, {arow['Company_Name']}, {arow['Set_No']}")
    directory = '/home/hduser/spark/endsem/'
    file = arow['Company_Name'] + ".csv"
    process_file(arow['Roll'], directory, file)

def main():
    create_spark_variables()
    create_results_df()

    directory = '/home/hduser/spark/endsem/'
    filepath = os.path.join(directory, 'student-and-question-data-for-endsem.xlsx')

    loc_df_in = pd.read_excel(filepath)
    df_set3 = loc_df_in[loc_df_in['Set_No']==3]

    try:
        df_set3.apply(process_row, axis=1)
    except Exception as e:
        print("An error occurred: ", e)
        release_allotted_data_spark()
        release_allotted_data_pd()
        release_global_variables()

    write_results_df(directory, "results-set3.csv")
    release_spark_variables()

if __name__ == "__main__":
    main()