# Atwater Customer Recommendations

#### A demo using DataStax Enterprise Analytics, Apache Cassandra, Apache Spark, Python and Jupyter Notebooks to utilize the power of big customer data to recommend items to our customers with a high degree of accruacy 

### Things To Setup
* Install DSE https://docs.datastax.com/en/install/doc/install60/installTOC.html
* Start DSE Analytics Cluster
* Using Python 2.7
* Using DSE Analytics 6
* Using latest verion of Jupyter 
* Find full path to <>/lib/pyspark.zip
* Find full path to <>/lib/py4j-0.10.4-src.zip
* Start Jupyter with DSE to get all environemnt variables: dse exec jupyter notebook
* Make sure that the all the CSV files are in the same locations as this notebook
* Make sure that all \*.cql files are in the same locations as this notebook
* !pip install cassandra-driver
* !pip install pattern 
* !pip install panadas
* Counter-intuitive don't install pyspark!!

#### Add some environment variables to find dse verision of pyspark. Edit these varibles with your path.

In [3]:
pysparkzip = "/usr/share/dse/spark/python/lib/pyspark.zip"
py4jzip = "/usr/share/dse/spark/python/lib/py4j-0.10.4-src.zip"

In [4]:
# Needed to be able to find pyspark libaries
import sys
sys.path.append(pysparkzip)
sys.path.append(py4jzip)

#### Import python packages -- all are required

In [6]:
import pandas
import cassandra
import pyspark
import re
import os
from IPython.display import display, Markdown
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pattern.en import sentiment, positive

#### Helper function to have nicer formatting of Spark DataFrames

In [7]:
#Helper for pretty formatting for Spark DataFrames
def showDF(df, limitRows =  5, truncate = True):
    if(truncate):
        pandas.set_option('display.max_colwidth', 50)
    else:
        pandas.set_option('display.max_colwidth', -1)
    pandas.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pandas.reset_option('display.max_rows')

### Creating Tables, Pulling Tweets, and Loading Tables

#### Connect to DSE Analytics Cluster

In [8]:
from cassandra.cluster import Cluster

cluster = Cluster(['127.0.0.1']) #If you have a locally installed DSE cluster
session = cluster.connect()

#### Create Demo Keyspace --Replication Factor is 1 since only have a one node demo cluster. Replication Factor is recommended at 3 or Write Consistency + Read Consistency > Replication Factor

In [9]:
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS demo 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

<cassandra.cluster.ResultSet at 0x69a3410>

#### Set keyspace 

In [10]:
session.set_keyspace('demo')

### Create the customer transaction table in DSE (this is for completed transactions).  This table will be updated with about 1000 transactions a minute (Atwater has around 200,000 transactions a day on their website)
#### Our primary key will be on state (limiting our analysis to just the US), and our clustering columns will be around gender, age and the transaction id. Consider your data model when choosing your primary key. This will give us a good distriubtion of the data and a unique row for each transaction. We will also be able to create models around age, gender, and state to give the best possible recommendations. 

##### For this demo this table will store 1 million records

In [11]:
query = "CREATE TABLE IF NOT EXISTS customer_transactions (id int, \
                                                            customer_name text, \
                                                            gender text, age int, \
                                                            state text, home_store int, \
                                                            items list<text>, year int, \
                                                            month int, rewards_member text, \
                                                            PRIMARY KEY ((state), gender, age, id))"
session.execute(query)


<cassandra.cluster.ResultSet at 0x69ac450>

### Create the live customer table in DSE - this represents live customers that are currently logged in on the site and what they have in their shopping cart. 
#### We will use that information to get a prediction of what we should recommend for them.  The data model is the same as above. 
##### For the ease of the demo the items per cart will just be 1 item. For demo this table will hold 10K records

In [12]:
query = "CREATE TABLE IF NOT EXISTS customer_live (id int, \
                                                            customer_name text, \
                                                            gender text, age int, \
                                                            state text, home_store int, \
                                                            items list<text>, year int, \
                                                            month int, rewards_member text, \
                                                            PRIMARY KEY ((state), gender, age, id))"
session.execute(query)

<cassandra.cluster.ResultSet at 0x2f4ae50>

### Create the Customer Recommendation Table in DSE
#### This table will be used with the inventory table to show the correct, in-stock items by the website. --In reality this information probably would not be written back to a Cassandra table as it doesn't need to be stored long-term. For the shake of the demo showing fast reads and fast writes. 

