# Streaming Music Service and it's User's Behavior Analysis


## Phase 1
### 1. Consuming user activity data from Kafka and user metadata from S3
### 2. Tranformation and enrichment of user activity data
### 3. Partitioning and saving of transformed data in S3 for use by other downstream systems


In [2]:
import org.apache.spark.sql.streaming.{StreamingQueryListener, Trigger}
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Column, DataFrame, SparkSession, streaming}
import scala.concurrent.duration.DurationInt
import org.apache.spark.sql.functions._
import spark.implicits._



import org.apache.spark.sql.streaming.{StreamingQueryListener, Trigger}
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Column, DataFrame, SparkSession, streaming}
import scala.concurrent.duration.DurationInt
import org.apache.spark.sql.functions._
import spark.implicits._


In [3]:

spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "AKIA2OWOK4NJIL5XP7H7")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "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")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
spark.sql("set spark.sql.legacy.allowUntypedScalaUDF = true")
  


res1: org.apache.spark.sql.DataFrame = [key: string, value: string]



### Schema definition of music listening activity event stream
#### 1. User music listening activity has 5 columns
##### a. eventID : ID of captured event. It is unique across complete dataset
##### b. customerID : ID of user.
##### c. songID : ID of song whcih user is listening.
##### d. time : time of user activity.
##### e. isMobile : platform used. i.e Mobile or Desktop


In [3]:
 val activityEventsSchema =  new StructType()
        .add("eventID", IntegerType, nullable=true)
        .add("customerID", IntegerType, nullable=true)
        .add("songID", StringType, nullable=true)
        .add("time", StringType, nullable=true)
        .add("isMobile", IntegerType, nullable=true)

activityEventsSchema.printTreeString()

root
 |-- eventID: integer (nullable = true)
 |-- customerID: integer (nullable = true)
 |-- songID: string (nullable = true)
 |-- time: string (nullable = true)
 |-- isMobile: integer (nullable = true)



activityEventsSchema: org.apache.spark.sql.types.StructType = StructType(StructField(eventID,IntegerType,true), StructField(customerID,IntegerType,true), StructField(songID,StringType,true), StructField(time,StringType,true), StructField(isMobile,IntegerType,true))



### Reading data from realtime pipeline
#### 1. Reading user activity stream from kafka
#### 2. Parsing Json
#### 3. Creation of spark dataframe


In [4]:
   
    val eventsDF = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "useractivityevents")
      .option("startingOffsets", "latest")
      .load()
      .select($"value" cast "string" as "json")
      .select(from_json($"json", activityEventsSchema) as "data")
      .select(col("data.*"))   

eventsDF.printSchema()


root
 |-- eventID: integer (nullable = true)
 |-- customerID: integer (nullable = true)
 |-- songID: string (nullable = true)
 |-- time: string (nullable = true)
 |-- isMobile: integer (nullable = true)



eventsDF: org.apache.spark.sql.DataFrame = [eventID: int, customerID: int ... 3 more fields]



### Reading user metadata from S3
#### 1. User metadata is used to enrich streaming data. Is is present in s3 in json format 
#### 2. It has 4 columns
##### a. customerID : ID of customer. This field will be used for joining with streaming data
##### b. Gender : Gender of User. i.e Male or Female
##### c. subscriptionStatus : Is Subsription Active or Expired ?
##### d. subscriptionType : Level of Subscription i.e Free, Gold or Premium


In [5]:
val customerInfo = spark.read.json("s3a://ds-mum-sparkdemoproject/usermetadata/user.json")

customerInfo.printSchema()

root
 |-- customerID: long (nullable = true)
 |-- gender: long (nullable = true)
 |-- subscriptionStatus: long (nullable = true)
 |-- subscriptionType: long (nullable = true)



customerInfo: org.apache.spark.sql.DataFrame = [customerID: bigint, gender: bigint ... 2 more fields]



### User metadata sample


In [6]:
customerInfo.show(5)

+----------+------+------------------+----------------+
|customerID|gender|subscriptionStatus|subscriptionType|
+----------+------+------------------+----------------+
|         0|     0|                 1|               1|
|         1|     0|                 1|               1|
|         2|     0|                 1|               0|
|         3|     0|                 1|               1|
|         4|     0|                 1|               0|
+----------+------+------------------+----------------+
only showing top 5 rows




### Transformation of user activity data
#### 1. Addition of phase of day i.e Morning,Afternoon,Evening,Night based on user activity at particular hour of day
#### 2. Addition of year,month,day,hour for partitioning purpose
#### 3. Updating deviceType and level of subscription id's with meaningful literals


In [7]:
 
