In [1]:
# Start coding here... 

In [4]:
# Importing SparkContext
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [5]:
sc = SparkContext.getOrCreate()
sql_context = SQLContext(sc)

In [None]:
# Loading csv files as DataFrame
amzn_df = sql_context.read.load('Spark/AMZN.csv',format='com.databricks.spark.csv',
                               header='true',
                               inferSchema='true')

goog_df = sql_context.read.load('Spark/GOOG.csv',format='com.databricks.spark.csv',
                               header='true',
                               inferSchema='true')

tsla_df = sql_context.read.load('Spark/TSLA.csv',format='com.databricks.spark.csv',
                               header='true',
                               inferSchema='true')

In [35]:
print(amzn_df.printSchema())
print(goog_df.printSchema())
print(tsla_df.printSchema())

In [24]:
print(amzn_df.show(2))
print(goog_df.show(2))
print(tsla_df.show(2))

### Exploring and Querying Data
#### DataFrame Operations

In [7]:
from pyspark.sql.functions import year, month

In [None]:
tsla_df.select(year('Date').alias('year'), 'Close') \
                    .groupby('year') \
                    .avg('Close') \
                    .sort('year') \
                    .show() 

In [None]:
# Average lowest price for Google per month
goog_df.select(year('Date').alias('year'),
               month('Date').alias('month'),'Low') \
                .groupby('year','month') \
                .avg('Low') \
                .sort('year','month') \
                .show()

### Analysis with Spark SQL

In [None]:
amzn_df.registerTempTable('amazon_stocks_data')
goog_df.registerTempTable('google_stocks_data')
tsla_df.registerTempTable('tesla_stocks_data')

In [None]:
# Average closing price per month for Tesla stocks
sql_context.sql("SELECT year(Date) AS year, \
                month(Date) as month, \
                ROUND(avg(Close),2) AS avg_closing_price \
                FROM tesla_stocks_data GROUP BY year,month") \
                .show()

#### When did the closing price of Google stock either go up or down by more than $2 in a single day?

In [None]:
sql_context.sql('SELECT Date, Open,Close, ROUND(abs(Close-Open),2) AS diff \
                FROM google_stocks_data \
                WHERE abs(Close-Open) > 4') \
                .show()

#### Minimum and Maximum Adjusted Closing price per year for Tesla

In [None]:
sql_context.sql('SELECT year(Date) AS year, \ 
                MIN(AdjClose), MAX(AdjClose) \
                FROM tesla_stocks_data GROUP BY year') \
                .show()

In [None]:
join_Close_df = sql_context.sql('SELECT tesla_stocks_data.Date,tesla_stocks_data.Close AS tesla_closing,\
                                 amazon_stocks_data.Close AS amazon_closing, \
                                 google_stocks_data.Close AS google_closing \
                                 FROM tesla_stocks_data JOIN google_stocks_data \
                                 ON tesla_stocks_data.Date=google_stocks_data.Date \
                                 Join amazon_stocks_data \
                                 ON amazon_stocks_data.Date=tesla_stocks_data.Date')
join_Close_df.show()

### Saving Spark DataFrames as Parquet files

In [None]:
join_Close_df.write.format('parquet').save('joins_stock')