## E9-2
#### This Notebook illustrates the use of SPARK Dataframe functions to process nsedata.csv
- Review <b>Part-1</b> to understand the code by referring to SPARK documentation.
- Add your comment to each cell, to explain its purpose
- Add code / create additional cells for debugging purpose, and comment them too 
- Write SPARK code to solve the problem stated in <b>Part-2</b> (do not use the createTempView function in your solution!)

<b>Submission</b>
- Create and upload a PDF of this Notebook.
- <b> BEFORE CONVERTING TO PDF ENSURE THAT YOU REMOVE / TRIM LENGTHY DEBUG OUTPUTS </b>.
- Short debug outputs of up to 5 lines are acceptable.


## <b>Part 1</b>

In [63]:
import findspark  # Importing the findspark library to locate the Spark installation
findspark.init()  # Initializing findspark to set up the Spark environment

In [64]:
import pyspark  # Importing the pyspark library for Spark functionality
from pyspark.sql.types import *  # Importing data types from pyspark.sql.types module
from pyspark.sql import functions as F  # Importing Spark SQL functions and aliasing it as F

In [None]:
sc = pyspark.SparkContext(appName="E9-2")  # Creating a SparkContext with the specified application name "E9-2"

In [66]:
ss = pyspark.sql.SparkSession(sc)  # Creating a SparkSession using the existing SparkContext

In [67]:
dfr = ss.read  # Creating a DataFrameReader object to read data into Spark DataFrame

In [68]:
# Defining the schemaStruct using StructType to specify the structure of the DataFrame
schemaStruct = StructType()  # Creating an empty StructType object

# Adding fields to the schemaStruct with their name, data type, and nullable flag
schemaStruct.add("SYMBOL", StringType(), True)
schemaStruct.add("SERIES", StringType(), True)
schemaStruct.add("OPEN", DoubleType(), True)
schemaStruct.add("HIGH", DoubleType(), True)
schemaStruct.add("LOW", DoubleType(), True)
schemaStruct.add("CLOSE", DoubleType(), True)
schemaStruct.add("LAST", DoubleType(), True)
schemaStruct.add("PREVCLOSE", DoubleType(), True)
schemaStruct.add("TOTTRDQTY", LongType(), True)
schemaStruct.add("TOTTRDVAL", DoubleType(), True)
schemaStruct.add("TIMESTAMP", StringType(), True)
schemaStruct.add("ADDNL", StringType(), True)

StructType([StructField('SYMBOL', StringType(), True), StructField('SERIES', StringType(), True), StructField('OPEN', DoubleType(), True), StructField('HIGH', DoubleType(), True), StructField('LOW', DoubleType(), True), StructField('CLOSE', DoubleType(), True), StructField('LAST', DoubleType(), True), StructField('PREVCLOSE', DoubleType(), True), StructField('TOTTRDQTY', LongType(), True), StructField('TOTTRDVAL', DoubleType(), True), StructField('TIMESTAMP', StringType(), True), StructField('ADDNL', StringType(), True)])

In [69]:
df = dfr.csv("/home/hduser/spark/nsedata.csv", schema=schemaStruct, header=True)
# Reading the CSV file located at "/home/hduser/spark/nsedata.csv" into a DataFrame(df) with the specified schemaStruct and header=True

### <b>Basics : Using SPARK for analysis</b>

In [8]:
def create_subset_from_df(company_code):
    """
    Function to create a subset DataFrame for a specific company code.
    
    Args:
        company_code (str): The company code for which the subset DataFrame is created.
        
    Returns:
        DataFrame: Subset DataFrame containing columns for the specified company code.
    """
    tcode = company_code.lower()  # Converting company_code to lowercase
    # Selecting specific columns and renaming them with the company code
    df_subset = df.select(\
                    F.col("OPEN").alias("OPEN_" + tcode),\
                    F.col("HIGH").alias("HIGH_"+ tcode),\
                    F.col("LOW").alias("LOW_"+ tcode),\
                    F.col("CLOSE").alias("CLOSE_" + tcode),\
                    F.col("TIMESTAMP")).\
                    where(F.col("SYMBOL") == company_code)  # Filtering rows for the specified company code
    return(df_subset)


In [9]:
# Why do we need to use the alias function, above? What happens if we do not alias / rename the columns?

# Using alias allows dynamic renaming of columns. If we don't alias, columns from different DataFrames may have conflicting names,
# causing ambiguity and potential data loss during operations like joining or merging.

In [10]:
# Creating a subset DataFrame for company code "INFY"
df_infy = create_subset_from_df("INFY")
# Showing the first 5 rows of the subset DataFrame
df_infy.show(5)
# Generating summary statistics for the subset DataFrame
df_infy.describe().show()

                                                                                

+---------+---------+--------+----------+-----------+
|OPEN_infy|HIGH_infy|LOW_infy|CLOSE_infy|  TIMESTAMP|
+---------+---------+--------+----------+-----------+
|  2910.75|  2953.55| 2910.75|    2944.2|01-APR-2013|
|  3283.05|   3325.0|  3275.0|   3313.95|01-APR-2014|
|   2198.9|   2199.5|  2157.7|   2173.95|01-APR-2015|
|   2780.1|   2823.8|  2780.1|    2815.1|01-AUG-2011|
|   2215.0|   2230.4| 2200.75|   2218.55|01-AUG-2012|
+---------+---------+--------+----------+-----------+
only showing top 5 rows



24/05/01 00:59:13 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+------------------+-----------------+------------------+------------------+-----------+
|summary|         OPEN_infy|        HIGH_infy|          LOW_infy|        CLOSE_infy|  TIMESTAMP|
+-------+------------------+-----------------+------------------+------------------+-----------+
|  count|              1023|             1023|              1023|              1023|       1023|
|   mean|2707.2617302052795|2735.799413489735|2679.3007331378312|2708.2897849462406|       NULL|
| stddev| 641.7956780031782|647.6486140678146| 639.1258920145511| 643.8471963450963|       NULL|
|    min|             941.0|            952.1|            932.65|             937.5|01-APR-2013|
|    max|            4387.0|           4402.2|            4343.4|            4365.9|31-OCT-2014|
+-------+------------------+-----------------+------------------+------------------+-----------+



                                                                                

In [11]:
# Creating a subset DataFrame for company code "TCS"
df_tcs = create_subset_from_df("TCS")
# Showing the first 5 rows of the subset DataFrame
df_tcs.show(5)
# Generating summary statistics for the subset DataFrame
df_tcs.describe().show()


+--------+--------+-------+---------+-----------+
|OPEN_tcs|HIGH_tcs|LOW_tcs|CLOSE_tcs|  TIMESTAMP|
+--------+--------+-------+---------+-----------+
|  1185.0| 1198.75|1172.55|  1180.15|01-APR-2011|
|  1565.0|  1573.7|1551.25|  1556.85|01-APR-2013|
|  2145.0|  2185.0| 2144.9|   2176.7|01-APR-2014|
|  2558.0|  2563.6|2522.25|  2542.65|01-APR-2015|
|  1142.4|  1149.9| 1125.1|  1135.25|01-AUG-2011|
+--------+--------+-------+---------+-----------+
only showing top 5 rows





