![MLTrain logo](https://mltrain.cc/wp-content/uploads/2017/11/mltrain_logo-4.png "MLTrain logo")

----------------
# Preparations #
Check the environment for basic functionality

[Readonly public link](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2554702015877214/632957142081533/8550007600826264/latest.html)

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import sklearn as skl
import datetime as dt

from os import linesep as endl

with plt.style.context('ggplot'):
  fig, [ax1, ax2] = plt.subplots(1, 2, figsize = [8, 3])
  ax1.plot(np.random.randn(100))

  df = pd.DataFrame(np.random.randn(10, 4), columns = ['one', 'two', 'three', 'four'])
  ax2.plot(df.two)
  display()




# TL;DR #
High-level Spark APIs (DataFrame, Dataset and SparkSql) are made available through a SparkSession cluster-level object.  
In the following we'll use a SparkSession to construct Spark DataFrames, register them as tables and execute SparkSQL over these tables.  
  
__Databricks__ comes with a SparkSession object instantiated as `spark`

In [4]:
sses = spark.builder.appName("ssapp01").getOrCreate()

### Upload a csv file to your Databricks cluster ###  
In Databricks we have to upload the .csv file manually:  
1. From the left of this screen select 'Data' > Table +,  
2. drag and drop to the rectangle on the right the file appleSales.csv that's contained in PfBDAaML Github repository  
  
The file is stored on your cluster under `/FileStore/tables`  
We use SparkSession's csv reader to create a Spark Dataframe object from the csv file:

In [6]:
df = sses.read.csv('/FileStore/tables/appleStocks.csv', header = True)
print type(df)
df.show(n = 10, truncate = True)

__NB:__ There's no type info in the csv file so everything is parsed as string.  
This is not good for doing transformations on the file's data.  
We ameliorate this by setting up a _schema_ for the file explicitly later on.

In [8]:
print 'df fields and their types:'
df.printSchema()

In [9]:
from pyspark.sql.types import StructField, StringType, IntegerType, DateType, FloatType, StructType

dataSchema = [
    StructField("Date", DateType(), nullable = True), 
    StructField("Open", FloatType(), True),
    StructField("Close", FloatType(), True),
    StructField("High", FloatType(), True),
    StructField("Low", FloatType(), True),
    StructField("Volume", IntegerType(), True),
    StructField("Adj Close", StringType(), True)]

dataSchema = StructType(fields = dataSchema)
df = spark.read.csv('/FileStore/tables/appleStocks.csv', schema = dataSchema)

df.printSchema()
df.show(n = 10, truncate = True)

# Projections and selections #

Slicing rows and columns of pyspark DataFrames is similar to pandas:

In [11]:
# import pandas as pd
# df[col.isin(*pd.date_range(dt.datetime(2016, 1, 1), dt.datetime(2016, 2, 1)))].collect()

df[['Date', 'Open']].show(5)

# Selection in pySpark is a relational algebra projection:
df.select(['Open', 'Close']).show(5)

In [12]:
# Select the volumes of stocks exchanged after 01 Jan 2016
df.filter(df['Date'] > dt.datetime(2016, 1, 1)).select(['Volume']).show(5)

# Adding new columns #

We import `month` ufunc from `pyspark.sql.functions` (a great deal of ufuncs are defined there).  
Then create a binned column 'Month' to use it for our aggregations in the sequel

In [14]:
import pyspark.sql.functions as pf

df = df.withColumn('Month', pf.month(df.Date))
df.select('Date', 'Month').sample(False, 0.1).show(5)

----------------------------------------------
# Grouping and aggregating #

As in Pandas, the groupby method returns a `groupby` object which we can aggregate on:

In [16]:
applesGroup = df.groupby('Month')

In [17]:
applesGroup.mean('Open').show(2)
applesGroup.max('Open').show(2)
applesGroup.count().show(2)

### Simultaneous aggregations ###

In [19]:
applesGroup.agg({'High': 'min', 'Low': 'max', 'Open': 'mean'}).show(5)

# OrderBy method #

In [21]:
df.orderBy("Month").show(5)

In [22]:
# More complex orderings are possible
df.orderBy(pf.year(df.Date).desc()).show(5)

## Functions
There are a variety of functions you can import from pyspark.sql.functions.  
Check out the [Spark documentation](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions) for the full list available

In [24]:
from pyspark.sql.functions import countDistinct, avg, stddev, year

df.select(countDistinct(year(df.Date))).show()
df.select(avg('Open')).show()
df.select(stddev('Low')).show()

-----------------------
# Exporting to pandas #

In [26]:
import datetime as dt
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

pd.set_option('display.width', 144)

pandf = df.select('Date', 'High', 'Low', 'Open', 'Close').toPandas()
pandf['Date'] = pd.to_datetime(pandf['Date'], errors = 'raise')
pandf = pandf.sort_values(axis = 'index', by = 'Date')

for col in pandf.columns[1:]:
    pandf[col] = pandf[col].astype(float)

# Normalize
pandf.High = pandf.High.map(lambda _: np.log(_ + 1.) * 1.05)
pandf.Low = pandf.Low.map(lambda _: np.log(_ + 1.) * .95)
pandf[['Open', 'Close']] = pandf[['Open', 'Close']].apply(lambda _: np.log(_ + 1.))

print pandf.dtypes
# print np.sort(pandf.Open.unique())[:10]

pandfs = pandf['Open'].sample(100, random_state = 101)

ax = plt.figure().add_subplot('111')
ax.fill_between(x = pandf.Date.values, y1 = pandf.High.values, y2 = pandf.Low.values, alpha = .3)
ax.plot(pandf.Date.values, pandf.Open.values, pandf.Date.values, pandf.Close.values - .1)
display()