In [41]:
query = "CREATE TABLE IF NOT EXISTS customer_recommend (id int, \
                                                            customer_name text, \
                                                            gender text, age int, \
                                                            state text, home_store int, \
                                                            items list<text>, year int, \
                                                            month int, rewards_member text,\
                                                            prediction list<text>,\
                                                            PRIMARY KEY ((id, state), gender, age))"
session.execute(query)

<cassandra.cluster.ResultSet at 0x7984e90>

### Create the Inventory Table in DSE 
#### Our primary key is going to be around the item type (pants, shirts, blender), the location of the items, the sku, and if it the items is currently avaliable. While customers may want to look at items that are on back-order, we do not want to recommend them. This will only cause frustration. This table would have around 6 million entries at one time, with inserts/deletions daily. 

In [67]:
query = "CREATE TABLE IF NOT EXISTS inventory (sku int, \
                                               item_name text, item_type text, \
                                               stock_loc text, num_items int, \
                                               backorder text, \
                                               PRIMARY KEY (item_type, stock_loc, backorder, sku))"
session.execute(query)

<cassandra.cluster.ResultSet at 0x78d9310>

#### Load CSV files into DSE for Customer Transactions, Customer Live/Shopping Cart and Inventory Tables
##### Note could also use bulk loader or a loop with insert statements

In [15]:
!head -n 2 customer.csv
!cat loadCustomer.cql
!time cqlsh -f loadCustomer.cql

1|Customer1|M|20|AL|0|['tshirt', 'jeans']|2017|6|Y
2|Customer2|F|50|AK|1|['shoes', 'heels', 'dress']|2017|6|N
COPY demo.customer_transactions( id, customer_name, gender, age, state, home_store, items, year, month, rewards_member) FROM 'customer.csv' WITH DELIMITER = '|';
Using 3 child processes

Starting copy of demo.customer_transactions with columns [id, customer_name, gender, age, state, home_store, items, year, month, rewards_member].


Processed: 810000 rows; Rate:   27592 rows/s; Avg. rate:   19641 rows/socessed: 15000 rows; Rate:   22385 rows/s; Avg. rate:   14091 rows/sProcessed: 20000 rows; Rate:   18639 rows/s; Avg. rate:   14283 rows/sProcessed: 25000 rows; Rate:   16838 rows/s; Avg. rate:   14428 rows/sProcessed: 30000 rows; Rate:   16665 rows/s; Avg. rate:   14736 rows/sProcessed: 35000 rows; Rate:   18061 rows/s; Avg. rate:   15265 rows/sProcessed: 50000 rows; Rate:   34635 rows/s; Avg. rate:   19336 rows/sProcessed: 55000 rows; Rate:   26745 rows/s; Avg. rate:   19292 rows/sProcessed: 60000 rows; Rate:   19231 rows/s; Avg. rate:   18306 rows/sProcessed: 65000 rows; Rate:   17548 rows/s; Avg. rate:   18092 rows/sProcessed: 75000 rows; Rate:   24440 rows/s; Avg. rate:   19172 rows/sProcessed: 90000 rows; Rate:   38517 rows/s; Avg. rate:   21443 rows/sProcessed: 95000 rows; Rate:   27062 rows/s; Avg. rate:   21029 rows/sProcessed: 100000 rows; Rate:   18540 rows/s; Avg. rate:   19934 rows/sProcessed: 110000 ro

Processed: 865000 rows; Rate:   18247 rows/s; Avg. rate:   19342 rows/sProcessed: 875000 rows; Rate:   28976 rows/s; Avg. rate:   19456 rows/sProcessed: 880000 rows; Rate:   19293 rows/s; Avg. rate:   19344 rows/sProcessed: 890000 rows; Rate:   25388 rows/s; Avg. rate:   19428 rows/sProcessed: 895000 rows; Rate:   20233 rows/s; Avg. rate:   19397 rows/sProcessed: 905000 rows; Rate:   28317 rows/s; Avg. rate:   19497 rows/sProcessed: 910000 rows; Rate:   16665 rows/s; Avg. rate:   19193 rows/sProcessed: 915000 rows; Rate:   16683 rows/s; Avg. rate:   19177 rows/sProcessed: 925000 rows; Rate:   28026 rows/s; Avg. rate:   19284 rows/sProcessed: 930000 rows; Rate:   23739 rows/s; Avg. rate:   19285 rows/sProcessed: 940000 rows; Rate:   31379 rows/s; Avg. rate:   19389 rows/sProcessed: 945000 rows; Rate:   21579 rows/s; Avg. rate:   19323 rows/sProcessed: 960000 rows; Rate:   40238 rows/s; Avg. rate:   19528 rows/sProcessed: 965000 rows; Rate:   24554 rows/s; Avg. rate:   19407 rows/sProces

