## E10-2
#### This Notebook illustrates the use of SPARK Dataframe functions to process nsedata.csv
- Review <b>Part-1</b> to understand the code by referring to SPARK documentation.
- Add your comment to each cell, to explain its purpose
- Add code / create additional cells for debugging purpose, and comment them too 
- Write SPARK code to solve the problem stated in <b>Part-2</b> (do not use the createTempView function in your solution!)

<b>Submission</b>
- Create and upload a PDF of this Notebook.
- <b> BEFORE CONVERTING TO PDF ENSURE THAT YOU REMOVE / TRIM LENGTHY DEBUG OUTPUTS </b>.
- Short debug outputs of up to 5 lines are acceptable.


## <b>Part 1</b>

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [3]:
sc = pyspark.SparkContext(appName="E9-2")

24/11/07 04:00:13 WARN Utils: Your hostname, maverick resolves to a loopback address: 127.0.1.1; using 192.168.206.245 instead (on interface wlp0s20f3)
24/11/07 04:00:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/07 04:00:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
ss = pyspark.sql.SparkSession(sc)

In [5]:
dfr = ss.read

24/11/07 04:00:26 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [6]:
schemaStruct = StructType()
schemaStruct.add("SYMBOL", StringType(), True)
schemaStruct.add("SERIES", StringType(), True)
schemaStruct.add("OPEN", DoubleType(), True)
schemaStruct.add("HIGH", DoubleType(), True)
schemaStruct.add("LOW", DoubleType(), True)
schemaStruct.add("CLOSE", DoubleType(), True)
schemaStruct.add("LAST", DoubleType(), True)
schemaStruct.add("PREVCLOSE", DoubleType(), True)
schemaStruct.add("TOTTRDQTY", LongType(), True)
schemaStruct.add("TOTTRDVAL", DoubleType(), True)
schemaStruct.add("TIMESTAMP", StringType(), True)
schemaStruct.add("ADDNL", StringType(), True)


StructType([StructField('SYMBOL', StringType(), True), StructField('SERIES', StringType(), True), StructField('OPEN', DoubleType(), True), StructField('HIGH', DoubleType(), True), StructField('LOW', DoubleType(), True), StructField('CLOSE', DoubleType(), True), StructField('LAST', DoubleType(), True), StructField('PREVCLOSE', DoubleType(), True), StructField('TOTTRDQTY', LongType(), True), StructField('TOTTRDVAL', DoubleType(), True), StructField('TIMESTAMP', StringType(), True), StructField('ADDNL', StringType(), True)])

In [None]:
df = dfr.csv("/home/hduser/spark/nsedata.csv", schema=schemaStruct, header=True)

### <b>Basics : Using SPARK for analysis</b>

In [None]:
def create_subset_from_df(company_code):
    tcode = company_code.lower()
    df_subset = df.select(\
                    F.col("OPEN").alias("OPEN_" + tcode),\
                    F.col("HIGH").alias("HIGH_"+ tcode),\
                    F.col("LOW").alias("LOW_"+ tcode),\
                    F.col("CLOSE").alias("CLOSE_" + tcode),\
                    F.col("TIMESTAMP")).\
                    where(F.col("SYMBOL") == company_code)
    return(df_subset)

In [None]:
# Why do we need to use the alias function, above? What happens if we do not alias / rename the columns?

In [None]:
df_infy = create_subset_from_df("INFY")
df_infy.show(5)
df_infy.describe().show()

In [None]:
df_tcs = create_subset_from_df("TCS")
df_tcs.show(5)
df_tcs.describe().show()

In [None]:
df_join = df_tcs.join(df_infy,"TIMESTAMP").select("TIMESTAMP","CLOSE_tcs","CLOSE_infy")
df_join.show(5)

In [None]:
df_join.select(F.abs(df_join["CLOSE_tcs"] - df_join["CLOSE_infy"]).alias("PriceDiff")).describe().show()

In [None]:
df_join.filter(F.abs(df_join["CLOSE_tcs"] - df_join["CLOSE_infy"]) < 180).show()

In [None]:
from pyspark.sql.functions import col, date_format, to_date

df1 = df.withColumn("TIMESTAMP2", date_format(to_date(col("TIMESTAMP"), "dd-MMM-yyyy"), "yyyy-MM"))

In [None]:
df1.printSchema()

In [None]:
df1.show(5)

In [None]:
from pyspark.sql import functions as F

df_t1 = df1.groupBy("SYMBOL","TIMESTAMP2").agg(F.min("OPEN"), F.max("OPEN"), F.avg("OPEN"),\
                                       F.stddev("OPEN"), F.count("OPEN"))

In [None]:
df_t1.show(5)

In [None]:
df_t2 = df_t1.sort(F.asc("SYMBOL"), F.asc("TIMESTAMP2"))