val transformedDF = eventsDF
      .select(col("eventID"), col("customerID"), col("songID"), col("time"), col("isMobile"),
        hour(col("time")).alias("hour"),
        when((hour(col("time")) < 4) || (hour(col("time")) >= 21), lit("Night"))
          .when((hour(col("time")) >= 4) && (hour(col("time")) < 11), lit("Morning"))
          .when((hour(col("time")) >= 11) && (hour(col("time")) < 16), lit("Afternoon"))
          .when((hour(col("time")) >= 16) && (hour(col("time")) < 21), lit("Evening"))
          .alias("phaseOfDay"),
        when(col("isMobile") === 1, lit("mobile")).otherwise(lit("desktop")).alias("deviceType")
      ).withColumn("year",year(col("time")))
      .withColumn("month",month(col("time")))
      .withColumn("day",dayofmonth(col("time")))
      .withColumn("hour",hour(col("time")))
    val updatesubScriptionValue = udf((level: Long) => {
      val map:Map[Long, String] = Map(0L -> "Free", 1L -> "Gold", 2L -> "Premium")
      map.get(level).get
    },StringType)

transformedDF.printSchema()

root
 |-- eventID: integer (nullable = true)
 |-- customerID: integer (nullable = true)
 |-- songID: string (nullable = true)
 |-- time: string (nullable = true)
 |-- isMobile: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- phaseOfDay: string (nullable = true)
 |-- deviceType: string (nullable = false)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)



transformedDF: org.apache.spark.sql.DataFrame = [eventID: int, customerID: int ... 9 more fields]
updatesubScriptionValue: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3930/0x000000084172d840@1bcfab3e,StringType,List(),None,None,true,true)



### Joining user activity stream with customer metadata DF


In [8]:
    val joinedDF = transformedDF.join(customerInfo, Seq("customerID"), "inner")
      .withColumn("gender", when((col("gender") === 1), "Female").otherwise("Male"))
      .withColumn("subscriptionStatus", when((col("subscriptionStatus") === 1), "Active").otherwise("Expired"))
      .withColumn("subscriptionType", updatesubScriptionValue(col("subscriptionType")))
      .drop("time")

joinedDF.printSchema()

root
 |-- customerID: integer (nullable = true)
 |-- eventID: integer (nullable = true)
 |-- songID: string (nullable = true)
 |-- isMobile: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- phaseOfDay: string (nullable = true)
 |-- deviceType: string (nullable = false)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- gender: string (nullable = false)
 |-- subscriptionStatus: string (nullable = false)
 |-- subscriptionType: string (nullable = true)



joinedDF: org.apache.spark.sql.DataFrame = [customerID: int, eventID: int ... 11 more fields]



### Writing tranformed data to S3 for use by other downstream systems
#### 1. Data is partitioned by deviceType (Mobile or Desktop), year,month,day,hour
#### 2. Data is saved in json format
#### 3. A Query Monitoring interface is also added to get progress updates of streaming batches


In [9]:
 class MonitorListener extends StreamingQueryListener {

    override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { }

    override def onQueryProgress(event: QueryProgressEvent): Unit = {
      println(s"""numInputRows: ${event.progress.numInputRows}""")
      println(s"""processedRowsPerSecond: ${event.progress.processedRowsPerSecond}""")
    }

    override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { }
  }

spark.streams.addListener(new MonitorListener)

val partitionColumns = Seq("deviceType","year","month","day","hour")
    joinedDF.coalesce(1)
      .writeStream
      .partitionBy(partitionColumns:_*)
      .format("json")
      .option("path", "s3a://ds-mum-sparkdemoproject/UserBehaviorAnalysis/")
      .option("checkpointLocation", "/tmp/checkpoint-c1")
      .start().awaitTermination(2 * 60 * 1000) 

numInputRows: 0
processedRowsPerSecond: 0.0
numInputRows: 100
processedRowsPerSecond: 4.855075981939117
numInputRows: 100
processedRowsPerSecond: 5.430650591940914
numInputRows: 100
processedRowsPerSecond: 5.372589050663515
numInputRows: 100
processedRowsPerSecond: 5.267871253226571
numInputRows: 100
processedRowsPerSecond: 5.339598462195642


defined class MonitorListener
partitionColumns: Seq[String] = List(deviceType, year, month, day, hour)
res7: Boolean = false


numInputRows: 0
processedRowsPerSecond: 0.0
numInputRows: 0
processedRowsPerSecond: 0.0
numInputRows: 0
processedRowsPerSecond: 0.0



## Phase 2
### 1. Consuming transformed and enriched user activity data from S3
### 2. Running SQL queries to get various insights on user behavior



