# Analyzing Customer-Music Data using Apache Spark

The original Drill & Tableau based tutorial is at https://mapr.com/blog/real-time-user-profiles-spark-drill-and-mapr-db/. I have converted them to Spark 2.4 Jupyter Scala Notebooks. In addition to that I have added many more Spark based Data Analysis sections, Side by Side Spark comparisons DF API and Spark SQL constructs to realize the same use case. Also used Jupyter Notebook for data visualization.

A special section for working with RDDs is also included.

Users are continuously connecting to the service and listening to tracks that they like -- this generates our main data set. The behaviors captured in these events, over time, represent the highest level of detail about actual behaviors of customers as they consume the service by listening to music. In addition to the events of listening to individual tracks, we have a few other data sets representing all the information we might normally have in such a service. In this post we will make use of the following three data sets.

## 1. Understanding the Data Set

**Individual customers listening to individual tracks: (tracks.csv)** - a collection of events, one per line, where each event is a client listening to a track.

This data is approximately 1M lines and contains simulated listener events over several months.

<table>
  <tr>
    <th><strong>Field Name</strong></th>
    <th>Event ID</th>
    <th>Customer ID</th>
    <th>Track ID</th>
    <th>Datetime</th>
    <th>Mobile</th>
    <th>Listening Zip</th>
  </tr>
  <tr>
    <td><strong>Type</strong></td>
    <td>Integer</td>
    <td>Integer</td>
    <td>Integer</td>
    <td>String</td>
    <td>Integer</td>
    <td>Integer</td>
  </tr>
  <tr>
    <td><strong>Example Value</strong></td>
    <td>9999767</td>
    <td>2597</td>
    <td>788</td>
    <td>2014-12-01 09:54:09</td>
    <td>0</td>
    <td>11003</td>
  </tr>
</table>

The event, customer and track IDs tell us what occurred (a customer listened to a certain track), while the other fields tell us some associated information, like whether the customer was listening on a mobile device and a guess about their location while they were listening. With many customers listening to many tracks, this data can get very large and will be the input into our Spark job.

**Customer information:** - information about individual customers.

<table>
  <tr>
    <th><strong>Field Name</strong></th>
    <th>Customer ID</th>
    <th>Name</th>
    <th>Gender</th>
    <th>Address</th>
    <th>ZIP</th>
    <th>Sign Date</th>    
    <th>Status</th>
    <th>Level</th>
    <th>Campaign</th>
    <th>Linked with Apps?</th>
  </tr>
  <tr>
    <td><strong>Type</strong></td>
    <td>Integer</td>
    <td>String</td>
    <td>Integer</td>
    <td>String</td>
    <td>Integer</td>
    <td>String</td>
    <td>Integer</td>
    <td>Integer</td>
    <td>Integer</td>
    <td>Integer</td>    
  </tr>
  <tr>
    <td><strong>Example Value</strong></td>
    <td>10</td>
    <td>Joshua Threadgill</td>
    <td>0</td>
    <td>10084 Easy Gate Bend</td>
    <td>66216</td>
    <td>01/13/2013</td>
    <td>0</td>
    <td>1</td>
    <td>1</td>
    <td>1</td>
  </tr>
</table>


The fields are defined as follows:
```
Customer ID: a unique identifier for that customer
Name, gender, address, zip: the customer’s associated information
Sign date: the date of addition to the service
Status: indicates whether or not the account is active (0 = closed, 1 = active)
Level: indicates what level of service -- 0, 1, 2 for Free, Silver and Gold, respectively
Campaign: indicates the campaign under which the user joined, defined as the following (fictional) campaigns driven by our (also fictional) marketing team:
NONE - no campaign
30DAYFREE - a ‘30 days free’ trial offer
SUPERBOWL - a Superbowl-related program
RETAILSTORE - an offer originating in brick-and-mortar retail stores
WEBOFFER - an offer for web-originated customers
```

**Previous ad clicks: (clicks.csv)** - a collection of user click events indicating which ad was played to the user and whether or not they clicked on it.

<table>
  <tr>
    <th><strong>Field Name</strong></th>
    <th><strong>EventID</strong></th>
    <th>CustID</th>
    <th>AdClicked</th>
    <th>Localtime</th>
  </tr>
  <tr>
    <td><strong>Type</strong></td>
    <td>Integer</td>
    <td>Integer</td>
    <td>String</td>
    <td>String</td>
  </tr>
  <tr>
    <td><strong>Example Value</strong></td>
    <td>0</td>
    <td>109</td>
    <td>ADV_FREE_REFERRAL</td>
    <td>2014-12-01 09:54:09</td>
  </tr>
</table>

The fields that interest us are the foreign key identifying the customer (CustID), a string indicating which ad they clicked (AdClicked), and the time when it happened (Localtime). Note that we could use a lot more features here, such as basic information about the customer (gender, etc.), but to keep things simple for the example we’ll leave that as a future exercise.

In [1]:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SparkSession, DataFrame, Dataset, Row}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.functions.{col, udf, asc, desc, when, array, struct}
import org.apache.spark.sql.functions.{sum, avg, count, countDistinct, hour, lit, format_number, explode}

import scala.collection.mutable.Set

## 2. Creating the Spark Session

In [2]:
val spark: SparkSession = (SparkSession
                           .builder
                           .master("local[*]")
                           .appName("music-customer-analysis-with-spark")
                           .getOrCreate())

spark = org.apache.spark.sql.SparkSession@6f4f4ba2


## 3. Load the data from files into DataFrames

In [3]:
val MUSIC_TRACKS_DATA: String = "data/tracks.csv"
val CUSTOMER_DATA: String     = "data/cust.csv"
val CLICKS_DATA: String       = "data/clicks.csv"

MUSIC_TRACKS_DATA = data/tracks.csv
CUSTOMER_DATA = data/cust.csv
CLICKS_DATA = data/clicks.csv


data/clicks.csv

In [4]:
//define the schema, corresponding to a line in the csv data file for music
val music_schema: StructType = new StructType(
                                    Array(
                                        new StructField("event_id", IntegerType, nullable=true),
                                        new StructField("customer_id", IntegerType, nullable=true),
                                        new StructField("track_id", StringType, nullable=true),
                                        new StructField("datetime", StringType, nullable=true),
                                        new StructField("is_mobile", IntegerType, nullable=true),
                                        new StructField("zip", IntegerType, nullable=true)
                                    ))

music_schema = StructType(StructField(event_id,IntegerType,true), StructField(customer_id,IntegerType,true), StructField(track_id,StringType,true), StructField(datetime,StringType,true), StructField(is_mobile,IntegerType,true), StructField(zip,IntegerType,true))


