In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import to_timestamp

spark = SparkSession \
    .builder \
    .appName('pyspark_demo_app') \
    .config('spark.driver.extraClassPath',
            '/home/jovyan/work/postgresql-42.2.5.jar') \
    .master("local[*]") \
    .getOrCreate()

sc = spark.sparkContext

bakery_schema = StructType([
    StructField('date', StringType(), True),
    StructField('time', StringType(), True),
    StructField('transaction', IntegerType(), True),
    StructField('item', StringType(), True)
])

df3 = spark.read \
    .format('csv') \
    .option('header', 'true') \
    .load('Datasets/BreadBasket_DMS.csv', schema=bakery_schema)

df3.show(100)
df3.count()

+----------+--------+-----------+-------------+
|      date|    time|transaction|         item|
+----------+--------+-----------+-------------+
|2016-10-30|09:58:11|          1|        Bread|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:07:57|          3|Hot chocolate|
|2016-10-30|10:07:57|          3|          Jam|
|2016-10-30|10:07:57|          3|      Cookies|
|2016-10-30|10:08:41|          4|       Muffin|
|2016-10-30|10:13:03|          5|       Coffee|
|2016-10-30|10:13:03|          5|       Pastry|
|2016-10-30|10:13:03|          5|        Bread|
|2016-10-30|10:16:55|          6|    Medialuna|
|2016-10-30|10:16:55|          6|       Pastry|
|2016-10-30|10:16:55|          6|       Muffin|
|2016-10-30|10:19:12|          7|    Medialuna|
|2016-10-30|10:19:12|          7|       Pastry|
|2016-10-30|10:19:12|          7|       Coffee|
|2016-10-30|10:19:12|          7|          Tea|
|2016-10-30|10:20:51|          8|       

21293

The below packages give the ability to Python to interact with the PostgreSQL.

In [2]:
!pip install psycopg2 psycopg2-binary

