# A demo notebook : Sample ecommerce dataset from AWS

## This demo illustrates the following 

* Defining spark external tables on data 
* Defining the star schema between the data tables 
* Loading a set of tables into SNAP ( Indexing Process)
* Sample queries on SNAP

### First let us setup the notebook

In [1]:
from pyhive import hive
from pprint import pprint
import pandas as pd
import os
from altair import *

import IPython.display
def draw(spec):
    IPython.display.display({
        'application/vnd.vegalite.v1+json': spec.to_dict()
    }, raw=True)

pd.set_option('display.max_colwidth', -1) # dont truncate table columns
cwd = os.getcwd()
cwd="/data/shared/snap-samples/Redshift"

c = hive.Connection(host="0.0.0.0",port=10000,auth='NOSASL')
pd.read_sql('show tables',c)

def sql(q, explain=False) :
    # silly hack to handle filesystem prefix for us when creating local tables
    if "{prefix}" in q:
        q = q.replace('{prefix}',cwd)
    df=pd.read_sql(q,c)
    return df

def explain(q):
    df = sql("explain " + q)
    plan = df['plan'][0]
    pprint(plan)
    

sql('show tables')

Unnamed: 0,tableName,isTemporary
0,adsnap,False
1,adtech_demo,False
2,bi_crn_f_retail_depl_mvw_vw,False
3,campaigns,False
4,category,False
5,custnation,False
6,customer,False
7,customer_address,False
8,customer_demographics,False
9,customer_ds,False


## Drop all tables and recreate them

Note these are external tables - so drop will only delete the metadata not the data

In [2]:
table_names = ["users","venue","category","ddate","event","listing","sales"]
drop = """
drop table if exists {table_name}
"""
for table_name in table_names:
    pd.read_sql(
        drop.format(table_name=table_name)
    ,c) 

### External table : Users

In [3]:
users = """
create table if not exists users (
	userid integer ,
	username string,
	firstname string,
	lastname string,
	city string,
	state string,
	email string,
	phone string,
	likesports string,
	liketheatre string,
	likeconcerts string,
	likejazz string,
	likeclassical string,
	likeopera string,
	likerock string,
	likevegas string,
	likebroadway string,
	likemusicals string)
    using csv
    options (path "{prefix}/allusers_pipe.txt", delimiter "|")
"""
sql(users)

Unnamed: 0,Result


### External table : Venue

In [4]:
venue = """
create table if not exists venue(
	venueid integer,
	venuename string,
	venuecity string,
	venuestate string,
	venueseats integer)
    using csv
    options (path "{prefix}/venue_pipe.txt", delimiter "|")
"""
sql(venue)


Unnamed: 0,Result


### External table : Category

In [5]:
category="""
create table if not exists category(
	catid integer,
	catgroup string,
	catname string,
	catdesc string)    
    using csv
    options (path "{prefix}/category_pipe.txt", delimiter "|")
"""
sql(category)

Unnamed: 0,Result


### External table: Date

In [6]:
ddate = """
create table if not exists ddate(
	dateid integer ,
	caldate date,
	day string,
	week integer,
	month string,
	qtr string,
	year integer,
	holiday string)    
    using csv
    options (path "/data/shared/snap-samples/Redshift/date2008_pipe.txt", delimiter "|")
"""
sql(ddate)

Unnamed: 0,Result


### External table : Event

In [7]:
event = """
create table if not exists event(
	eventid integer ,
	venueid integer,
	catid integer,
	dateid integer ,
	eventname string,
	starttime timestamp)    
    using csv
    options (path "{prefix}/allevents_pipe.txt", delimiter "|")
"""
sql(event)

Unnamed: 0,Result


### External table: Listing

In [8]:
listing = """
create table if not exists listing(
	listid integer  ,
	sellerid integer ,
	eventid integer ,
	dateid integer ,
	numtickets integer ,
	priceperticket decimal(8,2),
	totalprice decimal(8,2),
	listtime timestamp)    
    using csv
    options (path "{prefix}/listings_pipe.txt", delimiter "|")
"""

