# Business Scenario:
Daily data from New York Stock Exchange is given for the years 1970 to 2010 for a few companies. Also their
dividends data for these years is given in another file. A couple of lists or reports are to be generated from this
data as specified.

## ProblemStatement:
with the given files available on HDFS, using SparkSQL the following reports/lists need to be generated.

## 1)List the companies which have the stock close price more than or equal to 200 and stock volume more than or equal to 10 million.
## 2)List the companies that have given dividends more than 50 times. The list shouldinclude the number of times they have given dividends.
## 3)List the companies along with their close price, dividends and the date when they gave dividends of more than 0.01 when their daily close price was more than or equal to 100sorted by dividendsin ascending order.
## 4)Save the above lists as comma separated files.
## 5)Generate a JSON file containing the list of companies for the year 2009 with the numberof dividends and date when they gave, indicating 0 if they have not given dividends on certain dates and clubbing all the prices i.e. open, high, low,close and adjusted close into a nested tuplenamed price_details.Show two of the above lists:
### a.One sorted by dividends in descending order and
### b.Another sorted by dividends in ascending order.

## Importing Lib's

In [293]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import * 
from datetime import date, timedelta, datetime
import time

## Initializing SparkSession

In [294]:
sc = SparkSession.builder.appName("PysparkAssignment")\
.config ("spark.sql.shuffle.partitions", "50")\
.config("spark.driver.maxResultSize","5g")\
.config ("spark.sql.execution.arrow.enabled", "true")\
.getOrCreate()
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

## Creating Data Frames

1. First File = "NYSE_dividends.tsv"
2. Second File = "NYSE_daily.tsv"
3. **Data Dictionory for above files**

**NYSE_DAILY**    
1. exchange      
2. stock symbol
3. date
4. open price
5. high price
6. low price
7. close price
8. volume
9. adjusted close_price

**NYSE_dividends.tsv**
1. exchange
2. stock symbol
3. date
4. dividends

In [295]:
NYSE_df = sc.read.csv("NYSE_daily.tsv", sep=r'\t', header=False)

In [296]:
NYSE_df.printSchema

<bound method DataFrame.printSchema of DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string]>

In [297]:
NYSE_df.show(3)

+----+---+--------+-----+-----+-----+-----+-------+-----+
| _c0|_c1|     _c2|  _c3|  _c4|  _c5|  _c6|    _c7|  _c8|
+----+---+--------+-----+-----+-----+-----+-------+-----+
|NYSE|JEF|2/8/2010| 25.4|25.49|24.78|24.82|1134300|24.82|
|NYSE|JEF|2/5/2010|24.91|25.19|24.08|25.01|1765200|25.01|
|NYSE|JEF|2/4/2010|26.01| 26.2|24.85|24.85|1414400|24.85|
+----+---+--------+-----+-----+-----+-----+-------+-----+
only showing top 3 rows



In [298]:
NYSE_diviends = sc.read.csv("NYSE_dividends.tsv", sep=r'\t', header=False)

In [299]:
NYSE_diviends.printSchema

<bound method DataFrame.printSchema of DataFrame[_c0: string, _c1: string, _c2: string, _c3: string]>

In [300]:
NYSE_diviends.show(3)

+----+---+----------+-----+
| _c0|_c1|       _c2|  _c3|
+----+---+----------+-----+
|NYSE|JAH|12/30/2009|0.075|
|NYSE|JAH| 9/29/2009|0.075|
|NYSE|JGT|12/11/2009|0.377|
+----+---+----------+-----+
only showing top 3 rows



**Renaming Columns Names**
1. NYSE Dataset

In [301]:
NYSE =NYSE_df.withColumnRenamed("_c0","Exchange").withColumnRenamed("_c1","Stock_symbol").withColumnRenamed("_c2","Date").withColumnRenamed("_c3","Open_price").withColumnRenamed("_c4","High_price").withColumnRenamed("_c5","Low_price").withColumnRenamed("_c6","Close").withColumnRenamed("_c7","Volume").withColumnRenamed("_c8","Adjusted_close_price")