Collecting psycopg2
[?25l  Downloading https://files.pythonhosted.org/packages/bc/2a/61a8f9719bd6df5b421abd91740cb0595fc3c17b28eaf89fe4f144472ca6/psycopg2-2.7.6.1-cp36-cp36m-manylinux1_x86_64.whl (2.7MB)
[K    100% |████████████████████████████████| 2.7MB 1.7MB/s ta 0:00:01    33% |██████████▊                     | 901kB 1.7MB/s eta 0:00:02    64% |████████████████████▉           | 1.7MB 2.2MB/s eta 0:00:01
[?25hCollecting psycopg2-binary
[?25l  Downloading https://files.pythonhosted.org/packages/cd/eb/4e872a11edd82079b4163035389051668c58cd2acc30777b6bee73f5f8a3/psycopg2_binary-2.7.6.1-cp36-cp36m-manylinux1_x86_64.whl (2.7MB)
[K    100% |████████████████████████████████| 2.7MB 1.4MB/s ta 0:00:01    17% |█████▌                          | 460kB 2.2MB/s eta 0:00:02    46% |███████████████                 | 1.2MB 2.2MB/s eta 0:00:01
[?25hInstalling collected packages: psycopg2, psycopg2-binary
Successfully installed psycopg2-2.7.6.1 psycopg2-binary-2.7.6.1


### Interact with DB
The **psycopg2** and **psycopg2-binary** packages give Python the ability to interact with the PostgreSQL container.  
[psycopg2 Documentation](http://initd.org/psycopg/docs/usage.html)

In [3]:
import psycopg2

connect_str = 'host=postgres port=5432 dbname=demo user=postgres password=postgres1234'
conn = psycopg2.connect(connect_str)
conn.autocommit = True
cursor = conn.cursor()

sql_file = open('SQL.sql', 'r')
sqlFile = sql_file.read()
sql_file.close()
sqlCommands = sqlFile.split(';')
for command in sqlCommands:
    print(command)
    if command.strip() != '':
        cursor.execute(command)


DROP TABLE IF EXISTS "bakery_basket"

DROP SEQUENCE IF EXISTS bakery_basket_id_seq

CREATE SEQUENCE bakery_basket_id_seq INCREMENT 1 MINVALUE 1 MAXVALUE 2147486777 START 1 CACHE 1


CREATE TABLE "public"."bakery_basket" (
"id" integer DEFAULT nextval('bakery_basket_id_seq')NOT NULL,
 "date" character varying(10) NOT NULL,
    "time" character varying(8) NOT NULL,
    "transaction" integer NOT NULL,    "item" character varying(50) NOT NULL
) WITH (oids = false)


INSERT INTO "bakery_basket" ("date", "time", "transaction", "item", "id") VALUES
('2016-10-30','09:58:11',1,'Bread',1),
('2016-10-30','10:05:34',2,'Scandinavian',2),
('2016-10-30','10:07:57',3,'Hot chocolate',3)




### Analyze Data with SparkSQL

##### Adminer (full-featured database management tool - local port 8080) is used to confirm the SQL's scripts success. 

In [4]:
#Test - Load the PostgreSQL 'bakery_basket' table's contents into a DataFrame


properties = {
    'driver': 'org.postgresql.Driver',
    'url': 'jdbc:postgresql://postgres:5432/demo',
    'user': 'postgres',
    'password': 'postgres1234',
    'dbtable': 'bakery_basket',
}

df1 = spark.read \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .load()

In [5]:
df3.write \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .mode('append') \
    .save()


In [6]:
df3.show(10)

+----------+--------+-----------+-------------+
|      date|    time|transaction|         item|
+----------+--------+-----------+-------------+
|2016-10-30|09:58:11|          1|        Bread|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:07:57|          3|Hot chocolate|
|2016-10-30|10:07:57|          3|          Jam|
|2016-10-30|10:07:57|          3|      Cookies|
|2016-10-30|10:08:41|          4|       Muffin|
|2016-10-30|10:13:03|          5|       Coffee|
|2016-10-30|10:13:03|          5|       Pastry|
|2016-10-30|10:13:03|          5|        Bread|
+----------+--------+-----------+-------------+
only showing top 10 rows



In [11]:
df1.createOrReplaceTempView("bakery_table")
df4 = spark.sql("SELECT * FROM bakery_table " +
               "ORDER BY transaction, date, time")
df4.show(10)
df4.count()

+---+----------+--------+-----------+-------------+
| id|      date|    time|transaction|         item|
+---+----------+--------+-----------+-------------+
|  1|2016-10-30|09:58:11|          1|        Bread|
|  1|2016-10-30|09:58:11|          1|        Bread|
|  2|2016-10-30|10:05:34|          2| Scandinavian|
|  2|2016-10-30|10:05:34|          2| Scandinavian|
|  3|2016-10-30|10:05:34|          2| Scandinavian|
|  4|2016-10-30|10:07:57|          3|Hot chocolate|
|  6|2016-10-30|10:07:57|          3|      Cookies|
|  3|2016-10-30|10:07:57|          3|Hot chocolate|
|  5|2016-10-30|10:07:57|          3|          Jam|
|  7|2016-10-30|10:08:41|          4|       Muffin|
+---+----------+--------+-----------+-------------+
only showing top 10 rows



21296

In [12]:
df5 = spark.sql("SELECT COUNT(DISTINCT item) AS item_count FROM bakery_table")
df5.show()

df5 = spark.sql("SELECT item, COUNT(*) AS count " +
                "FROM bakery_table " + 
                "WHERE item NOT LIKE 'NONE' " +
                "GROUP BY item ORDER BY count DESC " +
                "LIMIT 10")
df5.show()

+----------+
|item_count|
+----------+
|        95|
+----------+

+-------------+-----+
|         item|count|
+-------------+-----+
|       Coffee| 5471|
|        Bread| 3326|
|          Tea| 1435|
|         Cake| 1025|
|       Pastry|  856|
|     Sandwich|  771|
|    Medialuna|  616|
|Hot chocolate|  591|
|      Cookies|  540|
|      Brownie|  379|
+-------------+-----+



### Graph Data with BokehJS

In [9]:
from bokeh.io import output_notebook, show
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.transform import factor_cmap
from bokeh.palettes import Paired12

output_notebook()

source = ColumnDataSource(data=df5.toPandas())

tooltips = [('item', '@item'), ('count', '@{count}{,}')]

items = source.data['item'].tolist()
color_map = factor_cmap(field_name='item', palette=Paired12, factors=items)
plot = figure(x_range=items, plot_width=750, plot_height=375, min_border=0, tooltips=tooltips)
plot.vbar(x='item', bottom=0, top='count', source=source, width=0.9, fill_color=color_map)
plot.title.text = 'Top 10 Bakery Items'
plot.xaxis.axis_label = 'Bakery Items'
plot.yaxis.axis_label = 'Total Items Sold'

show(plot)

In [16]:
df6 = spark.sql("SELECT CONCAT(date,' ',time) AS timestamp, transaction, item " +
                "FROM bakery_table " +
                "WHERE item NOT LIKE 'NONE' " +
                "ORDER BY transaction")

df6.show(10)
df6.count()

+-------------------+-----------+-------------+
|          timestamp|transaction|         item|
+-------------------+-----------+-------------+
|2016-10-30 09:58:11|          1|        Bread|
|2016-10-30 09:58:11|          1|        Bread|
|2016-10-30 10:05:34|          2| Scandinavian|
|2016-10-30 10:05:34|          2| Scandinavian|
|2016-10-30 10:05:34|          2| Scandinavian|
|2016-10-30 10:07:57|          3|Hot chocolate|
|2016-10-30 10:07:57|          3|      Cookies|
|2016-10-30 10:07:57|          3|Hot chocolate|
|2016-10-30 10:07:57|          3|          Jam|
|2016-10-30 10:08:41|          4|       Muffin|
+-------------------+-----------+-------------+
only showing top 10 rows



20510

In [17]:
df7 = df6.withColumn('timestamp', to_timestamp(df6.timestamp, 'yyyy-MM-dd HH:mm:ss'))
df7.printSchema()
df7.show(10)
df7.count()

root
 |-- timestamp: timestamp (nullable = true)
 |-- transaction: integer (nullable = true)
 |-- item: string (nullable = true)

+-------------------+-----------+-------------+
|          timestamp|transaction|         item|
+-------------------+-----------+-------------+
|2016-10-30 09:58:11|          1|        Bread|
|2016-10-30 09:58:11|          1|        Bread|
|2016-10-30 10:05:34|          2| Scandinavian|
|2016-10-30 10:05:34|          2| Scandinavian|
|2016-10-30 10:05:34|          2| Scandinavian|
|2016-10-30 10:07:57|          3|Hot chocolate|
|2016-10-30 10:07:57|          3|      Cookies|
|2016-10-30 10:07:57|          3|Hot chocolate|
|2016-10-30 10:07:57|          3|          Jam|
|2016-10-30 10:08:41|          4|       Muffin|
+-------------------+-----------+-------------+
only showing top 10 rows



20510

In [18]:
df7.createOrReplaceTempView("bakery_table")
df8 = spark.sql("SELECT DISTINCT * " +
                "FROM bakery_table " +
                "WHERE item NOT LIKE 'NONE'" +
                "ORDER BY transaction DESC"
                )
df8.show(100)
df8.count()

+-------------------+-----------+--------------------+
|          timestamp|transaction|                item|
+-------------------+-----------+--------------------+
|2017-04-09 15:04:24|       9684|           Smoothies|
|2017-04-09 14:57:06|       9683|              Coffee|
|2017-04-09 14:57:06|       9683|              Pastry|
|2017-04-09 14:32:58|       9682|                 Tea|
|2017-04-09 14:32:58|       9682|              Muffin|
|2017-04-09 14:32:58|       9682|        Tacos/Fajita|
|2017-04-09 14:32:58|       9682|              Coffee|
|2017-04-09 14:30:09|       9681|    Christmas common|
|2017-04-09 14:30:09|       9681|            Truffles|
|2017-04-09 14:30:09|       9681|      Spanish Brunch|
|2017-04-09 14:30:09|       9681|                 Tea|
|2017-04-09 14:24:03|       9680|               Bread|
|2017-04-09 14:08:37|       9679|               Bread|
|2017-04-09 14:08:37|       9679|      Spanish Brunch|
|2017-04-09 13:49:21|       9678|              Coffee|
|2017-04-0

18887

### Data Analysis with SciPy, NumPy, Plotly

In [10]:
!pip install plotly --quiet

In [13]:
import plotly.plotly as py
import plotly.graph_objs as go
from numpy import arange, array, ones
from scipy import stats, signal

In [26]:
df1.createOrReplaceTempView("bakery_table_temp")

df_bakery = spark.sql("SELECT date, COUNT(*) AS count " +
                      "FROM bakery_table_temp " +
                      "wHERE date >= '2017-01-01' " + 
                      "GROUP BY date " +
                      "ORDER BY date")
df_bakery.count()

98

In [31]:
df_bakery1 = df_bakery.toPandas()

xi = arange(0, len(df_bakery1.index))
slope, intercept, r_value, p_value, std_err = stats.linregress(xi, df_bakery1['count'])
line = slope*xi + intercept

layout = dict(title='2017 Bakery Sales',
              xaxis=dict(
                  title='Month',
                  showgrid=True,
                  zeroline=True,
                  showline=True,
                  ticks='outside',
                  tickangle=45,
                  showticklabels=True),
              yaxis=dict(
                  title='Items Sold/Day',
                  showgrid=True,
                  zeroline=True,
                  showline=True,
                  ticks='outside',
                  showticklabels=True))

trace1 = go.Bar(x=df_bakery1['date'], y=df_bakery1['count'], name='Items Sold')
trace2 = go.Scatter(x=df_bakery1['date'], y=line, mode='lines', name='Linear Fit')
trace3 = go.Scatter(x=df_bakery1['date'], y=signal.savgol_filter(df_bakery1['count'], 53, 3),
                    mode='lines', name='Savitzky-Golay')
data = [trace1, trace2, trace3]
fig = dict(data=data, layout=layout)
py.offline.plot(fig)
#py.iplot(fig, filename='jupyter-basic_bar.html')


Using a non-tuple sequence for multidimensional indexing is deprecated; use `arr[tuple(seq)]` instead of `arr[seq]`. In the future this will be interpreted as an array index, `arr[np.array(seq)]`, which will result either in an error or a different result.



AttributeError: module 'plotly.plotly' has no attribute 'offline'