StructType(StructField(event_id,IntegerType,true), StructField(customer_id,IntegerType,true), StructField(track_id,StringType,true), StructField(datetime,StringType,true), StructField(is_mobile,IntegerType,true), StructField(zip,IntegerType,true))

In [5]:
//define the schema, corresponding to a line in the csv data file for customer
val cust_schema: StructType = new StructType(
                                    Array(
                                        new StructField("customer_id", IntegerType, nullable=true),
                                        new StructField("name", StringType, nullable=true),
                                        new StructField("gender", IntegerType, nullable=true),
                                        new StructField("address", StringType, nullable=true),
                                        new StructField("zip", IntegerType, nullable=true),
                                        new StructField("sign_date", StringType, nullable=true),
                                        new StructField("status", IntegerType, nullable=true),
                                        new StructField("level", IntegerType, nullable=true),
                                        new StructField("campaign", IntegerType, nullable=true),
                                        new StructField("lnkd_with_apps", IntegerType, nullable=true)
                                    ))

cust_schema = StructType(StructField(customer_id,IntegerType,true), StructField(name,StringType,true), StructField(gender,IntegerType,true), StructField(address,StringType,true), StructField(zip,IntegerType,true), StructField(sign_date,StringType,true), StructField(status,IntegerType,true), StructField(level,IntegerType,true), StructField(campaign,IntegerType,true), StructField(lnkd_with_apps,IntegerType,true))


StructType(StructField(customer_id,IntegerType,true), StructField(name,StringType,true), StructField(gender,IntegerType,true), StructField(address,StringType,true), StructField(zip,IntegerType,true), StructField(sign_date,StringType,true), StructField(status,IntegerType,true), StructField(level,IntegerType,true), StructField(campaign,IntegerType,true), StructField(lnkd_with_apps,IntegerType,true))

In [6]:
//define the schema, corresponding to a line in the csv data file for ad click
val click_schema: StructType = StructType(
                                  Array(
                                    new StructField("event_id", IntegerType, nullable=true),
                                    new StructField("customer_id", IntegerType, nullable=true),
                                    new StructField("ad_clicked", StringType, nullable=true),
                                    new StructField("datetime", StringType, nullable=true)
                                ))

click_schema = StructType(StructField(event_id,IntegerType,true), StructField(customer_id,IntegerType,true), StructField(ad_clicked,StringType,true), StructField(datetime,StringType,true))


StructType(StructField(event_id,IntegerType,true), StructField(customer_id,IntegerType,true), StructField(ad_clicked,StringType,true), StructField(datetime,StringType,true))

In [7]:
//Load data
val music_df: DataFrame = spark.read.schema(music_schema).csv(path=MUSIC_TRACKS_DATA).cache()
music_df.createOrReplaceTempView("music")

val cust_df: DataFrame = spark.read.schema(cust_schema).option("header", "true").csv(path=CUSTOMER_DATA).cache()
cust_df.createOrReplaceTempView("cust")

val click_df: DataFrame = spark.read.schema(click_schema).option("header", "false").csv(path=CLICKS_DATA).cache()
click_df.createOrReplaceTempView("click")

music_df = [event_id: int, customer_id: int ... 4 more fields]
cust_df = [customer_id: int, name: string ... 8 more fields]
click_df = [event_id: int, customer_id: int ... 2 more fields]


[event_id: int, customer_id: int ... 2 more fields]

In [8]:
//How many music data rows
println(music_df.count())

1000000


In [9]:
music_df.show(5)

+--------+-----------+--------+-------------------+---------+-----+
|event_id|customer_id|track_id|           datetime|is_mobile|  zip|
+--------+-----------+--------+-------------------+---------+-----+
|       0|         48|     453|2014-10-23 03:26:20|        0|72132|
|       1|       1081|      19|2014-10-15 18:32:14|        1|17307|
|       2|        532|      36|2014-12-10 15:33:16|        1|66216|
|       3|       2641|     822|2014-10-20 02:24:55|        1|36690|
|       4|       2251|     338|2014-11-18 07:16:05|        1|61377|
+--------+-----------+--------+-------------------+---------+-----+
only showing top 5 rows



In [10]:
//How many customer data rows
println(cust_df.count())

5000


In [11]:
cust_df.show(5)

+-----------+-------------+------+--------------------+-----+----------+------+-----+--------+--------------+
|customer_id|         name|gender|             address|  zip| sign_date|status|level|campaign|lnkd_with_apps|
+-----------+-------------+------+--------------------+-----+----------+------+-----+--------+--------------+
|          0|Gregory Koval|     0|13004 Easy Cider ...|72132|06/04/2013|     1|    1|       1|             0|
|          1|Robert Gordon|     0|10497 Thunder Hic...|17307|07/27/2013|     1|    1|       1|             0|
|          2|Paula Peltier|     0|10084 Easy Gate Bend|66216|01/13/2013|     1|    0|       4|             1|
|          3|Francine Gray|     0|54845 Bent Pony H...|36690|07/11/2013|     1|    1|       1|             1|
|          4| David Garcia|     0|8551 Tawny Fox Villa|61377|09/09/2012|     1|    0|       1|             1|
+-----------+-------------+------+--------------------+-----+----------+------+-----+--------+--------------+
only showi

In [12]:
//How many ads click data rows
println(click_df.count())

65711


In [13]:
click_df.show(5)

+--------+-----------+--------------------+-------------------+
|event_id|customer_id|          ad_clicked|           datetime|
+--------+-----------+--------------------+-------------------+
|   76611|       2488|   ADV_FREE_REFERRAL|2014-12-25 05:08:59|
|  305706|       2476|ADV_DONATION_CHARITY|2014-11-26 22:24:21|
|  156074|       1307|   ADV_FREE_REFERRAL|2014-10-15 03:52:40|
|  192762|       1733|   ADV_LIKE_FACEBOOK|2014-10-20 14:55:08|
|   76106|          2|   ADV_LIKE_FACEBOOK|2014-11-19 00:22:13|
+--------+-----------+--------------------+-------------------+
only showing top 5 rows



## 4. Data Exploration

### 4.1 Compute Hourly Summary profile of each customer:

We will now see customers' listening behaviour across various hours in the day. Whether they tend to listen more in the morning or night, statistics like that.

**Add a new Hour Column to the Music data:**

In [14]:
var hourly_music_df: DataFrame = music_df.withColumn("hour", hour(col("datetime"))).cache()

hourly_music_df = [event_id: int, customer_id: int ... 5 more fields]


[event_id: int, customer_id: int ... 5 more fields]

In [15]:
hourly_music_df.show(5)