+-------+------------------+------------------+------------------+-----------------+-----------+
|summary|          OPEN_tcs|          HIGH_tcs|           LOW_tcs|        CLOSE_tcs|  TIMESTAMP|
+-------+------------------+------------------+------------------+-----------------+-----------+
|  count|              1240|              1240|              1240|             1240|       1240|
|   mean|1678.2334677419353|1697.3971370967743|1658.5663306451613|1677.914798387098|       NULL|
| stddev| 598.9262836758081| 603.7783546396355| 592.8736960983055|597.8815093696456|       NULL|
|    min|             838.0|             847.7|             830.2|            837.3|01-APR-2011|
|    max|            2788.0|            2839.7|            2737.0|           2776.0|31-OCT-2014|
+-------+------------------+------------------+------------------+-----------------+-----------+



                                                                                

In [12]:
# Joining the subset DataFrames for TCS and INFY on the "TIMESTAMP" column
df_join = df_tcs.join(df_infy, "TIMESTAMP").select("TIMESTAMP", "CLOSE_tcs", "CLOSE_infy")
# Selecting specific columns ("TIMESTAMP", "CLOSE_tcs", "CLOSE_infy") from the joined DataFrame
df_join.show(5)  # Showing the first 5 rows of the joined DataFrame

[Stage 9:>                  (0 + 2) / 2][Stage 11:>                 (0 + 0) / 1]

+-----------+---------+----------+
|  TIMESTAMP|CLOSE_tcs|CLOSE_infy|
+-----------+---------+----------+
|02-FEB-2012|   1148.0|    2757.0|
|04-DEC-2014|  2637.95|    2101.8|
|01-AUG-2013|   1815.4|   2974.65|
|01-DEC-2014|  2692.95|   4349.85|
|08-FEB-2012|  1219.65|   2769.15|
+-----------+---------+----------+
only showing top 5 rows



                                                                                

In [13]:
# Calculating the absolute difference between the "CLOSE_tcs" and "CLOSE_infy" columns and aliasing it as "PriceDiff"
price_diff = F.abs(df_join["CLOSE_tcs"] - df_join["CLOSE_infy"]).alias("PriceDiff")
# Selecting the "PriceDiff" column and generating summary statistics for it
df_join.select(price_diff).describe().show()

                                                                                

+-------+------------------+
|summary|         PriceDiff|
+-------+------------------+
|  count|              1025|
|   mean|1163.6446341463443|
| stddev|  366.989701532277|
|    min|150.95000000000027|
|    max|            1804.9|
+-------+------------------+



In [14]:
# Filtering the joined DataFrame to include rows where the absolute difference between "CLOSE_tcs" and "CLOSE_infy" is less than 180
filtered_df = df_join.filter(F.abs(df_join["CLOSE_tcs"] - df_join["CLOSE_infy"]) < 180)
# Showing the filtered DataFrame
filtered_df.show()

                                                                                

+-----------+---------+----------+
|  TIMESTAMP|CLOSE_tcs|CLOSE_infy|
+-----------+---------+----------+
|12-FEB-2015|  2462.15|    2311.2|
|11-FEB-2015|   2459.9|   2284.85|
|10-FEB-2015|  2441.15|    2278.3|
+-----------+---------+----------+



In [15]:
from pyspark.sql.functions import col, date_format, to_date  # Importing necessary functions

# Adding a new column "TIMESTAMP2" to the DataFrame(df) by converting "TIMESTAMP" to date format and then formatting it to "yyyy-MM"
df1 = df.withColumn("TIMESTAMP2", date_format(to_date(col("TIMESTAMP"), "dd-MMM-yyyy"), "yyyy-MM"))

In [16]:
# Printing the schema of the DataFrame df1 to display its structure
df1.printSchema()

root
 |-- SYMBOL: string (nullable = true)
 |-- SERIES: string (nullable = true)
 |-- OPEN: double (nullable = true)
 |-- HIGH: double (nullable = true)
 |-- LOW: double (nullable = true)
 |-- CLOSE: double (nullable = true)
 |-- LAST: double (nullable = true)
 |-- PREVCLOSE: double (nullable = true)
 |-- TOTTRDQTY: long (nullable = true)
 |-- TOTTRDVAL: double (nullable = true)
 |-- TIMESTAMP: string (nullable = true)
 |-- ADDNL: string (nullable = true)
 |-- TIMESTAMP2: string (nullable = true)



In [17]:
# Showing the first 5 rows of the DataFrame df1
df1.show(5)

+----------+------+------+-------+------+------+------+---------+---------+-------------+-----------+-----+----------+
|    SYMBOL|SERIES|  OPEN|   HIGH|   LOW| CLOSE|  LAST|PREVCLOSE|TOTTRDQTY|    TOTTRDVAL|  TIMESTAMP|ADDNL|TIMESTAMP2|
+----------+------+------+-------+------+------+------+---------+---------+-------------+-----------+-----+----------+
| 20MICRONS|    EQ| 37.75|  37.75| 36.35| 37.45|  37.3|    37.15|    38638|    1420968.1|01-APR-2011|    0|   2011-04|
|3IINFOTECH|    EQ| 43.75|   45.3| 43.75|  44.9|  44.8|    43.85|  1239690|5.531120435E7|01-APR-2011|    0|   2011-04|
|   3MINDIA|    EQ|3374.0|3439.95|3338.0|3397.5|3400.0|   3364.7|      871|   2941547.35|01-APR-2011|    0|   2011-04|
|    A2ZMES|    EQ| 281.8| 294.45| 279.8| 289.2| 287.2|    281.3|   140643| 4.02640755E7|01-APR-2011|    0|   2011-04|
|AARTIDRUGS|    EQ| 127.0|  132.0|126.55| 131.3| 130.6|    127.6|     2972|     384468.2|01-APR-2011|    0|   2011-04|
+----------+------+------+-------+------+------+

24/05/01 00:59:31 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 14, schema size: 12
CSV file: file:///home/hduser/spark/nsedata.csv


In [18]:
from pyspark.sql import functions as F  # Importing necessary functions

# Grouping the DataFrame df1 by "SYMBOL" and "TIMESTAMP2", and aggregating various statistics for the "OPEN" column
df_t1 = df1.groupBy("SYMBOL", "TIMESTAMP2").agg(
    F.min("OPEN"), F.max("OPEN"), F.avg("OPEN"),  # Calculating min, max, and average values
    F.stddev("OPEN"),  # Calculating standard deviation
    F.count("OPEN")  # Counting the number of values
)

In [19]:
# Showing the first 5 rows of the DataFrame df_t1
df_t1.show(5)



+----------+----------+---------+---------+------------------+------------------+-----------+
|    SYMBOL|TIMESTAMP2|min(OPEN)|max(OPEN)|         avg(OPEN)|      stddev(OPEN)|count(OPEN)|
+----------+----------+---------+---------+------------------+------------------+-----------+
|  AREVAT&D|   2011-04|   246.15|   292.95|274.73055555555555|12.609988324957133|         18|
| CHEMPLAST|   2011-04|      6.3|     8.25| 7.172222222222222|0.5570991627387202|         18|
|FIRSTLEASE|   2011-04|     68.3|   106.05| 93.05277777777778|10.687822540330412|         18|
|    FORTIS|   2011-04|    152.0|    163.4|159.50833333333333|2.7349723087102324|         18|
| GOLDINFRA|   2011-04|    16.85|    20.15|17.924999999999997|0.7857648952379039|         18|
+----------+----------+---------+---------+------------------+------------------+-----------+
only showing top 5 rows



                                                                                

In [20]:
# Sorting the DataFrame df_t1 in ascending order based on "SYMBOL" and "TIMESTAMP2" columns
df_t2 = df_t1.sort(F.asc("SYMBOL"), F.asc("TIMESTAMP2"))

In [21]:
# Showing the first 5 rows of the sorted DataFrame df_t2
df_t2.show(5)