In [302]:
NYSE

DataFrame[Exchange: string, Stock_symbol: string, Date: string, Open_price: string, High_price: string, Low_price: string, Close: string, Volume: string, Adjusted_close_price: string]

In [303]:
NYSE.show(3)

+--------+------------+--------+----------+----------+---------+-----+-------+--------------------+
|Exchange|Stock_symbol|    Date|Open_price|High_price|Low_price|Close| Volume|Adjusted_close_price|
+--------+------------+--------+----------+----------+---------+-----+-------+--------------------+
|    NYSE|         JEF|2/8/2010|      25.4|     25.49|    24.78|24.82|1134300|               24.82|
|    NYSE|         JEF|2/5/2010|     24.91|     25.19|    24.08|25.01|1765200|               25.01|
|    NYSE|         JEF|2/4/2010|     26.01|      26.2|    24.85|24.85|1414400|               24.85|
+--------+------------+--------+----------+----------+---------+-----+-------+--------------------+
only showing top 3 rows



2. NYSE Divdends Dataset

In [304]:
NYSE_div =NYSE_diviends.withColumnRenamed("_c0","Exchange").withColumnRenamed("_c1","Stock_symbol").withColumnRenamed("_c2","Date").withColumnRenamed("_c3","Dividends")

In [305]:
NYSE_div

DataFrame[Exchange: string, Stock_symbol: string, Date: string, Dividends: string]

In [306]:
NYSE_div.show(3)

+--------+------------+----------+---------+
|Exchange|Stock_symbol|      Date|Dividends|
+--------+------------+----------+---------+
|    NYSE|         JAH|12/30/2009|    0.075|
|    NYSE|         JAH| 9/29/2009|    0.075|
|    NYSE|         JGT|12/11/2009|    0.377|
+--------+------------+----------+---------+
only showing top 3 rows



**Chaning Data types for computing**

In [307]:
NYSE = NYSE.withColumn('Open_price', F.col('Open_price').cast(DoubleType()))
NYSE = NYSE.withColumn('High_price', F.col('High_price').cast(DoubleType()))
NYSE = NYSE.withColumn('Low_price', F.col('Low_price').cast(DoubleType()))
NYSE = NYSE.withColumn('Close', F.col('Close').cast(DoubleType()))
NYSE = NYSE.withColumn('Volume', F.col('Volume').cast(IntegerType()))
NYSE = NYSE.withColumn('Adjusted_close_price', F.col('Adjusted_close_price').cast(DoubleType()))

In [308]:
NYSE.show(3)

+--------+------------+--------+----------+----------+---------+-----+-------+--------------------+
|Exchange|Stock_symbol|    Date|Open_price|High_price|Low_price|Close| Volume|Adjusted_close_price|
+--------+------------+--------+----------+----------+---------+-----+-------+--------------------+
|    NYSE|         JEF|2/8/2010|      25.4|     25.49|    24.78|24.82|1134300|               24.82|
|    NYSE|         JEF|2/5/2010|     24.91|     25.19|    24.08|25.01|1765200|               25.01|
|    NYSE|         JEF|2/4/2010|     26.01|      26.2|    24.85|24.85|1414400|               24.85|
+--------+------------+--------+----------+----------+---------+-----+-------+--------------------+
only showing top 3 rows



In [309]:
NYSE_div = NYSE_div.withColumn('Dividends', F.col('Dividends').cast(DoubleType()))

In [310]:
NYSE_div.show(10)

