# Introduction to Apache Spark 

### Spark Tutorials - Part I
  
### Agenda
* Loading data
* Spark SQL & basic data transformations
* Writing data
* Caching data for performance

## 1.0 Loading data
### 1.1 Initialising access to AWS S3

In [3]:
# All client data should be residing on Sagacity's AWS S3 storage
# We need to setup access to the the location on S3 

# Set the bucket name
S3_BUCKET = "sagacity-analyst-scratch"

# Pull your AWS access keys from the Databricks 'secrets' secure vault
# You will be given AWS keys by security who will install these into the 'secrets' vault
S3_PATH = "s3a://%s:%s@%s/" % (dbutils.secrets.get("cstephenson", "access_key"), dbutils.secrets.get("cstephenson", "secret_key").replace("/", "%2F"), S3_BUCKET)

# NOTE: You should NEVER store your AWS Access Keys in Notebooks or anywhere else that is not secure

In [4]:
# What has this done?
# For accessing files on S3 you must specify the path as:
# s3a://[AWS_ACCESS_KEY]:[AWS_SECRET_KEY]@[S3_BUCKET_NAME]/[FOLDER_NAME]/[FILE_NAME]

print(S3_PATH)
# Note your AWS keys are have been "REDACTED" but behind the scenes Spark is passing your keys through to AWS to enable your 
# request for the data to be authenticated  

### 1.2 Loading data from S3

In [6]:
# Define the filename and its location within the S3 bucket
file_path = S3_PATH + 'zeppelin_demo/appl_stock.csv'
print(file_path)

# Load TT inputs file from S3
# Format 'CSV' - The file is a field delimited text file
# Option 'delimiter' - The file is delimited by a comma ',' note tab delimited files are indicated by '\t'
# Option 'header' - If 'True' the first row of the file contains header information (column names)
# Option 'inferSchema' - If 'True' Spark will attempt to 'guess' the data types for each column 
# Load - Provide the file path/location of the file
df_apple_stock = spark.read.format('csv'
                                  ).option('delimiter', ','
                                  ).option('header', 'true'
                                  ).option('inferSchema', 'true'
                                  ).load(file_path)


In [7]:
#todo
[TAB] for code-completion
df_apple_stock.