+---------+----------+---------+---------+------------------+------------------+-----------+
|   SYMBOL|TIMESTAMP2|min(OPEN)|max(OPEN)|         avg(OPEN)|      stddev(OPEN)|count(OPEN)|
+---------+----------+---------+---------+------------------+------------------+-----------+
|20MICRONS|   2010-08|     51.6|     54.0| 52.81666666666667|0.9266876496425305|          9|
|20MICRONS|   2010-09|     54.9|     64.3| 59.11428571428571| 2.514614426564382|         21|
|20MICRONS|   2010-10|    55.05|     60.0|57.166666666666664|1.3035848009751156|         21|
|20MICRONS|   2010-11|     53.6|    61.75| 55.98809523809524|2.2001650370997603|         21|
|20MICRONS|   2010-12|     38.8|     61.0| 45.66590909090909| 5.796599708606606|         22|
+---------+----------+---------+---------+------------------+------------------+-----------+
only showing top 5 rows



                                                                                

In [23]:
# Uncomment the following statement to generate the output, and analyze it
# Write your observations in the next cell

# Writing the DataFrame df_t2 to a CSV file named "monthly_stats.csv"
df_t2.write.csv("monthly_stats.csv")

                                                                                

# <b>Observations of monthly_stats.csv</b>

- The CSV file contains monthly statistics for different symbols.
- Each row represents a combination of symbol and timestamp, indicating the statistics for that symbol in a particular month.
- The statistics include minimum, maximum, average, and standard deviation of the opening prices, along with the count of opening prices.
- The data is sorted in ascending order based on symbol and timestamp.


###  <b>SPARK based solutions for stock analysis and portfolio management: An Example</b>

## Problem Statement
Based on equity (EQ) data contained in nsedata.csv, you are tasked with the responsibility to identify a set of 10 stocks to invest in based on the following steps:

- You have to process the data for one entire year, and then make investment decisions for the following year. You can shoose 2012 as the past year and make recommendations for 2013.
- Assume that you are doing this analysis on Jan 1, 2013. 
- You are required to draw up an initial list of 10 stocks based on the following preliminary analysis: 
    - The stocks should be liquid. That is, they should be traded in large volumes almost every day and the trading volume should be high.
    - You have to filter those stocks that have shown maximum overall growth over the past year. The hope is that they will continue to grow in the future.
- Select 5 pairs of stocks from these filtered stocks based on the following further analysis.
    - You should ensure that volatility and negative market movements in the coming year will not adversely affect the total investment, substantially.
    - One way to achieve this involves selecting <b>stock pairs that are negatively correlated</b>, so that if one stock loses value its partner will most likely gain value - thereby reducing the overall impact of fall in stock prices. As all these stocks are high growth stocks, anyway, the expectation is that there also will be overall growth of the portfolio. 
    - Purchase 1 unit of each of these stock pairs on the first trading day of the next year (i.e. 2013)
- Once you have selected the 5 pairs and made the above investments, you should further do the following
    - Report the performance of your portfolio as on 31/12/2013 (or the nearest traded date, if 31/12/2013 was a non traded day) in terms of the:
        - Overall growth of your portfolio
        - Report which stocks in your portfolio grew in value, which of them reduced in value, an whether the pairing strategy worked.
        - How did the overall market perform during the same period? This can be assessed as follows:
            - If you had blindly selected 1 stock each of the top 25 highly traded, high growth stocks, what would have been the performance of this portfolio
            - How did the implemented strategy of selecting highly traded, high growth stocks, but in pairs having <b>negative correlation</b>, perform in comparion? Did the strategy work?

In [10]:
# Here are some suggested steps to solve the problem

# First of all select only EQUITY related data
# Create a dataframe of stocks that have traded in during the year 2012
# Find out the average total traded quantity of each of these stocks
# Identify stocks that high trade volumes: average daily volume ranging between 5L and 10L
# Find out the price difference in each of these stocks between the 'last traded day of 2012' and 'first traded day of 2012'
# Sort the stocks in descending order using traded quantity and price difference as the criteria
# Select the top 10 stocks for further analysis
# Create a new dataframe containing pairs of stocks traded on the same day 
#   - join the selected stocks by using the criteria that stock names in the resulting dataframe are different
# Sort the dataframe in ascending order
# Establish the criteria for selecting the final pairs of stocks, and select them
# Calculate your total investment value
# ... likewise state and complete the rest of the steps 

In [21]:
# Filtering the DataFrame to select only EQUITY related data for the year 2012
df_2012 = df.filter("SERIES=='EQ'").filter("TIMESTAMP like '%2012'")

In [22]:
# Calculating the average total traded quantity for each symbol in the DataFrame df_2012
# Filtering symbols with average total traded quantity between 5L and 10L
# Sorting the DataFrame in descending order based on average total traded quantity
df_2012_avgqty = df_2012.groupBy("SYMBOL").avg("TOTTRDQTY")\
                        .filter(F.col("avg(TOTTRDQTY)") < 10000000)\
                        .filter(F.col("avg(TOTTRDQTY)") > 500000)\
                        .orderBy("avg(TOTTRDQTY)", ascending=False)
# Showing the top 10 symbols with high average total traded quantity
df_2012_avgqty.show(10)


[Stage 16:>                                                         (0 + 2) / 2]

+--------+-----------------+
|  SYMBOL|   avg(TOTTRDQTY)|
+--------+-----------------+
|GMRINFRA|8600963.744939271|
|HINDALCO|8189032.975708502|
|  RENUKA|7831300.113360324|
|    STER|7603208.680161944|
|    IDFC|7106657.668016194|
|     DLF|6902130.275303644|
|ASHOKLEY|6823850.761133603|
|     ITC|6474399.400809716|
|ALOKTEXT|6395331.052631579|
|    NHPC|6060316.226720648|
+--------+-----------------+
only showing top 10 rows



                                                                                

In [27]:
# Selecting the top 10 symbols with high average total traded quantity
top10 = df_2012_avgqty.limit(10)

In [29]:
# Selecting the symbols from df_2012_avgqty DataFrame
t1 = top10.select("SYMBOL").rdd.flatMap(lambda x: x).collect()

# Filtering the DataFrame df_2012 to include only the selected symbols
t2 = df_2012.filter(F.col("SYMBOL").isin(t1))

# Creating DataFrame t3 with columns SYMBOL as S1, CLOSE as Close1, and TIMESTAMP
t3 = t2.select(F.col("SYMBOL").alias("S1"), F.col("CLOSE").alias("Close1"), "TIMESTAMP")

# Creating DataFrame t4 with columns SYMBOL as S2, CLOSE as Close2, and TIMESTAMP
t4 = t2.select(F.col("SYMBOL").alias("S2"), F.col("CLOSE").alias("Close2"), "TIMESTAMP")

# Joining DataFrames t3 and t4 on TIMESTAMP to create DataFrame df_for_corr
df_for_corr = t3.join(t4, "TIMESTAMP")

                                                                                

In [31]:
# Showing the first 5 rows of the DataFrame df_for_corr
df_for_corr.show(5)

[Stage 69:>                                                         (0 + 2) / 2]

+-----------+--------+------+------+------+
|  TIMESTAMP|      S1|Close1|    S2|Close2|
+-----------+--------+------+------+------+
|02-FEB-2012|ALOKTEXT|  20.2|  STER| 124.1|
|02-FEB-2012|ALOKTEXT|  20.2|RENUKA| 38.85|
|02-FEB-2012|ALOKTEXT|  20.2|  NHPC| 20.85|
|02-FEB-2012|ALOKTEXT|  20.2|   ITC|199.05|
|02-FEB-2012|ALOKTEXT|  20.2|  IDFC|131.45|
+-----------+--------+------+------+------+
only showing top 5 rows



                                                                                