In [16]:
!head -n 2 customerTest.csv
!cat loadCustomerTest.cql
!time cqlsh -f loadCustomerTest.cql

1|Customer1|F|21|AL|0|['jeans']|2016|6|Y
2|Customer2|M|36|AK|1|['jeans']|2017|6|Y
COPY demo.customer_live( id, customer_name, gender, age, state, home_store, items, year, month, rewards_member) FROM 'customerTest.csv' WITH DELIMITER = '|';
Using 3 child processes

Starting copy of demo.customer_live with columns [id, customer_name, gender, age, state, home_store, items, year, month, rewards_member].
Processed: 5000 rows; Rate:    3797 rows/s; Avg. rate:    3796 rows/sProcessed: 10000 rows; Rate:    9238 rows/s; Avg. rate:    6033 rows/sProcessed: 10000 rows; Rate:    4619 rows/s; Avg. rate:    5678 rows/s
10000 rows imported from 1 files in 1.761 seconds (0 skipped).

real	0m2.767s
user	0m2.039s
sys	0m0.516s


In [68]:
!head -n 2 inventory.csv
!cat loadInventory.cql
!cqlsh -f loadInventory.cql

2|Fancy Dress Shirt 1|dressShirt|AL|4|N
4|Fancy Dress 1|dress|AL|133|N
COPY demo.inventory( sku, item_name, item_type, stock_loc, num_items, backorder) FROM 'inventory.csv' WITH DELIMITER = '|';
Using 3 child processes

Starting copy of demo.inventory with columns [sku, item_name, item_type, stock_loc, num_items, backorder].
Processed: 751 rows; Rate:    2297 rows/s; Avg. rate:    2296 rows/sProcessed: 751 rows; Rate:    1148 rows/s; Avg. rate:    1748 rows/s
751 rows imported from 1 files in 0.430 seconds (0 skipped).


#### Do a select * on customer transaction table and verify that the values have been inserted into the DSE table. Because we have used as our primary key "State" we can use this in our WHERE clause.

In [19]:
query = 'SELECT * FROM customer_transactions WHERE state=\'CA\' limit 10'
rows = session.execute(query)
for user_row in rows:
    print (user_row.id, user_row.items)

(155, [u'shoes', u'heels', u'dress'])
(405, [u'dress', u'heels'])
(2205, [u'dress', u'heels'])
(2405, [u'dress', u'heels'])
(2705, [u'dress', u'heels'])
(3305, [u'dress', u'heels'])
(7105, [u'tshirt', u'jeans'])
(7405, [u'tshirt', u'jeans'])
(8205, [u'dress', u'heels'])
(9405, [u'dress', u'heels'])


### Finally time for Some Analytics!

#### Create a spark session that is connected to Cassandra. From there load each table into a Spark Dataframe and take a count of the number of rows in each.

In [20]:
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()

tableDF = spark.read.format("org.apache.spark.sql.cassandra").options(table="customer_transactions", keyspace="demo").load()

testDF = spark.read.format("org.apache.spark.sql.cassandra").options(table="customer_live", keyspace="demo").load()


print "Table Train Count: "
print tableDF.count()
showDF(tableDF)

print "Table Test Count: "
print testDF.count()
showDF(testDF)

Table Train Count: 
1000000


Unnamed: 0,state,gender,age,id,customer_name,home_store,items,month,rewards_member,year
0,TX,F,18,20794,Customer20794,43,"[tshirt, jeans]",4,Y,2016
1,TX,F,18,26994,Customer26994,43,"[shoes, heels, dress]",5,N,2016
2,TX,F,18,50594,Customer50594,43,"[shoes, heels, dress]",11,N,2017
3,TX,F,18,50944,Customer50944,43,"[tshirt, jeans]",1,N,2016
4,TX,F,18,54344,Customer54344,43,"[shoes, heels, dress]",1,Y,2016


Table Test Count: 
10000


Unnamed: 0,state,gender,age,id,customer_name,home_store,items,month,rewards_member,year
0,TX,F,18,4694,Customer4694,43,[jeans],2,Y,2016
1,TX,F,18,6894,Customer6894,43,[jeans],11,N,2017
2,TX,F,19,6694,Customer6694,43,[jeans],8,N,2016
3,TX,F,20,1044,Customer1044,43,[jeans],5,N,2016
4,TX,F,20,5294,Customer5294,43,[jeans],3,N,2017