+--------+-----------+--------+-------------------+---------+-----+----+
|event_id|customer_id|track_id|           datetime|is_mobile|  zip|hour|
+--------+-----------+--------+-------------------+---------+-----+----+
|       0|         48|     453|2014-10-23 03:26:20|        0|72132|   3|
|       1|       1081|      19|2014-10-15 18:32:14|        1|17307|  18|
|       2|        532|      36|2014-12-10 15:33:16|        1|66216|  15|
|       3|       2641|     822|2014-10-20 02:24:55|        1|36690|   2|
|       4|       2251|     338|2014-11-18 07:16:05|        1|61377|   7|
+--------+-----------+--------+-------------------+---------+-----+----+
only showing top 5 rows



**Divide the entire day into four time buckets based on the hour:**  

Bucket the listen datetime into different buckets in the day e.g. night, morning, afternoon or evening and mark 1 if the song is listened in that bucket.

In [16]:
hourly_music_df = (hourly_music_df
      .withColumn("night", when((col("hour") < 5) || (col("hour") >= 22), 1).otherwise(0))
      .withColumn("morn", when((col("hour") >= 5) && (col("hour") < 12), 1).otherwise(0))
      .withColumn("aft", when((col("hour") >= 12) && (col("hour") < 17), 1).otherwise(0))
      .withColumn("eve", when((col("hour") >= 17) && (col("hour") < 22), 1).otherwise(0))
      .cache())

hourly_music_df = [event_id: int, customer_id: int ... 9 more fields]


[event_id: int, customer_id: int ... 9 more fields]

### 4.1.1 Compute Customer Hourly Summary using DF API:

Now we're ready to compute a summary profile for each user. We will leverage Spark SQL functions compute some high-level data:

+ Average number of tracks listened during each period of the day: morning, afternoon, evening, and night. We arbitrarily define the time ranges in the code.
+ Total unique tracks listened by that user, i.e. the set of unique track IDs.
+ Total mobile tracks listened by that user, i.e. the count of tracks that were listened that had their mobile flag set.

In [17]:
val cust_profile_df: DataFrame = (hourly_music_df
                                  .select("customer_id", "track_id", "night", "morn", "aft", "eve", "is_mobile")
                                  .groupBy("customer_id")
                                  .agg(countDistinct("track_id"), sum("night"),sum("morn"),sum("aft"),sum("eve"), sum("is_mobile")
                                  )).cache()

cust_profile_df = [customer_id: int, count(DISTINCT track_id): bigint ... 5 more fields]


[customer_id: int, count(DISTINCT track_id): bigint ... 5 more fields]

In [18]:
cust_profile_df.show(10)

+-----------+------------------------+----------+---------+--------+--------+--------------+
|customer_id|count(DISTINCT track_id)|sum(night)|sum(morn)|sum(aft)|sum(eve)|sum(is_mobile)|
+-----------+------------------------+----------+---------+--------+--------+--------------+
|        148|                     443|       149|      170|     109|     124|           476|
|        463|                     306|       103|       99|      84|      76|           176|
|       1591|                     171|        47|       64|      36|      40|            85|
|       2366|                     143|        55|       46|      30|      25|           113|
|       4101|                     100|        31|       28|      26|      22|            85|
|       1342|                     173|        53|       60|      36|      42|           102|
|       2659|                     119|        42|       43|      22|      22|            59|
|       1238|                     191|        72|       64|      30|  

### 4.1.2 Compute Customer Hourly Summary using SQL:

In the previous sections we used only DF APIs to calculate the hourly profiles. However, we can use pure Spark SQL to achieve the same results. That would be much less verbose. We will still leverage PySpark SQL functions compute those high-level data:

+ Average number of tracks listened during each period of the day: morning, afternoon, evening, and night. We arbitrarily define the time ranges in the code.
+ Total unique tracks listened by that user, i.e. the set of unique track IDs.
+ Total mobile tracks listened by that user, i.e. the count of tracks that were listened that had their mobile flag set.

**Divide the entire day into four time buckets based on the hour:**  

Bucket the listen datetime into different buckets in the day e.g. night, morning, afternoon or evening and mark 1 if the song is listened in that bucket.

In [19]:
spark.sql(
"""
SELECT  *,
      HOUR(datetime) as hour,
      CASE WHEN HOUR(datetime) < 5 OR HOUR(datetime) >= 22 THEN 1 ELSE 0 END AS night,
      CASE WHEN HOUR(datetime) >= 5 AND HOUR(datetime) < 12 THEN 1 ELSE 0 END AS morn,
      CASE WHEN HOUR(datetime) >= 12 AND HOUR(datetime) < 17 THEN 1 ELSE 0 END AS aft,
      CASE WHEN HOUR(datetime) >= 17 AND HOUR(datetime) < 22 THEN 1 ELSE 0 END AS eve
FROM music
""").show(10)

+--------+-----------+--------+-------------------+---------+-----+----+-----+----+---+---+
|event_id|customer_id|track_id|           datetime|is_mobile|  zip|hour|night|morn|aft|eve|
+--------+-----------+--------+-------------------+---------+-----+----+-----+----+---+---+
|       0|         48|     453|2014-10-23 03:26:20|        0|72132|   3|    1|   0|  0|  0|
|       1|       1081|      19|2014-10-15 18:32:14|        1|17307|  18|    0|   0|  0|  1|
|       2|        532|      36|2014-12-10 15:33:16|        1|66216|  15|    0|   0|  1|  0|
|       3|       2641|     822|2014-10-20 02:24:55|        1|36690|   2|    1|   0|  0|  0|
|       4|       2251|     338|2014-11-18 07:16:05|        1|61377|   7|    0|   1|  0|  0|
|       5|       1811|       6|2014-11-18 02:00:48|        1|20115|   2|    1|   0|  0|  0|
|       6|       3644|      24|2014-12-12 15:24:02|        1|15330|  15|    0|   0|  1|  0|
|       7|        250|     726|2014-10-07 09:48:53|        0|33570|   9|    0|  

**Compute the hourly profiles:**  

We can combine the above bucketing and calculating the hourly summary in one SQL as follows.

In [20]:
spark.sql(
"""
SELECT customer_id, COUNT(DISTINCT track_id), SUM(night), SUM(morn), SUM(aft), SUM(eve), SUM(is_mobile)
FROM(
  SELECT  *,
          HOUR(datetime) as hour,
          CASE WHEN HOUR(datetime) < 5 OR HOUR(datetime) >= 22 THEN 1 ELSE 0 END AS night,
          CASE WHEN HOUR(datetime) >= 5 AND HOUR(datetime) < 12 THEN 1 ELSE 0 END AS morn,
          CASE WHEN HOUR(datetime) >= 12 AND HOUR(datetime) < 17 THEN 1 ELSE 0 END AS aft,
          CASE WHEN HOUR(datetime) >= 17 AND HOUR(datetime) < 22 THEN 1 ELSE 0 END AS eve
  FROM music)
GROUP BY customer_id
""").show(10)