### Reading data from S3


In [1]:
import org.apache.spark.sql.streaming.{StreamingQueryListener, Trigger}
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Column, DataFrame, SparkSession, streaming}
import scala.concurrent.duration.DurationInt
import org.apache.spark.sql.functions._
import spark.implicits._


spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "AKIA2OWOK4NJIL5XP7H7")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "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")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")

Intitializing Scala interpreter ...

Spark Web UI available at http://4dee036fca3c:4040
SparkContext available as 'sc' (version = 3.2.1, master = local[*], app id = local-1651993671185)
SparkSession available as 'spark'


import org.apache.spark.sql.streaming.{StreamingQueryListener, Trigger}
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Column, DataFrame, SparkSession, streaming}
import scala.concurrent.duration.DurationInt
import org.apache.spark.sql.functions._
import spark.implicits._


In [2]:

val dataDF = spark.read.json("s3a://ds-mum-sparkdemoproject/UserBehaviorAnalysis")

dataDF.show(10)

+----------+-------+------+--------+----------+------+------------------+----------------+----------+----+-----+---+----+
|customerID|eventID|gender|isMobile|phaseOfDay|songID|subscriptionStatus|subscriptionType|deviceType|year|month|day|hour|
+----------+-------+------+--------+----------+------+------------------+----------------+----------+----+-----+---+----+
|      2265|    120|Female|       0| Afternoon|   703|            Active|            Free|   desktop|2014|   12|  7|  12|
|       412|    171|Female|       0| Afternoon|   101|            Active|            Free|   desktop|2014|   12|  7|  12|
|      3704|    140|Female|       1|   Morning|   156|            Active|            Free|    mobile|2014|   10| 25|   9|
|        62|    153|Female|       1|   Morning|  1348|           Expired|            Free|    mobile|2014|   10| 25|   9|
|      3572|    232|  Male|       0|   Morning|     4|            Active|            Gold|   desktop|2014|   10|  7|   4|
|      4867|    296|  Ma

dataDF: org.apache.spark.sql.DataFrame = [customerID: bigint, eventID: bigint ... 11 more fields]


In [3]:
dataDF.count()

res2: Long = 500



### Registering as table to use in various SQL queries and printing sample data


In [4]:
dataDF.createOrReplaceTempView("UserTable")

spark.sql("""SELECT * FROM UserTable""").show(20)


+----------+-------+------+--------+----------+------+------------------+----------------+----------+----+-----+---+----+
|customerID|eventID|gender|isMobile|phaseOfDay|songID|subscriptionStatus|subscriptionType|deviceType|year|month|day|hour|
+----------+-------+------+--------+----------+------+------------------+----------------+----------+----+-----+---+----+
|      2265|    120|Female|       0| Afternoon|   703|            Active|            Free|   desktop|2014|   12|  7|  12|
|       412|    171|Female|       0| Afternoon|   101|            Active|            Free|   desktop|2014|   12|  7|  12|
|      3704|    140|Female|       1|   Morning|   156|            Active|            Free|    mobile|2014|   10| 25|   9|
|        62|    153|Female|       1|   Morning|  1348|           Expired|            Free|    mobile|2014|   10| 25|   9|
|      3572|    232|  Male|       0|   Morning|     4|            Active|            Gold|   desktop|2014|   10|  7|   4|
|      4867|    296|  Ma


### Percentage of Mobile and Desktop users


In [5]:
spark.sql(""" select deviceType,count(distinct(customerID))*100/(select count(distinct(customerID)) from usertable) as percentage from usertable group by deviceType """).show()

+----------+------------------+
|deviceType|        percentage|
+----------+------------------+
|   desktop| 43.42984409799555|
|    mobile|60.356347438752785|
+----------+------------------+



### Distribution of users listening to music during different phase of day

In [6]:
spark.sql(""" select phaseOfDay,count(*) * 100/(select count(*) from usertable) as percentage from usertable group by phaseofday """).show()

+----------+----------+
|phaseOfDay|percentage|
+----------+----------+
|   Evening|      21.6|
|   Morning|      27.6|
| Afternoon|      22.8|
|     Night|      28.0|
+----------+----------+



### Distribution of Subscription Type

In [7]:
spark.sql(""" select subscriptionType,count(distinct(customerID))*100/(select count(distinct(customerID)) from usertable) as percentage from usertable group by subscriptionType order by percentage desc """).show()

+----------------+------------------+
|subscriptionType|        percentage|
+----------------+------------------+
|            Free|58.797327394209354|
|            Gold| 28.50779510022272|
|         Premium|12.694877505567929|
+----------------+------------------+