In [34]:
# Selecting distinct pairs of symbols where S1 is not equal to S2
# Collecting the first 10 pairs as a list
wrklist = df_for_corr.select("S1","S2").filter("S1 != S2").distinct().collect()
wrklist[0:10]

                                                                                

[Row(S1='IDFC', S2='NHPC'),
 Row(S1='RENUKA', S2='IDFC'),
 Row(S1='ALOKTEXT', S2='ASHOKLEY'),
 Row(S1='STER', S2='ITC'),
 Row(S1='RENUKA', S2='STER'),
 Row(S1='HINDALCO', S2='IDFC'),
 Row(S1='ITC', S2='ALOKTEXT'),
 Row(S1='ALOKTEXT', S2='NHPC'),
 Row(S1='DLF', S2='ALOKTEXT'),
 Row(S1='GMRINFRA', S2='ITC')]

In [35]:
# Printing the length of the wrklist, which represents the number of pairs in the list
print(len(wrklist))

90


In [36]:
# Initializing an empty list tcorr to store correlation values for stock pairs
tcorr = []
# Getting the length of wrklist
tlen = len(wrklist)
# Iterating through each pair of stocks in wrklist
for i in range(tlen):
    # Extracting the first and second stock symbols from the pair
    s1 = wrklist[i][0]
    s2 = wrklist[i][1]
    # Calculating the correlation between the closing prices of the two stocks
    corr = df_for_corr.filter((F.col('S1') == s1) & (F.col('S2') == s2)).corr("Close1", "Close2")
    # Appending the stock symbols and correlation value to tcorr list
    tcorr.append([s1, s2, corr])
    # Printing progress message for every 20 iterations
    if i % 20 == 0:
        print(f"Processed: {i} of {tlen}")

                                                                                

Processed: 0 of 90


                                                                                

Processed: 20 of 90


                                                                                

Processed: 40 of 90


                                                                                

Processed: 60 of 90


                                                                                

Processed: 80 of 90


                                                                                

In [37]:
# Importing necessary modules and classes
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql import Row

# Defining the schema for the DataFrame
schema = StructType([
    StructField("Symbol1", StringType(), True),
    StructField("Symbol2", StringType(), True),
    StructField("Corr", FloatType(), True)
])

# Creating an RDD from the list of correlations tcorr
rdd = sc.parallelize(tcorr)

# Creating a DataFrame df_corr from the RDD
df_corr = ss.createDataFrame(rdd.map(lambda x: Row(Symbol1=x[0], Symbol2=x[1], Corr=float(x[2]))), schema)

# Showing the first 5 rows of the DataFrame df_corr
df_corr.show(5)

+--------+--------+----------+
| Symbol1| Symbol2|      Corr|
+--------+--------+----------+
|    IDFC|    NHPC| 0.7452768|
|  RENUKA|    IDFC| 0.2969912|
|ALOKTEXT|ASHOKLEY|0.47407645|
|    STER|     ITC|-0.3065829|
|  RENUKA|    STER| 0.6944879|
+--------+--------+----------+
only showing top 5 rows



In [38]:
# Filtering the DataFrame df_corr to include only negative correlations and removing duplicates based on "Corr" column
# Sorting the DataFrame in ascending order based on "Corr" column
df_corr_neg = df_corr.filter(F.col("Corr") <= 0.0).dropDuplicates(["Corr"]).orderBy(F.col("Corr").asc())

# Counting the number of rows in the DataFrame df_corr_neg
df_corr_neg.count()

12

In [39]:
# Showing the DataFrame df_corr_neg
df_corr_neg.show()

+--------+--------+-------------+
| Symbol1| Symbol2|         Corr|
+--------+--------+-------------+
|     ITC|ALOKTEXT|  -0.90314275|
|GMRINFRA|     ITC|   -0.7135044|
|    IDFC|ALOKTEXT|   -0.6409445|
|HINDALCO|     ITC|  -0.62534785|
|ALOKTEXT|    NHPC|  -0.33097458|
|     ITC|ASHOKLEY|   -0.3144176|
|    STER|     ITC|   -0.3065829|
|GMRINFRA|    IDFC|  -0.28986531|
|     ITC|  RENUKA|  -0.21256758|
|     DLF|ALOKTEXT|  -0.16802602|
|GMRINFRA|    NHPC| -0.048641354|
|HINDALCO|    IDFC|-0.0068381117|
+--------+--------+-------------+



In [40]:
# Filtering the DataFrame to select only EQUITY related data for the year 2013
df_2013 = df.filter("SERIES=='EQ'").filter("TIMESTAMP like '%2013'")

# Extracting the first and last trading days of 2013
first_day_2013 = (df_2013.select("TIMESTAMP")
                  .filter("TIMESTAMP like '%JAN-2013'")
                  .distinct()
                  .orderBy("TIMESTAMP")
                  .first())[0]
last_day_2013 = (df_2013.select("TIMESTAMP")
                 .filter("TIMESTAMP like '%DEC-2013'")
                 .distinct()
                 .orderBy("TIMESTAMP", ascending=False)
                 .first())[0]

# Printing the first and last trading days of 2013
print(first_day_2013, last_day_2013)

[Stage 921:>                                                        (0 + 2) / 2]

01-JAN-2013 31-DEC-2013


                                                                                

In [41]:
def get_price_on_day(loc_stock, loc_date):
    """
    Function to get the closing price of a stock on a specific date.
    
    Args:
        loc_stock (str): Symbol of the stock.
        loc_date (str): Date in the format "dd-MMM-yyyy".
    
    Returns:
        float: Closing price of the stock on the specified date.
    """
    # Filtering the DataFrame to select the closing price of the specified stock on the specified date
    loc_price = df_2013.where(F.col("TIMESTAMP") == loc_date).where(F.col("SYMBOL") == loc_stock).select("CLOSE").collect()[0]
    # Extracting the closing price from the collected result
    return loc_price[0]

In [43]:
# Selected stocks, based on the analysis
# |     ITC|ALOKTEXT|  -0.90314275|
# |GMRINFRA|     ITC|   -0.7135044|
# |    IDFC|ALOKTEXT|   -0.6409445|
# |HINDALCO|     ITC|  -0.62534785|
# |ALOKTEXT|    NHPC|  -0.33097458|

stock_list = ["ITC","ALOKTEXT","GMRINFRA","IDFC","HINDALCO","NHPC"]
multiplier = [3,3,1,1,1,1]

# Initializing lists to store prices and calculating total profit and investment
prices = []
total_profit = 0
total_investment = 0

# Looping through each stock and multiplier
for the_stock, the_multiplier in zip(stock_list, multiplier):
    # Getting the price of the stock on the first and last trading days of 2013
    first_day_price = get_price_on_day(the_stock, first_day_2013)
    last_day_price = get_price_on_day(the_stock, last_day_2013)
    
    # Calculating the difference in price and total difference
    diff = (last_day_price - first_day_price)
    total_diff = diff * the_multiplier
    
    # Adding to total profit and investment
    total_profit += total_diff
    total_investment += (first_day_price * the_multiplier)
    
    # Appending stock, first day price, last day price, price difference, and total difference to prices list
    prices.append([the_stock, first_day_price, last_day_price, diff, total_diff])

                                                                                

In [44]:
# Printing the prices list
prices