+-----------+------------------------+----------+---------+--------+--------+--------------+
|customer_id|count(DISTINCT track_id)|sum(night)|sum(morn)|sum(aft)|sum(eve)|sum(is_mobile)|
+-----------+------------------------+----------+---------+--------+--------+--------------+
|        148|                     443|       149|      170|     109|     124|           476|
|        463|                     306|       103|       99|      84|      76|           176|
|       1591|                     171|        47|       64|      36|      40|            85|
|       2366|                     143|        55|       46|      30|      25|           113|
|       4101|                     100|        31|       28|      26|      22|            85|
|       1342|                     173|        53|       60|      36|      42|           102|
|       2659|                     119|        42|       43|      22|      22|            59|
|       1238|                     191|        72|       64|      30|  

We can see the result is same as the results form the DF APIs.

### 4.2 Summary Statistics:

Since we have the summary data readily available we compute some basic statistics on it.

In [21]:
//Referring to cust_profile_df from section 4.1.1 we can use the describe() function to get the summary statistics
cust_profile_df.select(cust_profile_df.columns.filter(c => !c.equals("customer_id")).map(col): _*).describe().show()

+-------+------------------------+-----------------+----------------+-----------------+-----------------+------------------+
|summary|count(DISTINCT track_id)|       sum(night)|       sum(morn)|         sum(aft)|         sum(eve)|    sum(is_mobile)|
+-------+------------------------+-----------------+----------------+-----------------+-----------------+------------------+
|  count|                    5000|             5000|            5000|             5000|             5000|              5000|
|   mean|                 170.295|          58.3032|         58.2908|          41.6434|          41.7626|           121.553|
| stddev|      117.04437556828793|67.27232404842705|67.3964412370437|47.87538247251274|48.01370329792189|148.79537090743347|
|    min|                      68|               15|              16|                9|                9|                32|
|    max|                    1617|             2139|            2007|             1460|             1480|              5093|


In [22]:
// store the describe dataframe temporarily
var summary_stats_df: DataFrame = cust_profile_df.select(cust_profile_df.columns.filter(c => !c.equals("customer_id")).map(col): _*).describe()

summary_stats_df = [summary: string, count(DISTINCT track_id): string ... 5 more fields]


[summary: string, count(DISTINCT track_id): string ... 5 more fields]

#### 4.2.1 Prettifying Summary Statistics:

There are too many decimal places for mean and stddev in the describe() dataframe. We can format the numbers to just show up to two decimal places. Pay careful attention to the datatypes that describe() returns, its a String, we need to cast that to a float before we can format. We use cast() and format_number() on individual columns to reformat.

In [23]:
summary_stats_df.select(summary_stats_df("summary"),
  format_number(summary_stats_df("count(DISTINCT track_id)").cast("float"), 2).alias("count(DISTINCT track_id)"),
  format_number(summary_stats_df("sum(night)").cast("float"), 2).alias("sum(night)"),
  format_number(summary_stats_df("sum(morn)").cast("float"), 2).alias("sum(morn)"),
  format_number(summary_stats_df("sum(aft)").cast("float"), 2).alias("sum(aft)"),
  format_number(summary_stats_df("sum(is_mobile)").cast("float"), 2).alias("sum(is_mobile)"))
.show()

+-------+------------------------+----------+---------+--------+--------------+
|summary|count(DISTINCT track_id)|sum(night)|sum(morn)|sum(aft)|sum(is_mobile)|
+-------+------------------------+----------+---------+--------+--------------+
|  count|                5,000.00|  5,000.00| 5,000.00|5,000.00|      5,000.00|
|   mean|                  170.29|     58.30|    58.29|   41.64|        121.55|
| stddev|                  117.04|     67.27|    67.40|   47.88|        148.80|
|    min|                   68.00|     15.00|    16.00|    9.00|         32.00|
|    max|                1,617.00|  2,139.00| 2,007.00|1,460.00|      5,093.00|
+-------+------------------------+----------+---------+--------+--------------+



#### 4.2.2 Prettifying Summary Statistics - Even Smarter:

In real life data sets there would be too many columns. Specifying each columm in the codes would not be feasible. We can use list comprehension of Python of for loops to do this smartly. We can even exclude some columns we dont' want.

**Apply for loop on formatting columns and excluding the summary column:**

In [24]:
for(col_name <- summary_stats_df.columns.filter(col_name => !col_name.equals("summary"))) {
  summary_stats_df = summary_stats_df.withColumn(col_name, format_number(col(col_name).cast("float"), 2))
}

In [25]:
summary_stats_df.show()

+-------+------------------------+----------+---------+--------+--------+--------------+
|summary|count(DISTINCT track_id)|sum(night)|sum(morn)|sum(aft)|sum(eve)|sum(is_mobile)|
+-------+------------------------+----------+---------+--------+--------+--------------+
|  count|                5,000.00|  5,000.00| 5,000.00|5,000.00|5,000.00|      5,000.00|
|   mean|                  170.29|     58.30|    58.29|   41.64|   41.76|        121.55|
| stddev|                  117.04|     67.27|    67.40|   47.88|   48.01|        148.80|
|    min|                   68.00|     15.00|    16.00|    9.00|    9.00|         32.00|
|    max|                1,617.00|  2,139.00| 2,007.00|1,460.00|1,480.00|      5,093.00|
+-------+------------------------+----------+---------+--------+--------+--------------+



Interpreting the summary statistics:
> People Listen to highest number of songs in the Night!

### 4.3 An ODE to RDD - Compute Customer Hourly Summary using Custom Group Function:

If you and must have to work with RDD instead of DataFrames, then we can compute a summary profile for each user by passing a function we'll write to mapValues to compute the same high-level data:

+ Average number of tracks listened during each period of the day: morning, afternoon, evening, and night. We arbitrarily define the time ranges in the code.
+ Total unique tracks listened by that user, i.e. the set of unique track IDs.
+ Total mobile tracks listened by that user, i.e. the count of tracks that were listened that had their mobile flag set.

In [26]:
//let's select only the original columns
val music_rdd: RDD[Row] = music_df.select("customer_id", "track_id", "datetime", "is_mobile", "zip").rdd.cache()

music_rdd = MapPartitionsRDD[117] at rdd at <console>:42


MapPartitionsRDD[117] at rdd at <console>:42

In [27]:
music_rdd.take(5).foreach(println)

[48,453,2014-10-23 03:26:20,0,72132]
[1081,19,2014-10-15 18:32:14,1,17307]
[532,36,2014-12-10 15:33:16,1,66216]
[2641,822,2014-10-20 02:24:55,1,36690]
[2251,338,2014-11-18 07:16:05,1,61377]


**Use customer_id as the key:**