+--------+------------+----------+---------+
|Exchange|Stock_symbol|      Date|Dividends|
+--------+------------+----------+---------+
|    NYSE|         JAH|12/30/2009|    0.075|
|    NYSE|         JAH| 9/29/2009|    0.075|
|    NYSE|         JGT|12/11/2009|    0.377|
|    NYSE|         JGT| 9/11/2009|    0.377|
|    NYSE|         JGT| 6/11/2009|    0.377|
|    NYSE|         JGT| 3/11/2009|    0.377|
|    NYSE|         JGT|12/11/2008|    0.377|
|    NYSE|         JGT| 9/11/2008|    0.451|
|    NYSE|         JGT| 6/11/2008|    0.451|
|    NYSE|         JGT| 3/12/2008|    0.451|
+--------+------------+----------+---------+
only showing top 10 rows



## Problem Statement:

## 1) List the companies which have the stock close price more than or equal to 200 and stock volume more than or equal to 10 million.

In [311]:
NYSE.filter((NYSE.Close>=200) & (NYSE.Volume >=1000000)).show()

+--------+------------+----------+----------+----------+---------+------+--------+--------------------+
|Exchange|Stock_symbol|      Date|Open_price|High_price|Low_price| Close|  Volume|Adjusted_close_price|
+--------+------------+----------+----------+----------+---------+------+--------+--------------------+
|    NYSE|        JNPR| 11/7/2000|    217.56|     218.5|    208.5|210.69| 9590000|              210.69|
|    NYSE|        JNPR| 11/6/2000|    216.63|    221.75|   209.31|218.81| 9084400|              218.81|
|    NYSE|        JNPR| 11/3/2000|     198.0|    216.88|   196.25|216.13|13424800|              216.13|
|    NYSE|        JNPR|10/24/2000|     224.5|    228.75|    215.0|218.13| 9032300|              218.13|
|    NYSE|        JNPR|10/23/2000|    233.25|    236.75|   221.69|224.88| 8265500|              224.88|
|    NYSE|        JNPR|10/20/2000|    229.75|     239.5|    227.5| 232.0| 9223400|               232.0|
|    NYSE|        JNPR|10/19/2000|    229.13|    234.31|    220.

## 2) List the companies that have given dividends more than 50 times. The list should include the number of times they have given dividends.

In [312]:
NYSE_div.registerTempTable('NYSE_temp')
sql_query = sqlContext.sql('select Stock_symbol, count(Dividends * 50) as Dividends_Count from NYSE_temp group by Stock_symbol having Dividends_count > 50')
sql_query.show()

+------------+---------------+
|Stock_symbol|Dividends_Count|
+------------+---------------+
|         JCI|             97|
|         JFP|             58|
|         JTP|             91|
|         JCP|            114|
|         JOE|             51|
|         JFR|             68|
|         JHI|             99|
|         JEF|             72|
|         JQC|             55|
|         JHP|             85|
|         JPM|            104|
|         JNJ|            160|
|         JWN|             81|
|         JHS|             88|
|         JPS|             89|
|         JPC|             60|
|         JRO|             63|
+------------+---------------+



## 3)List the companies along with their close price, dividends and the date when they gave dividends of more than 0.01 when their daily close price was more than or equal to 100 sorted by dividends in ascending order.

In [334]:
df_join = NYSE.join(NYSE_div,on=["Stock_symbol","Date"],how="inner")
result = df_join.filter("Dividends > .01 and Close >=100").select("Stock_symbol","Date","Close","Dividends").orderBy("Dividends", asc=True)
result_df = result.toPandas()
result_df.head()

Unnamed: 0,Stock_symbol,Date,Close,Dividends
0,JNJ,8/17/1987,101.62,0.02625
1,JNJ,2/11/1992,104.37,0.05
2,JCI,12/10/2003,109.55,0.075
3,JCI,9/12/2007,107.34,0.11
4,JCI,6/13/2007,110.87,0.11


## 4) Save the above lists as comma separated files.

In [335]:
result_df.to_csv("Results.csv", header =True, sep = ',')

## 5) Generate a JSON file containing the list of companies for the year 2009 with the number of dividends and date when they gave, indicating 0 if they have not given dividends on certain dates and clubbing all the prices i.e. open, high, low, close and adjusted close into a nested tuple named price_details. Show two of the above lists: 
### a. One sorted by dividends in descending order and
### b. Another sorted by dividends in ascending order.