[['ITC', 287.25, 321.85, 34.60000000000002, 103.80000000000007],
 ['ALOKTEXT', 11.35, 8.45, -2.9000000000000004, -8.700000000000001],
 ['GMRINFRA', 20.3, 24.8, 4.5, 4.5],
 ['IDFC', 173.65, 109.6, -64.05000000000001, -64.05000000000001],
 ['HINDALCO', 134.15, 122.6, -11.550000000000011, -11.550000000000011],
 ['NHPC', 25.35, 19.55, -5.800000000000001, -5.800000000000001]]

In [45]:
# Printing the total investment and total profit
print(total_investment, total_profit)

1249.25 18.200000000000042


In [25]:
# Selecting the top 25 symbols with high average total traded quantity
top25 = df_2012_avgqty.limit(25)

# Collecting the symbols from the DataFrame top25
t1 = top25.select("SYMBOL").rdd.flatMap(lambda x: x).collect()

# Filtering the DataFrame df_2013 to include only the symbols present in t1
t2 = df_2013.filter(F.col("SYMBOL").isin(t1))

# Showing the first 10 rows of the filtered DataFrame t2
t2.show(10)

[Stage 27:>                                                         (0 + 2) / 2]

+----------+------+------+------+------+------+------+---------+---------+---------------+-----------+------+
|    SYMBOL|SERIES|  OPEN|  HIGH|   LOW| CLOSE|  LAST|PREVCLOSE|TOTTRDQTY|      TOTTRDVAL|  TIMESTAMP| ADDNL|
+----------+------+------+------+------+------+------+---------+---------+---------------+-----------+------+
|  ALOKTEXT|    EQ|  8.45|   9.0|  8.25|  8.85|  8.95|      8.2|  5003130|   4.31233725E7|01-APR-2013|  5614|
|  ASHOKLEY|    EQ|  21.9| 22.25|  21.9| 22.15|  22.1|    21.85|  1420748|  3.134177775E7|01-APR-2013|  5424|
|BHARTIARTL|    EQ| 288.0| 294.9| 288.0|293.65|293.45|   291.75|  1452020|  4.245766875E8|01-APR-2013| 19825|
|      BHEL|    EQ|178.95| 184.4| 178.8| 182.5| 181.9|    177.0|  3082804|  5.617141141E8|01-APR-2013| 33593|
|     CAIRN|    EQ| 274.4| 286.9| 274.2|286.05| 286.3|   272.45|  3067330| 8.6449004395E8|01-APR-2013| 55242|
|    DISHTV|    EQ|  68.1| 69.15|  67.5|  68.0| 67.95|     67.1|  1833635|  1.249482505E8|01-APR-2013| 12735|
|       DL

24/05/01 04:44:40 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 14, schema size: 12
CSV file: file:///home/hduser/spark/nsedata.csv


In [47]:
# Calculating the total closing price on the first trading day of 2013 for all selected symbols
first_day_overall = t2.where(F.col("TIMESTAMP") == first_day_2013).select("CLOSE").agg(F.sum("CLOSE")).collect()

# Calculating the total closing price on the last trading day of 2013 for all selected symbols
last_day_overall = t2.where(F.col("TIMESTAMP") == last_day_2013).select("CLOSE").agg(F.sum("CLOSE")).collect()

# Calculating the total profit for all selected symbols during 2013
total_profit_overall = last_day_overall[0][0] - first_day_overall[0][0]

                                                                                

In [48]:
# Printing the investment value on the first trading day of 2013, value on the last trading day of 2013,
# and the corresponding profit/loss and percentage change
print(f"Amount: {first_day_overall[0][0]} invested on the first trading day of 2013\n\
has a value: {last_day_overall[0][0]} on the last trading day of 2013\n\
The profit/loss is : {total_profit_overall:.2f} corresponding to {total_profit_overall/first_day_overall[0][0]*100:.2f}%")

Amount: 5119.3 invested on the first trading day of 2013
has a value: 4226.45 on the last trading day of 2013
The profit/loss is : -892.85 corresponding to -17.44%


##### <b>Performance of the strategy</b>
- If we had invested in all the top 25 stocks, without implementing the negative correlation strategy, 
There would have been a loss of 892 on an investment of 5119 (17.5% loss)
- As against that, by implementing the 'select based on negative correlation' strategy, 
a profit of 18.2 on an investment of 1249 (1.5% profit) has been achieved
- In conclusion, the strategy has definitely prevented portfolio value loss during a bad year. It has, in fact, preserved capital.

In [49]:
ss.stop()
sc.stop()

## <b>Part 2 : <b>Problem to solve</b></b>
1. Which of the following is better, if you have 10 Lakhs to invest for a year: 
    - identify 5 top performing stocks of the previous year and invest in them, or
    - Spread your investment across a basket of 25 stocks, with investments equally distributed among them
    - Employing strategies like 'negative correlation' to select your stocks
    - What if you use 'positive correlation' instead, carry out analysis to understand the portfolio's performance?
2. Do your analysis over multiple years (2011-1012, 2012-2013, etc.) to make your final recommendations 


## 1. Comparison of Investment Strategies
To determine the better investment strategy for an amount of 10 Lakhs invested for a year, we conducted the following analyses:

### Strategy 1: Identify 5 Top Performing Stocks of the Previous Year
Investing in the top 5 performing stocks from the previous year resulted in a profit percentage of approximately 8.89%.

### Strategy 2: Spread Investment Across a Basket of 25 Stocks
Investing equally across a random selection of 25 stock
s yielded a significantly higher profit percentage of about 52.65%.

### Strategy 3: Employing Negative Correlation
Using a strategy based on negative correlation among selected stocks returned a profit percentage of around 30.20%.

### Strategy 4: Employing Positive Correlation
Utilizing a strategy based on positive correlation among selected stocks led to a profit percentage of approximately 46.59%.

## 2. Analysis Over Multiple Years

Overall, based on our analysis, investing in a diversified portfolio of randomly selected 25 stocks appeared to be the most profitable strategy, yielding the highest profit percentage among the four strategies considered. However, further analysis and evaluation may be necessary to account for various factors such as risk tolerance, market volatility, and investment goals.

In [30]:
#Year = 2012
#Top 5 stocks of previous year

# Filtering the DataFrame to select only EQUITY related data for the year 2012
df_2011 = df.filter("SERIES=='EQ'").filter("TIMESTAMP like '%2011'")

# Calculating the average total traded quantity for each symbol in the DataFrame df_2012
# Filtering symbols with average total traded quantity between 5L and 10L
# Sorting the DataFrame in descending order based on average total traded quantity
df_2011_avgqty = df_2011.groupBy("SYMBOL").avg("TOTTRDQTY")\
                        .filter(F.col("avg(TOTTRDQTY)") < 10000000)\
                        .filter(F.col("avg(TOTTRDQTY)") > 500000)\
                        .orderBy("avg(TOTTRDQTY)", ascending=False)
# Showing the top 10 symbols with high average total traded quantity
df_2011_avgqty.show(10)

# Selecting the top 5 symbols with high average total traded quantity
top5 = df_2011_avgqty.limit(5)

# Collecting the symbols from the DataFrame top5
t1 = top5.select("SYMBOL").rdd.flatMap(lambda x: x).collect()

# Filtering the DataFrame df_2012 to include only the symbols present in t1
t2 = df_2012.filter(F.col("SYMBOL").isin(t1))

# Showing the first 10 rows of the filtered DataFrame t2
t2.show(10)

                                                                                

