# PySpark Demo Notebook
## Demo
1. Run PostgreSQL Script
2. Load PostgreSQL Data
3. Create New Record
4. Append New Record to Database Table
5. Load CSV Data File
6. Overwrite Data to Database Table
7. Analyze Data with Spark SQL
8. Graph Data with BokehJS
9. Read and Write Data to Parquet Format

_Prepared by: [Gary A. Stafford](https://twitter.com/GaryStafford)   
Associated article: https://wp.me/p1RD28-61V_

## Run PostgreSQL Script
Run the sql script to create the database schema and import data from CSV file.

In [1]:
%run -i '03_load_sql.py'

DROP TABLE IF EXISTS "transactions"

DROP SEQUENCE IF EXISTS transactions_id_seq

CREATE SEQUENCE transactions_id_seq INCREMENT 1 MINVALUE 1 MAXVALUE 2147483647 START 1 CACHE 1


CREATE TABLE "public"."transactions"
(
    "id"          integer DEFAULT nextval('transactions_id_seq') NOT NULL,
    "date"        character varying(10)                           NOT NULL,
    "time"        character varying(8)                            NOT NULL,
    "transaction" integer                                         NOT NULL,
    "item"        character varying(50)                           NOT NULL
) WITH (oids = false)

Row count: 21293


## Load PostgreSQL Data
Load the PostgreSQL 'transactions' table's contents into a Spark DataFrame.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [3]:
spark = SparkSession \
    .builder \
    .appName('04_notebook') \
    .config('spark.driver.extraClassPath',
            'postgresql-42.2.8.jar') \
    .getOrCreate()

In [4]:
properties = {
    'driver': 'org.postgresql.Driver',
    'url': 'jdbc:postgresql://postgres:5432/bakery',
    'user': 'postgres',
    'password': 'postgres1234',
    'dbtable': 'transactions',
}

In [5]:
df1 = spark.read \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .load()

In [6]:
print('DataFrame rows: %d' % df1.count())
print('DataFrame schema: %s' % df1)
df1.show(10, False)

DataFrame rows: 21293
DataFrame schema: DataFrame[id: int, date: string, time: string, transaction: int, item: string]
+---+----------+--------+-----------+-------------+
|id |date      |time    |transaction|item         |
+---+----------+--------+-----------+-------------+
|1  |2016-10-30|09:58:11|1          |Bread        |
|2  |2016-10-30|10:05:34|2          |Scandinavian |
|3  |2016-10-30|10:05:34|2          |Scandinavian |
|4  |2016-10-30|10:07:57|3          |Hot chocolate|
|5  |2016-10-30|10:07:57|3          |Jam          |
|6  |2016-10-30|10:07:57|3          |Cookies      |
|7  |2016-10-30|10:08:41|4          |Muffin       |
|8  |2016-10-30|10:13:03|5          |Coffee       |
|9  |2016-10-30|10:13:03|5          |Pastry       |
|10 |2016-10-30|10:13:03|5          |Bread        |
+---+----------+--------+-----------+-------------+
only showing top 10 rows



## Create a New Record
Create a new bakery record and load into a Spark DataFrame.

In [7]:
bakery_schema = StructType([
    StructField('date', StringType(), True),
    StructField('time', StringType(), True),
    StructField('transaction', IntegerType(), True),
    StructField('item', StringType(), True)
])

In [8]:
data = [('2016-10-30', '10:13:27', 2, 'Pastry')]
df2 = spark.createDataFrame(data, bakery_schema)

In [9]:
print('DataFrame rows: %d' % df2.count())
print('DataFrame schema: %s' % df2)
df2.show(10, False)

DataFrame rows: 1
DataFrame schema: DataFrame[date: string, time: string, transaction: int, item: string]
+----------+--------+-----------+------+
|date      |time    |transaction|item  |
+----------+--------+-----------+------+
|2016-10-30|10:13:27|2          |Pastry|
+----------+--------+-----------+------+



## Append New Record to Database Table
Append the contents of the DataFrame to the bakery PostgreSQL database's 'transactions' table.

In [10]:
df2.write \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .mode('append') \
    .save()

In [11]:
print('DataFrame rows: %d' % df1.count())

DataFrame rows: 21294


## Read CSV-Format File
Read CSV-format data file into a Spark DataFrame.

In [12]:
df3 = spark.read \
        .format('csv') \
        .option('header', 'true') \
        .load('BreadBasket_DMS.csv', schema=bakery_schema)

In [13]:
print('DataFrame rows: %d' % df3.count())
print('DataFrame schema: %s' % df3)
df3.show(10, False)

DataFrame rows: 21293
DataFrame schema: DataFrame[date: string, time: string, transaction: int, item: string]
+----------+--------+-----------+-------------+
|date      |time    |transaction|item         |
+----------+--------+-----------+-------------+
|2016-10-30|09:58:11|1          |Bread        |
|2016-10-30|10:05:34|2          |Scandinavian |
|2016-10-30|10:05:34|2          |Scandinavian |
|2016-10-30|10:07:57|3          |Hot chocolate|
|2016-10-30|10:07:57|3          |Jam          |
|2016-10-30|10:07:57|3          |Cookies      |
|2016-10-30|10:08:41|4          |Muffin       |
|2016-10-30|10:13:03|5          |Coffee       |
|2016-10-30|10:13:03|5          |Pastry       |
|2016-10-30|10:13:03|5          |Bread        |
+----------+--------+-----------+-------------+
only showing top 10 rows



## Overwrite Data to Database Table
Overwrite the contents of the DataFrame to the 'transactions' table.

In [14]:
df3.write \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .option('truncate', 'true') \
    .mode('overwrite') \
    .save()

## Graph Data with BokehJS
Perform some simple analysis of the bakery data and plot the results with [BokehJS](https://docs.bokeh.org/en/latest/index.html).
### Questions
1. What are the best selling bakery items?
2. What are the busiest days of the week?
3. What are the busiest times of the day?

In [15]:
from math import pi
from bokeh.io import output_notebook, show
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.transform import factor_cmap, cumsum
from bokeh.palettes import Paired12

output_notebook()

In [16]:
df3.createOrReplaceTempView('tmp_bakery')
sql_query = "SELECT item, count(*) as count " + \
            "FROM tmp_bakery " + \
            "WHERE item NOT LIKE 'NONE' " + \
            "GROUP BY item " \
            "ORDER BY count DESC " + \
            "LIMIT 10"

df4 = spark.sql(sql_query)
df4.show(10, False)

+-------------+-----+
|item         |count|
+-------------+-----+
|Coffee       |5471 |
|Bread        |3325 |
|Tea          |1435 |
|Cake         |1025 |
|Pastry       |856  |
|Sandwich     |771  |
|Medialuna    |616  |
|Hot chocolate|590  |
|Cookies      |540  |
|Brownie      |379  |
+-------------+-----+



### Pie Chart

In [17]:
data = df4.toPandas()
tooltips = [('item', '@item'), ('count', '@{count}{,}')]
items = data['item'].tolist()
color_map = factor_cmap(
    field_name='item', 
    palette=Paired12, 
    factors=items
)

data['angle'] = data['count'] / data['count'].sum() * 2 * pi
plot = figure(
    plot_height=375, 
    title='Top 10 Bakery Items',
    tooltips=tooltips, 
    x_range=(-0.5, 1.0)
)
plot.wedge(
    x=0, 
    y=1, 
    radius=0.4,
    start_angle=cumsum('angle', 
                       include_zero=True), 
    end_angle=cumsum('angle'),
    line_color='white', 
    fill_color=color_map, 
    legend_field='item', 
    source=data
)
plot.axis.axis_label=None
plot.axis.visible=False
plot.grid.grid_line_color = None

show(plot)

### Vertical Bar Chart

In [18]:
sql_query = "SELECT date_format(date, 'EEEE') as day, count(*) as count " + \
            "FROM tmp_bakery " + \
            "WHERE item NOT LIKE 'NONE' " + \
            "GROUP BY day " \
            "ORDER BY count ASC " + \
            "LIMIT 10"

df5 = spark.sql(sql_query)
df5.show(10, False)

+---------+-----+
|day      |count|
+---------+-----+
|Wednesday|2321 |
|Monday   |2324 |
|Tuesday  |2392 |
|Thursday |2646 |
|Sunday   |3095 |
|Friday   |3124 |
|Saturday |4605 |
+---------+-----+



In [19]:
source = ColumnDataSource(data=df5.toPandas())
tooltips = [('day', '@day'), ('count', '@{count}{,}')]
days = source.data['day'].tolist()
color_map = factor_cmap(
    field_name='day', 
    palette=Paired12, 
    factors=days
)
plot = figure(
    y_range=days, 
    plot_width=750, 
    plot_height=375, 
    min_border=0, 
    tooltips=tooltips
)
plot.hbar(
    y='day',
    right='count', 
    height=.9, 
    source=source, 
    fill_color=color_map
)
plot.title.text = 'Items Sold/Day'
plot.yaxis.axis_label = 'Days of the Week'
plot.xaxis.axis_label = 'Total Items Sold'

show(plot)

In [20]:
sql_query = "WITH tmp_table AS (" \
            "  SELECT date_format(time, 'HH') as hour, count(*) as count " + \
            "  FROM tmp_bakery " + \
            "  WHERE item NOT LIKE 'NONE'" \
            "  GROUP BY hour " \
            "  ORDER BY hour ASC" \
            ")" \
            "SELECT hour, count " \
            "FROM tmp_table " \
            "WHERE hour BETWEEN 08 AND 20"

df6 = spark.sql(sql_query)
df6.show(24, False)

+----+-----+
|hour|count|
+----+-----+
|08  |645  |
|09  |1966 |
|10  |2666 |
|11  |3102 |
|12  |2854 |
|13  |2617 |
|14  |2640 |
|15  |2115 |
|16  |1343 |
|17  |368  |
|18  |82   |
|19  |48   |
|20  |22   |
+----+-----+



In [21]:
source = ColumnDataSource(data=df6.toPandas())
tooltips = [('hour', '@hour:00'), ('count', '@{count}{,}')]
hours = source.data['hour'].tolist()
plot = figure(
    x_range=hours, 
    plot_width=750, 
    plot_height=375, 
    min_border=0, 
    tooltips=tooltips
)
plot.vbar(
    x='hour', 
    bottom=0, 
    top='count', 
    source=source, 
    width=0.95
)
plot.line(
    x='hour',
    y='count',
    source=source,
    line_color='red',
    line_width=2
)
plot.title.text = 'Items Sold/Hour (8AM - 11PM)'
plot.xaxis.axis_label = 'Hour of the Day'
plot.yaxis.axis_label = 'Total Items Sold'

show(plot)

## Read and Write Data to Parquet Format
Perform basic analysis of the bakery data using Spark SQL. Read and write resulting DataFrame contents to Parquet.

In [22]:
sql_query = "SELECT transaction, CAST(CONCAT(date,' ',time) as timestamp) as timestamp, item " + \
            "FROM tmp_bakery " + \
            "WHERE item NOT LIKE 'NONE' " + \
            "ORDER BY transaction ASC, item ASC"

df7 = spark.sql(sql_query)
print('DataFrame rows: %d' % df7.count())
print('DataFrame schema: %s' % df7)
df7.show(10, False)

DataFrame rows: 20507
DataFrame schema: DataFrame[transaction: int, timestamp: timestamp, item: string]
+-----------+-------------------+-------------+
|transaction|timestamp          |item         |
+-----------+-------------------+-------------+
|1          |2016-10-30 09:58:11|Bread        |
|2          |2016-10-30 10:05:34|Scandinavian |
|2          |2016-10-30 10:05:34|Scandinavian |
|3          |2016-10-30 10:07:57|Cookies      |
|3          |2016-10-30 10:07:57|Hot chocolate|
|3          |2016-10-30 10:07:57|Jam          |
|4          |2016-10-30 10:08:41|Muffin       |
|5          |2016-10-30 10:13:03|Bread        |
|5          |2016-10-30 10:13:03|Coffee       |
|5          |2016-10-30 10:13:03|Pastry       |
+-----------+-------------------+-------------+
only showing top 10 rows



In [23]:
df7.write.parquet('output/bakery_parquet', mode='overwrite')

In [24]:
! ls 2>&1 -lh output/bakery_parquet | head -10
! echo 'Parquet Files:' $(ls | wc -l)

total 800K
-rw-r--r-- 1 garystaf users 1.9K Dec  4 10:09 part-00000-e82036fa-fb6b-4f9f-a956-9ebd76d9fc41-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 2.0K Dec  4 10:09 part-00001-e82036fa-fb6b-4f9f-a956-9ebd76d9fc41-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 1.8K Dec  4 10:09 part-00002-e82036fa-fb6b-4f9f-a956-9ebd76d9fc41-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 2.0K Dec  4 10:09 part-00003-e82036fa-fb6b-4f9f-a956-9ebd76d9fc41-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 1.9K Dec  4 10:09 part-00004-e82036fa-fb6b-4f9f-a956-9ebd76d9fc41-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 1.9K Dec  4 10:09 part-00005-e82036fa-fb6b-4f9f-a956-9ebd76d9fc41-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 2.0K Dec  4 10:09 part-00006-e82036fa-fb6b-4f9f-a956-9ebd76d9fc41-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 1.9K Dec  4 10:09 part-00007-e82036fa-fb6b-4f9f-a956-9ebd76d9fc41-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 2.1K Dec  4 10:09 part-00008-e82036fa-fb6b-4f9f-a

In [None]:
df8 = spark.read.parquet('output/bakery_parquet')
print('DataFrame rows: %d' % df7.count())
print('DataFrame schema: %s' % df7)
df8.select('transaction', 'timestamp', 'item') \
    .sort('transaction', 'item') \
    .show(10, False)

DataFrame rows: 20507
DataFrame schema: DataFrame[transaction: int, timestamp: timestamp, item: string]