More on the various formats and options for loading files can be found here
[Databricks Documentation - Read CSV](https://docs.databricks.com/spark/latest/data-sources/read-csv.html)

### 1.3 Displaying data

In [10]:
# Let's take a look at the dataframe contents ...
# Notice the default views - Grid, Charts, or Export to CSV
display(df_apple_stock)

# NOTE: The grid will only display the first 1000 rows in preview and the first 50 columns!
#       You can also download the results for analysis locally - Be careful when downloading customer sensitive data!

In [11]:
# Some other methods of viewing the data - less pretty
df_apple_stock.show()

In [12]:
# View columns and datatypes - you can also click the icon for the dataframe at the footer of the cell
df_apple_stock.printSchema()

In [13]:
# Display summary statistics on the table columns (stats for numeric values)
display(df_apple_stock.summary())

In [14]:
# Loading data quickly from your client (with health warning)

### 1.4 Loading other file formats

In [16]:
# GZ files
# --------
file_path = S3_PATH + '/zeppelin_demo/titanic.gz'

# Spark can read files compressed in the GZIP format directly into a dataframe
# It's good practice, especially if the original text file is large, to ask the provider to compress it first before uploading on to S3
df_titanic = spark.read.format('csv'
                                  ).option('delimiter', ','
                                  ).option('header', 'true'
                                  ).option('inferSchema', 'true'
                                  ).load(file_path)

# Parameter 'n' to show the first 'n' rows of the dataframe, 'truncate'=False will show entire column width 
df_titanic.show(n=5, truncate=True)

# Note: the file suffix needs to be '.GZ' for this to work in spark, if the file is suffixed '.GZIP' you will need to rename it

In [17]:
# Parquet files
# -------------
# Parquet files are a schema aware, compressed file format based on columnar storage 
# It is highly recommended to store all your data in Parquet format as it takes less space, and is MUCH faster to load
# If you recieve a file from a client, one of the first things you should do is conert it to Parquet  
file_path = S3_PATH + '/zeppelin_demo/appl_stock_open.parquet'

df_apple_open = spark.read.format('parquet').load(file_path)
df_apple_open.show(n=5)

More on the the Parquet file format with examples of working with them on Spark here
[Parquet Example](https://sparkbyexamples.com/spark/spark-read-write-dataframe-parquet-example/)

In [19]:
# JSON files
# Spark can support nested data structures and read data directly on these - JSON is a commonly used format here
# See  also YAML, XML
file_path = S3_PATH + '/zeppelin_demo/socrata_metadata_311.json'

df_socrata = spark.read.format('json').load(file_path)

display(df_socrata)

In [20]:
df_socrata.printSchema()

###1.5 Using Schemas

In [22]:
  from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType

  titanic_schema = StructType([
     StructField('passenger_id', IntegerType())
  ,  StructField('survived', IntegerType())
  ,  StructField('p_class', IntegerType())
  ,  StructField('name', StringType())
  ,  StructField('sex', StringType())
  ,  StructField('age', DoubleType())
  ,  StructField('sib_sp', IntegerType())
  ,  StructField('p_arch', IntegerType())
  ,  StructField('ticket', StringType())
  ,  StructField('fare', DoubleType())
  ,  StructField('cabin', StringType())
  ,  StructField('embarked', StringType())
  ])

  file_path = S3_PATH + '/zeppelin_demo/titanic.gz'

  df_titanic = spark.read.format('csv').option('delimiter', ','
                                    ).option('header', 'true'
                                    ).schema(titanic_schema
                                    ).load(file_path)

  display(df_titanic)    

In [23]:
# Loading non-standard file formats - load the data into a single column
# use the format 'text'
file_path = S3_PATH + '/zeppelin_demo/apple_stock.rpt'

# will load the full file splitting the rows on the new line character '\n' all row data in a single column
df_apple_stock_rpt = spark.read.format('text').load(file_path)

display(df_apple_stock_rpt)
# Note. for a solution to load this data properly into a dataframe with the correct column data, see Appendix below 

### Note
Spark can also load data directly from relational and NoSQL databases (using JDBC) e.g. Oracle, MySQL, Cassandra etc...

## 2.0 Spark SQL & basic data transformations

Once you have data loaded into your dataframe you can perform your data transformations:
* The DataFrame API with Python, Scala or R
* Using SQL with the Spark SQL API

### 2.1 Spark SQL API

In [27]:
# Register as a Temporary View - As it is temporary, this is ephemeral and the data in the table will be lost once your Spark cluster is terminated 
df_titanic.createOrReplaceTempView('titanic')

In [28]:
%sql
-- Now you can use SQL interactively using the %sql hint (magic) at the top of the cell
-- You can now query the table you registered earlier

-- Note: in-line comments in SQL are '--'
/* ...to start
mult-line comments,
and to finish... */
SELECT
  Sex
, COUNT(*) AS vol_total
, SUM(Survived) AS vol_survived
, ROUND(SUM(Survived) / COUNT(*), 2)  AS pct_survived
FROM
  titanic
GROUP BY
  Sex

#### Note. Local vs. Global tables   
**Local**   
> Only accessible by your user only and only on the current Spark cluster. It will also be automatically **deleted** when the Spark cluster is terminated  
> ```df.createOrReplaceTempView('my_local_table') ```  
  
**Global**  
> Accessible to all users across all Spark clusters (with the correct permissions assigned), and will be **saved** after the Spark cluster is terminated  
> ```df.createOrReplaceTable('my_global_table') ```  
**Generally the abilibilty to create Global tables will be denied to most users!**

If you are more familiar with SQL DDL syntax these can also be used  
[Spark SQL DDL Documentation here](https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html#)

In [30]:
# You can also execute SQL pulling the results back directly into a new Dataframe

# Note Python multi-line string with three double quotes """ """ or single quotes ''' '''
sql_text = """
SELECT
  Sex
, COUNT(*) AS vol_total
, SUM(Survived) AS vol_survived
, ROUND(SUM(Survived) / COUNT(*), 2)  AS pct_survived
FROM
  titanic
GROUP BY
  Sex
"""

# SparkSession.sql(sqlQuery)
#  Returns a DataFrame representing the result of the given query.
# Note: Databricks makes your SparkSession automattically availble via the variable 'spark'
df_srv_sex = spark.sql(sql_text)

df_srv_sex.show()


# Or you can use the Python line continuation character '\'
sql_text = 'SELECT' \
' Sex' \
' , COUNT(*) AS vol_total' \
' , SUM(Survived) AS vol_survived' \
' , ROUND(SUM(Survived) / COUNT(*), 2) AS pct_survived' \
' FROM ' \
'  titanic ' \
' GROUP BY ' \
'  Sex'

df_srv_sex = spark.sql(sql_text)
df_srv_sex.show()

Spark SQL reference here [Spark SQL Functions](https://spark.apache.org/docs/latest/api/sql/index.html)

### 2.2 Pyspark API (Python)

In [33]:
# Equivalent query using the DataFrame API and Python ...

# We first need to import the functions we want to use ...
from pyspark.sql.functions import count, sum, round

df_titanic.select(
  # Need to select all the columns we will be displaying or using
  'Sex'
, 'Survived'
).groupBy(
  # The column(s) we are grouping over here
  'Sex'
).agg(
  # .agg() here are our aggrgation functions & calculations, note the use of .alias() to set the column name, equivalent to "AS" in SQL
  count('*').alias('vol_total') 
, sum('Survived').alias('vol_survived') 
, round(sum('Survived') / count('*'), 2).alias('vol_survived') 
).show() # finally show() the results


###2.3 Revisiting file load - Loading difficult files

In [35]:
# A common problem is initial clensing aof data and ensuring datatypes are correct
# A common solution is to intially the load data into String/Character columns and clean/cast the data directly in the DataFrame
# Lets look at solving this using the S

# Here we are loading without the 'inferSchema' option which will interpret every column as a StringType
df_citations = spark.read.format('csv'
                                ).option('delimiter', ','
                                ).option('header', 'true'
                                ).load(S3_PATH + '/zeppelin_demo/parking-citations.gz')

In [36]:
# Transformation and datatype conversion in dataframe columns
from pyspark.sql.functions import col 
from pyspark.sql.types import DateType, DoubleType

# withColumn method is the pyspark way of altering the data in a dataframe column
# the 'col' function returns the column object 
# the 'cast' method allow conversion from one data type to another

# Cast Latitude to DoubleType
df_citations = df_citations.withColumn('Latitude', col('Latitude').cast(DoubleType()))

# Cast Longitude to DoubleType
df_citations = df_citations.withColumn('Longitude', col('Longitude').cast(DoubleType()))


In [37]:
# Notice the presence of nulls in 'Fine Amount'
display(
  df_citations.select('*').filter(col('Fine Amount').isNull())
)

In [38]:
# More complex transformations ...
from pyspark.sql.functions import substring, when

# using substring function
# Transform "Issue Date" removing the time portion to obtain a valid date format and cast to DateType
df_citations = df_citations.withColumn('Issue Date', substring('Issue Date', 1, 10).cast(DateType()))

# using na.fill method 
# Transform "Fine amount" to change NULL values to 0.0 
df_citations = df_citations.na.fill('0.0', 'Fine Amount')

# ... and cast column to a DoubleType
df_citations = df_citations.withColumn('Fine Amount', col('Fine Amount').cast(DoubleType()))


In [39]:
display(df_citations)

In [40]:
# The expr() function
from pyspark.sql.functions import expr
# If you know the SQL function equivalent you can use the expr() function to build SQL functions these in you Python code

# eqivalent of ```df_citations = df_citations.withColumn('Issue Date', substring('Issue Date', 1, 10).cast(DateType()))```
df_citations = df_citations.withColumn('Issue Date', expr("CAST(SUBSTRING('Issue Date', 1, 10) AS DATE)"))


Python strings - if you need to use quotes in a quoted string - you can enclose single '' in "" or vice-versa, or use the blackslash \ escape character ...
```df_citations = df_citations.withColumn('Issue Date', expr('CAST(SUBSTRING(\'Issue Date\', 1, 10) AS DATE)'))```

#### A note on Methods and Functions

##### Functions
Functions are directly callable, take arguments/inputs and return an output<br/>
[Pyspark.sql.functions documentation](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)<br/>
##### Methods
Methods are like functions but operate on an object<br/>
e.g. all the methods that can operate on the the Column object<br/>
[Pyspark.sql.column documentation](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column)


Sometimes we can choose method or function to achieve the same result
Function version - Pyspark.sql.functions.substring()<br/>
```df_citations = df_citations.withColumn('Issue Date', substring('Issue Date', 1, 10).cast(DateType()))```

Method version - Pyspark.sql.column.substr()<br/>
```df_citations = df_citations.withColumn('Issue Date', col('Issue Date').substr(1, 10).cast(DateType()))```

What is Markdown?
[Markdown Reference](https://github.com/adam-p/markdown-here/wiki/Markdown-Cheatsheet)

## 3.0 Writing data

### 3.1 File formats

In [46]:

# Write back as PARQUET (RECOMMENDED)
df_citations.write.mode('overwrite').format('parquet').save(S3_PATH + '/zeppelin_demo/test_out.parquet') 

# Write back as CSV with a header and pipe delimited fields
df_citations.write.mode('overwrite'       # option to 'append' data as well
             ).format('csv'
             ).option('delimiter', '|'
             ).option('header', 'true'
             ).save(S3_PATH + '/zeppelin_demo/test_out.csv') 

# Note: for larger files, Spark 'partitions' the data. This is how spark 'scales-out' processing  by work on parts of the dataset in parallel 
# Note also the difference in size between compressed PARQUET format and regular CSV, 665 KB vs 4.8 MB

### 3.2 Partitioning  
Spark 'partitions' it's dataframes in order to gain the speed benifits of processing the dataset in parallel. i.e. each thread/process on the cluster will process a single partition in parallel to all the other available processes, each working on their own partition file.

However, what if I just want to view the file in Excel or other tool that doesn't recognise partitions?

In [48]:
# Write back to a single CSV file 

# Option 1 - Collesce (write to a single partition)
df_titanic.coalesce(1).write.mode('overwrite'       # option to 'append' data as well
             ).format('csv'
             ).option('delimiter', ','
             ).option('header', 'true'
             ).save(S3_PATH + '/zeppelin_demo/titanic_spark.csv') 


# Option 2 - Use Pandas and S3FS
import pandas
import s3fs

# Convert Spark dataframe to Pandas dataframe
pdf_titanic = df_titanic.toPandas()
bytes_to_write = pdf_titanic.to_csv(None).encode()


# Mount S3 as a filesystem (need to provide your access keys)
fs = s3fs.S3FileSystem(key=dbutils.secrets.get("cstephenson", "access_key"), secret=dbutils.secrets.get("cstephenson", "secret_key").replace("/", "%2F"))

# Open a CSV file for writing ('wb') on S3

with fs.open('sagacity-analyst-scratch/zeppelin_demo/titanic_pandas.csv', 'wb') as titanic_file:
    # write the file bytes as a CSV
    titanic_file.write(bytes_to_write)


#### More documentation ...  
[Pandas Documentation here](https://pandas.pydata.org/pandas-docs/stable/index.html)  
[S3FS Documentation here](https://s3fs.readthedocs.io/en/latest/)

In [50]:
# partitionBy
#
# splits the files into sub-folders and therefore potentially reducing the number of files
# that would need to be accessed for a query - known as "partition pruning"

# add new column with the year and month of the ticket issue
df_citations = df_citations.withColumn("issue_year", expr("YEAR(`Issue Date`)"))

# partition by the new issue reporting period
df_citations.write.partitionBy('issue_year'
             ).mode('overwrite'
             ).format('csv'
             ).save(S3_PATH + '/zeppelin_demo/citations_by_year.csv') 


#### Note: 
You should be thinking up-front about how your data are likely to be accessed before choosing what column to partition by. 

An example of a good candidate for this would be a column that contains a reporting period, assuming one typically queries one reporting period at a time - this could make a big difference to how long your query takes to execute as your data grows in size.  

Other considerations ...  

__Partition Skew__
Ideally, all your partitions should be of roughly equal size otherwise some processes will do an unfair amount of processing (on the largest partitions) while others will complete quickly resulting in an overall longer execution time. If you partition your dataset on an un-evenly distributed column, you might get partition skew when your query needs to run across those partitions.   
By default, if no partitioning column is selected, Spark will partition randomly ensuring an even distribution

__Number of Partitions__
As Spark gains it's performance benfits by running its queries in parallel (one process running over a single partition) you need to make sure that there are enough partitions in your data to benifit from the parallelism provided by the cluster balanced against the partitioning scheme to only access the partitions that are needed!  

There are a lot of other 'tweaks' to ensure your data are processed efficeintly, if you need help tuning queries please ask :)

In [52]:
# check your skew
display(df_citations.select('issue_year').groupBy('issue_year').count())

## 4.0 Caching data for performance

Reading and writing data from/to S3 can be a bit slow and inefficent. We will also incur a small amount of AWS data transfer costs for every read/write from S3.

It is therefore good practice to 'Cache' your data sets locally (on the Spark cluster) if you are going to be running more than a few queries

### 4.1 Persist and Unpersist

In [55]:
# 4.1 Demonstrate query performance on caches and uncached data
from pyspark import StorageLevel

df_citations.persist(StorageLevel.MEMORY_AND_DISK)

### 4.2 Lazy Evaluation

## Appendix

In [59]:
#
# Example - Reading in 'fixed-width' files
#
from pyspark.sql.functions import lit, col, trim
from pyspark.sql.types import DateType, DoubleType, IntegerType


# Load the file as text, i.e. the full row into one column 
df_txt_rpt = spark.read.format('text').load(S3_PATH + '/zeppelin_demo/apple_stock.rpt')
df_txt_rpt.show(n=5, truncate=False)

df_apple_stock = df_txt_rpt.select( 
  # use substrings to pull out the elements we need ....
  trim(col('value').substr(2,19)).cast(DateType()).alias('date')
, trim(col('value').substr(22,18)).cast(DoubleType()).alias('open')
, trim(col('value').substr(41,18)).cast(DoubleType()).alias('high')
, trim(col('value').substr(60,18)).cast(DoubleType()).alias('low')
, trim(col('value').substr(79,18)).cast(DoubleType()).alias('close')
, trim(col('value').substr(98,9)).cast(IntegerType()).alias('vol')
, trim(col('value').substr(108,18)).cast(DoubleType()).alias('adj_close')
).filter(
  # Filter out unwanted rows - separators and header row
  (col('value').substr(0,1)==lit('|')) 
  & (trim(col('value').substr(2,19)).alias('date')!=lit('Date'))
)

df_apple_stock.show(n=5, truncate=False)