In [None]:
df_t2.show(5)

In [None]:
# Uncomment the following statement to generate the output, and analyze it
# Write your observations in the next cell

#df_t2.write.csv("monthly_stats.csv")

In [None]:
# Your observation related to monthly_stats

###  <b>SPARK based solutions for stock analysis and portfolio management: An Example</b>

## Problem Statement
Based on equity (EQ) data contained in nsedata.csv, you are tasked with the responsibility to identify a set of 10 stocks to invest in based on the following steps:

- You have to process the data for one entire year, and then make investment decisions for the following year. You can shoose 2012 as the past year and make recommendations for 2013.
- Assume that you are doing this analysis on Jan 1, 2013. 
- You are required to draw up an initial list of 10 stocks based on the following preliminary analysis: 
    - The stocks should be liquid. That is, they should be traded in large volumes almost every day and the trading volume should be high.
    - You have to filter those stocks that have shown maximum overall growth over the past year. The hope is that they will continue to grow in the future.
- Select 5 pairs of stocks from these filtered stocks based on the following further analysis.
    - You should ensure that volatility and negative market movements in the coming year will not adversely affect the total investment, substantially.
    - One way to achieve this involves selecting <b>stock pairs that are negatively correlated</b>, so that if one stock loses value its partner will most likely gain value - thereby reducing the overall impact of fall in stock prices. As all these stocks are high growth stocks, anyway, the expectation is that there also will be overall growth of the portfolio. 
    - Purchase 1 unit of each of these stock pairs on the first trading day of the next year (i.e. 2013)
- Once you have selected the 5 pairs and made the above investments, you should further do the following
    - Report the performance of your portfolio as on 31/12/2013 (or the nearest traded date, if 31/12/2013 was a non traded day) in terms of the:
        - Overall growth of your portfolio
        - Report which stocks in your portfolio grew in value, which of them reduced in value, an whether the pairing strategy worked.
        - How did the overall market perform during the same period? This can be assessed as follows:
            - If you had blindly selected 1 stock each of the top 25 highly traded, high growth stocks, what would have been the performance of this portfolio
            - How did the implemented strategy of selecting highly traded, high growth stocks, but in pairs having <b>negative correlation</b>, perform in comparion? Did the strategy work?

In [None]:
# Here are some suggested steps to solve the problem

# First of all select only EQUITY related data
# Create a dataframe of stocks that have traded in during the year 2012
# Find out the average total traded quantity of each of these stocks
# Identify stocks that high trade volumes: average daily volume ranging between 5L and 10L
# Find out the price difference in each of these stocks between the 'last traded day of 2012' and 'first traded day of 2012'
# Sort the stocks in descending order using traded quantity and price difference as the criteria
# Select the top 10 stocks for further analysis
# Create a new dataframe containing pairs of stocks traded on the same day 
#   - join the selected stocks by using the criteria that stock names in the resulting dataframe are different
# Sort the dataframe in ascending order
# Establish the criteria for selecting the final pairs of stocks, and select them
# Calculate your total investment value
# ... likewise state and complete the rest of the steps 

In [None]:
df_2012 = df.filter("SERIES=='EQ'").filter("TIMESTAMP like '%2012'")

In [None]:
df_2012_avgqty = df_2012.groupBy("SYMBOL").avg("TOTTRDQTY")\
                        .filter(F.col("avg(TOTTRDQTY)") < 10000000)\
                        .filter(F.col("avg(TOTTRDQTY)") > 500000)\
                        .orderBy("avg(TOTTRDQTY)", ascending=False)
df_2012_avgqty.show(10)

In [None]:
top10 = df_2012_avgqty.limit(10)

In [None]:
t1 = top10.select("SYMBOL").rdd.flatMap(lambda x: x).collect()
t2 = df_2012.filter(F.col("SYMBOL").isin(t1))
t3 = t2.select(F.col("SYMBOL").alias("S1"), F.col("CLOSE").alias("Close1"), "TIMESTAMP")
t4 = t2.select(F.col("SYMBOL").alias("S2"), F.col("CLOSE").alias("Close2"), "TIMESTAMP")
df_for_corr = t3.join(t4,"TIMESTAMP")

In [None]:
df_for_corr.show(5)

In [None]:
wrklist = df_for_corr.select("S1","S2").filter("S1 != S2").distinct().collect()
wrklist[0:10]

In [None]:
print(len(wrklist))

In [None]:
# THIS CELL TAKES QUITE SOME TIME TO EXECUTE - BE PATIENT!
tcorr = []
tlen = len(wrklist)
for i in range(tlen):
    s1 = wrklist[i][0]
    s2 = wrklist[i][1]
    corr = df_for_corr.filter((F.col('S1') == s1) & (F.col('S2') == s2)).corr("Close1","Close2")
    tcorr.append([s1,s2,corr])
    if(i%20 ==0):
        print(f"Processed: {i} of {tlen}")

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql import Row