In [28]:
//Use customer_id as the key, we will later group by on this column
music_rdd.map(record => (record(0), record)).take(5).foreach(println)

(48,[48,453,2014-10-23 03:26:20,0,72132])
(1081,[1081,19,2014-10-15 18:32:14,1,17307])
(532,[532,36,2014-12-10 15:33:16,1,66216])
(2641,[2641,822,2014-10-20 02:24:55,1,36690])
(2251,[2251,338,2014-11-18 07:16:05,1,61377])


**Develop the User Stats function:**

We loop over the tracks of each customer and find the unique number of tracks listened by him and how many times he listened during various times of the day.

In [29]:
def compute_stats_byuser(tracks: Iterable[Row]) : (Double, Double, Double, Double, Double, Double) = {

  var mcount, morn, aft, eve, night = 0
  val tracklist: Set[String] = Set()

  for(track <- tracks) {
    //println(track)
    //println(track.schema)

    val custid = track.getAs[Int](0)
    val trackid = track.getAs[String](1)
    val hour = track.getAs[String](2).split(" ")(1).split(":")(0).toInt
    val mobile = track.getAs[Int](3)
    val zip = track.getAs[Int](4)

    tracklist.add(trackid)

    mcount += mobile

    if (hour < 5) {
      night += 1
    } else if (hour < 12) {
      morn += 1
    } else if (hour < 17) {
      aft += 1
    } else if (hour < 22) {
      eve += 1
    } else {
      night += 1
    }

  }

  (tracklist.size, morn, aft, eve, night, mcount)
}

compute_stats_byuser: (tracks: Iterable[org.apache.spark.sql.Row])(Double, Double, Double, Double, Double, Double)


In [30]:
val cust_profile_rdd = (music_rdd.map(record => (record(0), record))
                                .groupByKey().mapValues(tracks => compute_stats_byuser(tracks)))

cust_profile_rdd = MapPartitionsRDD[121] at mapValues at <console>:46


MapPartitionsRDD[121] at mapValues at <console>:46

In [31]:
cust_profile_rdd.cache()

MapPartitionsRDD[121] at mapValues at <console>:46

In [32]:
cust_profile_rdd.take(10).foreach(println)

(4904,(106.0,40.0,20.0,20.0,33.0,72.0))
(4552,(105.0,24.0,22.0,33.0,32.0,91.0))
(3456,(96.0,29.0,23.0,25.0,23.0,47.0))
(4680,(91.0,25.0,21.0,22.0,29.0,49.0))
(1080,(185.0,71.0,49.0,29.0,58.0,98.0))
(320,(313.0,108.0,68.0,92.0,124.0,328.0))
(752,(260.0,87.0,70.0,66.0,78.0,158.0))
(3272,(112.0,38.0,20.0,27.0,35.0,53.0))
(408,(272.0,101.0,71.0,58.0,91.0,231.0))
(4352,(104.0,33.0,26.0,24.0,31.0,86.0))


**Compare the Results that we got from RDD and previously from DF methods:**

In [33]:
cust_profile_rdd.filter(record => record._1 == 48).take(1)

[(48,(696.0,310.0,217.0,223.0,277.0,503.0))]

In [34]:
cust_profile_df.filter(col("customer_id") === 48).show()

+-----------+------------------------+----------+---------+--------+--------+--------------+
|customer_id|count(DISTINCT track_id)|sum(night)|sum(morn)|sum(aft)|sum(eve)|sum(is_mobile)|
+-----------+------------------------+----------+---------+--------+--------+--------------+
|         48|                     696|       277|      310|     217|     223|           503|
+-----------+------------------------+----------+---------+--------+--------+--------------+



Woo Hoo! We can clearly see that the values in each of the columns are matching! We are on the right track!


**Summary Statistics:**

Since we have the summary data readily available we compute some basic statistics on it. Since we are working on the RDD we cannot use the `describe()` method of the DataFrame. Instead we will use the `Statistics` package  for the `colStats` function from `org.apache.spark.mllib.stat.Statistics`.

In [35]:
import org.apache.spark.mllib.stat.Statistics

//compute aggregate stats for entire track history
val summary_stats_ml = Statistics.colStats(cust_profile_rdd.map(x => Vectors.dense(Array(x._2._1, x._2._2, x._2._3, x._2._4, x._2._5, x._2._6))))

summary_stats_ml = org.apache.spark.mllib.stat.MultivariateOnlineSummarizer@598e2a85


org.apache.spark.mllib.stat.MultivariateOnlineSummarizer@598e2a85

In [36]:
println(summary_stats_ml.count)

5000


In [37]:
println(summary_stats_ml.mean)

[170.29499999999996,58.29079999999999,41.64339999999999,41.762599999999985,58.3032,121.55300000000003]


In [38]:
println(summary_stats_ml.max)

[1617.0,2007.0,1460.0,1480.0,2139.0,5093.0]


In [39]:
println(summary_stats_ml.min)

[68.0,16.0,9.0,9.0,15.0,32.0]


### 4.4 PIVOT Tables With Multiples WHENs - Compute Customer Hourly Summary:

If you intend to venture on using more advanced functions in Spark, then we can use the `pivot` function to do whate we have done doe now in much shorter steps.

First we extract the hour, convert that hour into several buckets and then pivot on those buckets.

In [40]:
music_df.select(
  col("event_id"),
  col("customer_id"),
  col("track_id"),
  col("datetime"),
  col("is_mobile"),
  col("zip"),
  hour(col("datetime")).alias("hour")
).show(10)

+--------+-----------+--------+-------------------+---------+-----+----+
|event_id|customer_id|track_id|           datetime|is_mobile|  zip|hour|
+--------+-----------+--------+-------------------+---------+-----+----+
|       0|         48|     453|2014-10-23 03:26:20|        0|72132|   3|
|       1|       1081|      19|2014-10-15 18:32:14|        1|17307|  18|
|       2|        532|      36|2014-12-10 15:33:16|        1|66216|  15|
|       3|       2641|     822|2014-10-20 02:24:55|        1|36690|   2|
|       4|       2251|     338|2014-11-18 07:16:05|        1|61377|   7|
|       5|       1811|       6|2014-11-18 02:00:48|        1|20115|   2|
|       6|       3644|      24|2014-12-12 15:24:02|        1|15330|  15|
|       7|        250|     726|2014-10-07 09:48:53|        0|33570|   9|
|       8|       1782|     442|2014-12-30 15:27:31|        1|41240|  15|
|       9|       2932|     775|2014-11-12 07:45:55|        0|63565|   7|
+--------+-----------+--------+-------------------+

