###General Instructions
In this assignment, you will need to complete the code samples where indicated to accomplish the given objectives. **Be sure to run all cells** and export this notebook as an HTML with results included.  Upload the exported HTML file to Canvas by the assignment deadline.

####Assignment
Complete the following python script per the instructions provided at the top of each code block. Look for the 
*# MODIFY THIS LINE* comment to indicate where you need to make code modifications. Do not add or remove any lines to this code. Everything should be able to be performed with the provided number of code lines.

In [3]:
# set directory variables
input_dir_name = 'wasbs://downloads@smithbc.blob.core.windows.net/nyse/'
output_dir_name = '/tmp/ibm_highest_1990s'

In [4]:
# define a schema for the nyse pricing data
from pyspark.sql.types import *
nyse_schema = StructType([
  StructField('exchange', StringType()),
  StructField('symbol', StringType()),
  StructField('date', DateType()),
  StructField('price_open', DoubleType()),
  StructField('price_high', DoubleType()),
  StructField('price_low', DoubleType()),
  StructField('price_close', DoubleType()),
  StructField('volume', IntegerType()),
  StructField('price_adj_close', DoubleType())  
  ]) # MODIFY THIS LINE

# load the nyse pricing data into a dataframe applying the schema 
# created in the previous step
df = spark.read.csv(  
  'wasbs://downloads@smithbc.blob.core.windows.net/nyse/', 
  header=True, 
  schema=nyse_schema,
  dateFormat='yyyy-MM-dd'
  )# MODIFY THIS LINE

# using the programmatic sql api, add a field named year to your dataframe which 
# will be assigned the year value from the date field
from pyspark.sql.functions import year
from pyspark.sql.types import *
df2 = ( 
 df
  .withColumn('year', year(df.date))  
  )# MODIFY THIS LINE

In [5]:
# question 1, between 1990 and 1999, what was the highest closing price for IBM stock
# across any year? use the programmatic sql api to construct this result.
results1 = (df2
            .where((df2.year >= 1990) & (df2.year <= 1999) & (df2.symbol == 'IBM') )
            .select('year', 'price_close')
            .groupby('year').agg({'price_close':'max'})
            .withColumnRenamed('max(price_close)', 'max_price_close')
            .orderBy('max_price_close', ascending=False)
            .limit(1)            
           ) # MODIFY THIS LINE

results1.show()

In [6]:
# question 2, between 1990 and 1999, what was the highest closing price for IBM stock
# by year? display your answer on the screen with data ordered by year in ascending order
# use a SQL statement to construct this result

df2.createOrReplaceTempView('pricing')

sql_statement = '''
SELECT year, max(price_close) AS max_price_close
FROM pricing
WHERE year BETWEEN 1990 and 1999 and symbol == 'IBM'
GROUP BY year
ORDER BY year 
''' # MODIFY THIS LINE

results2 =  spark.sql(sql_statement)# MODIFY THIS LINE

results2.show()

In [7]:
# delete output dir
dbutils.fs.rm(output_dir_name, recurse=True)

In [8]:
# save your results from question 2 to a new directory named homework_out under 
# your /tmp/imb_highest_1990s directory in the parquet format

results2.write.parquet('/tmp/ibm_highest_1990s/homework_out', mode='overwrite') # MODIFY THIS LINE
# results2.write.parquet(output_dir_name, mode='overwrite') # MODIFY THIS LINE

In [9]:
# display output files
display( dbutils.fs.ls(output_dir_name) )

path,name,size
dbfs:/tmp/ibm_highest_1990s/homework_out/_SUCCESS,_SUCCESS,0
dbfs:/tmp/ibm_highest_1990s/homework_out/_committed_4807510008535196280,_committed_4807510008535196280,1034
dbfs:/tmp/ibm_highest_1990s/homework_out/_started_4807510008535196280,_started_4807510008535196280,0
dbfs:/tmp/ibm_highest_1990s/homework_out/part-00000-tid-4807510008535196280-f4824eda-57bd-47fe-af64-b5a89f75a358-6050-1-c000.snappy.parquet,part-00000-tid-4807510008535196280-f4824eda-57bd-47fe-af64-b5a89f75a358-6050-1-c000.snappy.parquet,699
dbfs:/tmp/ibm_highest_1990s/homework_out/part-00001-tid-4807510008535196280-f4824eda-57bd-47fe-af64-b5a89f75a358-6051-1-c000.snappy.parquet,part-00001-tid-4807510008535196280-f4824eda-57bd-47fe-af64-b5a89f75a358-6051-1-c000.snappy.parquet,699
dbfs:/tmp/ibm_highest_1990s/homework_out/part-00002-tid-4807510008535196280-f4824eda-57bd-47fe-af64-b5a89f75a358-6052-1-c000.snappy.parquet,part-00002-tid-4807510008535196280-f4824eda-57bd-47fe-af64-b5a89f75a358-6052-1-c000.snappy.parquet,699
dbfs:/tmp/ibm_highest_1990s/homework_out/part-00003-tid-4807510008535196280-f4824eda-57bd-47fe-af64-b5a89f75a358-6053-1-c000.snappy.parquet,part-00003-tid-4807510008535196280-f4824eda-57bd-47fe-af64-b5a89f75a358-6053-1-c000.snappy.parquet,699
dbfs:/tmp/ibm_highest_1990s/homework_out/part-00004-tid-4807510008535196280-f4824eda-57bd-47fe-af64-b5a89f75a358-6054-1-c000.snappy.parquet,part-00004-tid-4807510008535196280-f4824eda-57bd-47fe-af64-b5a89f75a358-6054-1-c000.snappy.parquet,698
dbfs:/tmp/ibm_highest_1990s/homework_out/part-00005-tid-4807510008535196280-f4824eda-57bd-47fe-af64-b5a89f75a358-6055-1-c000.snappy.parquet,part-00005-tid-4807510008535196280-f4824eda-57bd-47fe-af64-b5a89f75a358-6055-1-c000.snappy.parquet,699
dbfs:/tmp/ibm_highest_1990s/homework_out/part-00006-tid-4807510008535196280-f4824eda-57bd-47fe-af64-b5a89f75a358-6056-1-c000.snappy.parquet,part-00006-tid-4807510008535196280-f4824eda-57bd-47fe-af64-b5a89f75a358-6056-1-c000.snappy.parquet,699