sql(listing)

Unnamed: 0,Result


### External table: Sales

In [9]:
sales = """
create table if not exists  sales(
	salesid integer ,
	listid integer,
	sellerid integer,
	buyerid integer,
	eventid integer,
	dateid integer,
	qtysold integer,
	pricepaid decimal(8,2),
	commission decimal(8,2),
	saletime timestamp)
    using csv
    options (
        path "{prefix}/sales_tab.txt"
        ,delimiter "\t"
        ,timestampFormat "d/MM/yyyy HH:mm:ss"
        ,inferSchema "true"
        )
"""
sql(sales)


Unnamed: 0,Result


### Define the star schema in SNAP

In [10]:
create_star_schema = """alter star schema on sales as
many_to_one join of sales with listing on sales.listid = listing.listid
many_to_one join of sales with event on sales.eventid = event.eventid
many_to_one join of sales with ddate on sales.dateid = ddate.dateid
many_to_one join of sales with users on sales.buyerid = users.userid
many_to_one join of event with category on event.catid = category.catid
many_to_one join of event with venue on event.venueid = venue.venueid
"""
sql(create_star_schema)

Unnamed: 0,Unnamed: 1


### Define the SNAP Index 

In [13]:
salessnap="""
drop olap index salessnap on sales
"""
sql(salessnap)

Unnamed: 0,Result


In [14]:
salessnap="""

create olap index salessnap on sales
timestamp dimension starttime 
timestamp dimension listtime 
timestamp dimension saletime
timestamp dimension caldate
metric priceperticket aggregator doubleSum is nullable nullvalue "0.0" 
metric totalprice aggregator doubleSum is nullable nullvalue "0.0"
metric numtickets aggregator longSum is nullable nullvalue "0"
metric qtysold aggregator longSum is nullable nullvalue "0"
metric pricepaid aggregator doubleSum is nullable nullvalue "0.0"
metric commission aggregator doubleSum is nullable nullvalue "0.0"
dimension holiday is not nullable 
dimensions "username,city, state, likesports, liketheatre,likeconcerts, likejazz , \
likeclassical, likeopera, likerock, likevegas, likebroadway, likemusicals, \
venuename, venuecity, venuestate, catgroup, catname, catdesc \
, day, week, month, qtr, year, eventname , \
sales.eventid, sales.buyerid, sales.listid, sales.salesid" 

OPTIONS (        
    path "/data/shared/snap-samples/Redshift/snap",
    avgSizePerPartition  "40mb",
    avgNumRowsPerPartition "10000",
    preferredSegmentSize "20mb",
    rowFlushBoundary "10000",
    defaultNullValue "0"
)
"""

sql(salessnap)

Unnamed: 0,Result


### Load data into SNAP

In [237]:
insert=""" insert overwrite olap index salessnap of sales """

sql(insert)

### Query 1

Note after loading data into SNAP you can query the original tables sales, ddate with the join keys. The query will get rewritten to use the SNAP Index

In [15]:
query1 = """
SELECT sum(qtysold) 
FROM   sales as sales, ddate
WHERE  sales.dateid = ddate.dateid 
AND    ddate.caldate = '2008-01-05'
"""
sql(query1)

Unnamed: 0,sum(qtysold)
0,203


In [18]:
explain(query1)

u'== Physical Plan ==\n*Project [alias-1#808L AS sum(qtysold)#807L]\n+- *HashAggregate(keys=[], functions=[sum(alias-1#808L)])\n   +- Exchange SinglePartition\n      +- *HashAggregate(keys=[], functions=[partial_sum(alias-1#808L)])\n         +- *Scan spmd{\n  "jsonClass" : "TimeSeriesQuerySpec",\n  "queryType" : "timeseries",\n  "dataSource" : "spmd",\n  "intervals" : [ "1900-01-01T00:00:00.000+05:53:20/2100-01-01T00:23:20.000+05:53:20" ],\n  "granularity" : "all",\n  "filter" : {\n    "jsonClass" : "LogicalFilterSpec",\n    "type" : "and",\n    "fields" : [ {\n      "jsonClass" : "JavascriptFilterSpec",\n      "type" : "javascript",\n      "dimension" : "caldate",\n      "function" : "function (caldate) {\\n            \\n            var v2 = org.joda.time.format.ISODateTimeFormat.dateTimeParser();\\n            var v4 = (org.joda.time.LocalDate.parse(caldate, v2).toString(\\"yyyy-MM-dd\\"));\\n\\n            return((((v4 != null) ? v4.toString() : \\"\\"))  ==  (\\"2008-01-05\\"));\\