In [41]:
// Create the hour buckets
music_df
  .select(col("event_id"), col("customer_id"), col("track_id"), col("datetime"), col("is_mobile"), col("zip"),
   hour(col("datetime")).alias("hour"),
   when((hour(col("datetime")) < 5) || (hour(col("datetime")) >= 22), lit("night"))
   .when((hour(col("datetime")) >= 5) && (hour(col("datetime")) < 12), lit("morn"))
   .when((hour(col("datetime")) >= 12) && (hour(col("datetime")) < 17), lit("aft"))
   .when((hour(col("datetime")) >= 17) && (hour(col("datetime")) < 22), lit("eve"))
   .alias("bucket")
).show(10)

+--------+-----------+--------+-------------------+---------+-----+----+------+
|event_id|customer_id|track_id|           datetime|is_mobile|  zip|hour|bucket|
+--------+-----------+--------+-------------------+---------+-----+----+------+
|       0|         48|     453|2014-10-23 03:26:20|        0|72132|   3| night|
|       1|       1081|      19|2014-10-15 18:32:14|        1|17307|  18|   eve|
|       2|        532|      36|2014-12-10 15:33:16|        1|66216|  15|   aft|
|       3|       2641|     822|2014-10-20 02:24:55|        1|36690|   2| night|
|       4|       2251|     338|2014-11-18 07:16:05|        1|61377|   7|  morn|
|       5|       1811|       6|2014-11-18 02:00:48|        1|20115|   2| night|
|       6|       3644|      24|2014-12-12 15:24:02|        1|15330|  15|   aft|
|       7|        250|     726|2014-10-07 09:48:53|        0|33570|   9|  morn|
|       8|       1782|     442|2014-12-30 15:27:31|        1|41240|  15|   aft|
|       9|       2932|     775|2014-11-1

In [42]:
// Create the hour buckets and then pivot on the hour buckets
val hourly_pivot_df = music_df
                        .select(col("event_id"), col("customer_id"), col("track_id"), col("datetime"), 
                                col("is_mobile"), col("zip"), hour(col("datetime")).alias("hour"),
                          when((hour(col("datetime")) < 5) || (hour(col("datetime")) >= 22), lit("night"))
                           .when((hour(col("datetime")) >= 5) && (hour(col("datetime")) < 12), lit("morn"))
                           .when((hour(col("datetime")) >= 12) && (hour(col("datetime")) < 17), lit("aft"))
                           .when((hour(col("datetime")) >= 17) && (hour(col("datetime")) < 22), lit("eve"))
                           .alias("bucket"))
                        .select("customer_id", "bucket")
                        .groupBy("customer_id")
                        .pivot("bucket", Array("night", "morn", "aft", "eve"))
                        .agg(count(col("bucket"))
                        ).cache()

hourly_pivot_df = [customer_id: int, night: bigint ... 3 more fields]


[customer_id: int, night: bigint ... 3 more fields]

In [43]:
hourly_pivot_df.show(10)

+-----------+-----+----+---+---+
|customer_id|night|morn|aft|eve|
+-----------+-----+----+---+---+
|        471|   84|  96| 60| 73|
|       3175|   35|  28| 25| 21|
|        833|   70|  75| 48| 63|
|       1088|   69|  62| 41| 46|
|        463|  103|  99| 84| 76|
|       1238|   72|  64| 30| 46|
|       1645|   55|  42| 54| 35|
|       1342|   53|  60| 36| 42|
|       1959|   42|  43| 34| 24|
|       2366|   55|  46| 30| 25|
+-----------+-----+----+---+---+
only showing top 10 rows



**Compare the Profile Summary that we got from Multi Step DF API and SQL above and the Pivot operation:**

In [44]:
hourly_pivot_df.filter(col("customer_id") === 48).show()

+-----------+-----+----+---+---+
|customer_id|night|morn|aft|eve|
+-----------+-----+----+---+---+
|         48|  277| 310|217|223|
+-----------+-----+----+---+---+



In [45]:
cust_profile_df.filter(col("customer_id") === 48).show()

+-----------+------------------------+----------+---------+--------+--------+--------------+
|customer_id|count(DISTINCT track_id)|sum(night)|sum(morn)|sum(aft)|sum(eve)|sum(is_mobile)|
+-----------+------------------------+----------+---------+--------+--------+--------------+
|         48|                     696|       277|      310|     217|     223|           503|
+-----------+------------------------+----------+---------+--------+--------+--------------+



**YAY!** We can clearly see that the results from our pivot operation and the results we got from DF API and SQL constructs are matching! To make it exactly we would need give a final touch!

Gather the stats for no. of unique tracks and is_mobile count separately and then join with the pivot table.

In [46]:
val tracks_summary_df = (music_df
  .select("customer_id", "track_id", "is_mobile")
  .groupBy("customer_id")
  .agg(countDistinct("track_id"), sum("is_mobile"))
).cache()

tracks_summary_df = [customer_id: int, count(DISTINCT track_id): bigint ... 1 more field]


[customer_id: int, count(DISTINCT track_id): bigint ... 1 more field]

In [47]:
tracks_summary_df.show(10)

+-----------+------------------------+--------------+
|customer_id|count(DISTINCT track_id)|sum(is_mobile)|
+-----------+------------------------+--------------+
|        148|                     443|           476|
|        463|                     306|           176|
|       1591|                     171|            85|
|       2366|                     143|           113|
|       4101|                     100|            85|
|       1342|                     173|           102|
|       2659|                     119|            59|
|       1238|                     191|           158|
|       4519|                     103|            54|
|       1580|                     162|           134|
+-----------+------------------------+--------------+
only showing top 10 rows



In [48]:
(tracks_summary_df
      .join(hourly_pivot_df, hourly_pivot_df("customer_id") === tracks_summary_df("customer_id"), "inner")
      .select(hourly_pivot_df("customer_id"), col("count(DISTINCT track_id)"),
      col("night").alias("sum(night)"),
      col("morn").alias("sum(morn)"),
      col("aft").alias("sum(aft)"),
      col("eve").alias("sum(eve)"),
        col("sum(is_mobile)"))
      .filter(col("customer_id") === 48)
    ).show()

+-----------+------------------------+----------+---------+--------+--------+--------------+
|customer_id|count(DISTINCT track_id)|sum(night)|sum(morn)|sum(aft)|sum(eve)|sum(is_mobile)|
+-----------+------------------------+----------+---------+--------+--------+--------------+
|         48|                     696|       277|      310|     217|     223|           503|
+-----------+------------------------+----------+---------+--------+--------+--------------+



In [49]:
cust_profile_df.filter(col("customer_id") === 48).show()

+-----------+------------------------+----------+---------+--------+--------+--------------+
|customer_id|count(DISTINCT track_id)|sum(night)|sum(morn)|sum(aft)|sum(eve)|sum(is_mobile)|
+-----------+------------------------+----------+---------+--------+--------+--------------+
|         48|                     696|       277|      310|     217|     223|           503|
+-----------+------------------------+----------+---------+--------+--------+--------------+