### With Spark we can also quickly create a new dataframes with just Customers who are of the ages 18-23. 
#### We can use this data frame to create another model and score our live data against.  
#### Models++ = Strong Accurancy --> $$

In [22]:
tableMilDF = tableDF.where(tableDF.age < 24)
print "Table Milennial Train Count: "
print tableMilDF.count()
showDF(tableMilDF)
testMilDF = testDF.where(testDF.age < 24)
print "Table Milennial Test Count:"
print testMilDF.count()
showDF(testMilDF)

Table Milennial Train Count: 
312268


Unnamed: 0,state,gender,age,id,customer_name,home_store,items,month,rewards_member,year
0,OR,F,18,3938,Customer3938,37,"[shoes, heels, dress]",11,Y,2017
1,OR,F,18,15588,Customer15588,37,"[shoes, heels, dress]",7,N,2017
2,OR,F,18,16138,Customer16138,37,"[tshirt, jeans]",5,N,2016
3,OR,F,18,25288,Customer25288,37,"[tshirt, jeans]",6,Y,2016
4,OR,F,18,27588,Customer27588,37,"[shoes, heels, dress]",5,Y,2017


Table Milennial Test Count:
3127


Unnamed: 0,state,gender,age,id,customer_name,home_store,items,month,rewards_member,year
0,OR,F,18,9288,Customer9288,37,[jeans],8,Y,2016
1,OR,F,19,4188,Customer4188,37,[jeans],2,N,2017
2,OR,F,20,7138,Customer7138,37,[jeans],5,Y,2016
3,OR,F,21,9588,Customer9588,37,[jeans],8,Y,2017
4,OR,F,22,7388,Customer7388,37,[jeans],2,N,2017


In [23]:
table24DF = tableDF.where(tableDF.age > 24)
print "Table Everyone Else Train Count: "
print table24DF.count()
showDF(table24DF)
print "Table Everyone Else Test Count:"
test24DF = testDF.where(testDF.age > 24)
print test24DF.count()
showDF(test24DF)

Table Everyone Else Train Count: 
677195


Unnamed: 0,state,gender,age,id,customer_name,home_store,items,month,rewards_member,year
0,OR,F,25,5888,Customer5888,37,"[shoes, heels, dress]",2,N,2017
1,OR,F,25,8438,Customer8438,37,"[shoes, heels, dress]",10,N,2016
2,OR,F,25,11938,Customer11938,37,"[shoe, socks]",6,N,2017
3,OR,F,25,16338,Customer16338,37,"[shoes, heels, dress]",5,Y,2016
4,OR,F,25,24038,Customer24038,37,"[shoes, heels, dress]",11,Y,2016


Table Everyone Else Test Count:
6767


Unnamed: 0,state,gender,age,id,customer_name,home_store,items,month,rewards_member,year
0,TX,F,26,1894,Customer1894,43,[dress],7,Y,2017
1,TX,F,27,5244,Customer5244,43,[jeans],6,Y,2016
2,TX,F,28,3944,Customer3944,43,[jacket],6,N,2017
3,TX,F,28,7594,Customer7594,43,[heels],6,N,2017
4,TX,F,29,3844,Customer3844,43,[heels],1,Y,2017


### FPGROWTH for Customer Shopping Cart Recommendations
#### Use Apache Spark MLlib with FPGrowth to find Recommendation -- Do model training on customer transaction dataset then use the live customer shopping cart as the test dataset
#### https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html
#### https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.fpm.FPGrowth

#### Let's look at Non-Milennials First

In [24]:
from pyspark.ml.fpm import FPGrowth

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.1, minConfidence=0.2)
model = fpGrowth.fit(table24DF)
recommendDF=model.transform(test24DF)
recommendDF.show()

+-----+------+---+----+-------------+----------+------------+-----+--------------+----+--------------+
|state|gender|age|  id|customer_name|home_store|       items|month|rewards_member|year|    prediction|
+-----+------+---+----+-------------+----------+------------+-----+--------------+----+--------------+
|   IA|     F| 25|1416| Customer1416|        15|     [socks]|    2|             Y|2017|        [shoe]|
|   IA|     F| 25|5066| Customer5066|        15|    [jacket]|    9|             Y|2016|        [coat]|
|   IA|     F| 25|7266| Customer7266|        15|    [jacket]|    7|             Y|2016|        [coat]|
|   IA|     F| 27|2616| Customer2616|        15|     [dress]|    7|             Y|2017|[heels, shoes]|
|   IA|     F| 27|3566| Customer3566|        15|     [heels]|   11|             N|2016|[shoes, dress]|
|   IA|     F| 27|3966| Customer3966|        15|     [socks]|    6|             Y|2017|        [shoe]|
|   IA|     F| 27|4166| Customer4166|        15|      [coat]|    7|      