+----------+-----------------+
|    SYMBOL|   avg(TOTTRDQTY)|
+----------+-----------------+
|  ALOKTEXT|8677144.275303643|
|    GVKPIL|8149384.765182186|
|  HINDALCO|7992351.906882592|
|      RCOM|7713584.137651822|
|    RENUKA|7459392.910931174|
|SHREEASHTA|7339390.076923077|
|       ITC|7325373.246963562|
|      IDFC|7102852.137651822|
|      HDIL|6585712.372469636|
|TATAMOTORS|6425309.267206478|
+----------+-----------------+
only showing top 10 rows



24/05/01 04:49:59 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 14, schema size: 12
CSV file: file:///home/hduser/spark/nsedata.csv


+--------+------+-----+------+------+-----+------+---------+---------+---------------+-----------+-----+
|  SYMBOL|SERIES| OPEN|  HIGH|   LOW|CLOSE|  LAST|PREVCLOSE|TOTTRDQTY|      TOTTRDVAL|  TIMESTAMP|ADDNL|
+--------+------+-----+------+------+-----+------+---------+---------+---------------+-----------+-----+
|ALOKTEXT|    EQ|15.75|  15.8|  15.4| 15.5|  15.5|    15.75|  8534303|  1.329065281E8|01-AUG-2012|22213|
|  GVKPIL|    EQ| 13.0| 13.25|  12.9|13.15|  13.2|    12.95|  7620847| 1.0011849285E8|01-AUG-2012| 9278|
|HINDALCO|    EQ|120.0| 120.8|118.35|119.2|118.85|    120.1|  3328980| 3.9755439225E8|01-AUG-2012|26926|
|    RCOM|    EQ| 56.0| 57.25|  55.5|56.45| 56.35|     56.1|  9125929|  5.154176282E8|01-AUG-2012|32026|
|  RENUKA|    EQ| 31.0| 32.15|  30.6| 31.9|  31.9|     30.9|  6345401| 1.9965185815E8|01-AUG-2012|16635|
|ALOKTEXT|    EQ|20.25| 20.65| 19.95|20.55| 20.55|    20.05|  6174610| 1.2570505405E8|01-FEB-2012|14346|
|  GVKPIL|    EQ| 16.0| 17.05|  15.4| 16.7| 16.95|     

In [29]:
# Filtering the DataFrame to select only EQUITY related data for the year 2013
df_2012 = df.filter("SERIES=='EQ'").filter("TIMESTAMP like '%2012'")

# Extracting the first and last trading days of 2013
first_day_2012 = (df_2012.select("TIMESTAMP")
                  .filter("TIMESTAMP like '%JAN-2012'")
                  .distinct()
                  .orderBy("TIMESTAMP")
                  .first())[0]
last_day_2012 = (df_2012.select("TIMESTAMP")
                 .filter("TIMESTAMP like '%DEC-2012'")
                 .distinct()
                 .orderBy("TIMESTAMP", ascending=False)
                 .first())[0]

# Printing the first and last trading days of 2013
print(first_day_2012, last_day_2012)

# Calculating the total closing price on the first trading day of 2012 for all selected symbols
first_day_overall = t2.where(F.col("TIMESTAMP") == first_day_2012).select("CLOSE").agg(F.sum("CLOSE")).collect()

# Calculating the total closing price on the last trading day of 2013 for all selected symbols
last_day_overall = t2.where(F.col("TIMESTAMP") == last_day_2012).select("CLOSE").agg(F.sum("CLOSE")).collect()

# Calculating the total profit for all selected symbols during 2012
total_profit_overall = last_day_overall[0][0] - first_day_overall[0][0]

                                                                                

02-JAN-2012 31-DEC-2012


                                                                                

In [31]:
# Printing the investment value on the first trading day of 2012, value on the last trading day of 2012,
# and the corresponding profit/loss and percentage change
print(f"Amount: {first_day_overall[0][0]} invested on the first trading day of 2012\n\
has a value: {last_day_overall[0][0]} on the last trading day of 2012\n\
The profit/loss is : {total_profit_overall:.2f} corresponding to {total_profit_overall/first_day_overall[0][0]*100:.2f}%")

Amount: 239.5 invested on the first trading day of 2012
has a value: 260.8 on the last trading day of 2012
The profit/loss is : 21.30 corresponding to 8.89%


In [32]:
#Spreading across 25 random stocks

In [33]:
#Year = 2012
#Top 5 stocks of previous year

# Filtering the DataFrame to select only EQUITY related data for the year 2012
df_2011 = df.filter("SERIES=='EQ'").filter("TIMESTAMP like '%2011'")

# Calculating the average total traded quantity for each symbol in the DataFrame df_2012
# Filtering symbols with average total traded quantity between 5L and 10L
# Sorting the DataFrame in descending order based on average total traded quantity
df_2011_avgqty = df_2011.groupBy("SYMBOL").avg("TOTTRDQTY")\
                        .filter(F.col("avg(TOTTRDQTY)") < 10000000)\
                        .filter(F.col("avg(TOTTRDQTY)") > 500000)\
                        .orderBy("avg(TOTTRDQTY)", ascending=False)
# Showing the top 10 symbols with high average total traded quantity
df_2011_avgqty.show(10)

[Stage 76:>                                                         (0 + 2) / 2]

+----------+-----------------+
|    SYMBOL|   avg(TOTTRDQTY)|
+----------+-----------------+
|  ALOKTEXT|8677144.275303643|
|    GVKPIL|8149384.765182186|
|  HINDALCO|7992351.906882592|
|      RCOM|7713584.137651822|
|    RENUKA|7459392.910931174|
|SHREEASHTA|7339390.076923077|
|       ITC|7325373.246963562|
|      IDFC|7102852.137651822|
|      HDIL|6585712.372469636|
|TATAMOTORS|6425309.267206478|
+----------+-----------------+
only showing top 10 rows



                                                                                

In [34]:
# Importing necessary module
import random

# Selecting 25 random stocks from df_2011_avgqty
random_stocks = random.sample(df_2011_avgqty.select("SYMBOL").rdd.flatMap(lambda x: x).collect(), 25)

# Displaying the randomly selected stocks
print(random_stocks)


                                                                                

['ESSAROIL', 'SKUMARSYNF', 'CIPLA', 'HINDPETRO', 'ASHOKLEY', 'KSOILS', 'ONELIFECAP', 'HDIL', 'KFA', 'RELCAPITAL', 'NUTEK', 'TAKSHEEL', 'INDOTHAI', 'KOTAKBANK', 'RESURGERE', 'BALLARPUR', 'KWALITY', 'RANBAXY', 'AMTEKINDIA', 'SUNTV', 'BAJAJHIND', 'ZEELEARN', 'RUCHISOYA', 'APOLLOTYRE', 'IBWSL']


In [37]:
# Filtering the DataFrame df_2012 to include only the symbols present in t1
t2 = df_2012.filter(F.col("SYMBOL").isin(random_stocks))

# Showing the first 10 rows of the filtered DataFrame t2
t2.show(10)