schema = StructType([
    StructField("Symbol1", StringType(), True),
    StructField("Symbol2", StringType(), True),
    StructField("Corr", FloatType(), True)
])

rdd = sc.parallelize(tcorr)
df_corr = ss.createDataFrame(rdd.map(lambda x: Row(Symbol1=x[0], Symbol2=x[1], Corr=float(x[2]))), schema)

df_corr.show(5)

In [None]:
df_corr_neg = df_corr.filter(F.col("Corr") <= 0.0).dropDuplicates(["Corr"]).orderBy(F.col("Corr").asc())
df_corr_neg.count()

In [None]:
df_corr_neg.show()

In [None]:
df_2013 = df.filter("SERIES=='EQ'").filter("TIMESTAMP like '%2013'")

first_day_2013 = (df_2013.select("TIMESTAMP").filter("TIMESTAMP like '%JAN-2013'").distinct().orderBy("TIMESTAMP").first())[0]
last_day_2013 = (df_2013.select("TIMESTAMP").filter("TIMESTAMP like '%DEC-2013'").distinct().orderBy("TIMESTAMP",ascending=False).first())[0]

print(first_day_2013,last_day_2013)

In [None]:
def get_price_on_day(loc_stock, loc_date):
    loc_price = df_2013.where(F.col("TIMESTAMP")==loc_date).where(F.col("SYMBOL")==loc_stock).select("CLOSE").collect()[0]
    return((loc_price)[0])

In [None]:
# Selected stocks, based on the analysis
# |     ITC|ALOKTEXT|  -0.90314275|
# |GMRINFRA|     ITC|   -0.7135044|
# |    IDFC|ALOKTEXT|   -0.6409445|
# |HINDALCO|     ITC|  -0.62534785|
# |ALOKTEXT|    NHPC|  -0.33097458|

stock_list = ["ITC","ALOKTEXT","GMRINFRA","IDFC","HINDALCO","NHPC"]
multiplier = [3,3,1,1,1,1]

prices = []
total_profit = 0
total_investment = 0
for the_stock,the_multiplier in zip(stock_list,multiplier):
    first_day_price = get_price_on_day(the_stock,first_day_2013)
    last_day_price  = get_price_on_day(the_stock,last_day_2013)
    diff = (last_day_price - first_day_price)
    total_diff = diff * the_multiplier
    total_profit += total_diff
    total_investment += (first_day_price * the_multiplier)
    prices.append([the_stock,first_day_price,last_day_price,diff,total_diff])

In [None]:
prices

In [None]:
print(total_investment, total_profit)

In [None]:
top25 = df_2012_avgqty.limit(25)
t1 = top25.select("SYMBOL").rdd.flatMap(lambda x: x).collect()
t2 = df_2013.filter(F.col("SYMBOL").isin(t1))
t2.show(10)

In [None]:
first_day_overall = t2.where(F.col("TIMESTAMP")==first_day_2013).select("CLOSE").agg(F.sum("CLOSE")).collect()
last_day_overall = t2.where(F.col("TIMESTAMP")==last_day_2013).select("CLOSE").agg(F.sum("CLOSE")).collect()
total_profit_overall = last_day_overall[0][0] - first_day_overall[0][0]

In [None]:
print(f"Amount: {first_day_overall[0][0]} invested on the first trading day of 2013\n\
has a value: {last_day_overall[0][0]} on the last trading day of 2013\n\
The profit/loss is : {total_profit_overall:.2f} corresponding to {total_profit_overall/first_day_overall[0][0]*100:.2f}%")

##### <b>Performance of the strategy</b>
- If we had invested in all the top 25 stocks, without implementing the negative correlation strategy, 
There would have been a loss of 892 on an investment of 5119 (17.5% loss)
- As against that, by implementing the 'select based on negative correlation' strategy, 
a profit of 18.2 on an investment of 1249 (1.5% profit) has been achieved
- In conclusion, the strategy has definitely prevented portfolio value loss during a bad year. It has, in fact, preserved capital.

In [None]:
ss.stop()
sc.stop()

## <b>Part 2 : <b>Problem to solve</b></b>
1. Which of the following is better, if you have 10 Lakhs to invest for a year: 
    - identify 5 top performing stocks of the previous year and invest in them, or
    - Spread your investment across a basket of 25 stocks, with investments equally distributed among them
    - Employing strategies like 'negative correlation' to select your stocks
    - What if you use 'positive correlation' instead, carry out analysis to understand the portfolio's performance?
2. Do your analysis over multiple years (2011-1012, 2012-2013, etc.) to make your final recommendations 