In [231]:
query2 = """
SELECT firstname, lastname, total_quantity 
FROM   (SELECT buyerid, sum(qtysold) total_quantity
        FROM  sales
        GROUP BY buyerid
        ORDER BY total_quantity desc limit 10) Q, users
WHERE Q.buyerid = userid
ORDER BY Q.total_quantity desc
"""
sql(query2)

In [24]:
sq1=""" 

SELECT SUM(`sales`.`qtysold`) AS `sum_qtysold_ok`, 
COUNT(1) AS `x__alias__0` 
FROM `default`.`sales` `sales` 
  JOIN `default`.`users` `users` ON (`sales`.`buyerid` = `users`.`userid`) 
  JOIN `default`.`event` `event` ON (`sales`.`eventid` = `event`.`eventid`) 
  JOIN `default`.`listing` `listing` ON (`sales`.`listid` = `listing`.`listid`) 
  JOIN `default`.`ddate` `ddate` ON (`sales`.`dateid` = `ddate`.`dateid`) 
  JOIN `default`.`category` `category` ON (`event`.`catid` = `category`.`catid`) 
  JOIN `default`.`venue` `venue` ON (`event`.`venueid` = `venue`.`venueid`) 
  HAVING (COUNT(1) > 0)

"""
sql(sq1)

Unnamed: 0,sum_qtysold_ok,x__alias__0
0,339796,169679


In [22]:
t10="""
select  sales.eventid, sum(sales.pricepaid) 
from sales, event
where sales.eventid = event.eventid
and sales.pricepaid > 30
group by sales.eventid
order by 2 desc
limit 10
"""
sql(t10)

Unnamed: 0,eventid,sum(pricepaid)
0,289,51846.0
1,7895,51049.0
2,1602,50301.0
3,851,49956.0
4,7315,49823.0
5,6471,47997.0
6,2118,47863.0
7,984,46780.0
8,7851,46661.0
9,5638,46280.0


### Create a derived view representing a segment of users

In [21]:
sportsandjazz = """
create or replace view sportsandjazz as 
select * from salessnap where likesports="TRUE" and likejazz="TRUE"
"""
sql(sportsandjazz)

Unnamed: 0,Result


In [20]:

q="""
select count(*) from sportsandjazz
"""
sql(q)

Unnamed: 0,count(1)
0,7721


### Compare metrics for all users vs a segment of users

In [25]:
q="""
with allusers AS ( 
select caldate adate, city,sum(qtysold) q, sum(pricepaid) p 
from salessnap group by caldate,city)
,
someusers AS (
select caldate sdate,city, sum(qtysold) a, sum(pricepaid) b 
from salessnap where likeconcerts='TRUE' AND likejazz='TRUE' group by caldate,city)

select adate,allusers.city, a, b, round(a/q,2)*100 qratio , round(b/p,2)*100 pratio
from allusers, someusers where adate=sdate order by pratio desc limit 5000
"""


In [26]:
df=sql(q)

In [27]:
df.columns

Index([u'adate', u'city', u'a', u'b', u'qratio', u'pratio'], dtype='object')

### Combine results from SNAP with Pandas for descriptive analysis

In [28]:
df[['a','b','qratio','pratio']].describe()

Unnamed: 0,a,qratio
count,5000.0,5000.0
mean,3.081,273.9516
std,1.32544,131.798955
min,1.0,67.0
25%,2.0,200.0
50%,3.0,200.0
75%,4.0,400.0
max,8.0,800.0