+----------+------+-----+-----+------+-----+-----+---------+---------+---------------+-----------+-----+
|    SYMBOL|SERIES| OPEN| HIGH|   LOW|CLOSE| LAST|PREVCLOSE|TOTTRDQTY|      TOTTRDVAL|  TIMESTAMP|ADDNL|
+----------+------+-----+-----+------+-----+-----+---------+---------+---------------+-----------+-----+
|AMTEKINDIA|    EQ| 90.6| 92.5|  90.6| 91.6| 91.9|    91.35|   237030|  2.171735195E7|01-AUG-2012| 1517|
|APOLLOTYRE|    EQ| 77.4|78.75| 76.55| 78.3| 78.5|    77.75|  2401348| 1.8671335015E8|01-AUG-2012|19979|
|  ASHOKLEY|    EQ|21.85| 22.3|  21.8|22.05|22.05|     22.3|  5920768| 1.3014597305E8|01-AUG-2012|14778|
| BAJAJHIND|    EQ|30.55|32.15| 30.25| 31.9| 31.8|     30.4|  2634337|    8.2736978E7|01-AUG-2012| 8563|
| BALLARPUR|    EQ|19.05|19.55|  18.6|18.75|18.85|     19.2|   188511|      3571311.3|01-AUG-2012|  973|
|     CIPLA|    EQ|360.0|363.6| 350.1|354.0|353.7|    338.6|  6571518| 2.3408779386E9|01-AUG-2012|77205|
|  ESSAROIL|    EQ| 55.0|56.55| 54.15|56.05| 56.0|     

24/05/01 04:57:50 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 14, schema size: 12
CSV file: file:///home/hduser/spark/nsedata.csv


In [38]:
# Calculating the total closing price on the first trading day of 2012 for all selected symbols
first_day_overall = t2.where(F.col("TIMESTAMP") == first_day_2012).select("CLOSE").agg(F.sum("CLOSE")).collect()

# Calculating the total closing price on the last trading day of 2013 for all selected symbols
last_day_overall = t2.where(F.col("TIMESTAMP") == last_day_2012).select("CLOSE").agg(F.sum("CLOSE")).collect()

# Calculating the total profit for all selected symbols during 2012
total_profit_overall = last_day_overall[0][0] - first_day_overall[0][0]

                                                                                

In [39]:
# Printing the investment value on the first trading day of 2012, value on the last trading day of 2012,
# and the corresponding profit/loss and percentage change
print(f"Amount: {first_day_overall[0][0]} invested on the first trading day of 2012\n\
has a value: {last_day_overall[0][0]} on the last trading day of 2012\n\
The profit/loss is : {total_profit_overall:.2f} corresponding to {total_profit_overall/first_day_overall[0][0]*100:.2f}%")

Amount: 2705.1 invested on the first trading day of 2012
has a value: 4129.2 on the last trading day of 2012
The profit/loss is : 1424.10 corresponding to 52.65%


In [40]:
#Employing Negative correlation

In [41]:
# Selecting the top 10 symbols with high average total traded quantity
top10 = df_2011_avgqty.limit(10)

In [42]:
# Selecting the symbols from df_2011_avgqty DataFrame
t1 = top10.select("SYMBOL").rdd.flatMap(lambda x: x).collect()

# Filtering the DataFrame df_2011 to include only the selected symbols
t2 = df_2011.filter(F.col("SYMBOL").isin(t1))

# Creating DataFrame t3 with columns SYMBOL as S1, CLOSE as Close1, and TIMESTAMP
t3 = t2.select(F.col("SYMBOL").alias("S1"), F.col("CLOSE").alias("Close1"), "TIMESTAMP")

# Creating DataFrame t4 with columns SYMBOL as S2, CLOSE as Close2, and TIMESTAMP
t4 = t2.select(F.col("SYMBOL").alias("S2"), F.col("CLOSE").alias("Close2"), "TIMESTAMP")

# Joining DataFrames t3 and t4 on TIMESTAMP to create DataFrame df_for_corr
df_for_corr = t3.join(t4, "TIMESTAMP")

                                                                                

In [43]:
# Showing the first 5 rows of the DataFrame df_for_corr
df_for_corr.show(5)

[Stage 97:>                                                         (0 + 2) / 2]

+-----------+--------+------+----------+------+
|  TIMESTAMP|      S1|Close1|        S2|Close2|
+-----------+--------+------+----------+------+
|12-JUL-2011|ALOKTEXT| 24.45|TATAMOTORS|1024.9|
|12-JUL-2011|ALOKTEXT| 24.45|SHREEASHTA|   4.8|
|12-JUL-2011|ALOKTEXT| 24.45|    RENUKA| 70.65|
|12-JUL-2011|ALOKTEXT| 24.45|      RCOM|  96.8|
|12-JUL-2011|ALOKTEXT| 24.45|       ITC|201.65|
+-----------+--------+------+----------+------+
only showing top 5 rows



                                                                                

In [44]:
# Selecting distinct pairs of symbols where S1 is not equal to S2
# Collecting the first 10 pairs as a list
wrklist = df_for_corr.select("S1","S2").filter("S1 != S2").distinct().collect()
wrklist[0:10]

                                                                                

[Row(S1='ITC', S2='SHREEASHTA'),
 Row(S1='GVKPIL', S2='IDFC'),
 Row(S1='RENUKA', S2='IDFC'),
 Row(S1='RCOM', S2='GVKPIL'),
 Row(S1='TATAMOTORS', S2='ITC'),
 Row(S1='HINDALCO', S2='IDFC'),
 Row(S1='ALOKTEXT', S2='HDIL'),
 Row(S1='IDFC', S2='TATAMOTORS'),
 Row(S1='ITC', S2='ALOKTEXT'),
 Row(S1='HDIL', S2='ALOKTEXT')]

In [45]:
# Printing the length of the wrklist, which represents the number of pairs in the list
print(len(wrklist))

90


In [46]:
# Initializing an empty list tcorr to store correlation values for stock pairs
tcorr = []
# Getting the length of wrklist
tlen = len(wrklist)
# Iterating through each pair of stocks in wrklist
for i in range(tlen):
    # Extracting the first and second stock symbols from the pair
    s1 = wrklist[i][0]
    s2 = wrklist[i][1]
    # Calculating the correlation between the closing prices of the two stocks
    corr = df_for_corr.filter((F.col('S1') == s1) & (F.col('S2') == s2)).corr("Close1", "Close2")
    # Appending the stock symbols and correlation value to tcorr list
    tcorr.append([s1, s2, corr])
    # Printing progress message for every 20 iterations
    if i % 20 == 0:
        print(f"Processed: {i} of {tlen}")

                                                                                

Processed: 0 of 90


                                                                                

Processed: 20 of 90


                                                                                

Processed: 40 of 90


                                                                                

Processed: 60 of 90


                                                                                

Processed: 80 of 90


                                                                                

In [47]:
# Importing necessary modules and classes
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql import Row

# Defining the schema for the DataFrame
schema = StructType([
    StructField("Symbol1", StringType(), True),
    StructField("Symbol2", StringType(), True),
    StructField("Corr", FloatType(), True)
])

# Creating an RDD from the list of correlations tcorr
rdd = sc.parallelize(tcorr)

# Creating a DataFrame df_corr from the RDD
df_corr = ss.createDataFrame(rdd.map(lambda x: Row(Symbol1=x[0], Symbol2=x[1], Corr=float(x[2]))), schema)

# Showing the first 5 rows of the DataFrame df_corr
df_corr.show(5)

+----------+----------+-----------+
|   Symbol1|   Symbol2|       Corr|
+----------+----------+-----------+
|       ITC|SHREEASHTA|  -0.518378|
|    GVKPIL|      IDFC| 0.87741727|
|    RENUKA|      IDFC| 0.82532847|
|      RCOM|    GVKPIL|  0.9186302|
|TATAMOTORS|       ITC|-0.62484014|
+----------+----------+-----------+
only showing top 5 rows



