# Start a SparkSession
This will start a local Spark session.

In [None]:
import findspark
findspark.init('/opt/spark') # This is specific to the virtual lab. Use `findspark.init()`
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

# Reading and formatting dates

You've already looked at reading files into spark. In here we will focus on reading data with dates and ensuring they are read in the right format.

In this file, the most important columns to us are the   
1) Date: the date of trade  
2) Close: the value at the end of the day  

This read puts the data directly into a dataframe.  
Data source:  https://in.finance.yahoo.com/q/hp?s=AAPL

In [None]:
# read csv file
dfstock2011 = spark.read.csv('aapl.csv',header=True)

In [None]:
dfstock2011.head()

In [None]:
# select only "date" and "close" column
dfstock2011 = dfstock2011.select('Date','Close')

# inspect the first 10 rows
dfstock2011.show(10)


# the printSchema() method tells you the data type of each column
dfstock2011.printSchema()

From the above we see that the Date and close columns are formatted as strings.

We will need to tell spark what types they are. `Close` should be an int and `Date` should be Date object.

Just like we formatted dates in pandas spark allows us to similar.

Spark has the function `to_timestamp` which is used to format Datetime objects, eg. "*2018-02-02 18:43:00*" and `to_date` which is used for only dates, eg. "*2018-02-02*". Its also capable of converting epoch dates.

Our data is only a date so we use the `to_date` function.

In [None]:
from pyspark.sql.functions import datediff, to_date

#using the to_date function we convert the string to a date object also providing it the write format.
dfstock2011 = dfstock2011.withColumn("Date",to_date(dfstock2011["Date"],'yyyy-MM-dd'))

# while at it also convert the close column to a float
dfstock2011 = dfstock2011.withColumn("Close",dfstock2011["Close"].cast('float'))

dfstock2011.show(10)
dfstock2011.printSchema()

Now viewing the schema of the dataframe we have them in the write types.

To convert epoch dates it provide the function `from_unixtime`

In the example below we create a dummy dataframe with epoch dates and convert it a timestamp object

In [None]:
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import from_unixtime

df = spark.createDataFrame([("1524460623",), ("1524560699",), ("151456057",)], ['date_str'])
df.show(10)

df2 = df.select('date_str', from_unixtime('date_str').alias('timestamp'))

df2.show(10)

# Date Formatting
Spark also allows us to reformat a date or timestamp column to another date format.

In [None]:
# the time 00:00:00 are of no use to us. 
# Suppose we wanted to formatted the timestamp to only show the date in a format we like
from pyspark.sql.functions import date_format

dfstock2011.select(date_format('Date', "dd-MMM-yyy").alias("date_type1")).show(5)

dfstock2011.select(date_format('Date', "dd/MM/yy").alias("date_type2")).show(5)



# Subtracting Dates

Just as we were able to subtract two dates we can do same with spark. To that we use the function `datediff(end,start)`

See documentation [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.*datediff*)

In [None]:
#create dummy dataframe
df = spark.createDataFrame(
    [("2005-11-23","2005-11-23"),
     ("2010-10-25","2010-12-31"), 
     ("2009-06-30","2010-06-30")], 
    ['start_date','end_date'])

# convert dates to date object
df = df.withColumn("start_date",to_date(df["start_date"],'yyyy-MM-dd'))
df = df.withColumn("end_date",  to_date(df["end_date"],  'yyyy-MM-dd'))
df.show(10)


# using the date_diff functiion to find the difference between the two dates
df.withColumn("difference_in_days",datediff(df.end_date,df.start_date)).show()

# Adding to and subtracting from dates

We can add days,months and years to dates in spark. For this we use 


*   **Adding** - `date_add(start,days)`
*   **Subtracting** - `date_sub(start,days)`


In [None]:
from pyspark.sql.functions import to_date,date_add,date_sub
#create dummy dataframe
df = spark.createDataFrame(
    [("2005-11-23",),
     ("2010-10-25",), 
     ("2009-06-30",)], 
    ['date',])

# convert dates to date object
df = df.withColumn("date",to_date(df["date"],'yyyy-MM-dd'))
df.show(10)

df = df.withColumn("plus_10_days",date_add(df["date"],10))
df = df.withColumn("minus_60_days",date_sub(df["date"],60))
df.show(10)

# Windowing in Spark



Read more about spark windowing function https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

You can also find the official documentation [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Window)


In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import count, round

windowSpec = Window.partitionBy(dfstock2011["Close"]).orderBy(dfstock2011.Date.asc())

counts = count(dfstock2011.Close).over(windowSpec)

dfstock2011.select(round(dfstock2011.Close, 0), counts.alias("Counts")).show(20)

Using the same understaning of windowing from pandas we are able to do same in spark.

With spark you first create a window, then create columns of that do calculations over the window, then you can append it to your original data.

In the example below we want to find the moving average and cummlative sum of the "close" values


Read more about spark windowing function https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

You can also find the official documentation [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Window)

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import  round,avg,sum

# create window on data ordery by date
windowSpec = Window.orderBy(dfstock2011.Date.asc())

# calculate average and sum over the window
avgs = avg(dfstock2011.Close).over(windowSpec)
sums = sum(dfstock2011.Close).over(windowSpec)

# add calculated columns to orignal data and show
dfstock2011.withColumn("moving_average",round(avgs,2)).withColumn("cummlative_sum",round(sums,2)).show(10)



Now if we wanted to find the monthly moving average and cumulative  sum we need create a column holding just the month of the year and create a partition in our window using the month column. This makes sure all calculations done on the window are done on partition in isolation.

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import round,avg,sum,month

# create column to hold month of the year
dfstock2011 = dfstock2011.withColumn("month",month(dfstock2011.Date))

# create and partition window by month and order by date
windowSpec = Window.partitionBy(dfstock2011.month).orderBy(dfstock2011.Date.asc())

# calculate average and sum over the window
avgs = avg(dfstock2011.Close).over(windowSpec)
sums = sum(dfstock2011.Close).over(windowSpec)

# add calculated columns to orignal data and show
dfstock2011.withColumn("moving_average",round(avgs,2)).withColumn("cummlative_sum",round(sums,2)).show(10)



# Assessment

Using the data in internet_traffic.csv provide the following statistics

* Average houly bits transferred
* Total daily bits transferred

The data shows in megabits the amount of data transferred by an ISP at 5 minute intervals

In [None]:
# Write code here

In [None]:
spark.stop()