**Changing Date String type to Date DateType**

In [340]:
NYSE_div.registerTempTable('NYSE_temp')
NYSE.registerTempTable('NYSE_Demo')

sql_queryA = sqlContext.sql("Select a.Stock_symbol,a.Date,b.Dividends,(a.Open_price + a.Close + a.High_price + a.Adjusted_close_price) as Price_Details, case when b.Date is null then 0 else b.Date end as Div_status from NYSE_demo as a inner join NYSE_temp as b on (a.Stock_symbol = b.Stock_symbol AND a.Date = b.Date) where right(a.Date,4)= '2009' order by b.Dividends asc")
sql_queryA.show()
sql_queryA.toPandas().to_json(r"ResultA.json")

+------------+----------+---------+------------------+----------+
|Stock_symbol|      Date|Dividends|     Price_Details|Div_status|
+------------+----------+---------+------------------+----------+
|         JKH| 3/25/2009|    0.003|            208.75| 3/25/2009|
|         JKK| 9/23/2009|    0.008|            254.86| 9/23/2009|
|         JMP| 5/20/2009|     0.01|28.810000000000002| 5/20/2009|
|         JMP| 3/18/2009|     0.01|              17.1| 3/18/2009|
|         JMP| 8/19/2009|     0.01|41.489999999999995| 8/19/2009|
|         JMP|11/18/2009|     0.01|              38.9|11/18/2009|
|         JFP|12/11/2009|    0.015|              9.18|12/11/2009|
|         JKK| 6/23/2009|    0.016|210.29000000000002| 6/23/2009|
|         JRN| 2/20/2009|     0.02|4.7299999999999995| 2/20/2009|
|         JFP| 9/11/2009|    0.022|             11.41| 9/11/2009|
|         JFP|11/10/2009|    0.022|              9.29|11/10/2009|
|         JFP|10/13/2009|    0.022|11.870000000000001|10/13/2009|
|         

In [339]:
NYSE_div.registerTempTable('NYSE_temp')
NYSE.registerTempTable('NYSE_Demo')

sql_queryB = sqlContext.sql("Select a.Stock_symbol,a.Date,b.Dividends,(a.Open_price + a.Close + a.High_price + a.Adjusted_close_price) as Price_Details, case when b.Date is null then 0 else b.Date end as Div_status from NYSE_demo as a inner join NYSE_temp as b on (a.Stock_symbol = b.Stock_symbol AND a.Date = b.Date) where right(a.Date,4)= '2009' order by b.Dividends desc")
sql_queryB.show()
sql_queryB.toPandas().to_json(r"ResultB.json")

+------------+----------+---------+------------------+----------+
|Stock_symbol|      Date|Dividends|     Price_Details|Div_status|
+------------+----------+---------+------------------+----------+
|         JXI| 6/22/2009|    1.113|            165.54| 6/22/2009|
|         JBI| 8/12/2009|    0.984|             99.98| 8/12/2009|
|         JBI| 2/11/2009|    0.984|             96.56| 2/11/2009|
|         JBR| 8/27/2009|    0.875|             76.72| 8/27/2009|
|         JZK|11/10/2009|    0.875| 65.30000000000001|11/10/2009|
|         JZV|11/10/2009|    0.875|             81.13|11/10/2009|
|         JZV| 5/12/2009|    0.875|             57.33| 5/12/2009|
|         JZK| 5/12/2009|    0.875|             52.07| 5/12/2009|
|         JBN| 2/25/2009|    0.875|             46.18| 2/25/2009|
|         JBR| 2/25/2009|    0.875|              45.9| 2/25/2009|
|         JBN| 8/27/2009|    0.875| 85.32999999999998| 8/27/2009|
|         JZJ| 5/12/2009|    0.797|             89.95| 5/12/2009|
|         