### Let's build a model just for our young Milennial Customers
#### Did we see anything different?

In [38]:
fpGrowth1 = FPGrowth(itemsCol="items", minSupport=0.1, minConfidence=0.1)
model1 = fpGrowth1.fit(tableMilDF)
recommendDF1=model1.transform(testMilDF)
recommendDF1.show()

+-----+------+---+----+-------------+----------+-------+-----+--------------+----+----------+
|state|gender|age|  id|customer_name|home_store|  items|month|rewards_member|year|prediction|
+-----+------+---+----+-------------+----------+-------+-----+--------------+----+----------+
|   TX|     F| 18|4694| Customer4694|        43|[jeans]|    2|             Y|2016|  [tshirt]|
|   TX|     F| 18|6894| Customer6894|        43|[jeans]|   11|             N|2017|  [tshirt]|
|   TX|     F| 19|6694| Customer6694|        43|[jeans]|    8|             N|2016|  [tshirt]|
|   TX|     F| 20|1044| Customer1044|        43|[jeans]|    5|             N|2016|  [tshirt]|
|   TX|     F| 20|5294| Customer5294|        43|[jeans]|    3|             N|2017|  [tshirt]|
|   TX|     F| 21| 194|  Customer194|        43|[jeans]|    8|             Y|2017|  [tshirt]|
|   TX|     F| 21|6294| Customer6294|        43|[jeans]|    7|             N|2016|  [tshirt]|
|   TX|     F| 21|9044| Customer9044|        43|[jeans]|    

In [42]:
from pyspark.sql.functions import col, size

dfPrediction1=recommendDF1.where(size(col("prediction")) > 0)
dfPrediction=recommendDF.where(size(col("prediction")) > 0)

### Load Recommendations Dataframe include Predictions back into a Cassandra Table

In [43]:
dfPrediction1.write.format("org.apache.spark.sql.cassandra").options(table="customer_recommend", keyspace="demo").save(mode="append")

In [44]:
dfPrediction.write.format("org.apache.spark.sql.cassandra").options(table="customer_recommend", keyspace="demo").save(mode="append")

### Get Recommended Items from Inventory Table
#### For each customer in the recommendations table (sorry this is a select * on all, but must be done!) For each item type that was recommended (bed, sweather, pants) query the inventory table for that item type, the wearhouse location that is in the same state at the user (one per state), and if the item is on backorder. Whatever specific item name matchs this query is what will be recommended to the user.

In [74]:
query = 'SELECT * FROM customer_recommend limit 10'
rows = session.execute(query)
for user_row in rows:
        for item in user_row.prediction:
            query = "SELECT * FROM inventory WHERE item_type=\'%s\' AND stock_loc=\'%s\' AND\
            backorder=\'N\'" % (item, user_row.state)
            items = session.execute(query)
            for item_row in items:
                print "Customer: " + user_row.customer_name + " **Shopping Cart: " + str(user_row.items) + "** --> Recommendations: Type: " + item + " Name: " + item_row.item_name

Customer: Customer6924 **Shopping Cart: [u'coat']** --> Recommendations: Type: jacket Name: Active
Customer: Customer6924 **Shopping Cart: [u'coat']** --> Recommendations: Type: jacket Name: Varsity
Customer: Customer5555 **Shopping Cart: [u'dress']** --> Recommendations: Type: heels Name: High Heels
Customer: Customer5555 **Shopping Cart: [u'dress']** --> Recommendations: Type: heels Name: High Heels
Customer: Customer5555 **Shopping Cart: [u'dress']** --> Recommendations: Type: shoes Name: Vans
Customer: Customer5555 **Shopping Cart: [u'dress']** --> Recommendations: Type: shoes Name: Vans
Customer: Customer9286 **Shopping Cart: [u'heels']** --> Recommendations: Type: shoes Name: Oxford
Customer: Customer9286 **Shopping Cart: [u'heels']** --> Recommendations: Type: shoes Name: Vans
Customer: Customer9286 **Shopping Cart: [u'heels']** --> Recommendations: Type: dress Name: Red Dress
Customer: Customer9286 **Shopping Cart: [u'heels']** --> Recommendations: Type: dress Name: Purple Dres

In [66]:
#session.execute("drop table customer_live")
#session.execute("drop table customer_recommend")
#session.execute("drop table inventory")
#session.execute("drop table customer_transactions")

<cassandra.cluster.ResultSet at 0x7f32746bcc90>