###General Instructions
Complete the following python script per the instructions provided at the top of each code block.

In [0]:
# notebook config
USER_NAME = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
FILE_STORE_ROOT = '/FileStore/shared_uploads/'+USER_NAME

In [0]:
# set directory variables
input_dir_name = FILE_STORE_ROOT + '/nyse/'
output_dir_name = '/tmp/ibm_highest_1990s'

In [0]:
# define a schema for the nyse pricing data
from pyspark.sql.types import *
import pyspark.sql.functions as f

nyse_schema = StructType([
  StructField('exchange', StringType()),
  StructField('symbol', StringType()),
  StructField('date', DateType()),
  StructField('price_open', DoubleType()),
  StructField('price_high', DoubleType()),
  StructField('price_low', DoubleType()),
  StructField('price_close', DoubleType()),
  StructField('volume', IntegerType()),
  StructField('price_adj_close', DoubleType())  
  ])

# load the nyse pricing data into a dataframe applying the schema 
# created in the previous step
df = (
  spark
    .read
    .csv(  
      FILE_STORE_ROOT + '/nyse/', 
      header=True, 
      schema=nyse_schema,
      dateFormat='yyyy-MM-dd'
      )
  )

# using the programmatic sql api, add a field named year to your dataframe which 
# will be assigned the year value from the date field 
# (hint: use the year() pyspark sql function)
df2 = (
  df
    .withColumn('year', f.year(df.date))
)

display(df2)

exchange,symbol,date,price_open,price_high,price_low,price_close,volume,price_adj_close,year
exchange,stock_symbol,,,,,,,,
exchange,stock_symbol,,,,,,,,
exchange,stock_symbol,,,,,,,,
exchange,stock_symbol,,,,,,,,
exchange,stock_symbol,,,,,,,,
exchange,stock_symbol,,,,,,,,
exchange,stock_symbol,,,,,,,,
exchange,stock_symbol,,,,,,,,
exchange,stock_symbol,,,,,,,,
exchange,stock_symbol,,,,,,,,


In [0]:
# question 1, between 1990 and 1999, what was the highest closing price for IBM stock
# across any year? use the programmatic sql api to construct this result.
results1 = (
  df2
    .where( (df2.year >= 1990) & (df2.year <=1999) & (df2.symbol == 'IBM') )
    .select('symbol', 'price_close')
    .groupby('symbol').agg(
      f.max('price_close').alias('highest_price_close')
      )
  )

display(
  results1
  )

symbol,highest_price_close
IBM,246.0


In [0]:
# question 2, between 1990 and 1999, what was the highest closing price for IBM stock
# by year? display your answer on the screen with data ordered by year in ascending order
# use a SQL statement to construct this result

df2.createOrReplaceTempView('pricing')

sql_statement = '''
SELECT
  year,
  MAX(price_close) as highest_price_close
FROM pricing
WHERE
  year >= 1990 AND
  year <= 1999 AND
  symbol = 'IBM'
GROUP BY year
ORDER BY year ASC
'''

results2 =  spark.sql(sql_statement)

display(results2)

year,highest_price_close
1990,122.37
1991,139.5
1992,100.25
1993,59.75
1994,76.12
1995,113.62
1996,163.0
1997,179.25
1998,189.25
1999,246.0


In [0]:
# save your results from question 2 to a new directory named homework_out under 
# your /tmp/ibm_highest_1990s directory in the parquet format
(
  results2
    .write
    .format('parquet')
    .mode('overwrite')
    .save('/tmp/ibm_highest_1990s/homework_out')
  )

In [0]:
# display output files
display( dbutils.fs.ls(output_dir_name) )

path,name,size,modificationTime
dbfs:/tmp/ibm_highest_1990s/homework_out/,homework_out/,0,0
