d-sandbox
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 400px">
</div>

# ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Abandoned Carts Lab
Get abandoned cart items for email without purchases.
1. Get emails of converted users from transactions
2. Join emails with user IDs
3. Get cart item history for each user
4. Join cart item history with emails
5. Filter for emails with abandoned cart items

##### Methods
- DataFrame (<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe#pyspark.sql.DataFrame" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html" target="_blank">Scala</a>): `join`
- Built-In Functions (<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=functions#module-pyspark.sql.functions" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/functions$.html" target="_blank">Scala</a>): `lit`
- DataFrameNaFunctions (<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=nafunctions#pyspark.sql.DataFrameNaFunctions" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/DataFrameNaFunctions.html" target="_blank">Scala</a>): `fill`

### Setup
Run the cells below to create DataFrames **`salesDF`**, **`usersDF`**, and **`eventsDF`**.

In [0]:
%run ./Includes/Classroom-Setup

In [0]:
# sale transactions at BedBricks
salesDF = spark.read.parquet(salesPath)
display(salesDF.limit(5))

order_id,email,transaction_timestamp,total_item_quantity,purchase_revenue_in_usd,unique_items,items
257437,kmunoz@powell-duran.com,1592194221828900,1,1995.0,1,"List(List(null, M_PREM_K, Premium King Mattress, 1995.0, 1995.0, 1))"
282611,bmurillo@hotmail.com,1592504237604072,1,940.5,1,"List(List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1))"
257448,bradley74@gmail.com,1592200438030141,1,945.0,1,"List(List(null, M_STAN_F, Standard Full Mattress, 945.0, 945.0, 1))"
257440,jameshardin@campbell-morris.biz,1592197217716495,1,1045.0,1,"List(List(null, M_STAN_Q, Standard Queen Mattress, 1045.0, 1045.0, 1))"
283949,whardin@hotmail.com,1592510720760323,1,535.5,1,"List(List(NEWBED10, M_STAN_T, Standard Twin Mattress, 535.5, 595.0, 1))"


In [0]:
# user IDs and emails at BedBricks
usersDF = spark.read.parquet(usersPath)
display(usersDF.limit(5))

user_id,user_first_touch_timestamp,email
UA000000102357305,1592182691348767,
UA000000102357308,1592183287634953,
UA000000102357309,1592183302736627,
UA000000102357321,1592184604178702,david23@orozco-parker.com
UA000000102357325,1592185154063628,


In [0]:
print(f"The number of recorded users: {usersDF.count()}")

In [0]:
# events logged on the BedBricks website
eventsDF = spark.read.parquet(eventsPath)
display(eventsDF.limit(5))

device,ecommerce,event_name,event_previous_timestamp,event_timestamp,geo,items,traffic_source,user_first_touch_timestamp,user_id
macOS,"List(null, null, null)",warranty,1593878899217692.0,1593878946592107,"List(Montrose, MI)",List(),google,1593878899217692,UA000000107379500
Windows,"List(null, null, null)",press,1593876662175340.0,1593877011756535,"List(Northampton, MA)",List(),google,1593876662175340,UA000000107359357
macOS,"List(null, null, null)",add_item,1593878792892652.0,1593878815459100,"List(Salinas, CA)","List(List(null, M_STAN_T, Standard Twin Mattress, 595.0, 595.0, 1))",youtube,1593878455472030,UA000000107375547
iOS,"List(null, null, null)",mattresses,1593878178791663.0,1593878809276923,"List(Everett, MA)",List(),facebook,1593877903116176,UA000000107370581
Windows,"List(null, null, null)",mattresses,,1593878628143633,"List(Cottage Grove, MN)",List(),google,1593878628143633,UA000000107377108


### 1. Get emails of converted users from transactions
- Select **`email`** column in **`salesDF`** and remove duplicates
- Add new column **`converted`** with value **`True`** for all rows

Save result as **`convertedUsersDF`**.

In [0]:
from pyspark.sql.functions import *
convertedUsersDF = (salesDF.select("Email")
                    .dropDuplicates()
                    .withColumn('converted', lit(True))
)
display(convertedUsersDF.limit(5))

Email,converted
zacharyfisher@brown.com,True
flowersrhonda@paul.com,True
tanya8857@yahoo.com,True
serranoerika@brooks-lawson.com,True
bishopamber@yahoo.com,True


In [0]:
print(f"The number of unique emails of recorded users that make purchases: {convertedUsersDF.count()}")

### 2. Join emails with user IDs
- Perform an outer join on **`convertedUsersDF`** and **`usersDF`** with the **`email`** field
- Filter for users where **`email`** is not null
- Fill null values in **`converted`** as **`False`**

Save result as **`conversionsDF`**.

In [0]:
# TODO
conversionsDF = (usersDF.join(convertedUsersDF, on=["email"], how="full_outer")
                 .filter(col("email").isNotNull())
)
conversionsDF = conversionsDF.fillna(False, subset=['converted'])
display(conversionsDF.limit(5))

email,user_id,user_first_touch_timestamp,converted
aacevedo@moss-young.com,UA000000103755561,1592671212475050,False
aacosta11@gmail.com,UA000000106362980,1593540790039008,False
aadams9@gmail.com,UA000000103384927,1592575968245258,False
aadams@coleman.org,UA000000107105749,1593795399348718,False
aadams@howard.biz,UA000000104562958,1592928837244180,False


In [0]:
print("The number of rows in conversionsDF: {}".format(conversionsDF.count()))

In [0]:
print("The number of unique emails in conversionsDF: {}".format(conversionsDF.select("email").dropDuplicates().count()))

### 3. Get cart item history for each user
- Explode **`items`** field in **`eventsDF`**
- Group by **`user_id`**
  - Collect set of all **`items.item_id`** objects for each user and alias with "cart"
  
Save result as **`cartsDF`**.

In [0]:
cartsDF = (eventsDF.withColumn("items", explode("items"))
           .groupBy("user_id")
           .agg(collect_set("items.item_id").alias("cart"))
)
display(cartsDF)

user_id,cart
UA000000102360011,List(M_STAN_Q)
UA000000102362166,List(M_STAN_K)
UA000000102368105,List(M_STAN_T)
UA000000102370324,List(M_STAN_K)
UA000000102371333,List(M_STAN_T)
UA000000102377152,List(M_STAN_F)
UA000000102386796,List(M_STAN_Q)
UA000000102390929,List(M_STAN_K)
UA000000102398119,List(M_STAN_T)
UA000000102400606,List(M_PREM_Q)


### 4. Join cart item history with emails
- Perform a left join on **`conversionsDF`** and **`cartsDF`** on the **`user_id`** field

Save result as **`emailCartsDF`**.

In [0]:
# TODO
emailCartsDF = conversionsDF.join(cartsDF, on="user_id", how="left")
display(emailCartsDF.limit(5))

user_id,email,user_first_touch_timestamp,converted,cart
UA000000102357285,ianortiz@francis.com,1592169133135185,False,
UA000000102357324,mtorres@gmail.com,1592185107111059,False,
UA000000102357753,kimberly84@strickland.biz,1592196381720379,False,
UA000000102357771,nwilson2@yahoo.com,1592196573947350,False,
UA000000102357938,choidaniel@price.com,1592198013565458,True,


### 5. Filter for emails with abandoned cart items
- Filter **`emailCartsDF`** for users where **`converted`** is False
- Filter for users with non-null carts

Save result as **`abandonedItemsDF`**.

In [0]:
# TODO
abandonedCartsDF = (emailCartsDF.filter(col('converted') == False)
                    .filter(col("cart").isNotNull())
)
display(abandonedCartsDF.limit(5))

user_id,email,user_first_touch_timestamp,converted,cart
UA000000102386796,lukemiller@hotmail.com,1592221721420940,False,List(M_STAN_Q)
UA000000102415866,klee@richardson-williams.net,1592228636702037,False,"List(M_PREM_Q, M_STAN_F)"
UA000000102454481,hdavis30@yahoo.com,1592235363612845,False,List(M_STAN_Q)
UA000000102456729,lucaseric@yahoo.com,1592235717304674,False,List(P_FOAM_S)
UA000000102457451,rubiokatherine@yahoo.com,1592235835162487,False,List(M_PREM_T)


### Bonus: Plot number of abandoned cart items by product

In [0]:
abandonedCartsDF.printSchema()

In [0]:
abandonedItemsDF = (abandonedCartsDF.select("cart")
        .withColumn("cart", explode("cart"))
        .groupBy("cart").count()
        .withColumnRenamed("count", "Number_of_abandoned_items")
        .sort("cart")
        .withColumnRenamed("cart", "Product")
)  
display(abandonedItemsDF)

Product,Number_of_abandoned_items
M_PREM_F,6363
M_PREM_K,9839
M_PREM_Q,11976
M_PREM_T,12527
M_STAN_F,25761
M_STAN_K,38765
M_STAN_Q,47008
M_STAN_T,50315
P_DOWN_K,2987
P_DOWN_S,6917


In [0]:
### Clean up classroom

In [0]:
%run ./Includes/Classroom-Cleanup