# Starbucks Customer Segmentation

# Importing libraries

In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
import time
from pyspark import SparkFiles

In [None]:
spark = SparkSession.builder.\
master("local").\
appName("Starbucks Customer Segmentation").\
config("spark.some.config.option", "some-value").\
getOrCreate()

# Portfolio

In [None]:
url1=("https://raw.githubusercontent.com/seifip/starbucks-customer-segmentation/master/data/portfolio.json")
spark.sparkContext.addFile(url1)
portfolio = spark.read.option("inferSchema", "true").json(SparkFiles.get("portfolio.json"))

In [None]:
portfolio

DataFrame[channels: array<string>, difficulty: bigint, duration: double, id: string, offer_type: string, reward: bigint]

In [None]:
portfolio.printSchema()

root
 |-- channels: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- difficulty: long (nullable = true)
 |-- duration: double (nullable = true)
 |-- id: string (nullable = true)
 |-- offer_type: string (nullable = true)
 |-- reward: long (nullable = true)



In [None]:
portfolio.show(5)

+--------------------+----------+--------+--------------------+-------------+------+
|            channels|difficulty|duration|                  id|   offer_type|reward|
+--------------------+----------+--------+--------------------+-------------+------+
|[email, mobile, s...|        10|     7.0|ae264e3637204a6fb...|         bogo|    10|
|[web, email, mobi...|        10|     5.0|4d5c57ea9a6940dd8...|         bogo|    10|
|[web, email, mobile]|         0|     4.0|3f207df678b143eea...|informational|     0|
|[web, email, mobile]|         5|     7.0|9b98b8c7a33c4b65b...|         bogo|     5|
|        [web, email]|        20|    10.0|0b1e1539f2cc45b7b...|     discount|     5|
+--------------------+----------+--------+--------------------+-------------+------+
only showing top 5 rows



In [None]:
portfolio.count()

10

# Profile

In [None]:
url2=("https://raw.githubusercontent.com/seifip/starbucks-customer-segmentation/master/data/profile.json")
spark.sparkContext.addFile(url2)
profile = spark.read.option("inferSchema", "true").json(SparkFiles.get("profile.json"))

In [None]:
profile

DataFrame[age: bigint, became_member_on: string, gender: string, id: string, income: bigint]

In [None]:
profile.printSchema()

root
 |-- age: long (nullable = true)
 |-- became_member_on: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: string (nullable = true)
 |-- income: long (nullable = true)



In [None]:
profile.show(5)

+---+----------------+------+--------------------+------+
|age|became_member_on|gender|                  id|income|
+---+----------------+------+--------------------+------+
|118|        20170212|  null|68be06ca386d4c319...|  null|
| 55|        20170715|     F|0610b486422d4921a...|112000|
|118|        20180712|  null|38fe809add3b4fcf9...|  null|
| 75|        20170509|     F|78afa995795e4d85b...|100000|
|118|        20170804|  null|a03223e636434f42a...|  null|
+---+----------------+------+--------------------+------+
only showing top 5 rows



In [None]:
profile.count()

17000

# Transcript

In [None]:
url3=("https://raw.githubusercontent.com/seifip/starbucks-customer-segmentation/master/data/transcript.json")
spark.sparkContext.addFile(url3)
transcript = spark.read.option("inferSchema", "true").json(SparkFiles.get("transcript.json"))

In [None]:
transcript

DataFrame[event: string, person: string, time: bigint, value: struct<amount:double,offer id:string,offer_id:string,reward:bigint>]

In [None]:
transcript.printSchema()

root
 |-- event: string (nullable = true)
 |-- person: string (nullable = true)
 |-- time: long (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- amount: double (nullable = true)
 |    |-- offer id: string (nullable = true)
 |    |-- offer_id: string (nullable = true)
 |    |-- reward: long (nullable = true)



In [None]:
transcript.show(5)

+--------------+--------------------+----+--------------------+
|         event|              person|time|               value|
+--------------+--------------------+----+--------------------+
|offer received|78afa995795e4d85b...|   0|[, 9b98b8c7a33c4b...|
|offer received|a03223e636434f42a...|   0|[, 0b1e1539f2cc45...|
|offer received|e2127556f4f64592b...|   0|[, 2906b810c7d441...|
|offer received|8ec6ce2a7e7949b1b...|   0|[, fafdcd668e3743...|
|offer received|68617ca6246f4fbc8...|   0|[, 4d5c57ea9a6940...|
+--------------+--------------------+----+--------------------+
only showing top 5 rows



In [None]:
transcript.count()

306534

In [None]:
type(transcript)

pyspark.sql.dataframe.DataFrame

In [None]:
portfolio.show()

+--------------------+----------+--------+--------------------+-------------+------+
|            channels|difficulty|duration|                  id|   offer_type|reward|
+--------------------+----------+--------+--------------------+-------------+------+
|[email, mobile, s...|        10|     7.0|ae264e3637204a6fb...|         bogo|    10|
|[web, email, mobi...|        10|     5.0|4d5c57ea9a6940dd8...|         bogo|    10|
|[web, email, mobile]|         0|     4.0|3f207df678b143eea...|informational|     0|
|[web, email, mobile]|         5|     7.0|9b98b8c7a33c4b65b...|         bogo|     5|
|        [web, email]|        20|    10.0|0b1e1539f2cc45b7b...|     discount|     5|
|[web, email, mobi...|         7|     7.0|2298d6c36e964ae4a...|     discount|     3|
|[web, email, mobi...|        10|    10.0|fafdcd668e3743c1b...|     discount|     2|
|[email, mobile, s...|         0|     3.0|5a8bc65990b245e5a...|informational|     0|
|[web, email, mobi...|         5|     5.0|f19421c1d4aa40978...|  

In [None]:
profile.show()

+---+----------------+------+--------------------+------+
|age|became_member_on|gender|                  id|income|
+---+----------------+------+--------------------+------+
|118|        20170212|  null|68be06ca386d4c319...|  null|
| 55|        20170715|     F|0610b486422d4921a...|112000|
|118|        20180712|  null|38fe809add3b4fcf9...|  null|
| 75|        20170509|     F|78afa995795e4d85b...|100000|
|118|        20170804|  null|a03223e636434f42a...|  null|
| 68|        20180426|     M|e2127556f4f64592b...| 70000|
|118|        20170925|  null|8ec6ce2a7e7949b1b...|  null|
|118|        20171002|  null|68617ca6246f4fbc8...|  null|
| 65|        20180209|     M|389bc3fa690240e79...| 53000|
|118|        20161122|  null|8974fc5686fe429db...|  null|
|118|        20170824|  null|c4863c7985cf408fa...|  null|
|118|        20150919|  null|148adfcaa27d485b8...|  null|
| 58|        20171111|     M|2eeac8d8feae4a8ca...| 51000|
| 61|        20170911|     F|aa4862eba776480b8...| 57000|
| 26|        2

In [None]:
transcript.show()

+--------------+--------------------+----+--------------------+
|         event|              person|time|               value|
+--------------+--------------------+----+--------------------+
|offer received|78afa995795e4d85b...|   0|[, 9b98b8c7a33c4b...|
|offer received|a03223e636434f42a...|   0|[, 0b1e1539f2cc45...|
|offer received|e2127556f4f64592b...|   0|[, 2906b810c7d441...|
|offer received|8ec6ce2a7e7949b1b...|   0|[, fafdcd668e3743...|
|offer received|68617ca6246f4fbc8...|   0|[, 4d5c57ea9a6940...|
|offer received|389bc3fa690240e79...|   0|[, f19421c1d4aa40...|
|offer received|c4863c7985cf408fa...|   0|[, 2298d6c36e964a...|
|offer received|2eeac8d8feae4a8ca...|   0|[, 3f207df678b143...|
|offer received|aa4862eba776480b8...|   0|[, 0b1e1539f2cc45...|
|offer received|31dda685af34476ca...|   0|[, 0b1e1539f2cc45...|
|offer received|744d603ef08c4f33a...|   0|[, 0b1e1539f2cc45...|
|offer received|3d02345581554e81b...|   0|[, 0b1e1539f2cc45...|
|offer received|4b0da7e80e5945209...|   

# Data Wrangling

In [None]:
# Removing duplicate rows for all the datasets
profile = profile.distinct()
portfolio = portfolio.distinct()
transcript = transcript.distinct()

### Profile :

In [None]:
# Checking Null values are present or not for all the columns
profile.select("age").filter("age is null").show(1)
profile.select("became_member_on").filter("became_member_on is null").show(1)
profile.select("gender").filter("gender is null").show(1)
profile.select("id").filter("id is null").show(1)
profile.select("income").filter("income is null").show(1)

+---+
|age|
+---+
+---+

+----------------+
|became_member_on|
+----------------+
+----------------+

+------+
|gender|
+------+
|  null|
+------+
only showing top 1 row

+---+
| id|
+---+
+---+

+------+
|income|
+------+
|  null|
+------+
only showing top 1 row



In [None]:
# working on null values

# filling null values of gender and became_member_on columns with NA
profile = profile.na.fill({'gender': 'NA','became_member_on':'NA'})

# filling income column's null values with the mean or average value of income column
from pyspark.sql.functions import mean
mean_val = profile.select(mean(profile.income)).collect()
mean_val
mean_income = mean_val[0][0]
#now using mean_income value to fill the nulls in income column
profile = profile.na.fill(mean_income,subset=['income'])

# filling age column's null values with the mean or average value of age column
mean_values = profile.select(mean(profile.age)).collect()
mean_values
mean_age = mean_values[0][0]
#now using mean_age value to fill the nulls in age column
profile = profile.na.fill(mean_age,subset=['age'])

# removing rows where null is present in id column (primary key)
profile = profile.filter("id is not null")
profile.show()


+---+----------------+------+--------------------+------+
|age|became_member_on|gender|                  id|income|
+---+----------------+------+--------------------+------+
| 57|        20160827|     F|b3ad8755d0ac47faa...| 67000|
| 52|        20140113|     F|6cabe7c6741e4f0eb...| 45000|
| 32|        20180124|     M|938d98afcb894aeb8...| 48000|
| 75|        20171008|     M|18d0ef3f15b5463ab...| 71000|
| 54|        20141024|     M|4ecc12e5805548f88...| 46000|
| 60|        20180115|     M|fbbd60d747d642928...| 83000|
| 40|        20180309|     M|67b51df867b040ba8...| 30000|
| 54|        20180710|     F|e4b82a439eed4edea...|100000|
| 49|        20180307|     M|3b3ae55adad84178b...| 31000|
| 56|        20170909|     M|4d17f173ee4d460f9...| 86000|
| 29|        20171006|     M|7a5b9a18071a4e6fb...| 54000|
| 63|        20150925|     M|7eda9728df624f09b...| 96000|
| 57|        20170227|     M|ac3b8c9879304a9db...| 72000|
| 58|        20170113|     M|073fce5708884b30a...| 96000|
| 42|        2

In [None]:
# dropping rows having age > 75
profile = profile.filter("age <= 75")
profile.show()

print("unique genders are :")
profile.select("gender").distinct().show()

+---+----------------+------+--------------------+------+
|age|became_member_on|gender|                  id|income|
+---+----------------+------+--------------------+------+
| 57|        20160827|     F|b3ad8755d0ac47faa...| 67000|
| 52|        20140113|     F|6cabe7c6741e4f0eb...| 45000|
| 32|        20180124|     M|938d98afcb894aeb8...| 48000|
| 75|        20171008|     M|18d0ef3f15b5463ab...| 71000|
| 54|        20141024|     M|4ecc12e5805548f88...| 46000|
| 60|        20180115|     M|fbbd60d747d642928...| 83000|
| 40|        20180309|     M|67b51df867b040ba8...| 30000|
| 54|        20180710|     F|e4b82a439eed4edea...|100000|
| 49|        20180307|     M|3b3ae55adad84178b...| 31000|
| 56|        20170909|     M|4d17f173ee4d460f9...| 86000|
| 29|        20171006|     M|7a5b9a18071a4e6fb...| 54000|
| 63|        20150925|     M|7eda9728df624f09b...| 96000|
| 57|        20170227|     M|ac3b8c9879304a9db...| 72000|
| 58|        20170113|     M|073fce5708884b30a...| 96000|
| 42|        2

In [None]:
# converting str to date format for "became_member_on" column
profile = profile.withColumn("became_member_on", coalesce(to_date("became_member_on", "yyyyMMdd")))
profile.show()
profile.printSchema()

+---+----------------+------+--------------------+------+
|age|became_member_on|gender|                  id|income|
+---+----------------+------+--------------------+------+
| 57|      2016-08-27|     F|b3ad8755d0ac47faa...| 67000|
| 52|      2014-01-13|     F|6cabe7c6741e4f0eb...| 45000|
| 32|      2018-01-24|     M|938d98afcb894aeb8...| 48000|
| 75|      2017-10-08|     M|18d0ef3f15b5463ab...| 71000|
| 54|      2014-10-24|     M|4ecc12e5805548f88...| 46000|
| 60|      2018-01-15|     M|fbbd60d747d642928...| 83000|
| 40|      2018-03-09|     M|67b51df867b040ba8...| 30000|
| 54|      2018-07-10|     F|e4b82a439eed4edea...|100000|
| 49|      2018-03-07|     M|3b3ae55adad84178b...| 31000|
| 56|      2017-09-09|     M|4d17f173ee4d460f9...| 86000|
| 29|      2017-10-06|     M|7a5b9a18071a4e6fb...| 54000|
| 63|      2015-09-25|     M|7eda9728df624f09b...| 96000|
| 57|      2017-02-27|     M|ac3b8c9879304a9db...| 72000|
| 58|      2017-01-13|     M|073fce5708884b30a...| 96000|
| 42|      201

In [None]:
print('Descriptive stats for age and income:')
profile.describe().show()

Descriptive stats for age and income:
+-------+------------------+------+--------------------+------------------+
|summary|               age|gender|                  id|            income|
+-------+------------------+------+--------------------+------------------+
|  count|             13175| 13175|               13175|             13175|
|   mean| 50.78998102466793|  null|2.565638242424101E31| 64636.81214421253|
| stddev|14.804504804077256|  null|                 NaN|21384.177997490653|
|    min|                18|     F|0009655768c64bdeb...|             30000|
|    max|                75|     O|ffff82501cea40309...|            120000|
+-------+------------------+------+--------------------+------------------+



In [None]:
profile.show()

+---+----------------+------+--------------------+------+
|age|became_member_on|gender|                  id|income|
+---+----------------+------+--------------------+------+
| 57|      2016-08-27|     F|b3ad8755d0ac47faa...| 67000|
| 52|      2014-01-13|     F|6cabe7c6741e4f0eb...| 45000|
| 32|      2018-01-24|     M|938d98afcb894aeb8...| 48000|
| 75|      2017-10-08|     M|18d0ef3f15b5463ab...| 71000|
| 54|      2014-10-24|     M|4ecc12e5805548f88...| 46000|
| 60|      2018-01-15|     M|fbbd60d747d642928...| 83000|
| 40|      2018-03-09|     M|67b51df867b040ba8...| 30000|
| 54|      2018-07-10|     F|e4b82a439eed4edea...|100000|
| 49|      2018-03-07|     M|3b3ae55adad84178b...| 31000|
| 56|      2017-09-09|     M|4d17f173ee4d460f9...| 86000|
| 29|      2017-10-06|     M|7a5b9a18071a4e6fb...| 54000|
| 63|      2015-09-25|     M|7eda9728df624f09b...| 96000|
| 57|      2017-02-27|     M|ac3b8c9879304a9db...| 72000|
| 58|      2017-01-13|     M|073fce5708884b30a...| 96000|
| 42|      201

### Transcript :

In [None]:
transcript.show()

+--------------+--------------------+----+--------------------+
|         event|              person|time|               value|
+--------------+--------------------+----+--------------------+
|offer received|bf2c086d4c4049289...|   0|[, 9b98b8c7a33c4b...|
|offer received|27aa749a6f5f448e9...|   0|[, 5a8bc65990b245...|
|offer received|b94c7601b17b41609...|   0|[, ae264e3637204a...|
|offer received|696493b9f616411a8...|   0|[, fafdcd668e3743...|
|offer received|56163abfe5a848deb...|   0|[, f19421c1d4aa40...|
|offer received|abf29a94ba3d46488...|   0|[, 5a8bc65990b245...|
|offer received|23264960b6724afea...|   0|[, 2298d6c36e964a...|
|offer received|7fc12dee8c9144f3b...|   0|[, fafdcd668e3743...|
|offer received|cf8dc5cf3dc84f648...|   0|[, 5a8bc65990b245...|
|offer received|269424f345f6478e8...|   0|[, 2906b810c7d441...|
|offer received|fcbf38029321416f9...|   0|[, fafdcd668e3743...|
|offer received|2f21db46b5f84be5b...|   0|[, ae264e3637204a...|
|offer received|e5c59811346840e2a...|   

In [None]:
# Checking null values are present or not for all the common columns
transcript.select("event").filter("event is null").show(1)
transcript.select("person").filter("person is null").show(1)
transcript.select("time").filter("time is null").show(1)

+-----+
|event|
+-----+
+-----+

+------+
|person|
+------+
+------+

+----+
|time|
+----+
+----+



In [None]:
# working on null values for all the common columns

# filling null values for event and time columns
transcript = transcript.na.fill({'event': 'NA','time': 0})

# removing rows where null is present in person column (primary key)
transcript = transcript.filter("person is not null")

transcript.show()

+--------------+--------------------+----+--------------------+
|         event|              person|time|               value|
+--------------+--------------------+----+--------------------+
|offer received|bf2c086d4c4049289...|   0|[, 9b98b8c7a33c4b...|
|offer received|27aa749a6f5f448e9...|   0|[, 5a8bc65990b245...|
|offer received|b94c7601b17b41609...|   0|[, ae264e3637204a...|
|offer received|696493b9f616411a8...|   0|[, fafdcd668e3743...|
|offer received|56163abfe5a848deb...|   0|[, f19421c1d4aa40...|
|offer received|abf29a94ba3d46488...|   0|[, 5a8bc65990b245...|
|offer received|23264960b6724afea...|   0|[, 2298d6c36e964a...|
|offer received|7fc12dee8c9144f3b...|   0|[, fafdcd668e3743...|
|offer received|cf8dc5cf3dc84f648...|   0|[, 5a8bc65990b245...|
|offer received|269424f345f6478e8...|   0|[, 2906b810c7d441...|
|offer received|fcbf38029321416f9...|   0|[, fafdcd668e3743...|
|offer received|2f21db46b5f84be5b...|   0|[, ae264e3637204a...|
|offer received|e5c59811346840e2a...|   

In [None]:
print('Unique event types:')
transcript.select('event').distinct().show()

Unique event types:
+---------------+
|          event|
+---------------+
|    transaction|
| offer received|
|offer completed|
|   offer viewed|
+---------------+



### Splitting transactions and offers dataframes

In [None]:
# making transactions dataframe
transactions = transcript.filter("event == 'transaction' ")
transactions = transactions.withColumn("trans_amount",coalesce("value.amount"))
transactions = transactions.drop('value')
transactions.show()

# making offers dataframe
offers = transcript.filter("event != 'transaction' ")
offers = offers.withColumn("offer_id",coalesce("value.offer_id", "value.offer id"))
offers = offers.drop('value')
offers.show()


+-----------+--------------------+----+------------+
|      event|              person|time|trans_amount|
+-----------+--------------------+----+------------+
|transaction|7a1eda9ab57049068...|   0|        6.13|
|transaction|2fb4578848f34ce4b...|   0|        21.4|
|transaction|6851449a9192478d8...|   0|         2.3|
|transaction|d5e320154bed47159...|   0|        7.83|
|transaction|461e13a14a074077b...|   6|        7.94|
|transaction|6e0b0d6db74942b49...|   6|        8.12|
|transaction|8d59a5826cc547f58...|   6|       23.77|
|transaction|ae56c4d76ac84a639...|   6|       27.23|
|transaction|1575f73eb0274ab6b...|   6|        0.13|
|transaction|ea6cfa381e2c492e9...|   6|       13.84|
|transaction|26829118683847c8a...|  12|        7.41|
|transaction|9ca7ccb9ef3e44b4a...|  12|        3.13|
|transaction|3dbbfc8fc19d40df9...|  12|       10.98|
|transaction|3b3f484e876f475ea...|  12|        15.1|
|transaction|63b1186a539940508...|  12|        2.93|
|transaction|f422c808ac4d47d1a...|  12|       

In [None]:
# checking null values are present or not for transactions and offers dataframe
transactions.select("trans_amount").filter("trans_amount is null").show(1)
offers.select("offer_id").filter("offer_id is null").show(1)

+------------+
|trans_amount|
+------------+
+------------+

+--------+
|offer_id|
+--------+
+--------+



In [None]:
# working on null values for transactions and offers dataframe

# transactions dataframe :
##  filling trans_amount column's null values with the mean or average value of trans_amount column
mean_vals = transactions.select(mean(transactions.trans_amount)).collect()
mean_vals
mean_trans = mean_vals[0][0]
#now using mean_trans value to fill the nulls in trans_amount column
transactions = transactions.na.fill(mean_trans,subset=['trans_amount'])
transactions.show()


# offers dataframe
## removing rows where null is present in offer_id column (primary key)
offers = offers.filter("offer_id is not null")
offers.show()


+-----------+--------------------+----+------------+
|      event|              person|time|trans_amount|
+-----------+--------------------+----+------------+
|transaction|7a1eda9ab57049068...|   0|        6.13|
|transaction|2fb4578848f34ce4b...|   0|        21.4|
|transaction|6851449a9192478d8...|   0|         2.3|
|transaction|d5e320154bed47159...|   0|        7.83|
|transaction|461e13a14a074077b...|   6|        7.94|
|transaction|6e0b0d6db74942b49...|   6|        8.12|
|transaction|8d59a5826cc547f58...|   6|       23.77|
|transaction|ae56c4d76ac84a639...|   6|       27.23|
|transaction|1575f73eb0274ab6b...|   6|        0.13|
|transaction|ea6cfa381e2c492e9...|   6|       13.84|
|transaction|26829118683847c8a...|  12|        7.41|
|transaction|9ca7ccb9ef3e44b4a...|  12|        3.13|
|transaction|3dbbfc8fc19d40df9...|  12|       10.98|
|transaction|3b3f484e876f475ea...|  12|        15.1|
|transaction|63b1186a539940508...|  12|        2.93|
|transaction|f422c808ac4d47d1a...|  12|       

### Portfolio

In [None]:
portfolio.show()

+--------------------+----------+--------+--------------------+-------------+------+
|            channels|difficulty|duration|                  id|   offer_type|reward|
+--------------------+----------+--------+--------------------+-------------+------+
|[web, email, mobile]|         0|     4.0|3f207df678b143eea...|informational|     0|
|[email, mobile, s...|         0|     3.0|5a8bc65990b245e5a...|informational|     0|
|[email, mobile, s...|        10|     7.0|ae264e3637204a6fb...|         bogo|    10|
|[web, email, mobi...|         5|     5.0|f19421c1d4aa40978...|         bogo|     5|
|[web, email, mobi...|         7|     7.0|2298d6c36e964ae4a...|     discount|     3|
|[web, email, mobi...|        10|    10.0|fafdcd668e3743c1b...|     discount|     2|
|[web, email, mobi...|        10|     5.0|4d5c57ea9a6940dd8...|         bogo|    10|
|[web, email, mobile]|        10|     7.0|2906b810c7d441179...|     discount|     2|
|        [web, email]|        20|    10.0|0b1e1539f2cc45b7b...|  

In [None]:
portfolio.printSchema()

root
 |-- channels: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- difficulty: long (nullable = true)
 |-- duration: double (nullable = true)
 |-- id: string (nullable = true)
 |-- offer_type: string (nullable = true)
 |-- reward: long (nullable = true)



In [None]:
# Checking Null values are present or not for all the columns
portfolio.select("channels").filter("channels is null").show()
portfolio.select("difficulty").filter("difficulty is null").show()
portfolio.select("duration").filter("duration is null").show()
portfolio.select("id").filter("id is null").show()
portfolio.select("offer_type").filter("offer_type is null").show()
portfolio.select("reward").filter("reward is null").show()

+--------+
|channels|
+--------+
+--------+

+----------+
|difficulty|
+----------+
+----------+

+--------+
|duration|
+--------+
+--------+

+---+
| id|
+---+
+---+

+----------+
|offer_type|
+----------+
+----------+

+------+
|reward|
+------+
+------+



In [None]:
# working on null values

# filling null values of difficulty, duration, offer_type, reward
portfolio = portfolio.na.fill({'difficulty': 0,'duration':0.0, 'offer_type':'NA','reward':0})

# removing rows where null is present in id column (primary key)
portfolio = portfolio.filter("id is not null")
portfolio.show()


+--------------------+----------+--------+--------------------+-------------+------+
|            channels|difficulty|duration|                  id|   offer_type|reward|
+--------------------+----------+--------+--------------------+-------------+------+
|[web, email, mobile]|         0|     4.0|3f207df678b143eea...|informational|     0|
|[email, mobile, s...|         0|     3.0|5a8bc65990b245e5a...|informational|     0|
|[email, mobile, s...|        10|     7.0|ae264e3637204a6fb...|         bogo|    10|
|[web, email, mobi...|         5|     5.0|f19421c1d4aa40978...|         bogo|     5|
|[web, email, mobi...|         7|     7.0|2298d6c36e964ae4a...|     discount|     3|
|[web, email, mobi...|        10|    10.0|fafdcd668e3743c1b...|     discount|     2|
|[web, email, mobi...|        10|     5.0|4d5c57ea9a6940dd8...|         bogo|    10|
|[web, email, mobile]|        10|     7.0|2906b810c7d441179...|     discount|     2|
|        [web, email]|        20|    10.0|0b1e1539f2cc45b7b...|  

# Merging Dataframes

In [None]:
profile.show(2)

+---+----------------+------+--------------------+------+
|age|became_member_on|gender|                  id|income|
+---+----------------+------+--------------------+------+
| 57|      2016-08-27|     F|b3ad8755d0ac47faa...| 67000|
| 52|      2014-01-13|     F|6cabe7c6741e4f0eb...| 45000|
+---+----------------+------+--------------------+------+
only showing top 2 rows



In [None]:
portfolio.show(2)

+--------------------+----------+--------+--------------------+-------------+------+
|            channels|difficulty|duration|                  id|   offer_type|reward|
+--------------------+----------+--------+--------------------+-------------+------+
|[web, email, mobile]|         0|     4.0|3f207df678b143eea...|informational|     0|
|[email, mobile, s...|         0|     3.0|5a8bc65990b245e5a...|informational|     0|
+--------------------+----------+--------+--------------------+-------------+------+
only showing top 2 rows



#### transcript

In [None]:
transactions.show(2)

+-----------+--------------------+----+------------+
|      event|              person|time|trans_amount|
+-----------+--------------------+----+------------+
|transaction|7a1eda9ab57049068...|   0|        6.13|
|transaction|2fb4578848f34ce4b...|   0|        21.4|
+-----------+--------------------+----+------------+
only showing top 2 rows



In [None]:
offers.show(2)

+--------------+--------------------+----+--------------------+
|         event|              person|time|            offer_id|
+--------------+--------------------+----+--------------------+
|offer received|bf2c086d4c4049289...|   0|9b98b8c7a33c4b65b...|
|offer received|27aa749a6f5f448e9...|   0|5a8bc65990b245e5a...|
+--------------+--------------------+----+--------------------+
only showing top 2 rows



In [None]:
# joining profile, portfolio and offers for all the offers

df1 = profile.join(offers, profile.id == offers.person, 'inner').drop(profile.id)
df1.show(2)

allOffers = df1.join(portfolio, df1.offer_id == portfolio.id, 'inner').drop(portfolio.id)
allOffers.show(2)

+---+----------------+------+------+--------------+--------------------+----+--------------------+
|age|became_member_on|gender|income|         event|              person|time|            offer_id|
+---+----------------+------+------+--------------+--------------------+----+--------------------+
| 32|      2015-10-08|     M| 38000|offer received|27aa749a6f5f448e9...|   0|5a8bc65990b245e5a...|
| 62|      2016-05-02|     F| 98000|offer received|abf29a94ba3d46488...|   0|5a8bc65990b245e5a...|
+---+----------------+------+------+--------------+--------------------+----+--------------------+
only showing top 2 rows

+---+----------------+------+------+--------------+--------------------+----+--------------------+--------------------+----------+--------+-------------+------+
|age|became_member_on|gender|income|         event|              person|time|            offer_id|            channels|difficulty|duration|   offer_type|reward|
+---+----------------+------+------+--------------+--------

In [None]:
# joining profile and transactions for all the transactions

allTransactions = profile.join(transactions, profile.id == transactions.person, 'inner').drop(profile.id)
allTransactions.show(2)

+---+----------------+------+------+-----------+--------------------+----+------------+
|age|became_member_on|gender|income|      event|              person|time|trans_amount|
+---+----------------+------+------+-----------+--------------------+----+------------+
| 51|      2015-12-16|     M| 81000|transaction|2fb4578848f34ce4b...|   0|        21.4|
| 40|      2017-11-14|     M| 39000|transaction|6851449a9192478d8...|   0|         2.3|
+---+----------------+------+------+-----------+--------------------+----+------------+
only showing top 2 rows



In [None]:
allOffers.show(2)
allOffers.printSchema()

+---+----------------+------+------+--------------+--------------------+----+--------------------+--------------------+----------+--------+-------------+------+
|age|became_member_on|gender|income|         event|              person|time|            offer_id|            channels|difficulty|duration|   offer_type|reward|
+---+----------------+------+------+--------------+--------------------+----+--------------------+--------------------+----------+--------+-------------+------+
| 32|      2015-10-08|     M| 38000|offer received|27aa749a6f5f448e9...|   0|5a8bc65990b245e5a...|[email, mobile, s...|         0|     3.0|informational|     0|
| 62|      2016-05-02|     F| 98000|offer received|abf29a94ba3d46488...|   0|5a8bc65990b245e5a...|[email, mobile, s...|         0|     3.0|informational|     0|
+---+----------------+------+------+--------------+--------------------+----+--------------------+--------------------+----------+--------+-------------+------+
only showing top 2 rows

root
 |--

In [None]:
allTransactions.show(2)
allTransactions.printSchema()

+---+----------------+------+------+-----------+--------------------+----+------------+
|age|became_member_on|gender|income|      event|              person|time|trans_amount|
+---+----------------+------+------+-----------+--------------------+----+------------+
| 51|      2015-12-16|     M| 81000|transaction|2fb4578848f34ce4b...|   0|        21.4|
| 40|      2017-11-14|     M| 39000|transaction|6851449a9192478d8...|   0|         2.3|
+---+----------------+------+------+-----------+--------------------+----+------------+
only showing top 2 rows

root
 |-- age: long (nullable = true)
 |-- became_member_on: date (nullable = true)
 |-- gender: string (nullable = false)
 |-- income: long (nullable = true)
 |-- event: string (nullable = false)
 |-- person: string (nullable = true)
 |-- time: long (nullable = false)
 |-- trans_amount: double (nullable = false)



# Dumping data into MongoDB

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").\
config("spark.mongodb.input.uri","mongodb://localhost:27017/").\
config("spark.mongodb.output.uri","mongodb://locahost:27017/").\
config("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
getOrCreate()

In [None]:
allOffers.write.format("mongo").option("uri","mongodb://localhost:27017/starbucks.data1").mode("append").save()
allOffers.count()

131665

In [None]:
allTransactions.write.format("mongo").option("uri","mongodb://localhost:27017/starbucks.data2").mode("append").save()
allTransactions.count()

111086