# New York Stock Exchange Code (using pySpark)

In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark SQL example").getOrCreate()

In [3]:
NYSE_daily = spark.read.load("/user/NYSE_ASS/NYSE_daily.tsv", format="csv", sep="\t", inferSchema="true", header="false")
NYSE_dividends = spark.read.load("/user/NYSE_ASS/NYSE_dividends.tsv", format="csv", sep="\t", inferSchema="true", header="false")

### Business Scenario:
Daily data from New York Stock Exchange is given for the years 1970 to 2010 for a few companies. Also their
dividends data for these years is given in another file. A couple of lists or reports are to be generated from this
data as specified.

In [4]:
NYSE_daily.createOrReplaceTempView("daily")
NYSE_dividends.createOrReplaceTempView("dividends")

In [5]:
NYSE_daily = (NYSE_daily.withColumnRenamed('_c0','exchange')
              .withColumnRenamed('_c1','stock_symbol')
              .withColumnRenamed('_c2','date')
              .withColumnRenamed('_c3','open_price')
              .withColumnRenamed('_c4','high_price')
              .withColumnRenamed('_c5','low_price')
              .withColumnRenamed('_c6','close_price')
              .withColumnRenamed('_c7','volume')
              .withColumnRenamed('_c8','adjusted_close_price'))

In [6]:
NYSE_dividends = (NYSE_dividends.withColumnRenamed('_c0','exchange')
              .withColumnRenamed('_c1','stock_symbol')
              .withColumnRenamed('_c2','date')
              .withColumnRenamed('_c3','dividends'))

In [7]:
NYSE_daily.createOrReplaceTempView("daily")
NYSE_dividends.createOrReplaceTempView("dividends")

In [8]:
NYSE_daily.createOrReplaceTempView("daily")
NYSE_dividends.createOrReplaceTempView("dividends")

## Question 1

List the companies which have the stock close price more than or equal to 200 and stock volume more than or
equal to 10 million.

In [19]:
q1=spark.sql("SELECT stock_symbol,close_price, volume FROM daily WHERE close_price >=200 and volume >=10000000")
q1.show()

+------------+-----------+--------+
|stock_symbol|close_price|  volume|
+------------+-----------+--------+
|        JNPR|     216.13|13424800|
|        JNPR|     232.58|11323800|
|        JNPR|     213.88|15463100|
|        JNPR|     229.19|16734200|
|        JNPR|      243.0|17288400|
|        JNPR|      228.5|19565000|
|        JNPR|      206.0|16487000|
|        JNPR|     205.94|12586900|
|        JNPR|     207.95|11819100|
|        JNPR|     201.44|13457300|
|        JNPR|     206.13|10621700|
|        JNPR|     225.64|12392300|
|        JNPR|     220.06|12225300|
|        JNPR|     209.92|13482800|
|        JNPR|     209.69|15197300|
|        JNPR|      237.0|12235000|
|        JNPR|      238.0|13748200|
|        JNPR|     224.69|11566000|
|        JNPR|      214.5|10173400|
|        JNPR|     204.13|12961600|
+------------+-----------+--------+
only showing top 20 rows



## Question 2 

List the companies that have given dividends more than 50 times. The list should include the number of times
they have given dividends.

In [20]:
q2=spark.sql("SELECT stock_symbol, count(*) as dividend_count FROM dividends GROUP BY stock_symbol HAVING dividend_count>50")
q2.show()

+------------+--------------+
|stock_symbol|dividend_count|
+------------+--------------+
|         JCP|           114|
|         JEF|            72|
|         JPM|           104|
|         JRO|            63|
|         JFP|            58|
|         JHI|            99|
|         JNJ|           160|
|         JHS|            88|
|         JTP|            91|
|         JOE|            51|
|         JQC|            55|
|         JHP|            85|
|         JPS|            89|
|         JPC|            60|
|         JCI|            97|
|         JFR|            68|
|         JWN|            81|
+------------+--------------+



## Question 3

List the companies along with their close price, dividends and the date when they gave dividends of more than
0.01 when their daily close price was more than or equal to 100 sorted by dividends in ascending order.

In [21]:
q3a = NYSE_daily.join(NYSE_dividends,on=["stock_symbol","date"],how="left")
q3a.show()

+------------+---------+--------+----------+----------+---------+-----------+-------+--------------------+--------+---------+
|stock_symbol|     date|exchange|open_price|high_price|low_price|close_price| volume|adjusted_close_price|exchange|dividends|
+------------+---------+--------+----------+----------+---------+-----------+-------+--------------------+--------+---------+
|         JEF| 2/8/2010|    NYSE|      25.4|     25.49|    24.78|      24.82|1134300|               24.82|    null|     null|
|         JEF| 2/5/2010|    NYSE|     24.91|     25.19|    24.08|      25.01|1765200|               25.01|    null|     null|
|         JEF| 2/4/2010|    NYSE|     26.01|      26.2|    24.85|      24.85|1414400|               24.85|    null|     null|
|         JEF| 2/3/2010|    NYSE|     26.23|     26.76|    26.22|      26.29|1066000|               26.29|    null|     null|
|         JEF| 2/2/2010|    NYSE|     26.08|     26.86|    25.78|      26.46|1496400|               26.46|    null|   

In [22]:
q3=q3a.filter("dividends > .01 and close_price >=100").select("stock_symbol","date","close_price","dividends").orderBy("dividends", asc=True)
q3.show()

+------------+----------+-----------+---------+
|stock_symbol|      date|close_price|dividends|
+------------+----------+-----------+---------+
|         JNJ| 8/17/1987|     101.62|  0.02625|
|         JNJ| 2/11/1992|     104.37|     0.05|
|         JCI|12/10/2003|     109.55|    0.075|
|         JCI| 9/12/2007|     107.34|     0.11|
|         JCI| 6/13/2007|     110.87|     0.11|
|         JNJ|11/12/1999|     103.75|     0.14|
|         JNJ| 5/18/2001|      101.0|     0.18|
|         JPM|  1/2/1998|     110.62|  0.20667|
|         JPM| 10/2/1997|     120.44|  0.20667|
|         JPM|  4/2/1998|     139.31|     0.24|
|         JLL| 5/11/2007|     117.09|     0.35|
+------------+----------+-----------+---------+



## Question 4

Save the above lists as comma separated files.

In [26]:
import pandas as pd
q1.toPandas().to_csv('answer1.csv')

In [27]:
q2.toPandas().to_csv('answer2.csv')

In [28]:
q3.toPandas().to_csv('answer3.csv')