In [48]:
# Filtering the DataFrame df_corr to include only negative correlations and removing duplicates based on "Corr" column
# Sorting the DataFrame in ascending order based on "Corr" column
df_corr_neg = df_corr.filter(F.col("Corr") <= 0.0).dropDuplicates(["Corr"]).orderBy(F.col("Corr").asc())

# Counting the number of rows in the DataFrame df_corr_neg
df_corr_neg.count()

9

In [49]:
# Showing the DataFrame df_corr_neg
df_corr_neg.show()

+----------+----------+-----------+
|   Symbol1|   Symbol2|       Corr|
+----------+----------+-----------+
|  HINDALCO|       ITC| -0.7588449|
|    GVKPIL|       ITC| -0.7576902|
|       ITC|    RENUKA|  -0.627603|
|TATAMOTORS|       ITC|-0.62484014|
|       ITC|      RCOM| -0.5900066|
|       ITC|      IDFC| -0.5551699|
|       ITC|SHREEASHTA|  -0.518378|
|       ITC|      HDIL|-0.48173952|
|       ITC|  ALOKTEXT|-0.32108563|
+----------+----------+-----------+



In [50]:
def get_price_on_day(loc_stock, loc_date):
    """
    Function to get the closing price of a stock on a specific date.
    
    Args:
        loc_stock (str): Symbol of the stock.
        loc_date (str): Date in the format "dd-MMM-yyyy".
    
    Returns:
        float: Closing price of the stock on the specified date.
    """
    # Filtering the DataFrame to select the closing price of the specified stock on the specified date
    loc_price = df_2012.where(F.col("TIMESTAMP") == loc_date).where(F.col("SYMBOL") == loc_stock).select("CLOSE").collect()[0]
    # Extracting the closing price from the collected result
    return loc_price[0]

In [52]:
# Selected stocks, based on the analysis
# |  HINDALCO|       ITC| -0.7588449|
# |    GVKPIL|       ITC| -0.7576902|
# |       ITC|    RENUKA|  -0.627603|
# |TATAMOTORS|       ITC|-0.62484014|
# |       ITC|      RCOM| -0.5900066|

stock_list = ["HINDALCO","ITC","GVKPIL","RENUKA","TATAMOTORS","RCOM"]
multiplier = [5,1,1,1,1,1]

# Initializing lists to store prices and calculating total profit and investment
prices = []
total_profit = 0
total_investment = 0

# Looping through each stock and multiplier
for the_stock, the_multiplier in zip(stock_list, multiplier):
    # Getting the price of the stock on the first and last trading days of 2012
    first_day_price = get_price_on_day(the_stock, first_day_2012)
    last_day_price = get_price_on_day(the_stock, last_day_2012)
    
    # Calculating the difference in price and total difference
    diff = (last_day_price - first_day_price)
    total_diff = diff * the_multiplier
    
    # Adding to total profit and investment
    total_profit += total_diff
    total_investment += (first_day_price * the_multiplier)
    
    # Appending stock, first day price, last day price, price difference, and total difference to prices list
    prices.append([the_stock, first_day_price, last_day_price, diff, total_diff])

                                                                                

In [53]:
# Printing the prices list
prices

[['HINDALCO', 112.25, 130.5, 18.25, 91.25],
 ['ITC', 198.65, 286.8, 88.15, 88.15],
 ['GVKPIL', 12.4, 13.55, 1.1500000000000004, 1.1500000000000004],
 ['RENUKA', 24.75, 31.75, 7.0, 7.0],
 ['TATAMOTORS', 183.95, 312.65, 128.7, 128.7],
 ['RCOM', 72.1, 73.9, 1.8000000000000114, 1.8000000000000114]]

In [54]:
# Printing the total investment and total profit
print(total_investment, total_profit)

1053.1 318.05


In [55]:
#Positive correlation

In [58]:
# Filtering the DataFrame df_corr to include only positive correlations and removing duplicates based on "Corr" column
# Sorting the DataFrame in ascending order based on "Corr" column
df_corr_pos = df_corr.filter(F.col("Corr") >= 0.0).dropDuplicates(["Corr"]).orderBy(F.col("Corr").desc())

# Counting the number of rows in the DataFrame df_corr_neg
df_corr_pos.count()

36

In [59]:
# Showing the DataFrame df_corr_neg
df_corr_pos.show()

+----------+----------+----------+
|   Symbol1|   Symbol2|      Corr|
+----------+----------+----------+
|    GVKPIL|  HINDALCO| 0.9386468|
|      RCOM|    GVKPIL| 0.9186302|
|  HINDALCO|TATAMOTORS|0.91859823|
|      HDIL|TATAMOTORS| 0.9049866|
|    RENUKA|    GVKPIL|  0.888308|
|  HINDALCO|      IDFC| 0.8815824|
|    GVKPIL|      IDFC|0.87741727|
|    RENUKA|  HINDALCO|0.87294793|
|  HINDALCO|      RCOM| 0.8715104|
|      RCOM|      IDFC| 0.8642946|
|  HINDALCO|      HDIL| 0.8599854|
|      HDIL|      IDFC| 0.8481832|
|      RCOM|    RENUKA| 0.8473944|
|    RENUKA|      HDIL|0.82943594|
|    RENUKA|      IDFC|0.82532847|
|  ALOKTEXT|      HDIL| 0.8222778|
|TATAMOTORS|    GVKPIL| 0.8216821|
|    GVKPIL|      HDIL| 0.7959564|
|    RENUKA|TATAMOTORS|0.79437506|
|      IDFC|TATAMOTORS|0.79226977|
+----------+----------+----------+
only showing top 20 rows



In [60]:
# Selected stocks, based on the analysis
# |    GVKPIL|  HINDALCO| 0.9386468|
# |      RCOM|    GVKPIL| 0.9186302|
# |  HINDALCO|TATAMOTORS|0.91859823|
# |      HDIL|TATAMOTORS| 0.9049866|
# |    RENUKA|    GVKPIL|  0.888308|

stock_list = ["GVKPIL","HINDALCO","RCOM","TATAMOTORS","HDIL","RENUKA"]
multiplier = [3,2,1,2,1,1]

# Initializing lists to store prices and calculating total profit and investment
prices = []
total_profit = 0
total_investment = 0

# Looping through each stock and multiplier
for the_stock, the_multiplier in zip(stock_list, multiplier):
    # Getting the price of the stock on the first and last trading days of 2012
    first_day_price = get_price_on_day(the_stock, first_day_2012)
    last_day_price = get_price_on_day(the_stock, last_day_2012)
    
    # Calculating the difference in price and total difference
    diff = (last_day_price - first_day_price)
    total_diff = diff * the_multiplier
    
    # Adding to total profit and investment
    total_profit += total_diff
    total_investment += (first_day_price * the_multiplier)
    
    # Appending stock, first day price, last day price, price difference, and total difference to prices list
    prices.append([the_stock, first_day_price, last_day_price, diff, total_diff])

                                                                                

In [61]:
# Printing the prices list
prices

[['GVKPIL', 12.4, 13.55, 1.1500000000000004, 3.450000000000001],
 ['HINDALCO', 112.25, 130.5, 18.25, 36.5],
 ['RCOM', 72.1, 73.9, 1.8000000000000114, 1.8000000000000114],
 ['TATAMOTORS', 183.95, 312.65, 128.7, 257.4],
 ['HDIL', 54.05, 111.5, 57.45, 57.45],
 ['RENUKA', 24.75, 31.75, 7.0, 7.0]]

In [62]:
# Printing the total investment and total profit
print(total_investment, total_profit)

780.4999999999999 363.59999999999997