In [50]:
hourly_pivot_df.unpersist()
tracks_summary_df.unpersist()

[customer_id: int, count(DISTINCT track_id): bigint ... 1 more field]

### 4.5 PIVOT & UNPIVOT Tables:

Often times we would need to UNPIVOT tables. This is just the reverse of PIVOT function, converting from a wide format to narrow format. It is similar to pandas `melt` function.

We can realise that through a combination of tsruct `explode(array({struct(<col name>,<col val>)}*))` transformations.

First we extract the hour, convert that hour into several buckets and then pivot on those buckets to create the hourly_pivot DataFrame.

In [51]:
// Create the hour buckets and then pivot on the hour buckets
val hourly_pivot_df = (music_df.select($"event_id", $"customer_id", $"track_id", $"datetime", $"is_mobile", $"zip", 
             hour($"datetime").alias("hour"),
             when((hour($"datetime") < 5) or (hour($"datetime") >= 22), lit("night"))
              .when((hour($"datetime") >= 5) and (hour($"datetime") < 12), lit("morn"))
              .when((hour($"datetime") >= 12) and (hour($"datetime") < 17), lit("aft"))
              .when((hour($"datetime") >= 17) and (hour($"datetime") < 22), lit("eve"))
              .alias("bucket"))
              .select("customer_id", "bucket")
              .groupBy("customer_id")
              .pivot("bucket", Array("night", "morn", "aft", "eve"))
              .agg(count($"bucket"))
           ).cache()

hourly_pivot_df = [customer_id: int, night: bigint ... 3 more fields]


[customer_id: int, night: bigint ... 3 more fields]

In [52]:
hourly_pivot_df.show(10)

+-----------+-----+----+---+---+
|customer_id|night|morn|aft|eve|
+-----------+-----+----+---+---+
|        471|   84|  96| 60| 73|
|       3175|   35|  28| 25| 21|
|        833|   70|  75| 48| 63|
|       1088|   69|  62| 41| 46|
|        463|  103|  99| 84| 76|
|       1238|   72|  64| 30| 46|
|       1645|   55|  42| 54| 35|
|       1342|   53|  60| 36| 42|
|       1959|   42|  43| 34| 24|
|       2366|   55|  46| 30| 25|
+-----------+-----+----+---+---+
only showing top 10 rows



Then we convert each column into a struct column and the combine all those struct columns to form an array of struct columns.  It is important to provide same names to the individual elements with the struct columns otherwise the array function will complain that it has not been provided with similar elements e.g. `struct(lit("night").alias("bucket"), col("night").alias("count"))`

In [53]:
(hourly_pivot_df
    .select($"customer_id", 
        array(
            struct(lit("night").alias("bucket"), col("night").alias("count")), 
            struct(lit("morn").alias("bucket"), col("morn").alias("count")),
            struct(lit("aft").alias("bucket"), col("aft").alias("count")),
            struct(lit("eve").alias("bucket"), col("eve").alias("count"))
          ).alias("array_of_struct_bucket_count")
        )).show(10, false)

+-----------+------------------------------------------------+
|customer_id|array_of_struct_bucket_count                    |
+-----------+------------------------------------------------+
|471        |[[night, 84], [morn, 96], [aft, 60], [eve, 73]] |
|3175       |[[night, 35], [morn, 28], [aft, 25], [eve, 21]] |
|833        |[[night, 70], [morn, 75], [aft, 48], [eve, 63]] |
|1088       |[[night, 69], [morn, 62], [aft, 41], [eve, 46]] |
|463        |[[night, 103], [morn, 99], [aft, 84], [eve, 76]]|
|1238       |[[night, 72], [morn, 64], [aft, 30], [eve, 46]] |
|1645       |[[night, 55], [morn, 42], [aft, 54], [eve, 35]] |
|1342       |[[night, 53], [morn, 60], [aft, 36], [eve, 42]] |
|1959       |[[night, 42], [morn, 43], [aft, 34], [eve, 24]] |
|2366       |[[night, 55], [morn, 46], [aft, 30], [eve, 25]] |
+-----------+------------------------------------------------+
only showing top 10 rows



We then explode the array of structs column so that now each struct column becomes a row.

In [54]:
(hourly_pivot_df
    .select($"customer_id",    
        explode(
            array(
                struct(lit("night").alias("bucket"), col("night").alias("count")), 
                struct(lit("morn").alias("bucket"), col("morn").alias("count")),
                struct(lit("aft").alias("bucket"), col("aft").alias("count")),
                struct(lit("eve").alias("bucket"), col("eve").alias("count"))
            )
          ).alias("exploded_struct_bucket_count")
        )).show(10, false)

+-----------+----------------------------+
|customer_id|exploded_struct_bucket_count|
+-----------+----------------------------+
|471        |[night, 84]                 |
|471        |[morn, 96]                  |
|471        |[aft, 60]                   |
|471        |[eve, 73]                   |
|3175       |[night, 35]                 |
|3175       |[morn, 28]                  |
|3175       |[aft, 25]                   |
|3175       |[eve, 21]                   |
|833        |[night, 70]                 |
|833        |[morn, 75]                  |
+-----------+----------------------------+
only showing top 10 rows



And finaly, We break exploded struct column into ite individual components and extract them out as separate columns.

In [55]:
(hourly_pivot_df
    .withColumn("exploded_struct_bucket_count",    
        explode(
            array(
                struct(lit("night").alias("bucket"), col("night").alias("count")), 
                struct(lit("morn").alias("bucket"), col("morn").alias("count"))
            )
          )         
        )
     .selectExpr("customer_id", "exploded_struct_bucket_count.bucket as bucket", "exploded_struct_bucket_count.count as count")
).show(10, false)

+-----------+------+-----+
|customer_id|bucket|count|
+-----------+------+-----+
|471        |night |84   |
|471        |morn  |96   |
|3175       |night |35   |
|3175       |morn  |28   |
|833        |night |70   |
|833        |morn  |75   |
|1088       |night |69   |
|1088       |morn  |62   |
|463        |night |103  |
|463        |morn  |99   |
+-----------+------+-----+
only showing top 10 rows



In [56]:
hourly_pivot_df.unpersist()

[customer_id: int, night: bigint ... 3 more fields]

### 4.6 Average number of tracks listened by Customers of Different Levels during Different Time of the Day:

In [57]:
cust_df.show(5)

+-----------+-------------+------+--------------------+-----+----------+------+-----+--------+--------------+
|customer_id|         name|gender|             address|  zip| sign_date|status|level|campaign|lnkd_with_apps|
+-----------+-------------+------+--------------------+-----+----------+------+-----+--------+--------------+
|          0|Gregory Koval|     0|13004 Easy Cider ...|72132|06/04/2013|     1|    1|       1|             0|
|          1|Robert Gordon|     0|10497 Thunder Hic...|17307|07/27/2013|     1|    1|       1|             0|
|          2|Paula Peltier|     0|10084 Easy Gate Bend|66216|01/13/2013|     1|    0|       4|             1|
|          3|Francine Gray|     0|54845 Bent Pony H...|36690|07/11/2013|     1|    1|       1|             1|
|          4| David Garcia|     0|8551 Tawny Fox Villa|61377|09/09/2012|     1|    0|       1|             1|
+-----------+-------------+------+--------------------+-----+----------+------+-----+--------+--------------+
only showi

In [58]:
// Define a udf to Map from level number to actual level string
val udfIndexTolevel: UserDefinedFunction = udf((mon: Int) => {
  val level_map: Map[Int, String] = Map(0 -> "Free", 1 -> "Silver", 2 -> "Gold")
  level_map.get(mon)
}, StringType)

udfIndexTolevel = UserDefinedFunction(<function1>,StringType,None)


UserDefinedFunction(<function1>,StringType,None)

In [59]:
var result_df: DataFrame = 
(cust_df.join(cust_profile_df, cust_df("customer_id") === cust_profile_df("customer_id"), "inner")
  .select(udfIndexTolevel(col("level")).alias("level"), 
          col("sum(night)"), col("sum(morn)"), col("sum(aft)"), col("sum(eve)"))
  .groupBy("level")
  .agg(avg("sum(aft)").alias("Afternoon"),
       avg("sum(eve)").alias("Evening"),
       avg("sum(morn)").alias("Morning"),
       avg("sum(night)").alias("Night")
   )
)

result_df = [level: string, Afternoon: double ... 3 more fields]


[level: string, Afternoon: double ... 3 more fields]

In [60]:
result_df.show()

+------+------------------+------------------+-----------------+------------------+
| level|         Afternoon|           Evening|          Morning|             Night|
+------+------------------+------------------+-----------------+------------------+
|Silver| 42.12979890310786|42.409506398537474|59.01401584399756| 59.16209628275442|
|  Gold|39.868173258003765| 40.22975517890772|56.35969868173258|55.685499058380415|
|  Free|  41.6944837340877|41.675035360678926|58.23373408769449|  58.2963224893918|
+------+------------------+------------------+-----------------+------------------+



### 4.7 Distribution of Customers By Level:

In [61]:
result_df = 
(cust_df.select(col("level"), when(col("gender") === 0, "Male").otherwise("Female").alias("gender"))
    .groupBy(col("level"))
    .pivot("gender")
    .count()
    .orderBy(desc("level")))

result_df = [level: int, Female: bigint ... 1 more field]


[level: int, Female: bigint ... 1 more field]

In [62]:
result_df.show(5)

+-----+------+----+
|level|Female|Male|
+-----+------+----+
|    2|   201| 330|
|    1|   670| 971|
|    0|  1145|1683|
+-----+------+----+



### 4.8 Top 10 Zip Codes: Which regions consume most from this service:

In [63]:
result_df = cust_df.groupBy("zip").count().orderBy(desc("count")).limit(10)

result_df = [zip: int, count: bigint]


[zip: int, count: bigint]

In [64]:
result_df.show()

+-----+-----+
|  zip|count|
+-----+-----+
| 5341|    4|
|80821|    4|
|71458|    3|
|31409|    3|
|70446|    3|
|20098|    3|
|80459|    3|
|57445|    3|
|78754|    3|
|47577|    3|
+-----+-----+



### 4.9 Distribution of Customers By SignUp Campaign:

In [65]:
// Define a udf to Map from campaign number to actual campaign description
val udfIndexToCampaign: UserDefinedFunction = udf((camptype: Int) => {
  val campaign_map: Map[Int, String] = Map(0 -> "None", 1 -> "30DaysFree", 2 -> "SuperBowl",  3 -> "RetailStore", 4 -> "WebOffer")
  campaign_map.get(camptype)
}, StringType)

udfIndexToCampaign = UserDefinedFunction(<function1>,StringType,None)


UserDefinedFunction(<function1>,StringType,None)

In [66]:
result_df = (cust_df
             .select(udfIndexToCampaign(col("campaign")).alias("campaign"))
             .groupBy("campaign")
             .count()
             .orderBy("count"))

result_df = [campaign: string, count: bigint]


[campaign: string, count: bigint]

In [67]:
result_df.show()

+-----------+-----+
|   campaign|count|
+-----------+-----+
|  SuperBowl|  240|
|RetailStore|  489|
|       None|  608|
|   WebOffer|  750|
| 30DaysFree| 2913|
+-----------+-----+



### 4.10 Average Unique Track Count By Customer Level:

In [68]:
result_df = (music_df.select("customer_id", "track_id")
             .groupBy("customer_id")
             .agg(countDistinct("track_id").alias("unique_track_count"))
             .join(cust_df, music_df("customer_id") === cust_df("customer_id"), "inner")
             .select(udfIndexTolevel(col("level")).alias("level"), col("unique_track_count"))
             .groupBy("level")
             .agg(avg("unique_track_count").alias("avg_unique_track_count")))

result_df = [level: string, avg_unique_track_count: double]


[level: string, avg_unique_track_count: double]

In [69]:
result_df.show()

+------+----------------------+
| level|avg_unique_track_count|
+------+----------------------+
|Silver|     170.2772699573431|
|  Gold|    166.85310734463278|
|  Free|     170.9515558698727|
+------+----------------------+



### 4.11 Mobile Tracks Count By Customer Level:

In [70]:
result_df = (music_df.select("customer_id", "track_id")
             .filter(col("is_mobile") === 1)
             .groupBy("customer_id")
             .count()
             .withColumnRenamed("count", "mobile_track_count")
             .join(cust_df, music_df("customer_id") === cust_df("customer_id"), "inner")
             .select(udfIndexTolevel(col("level")).alias("level"), col("mobile_track_count"))
             .groupBy("level")
             .agg(avg("mobile_track_count").alias("avg_mobile_track_count"))
             .orderBy("avg_mobile_track_count"))

result_df = [level: string, avg_mobile_track_count: double]


[level: string, avg_mobile_track_count: double]

In [71]:
result_df.show()

+------+----------------------+
| level|avg_mobile_track_count|
+------+----------------------+
|  Free|    100.01308345120226|
|Silver|     146.1614868982328|
|  Gold|    160.22033898305085|
+------+----------------------+



## 5. Destroying the Spark Session & Cleaning Up

In [72]:
music_df.unpersist()
cust_df.unpersist()
click_df.unpersist()

[event_id: int, customer_id: int ... 2 more fields]

In [73]:
spark.stop()