### SETUP

In [1]:
%pip install fastavro pandas
%pip install faker
%pip install timezonefinder
%pip install fastavro kafka-python
%pip install -q pyspark

Collecting fastavro
  Downloading fastavro-1.9.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: fastavro
Successfully installed fastavro-1.9.4
Collecting faker
  Downloading Faker-24.9.0-py3-none-any.whl (1.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m8.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: faker
Successfully installed faker-24.9.0
Collecting timezonefinder
  Downloading timezonefinder-6.5.0.tar.gz (49.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.4/49.4 MB[0m [31m13.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting h3<4,>=3.7.6 (from timezonefinder)
  Do

In [2]:
%%writefile environment.sh
#!/usr/bin/bash
export KAFKA_BINARY_VERSION='3.7.0'
export SCALA_BINARY_VERSION='2.13'
export KAFKA_BINARY_VERSION=$KAFKA_BINARY_VERSION
export SCALA_BINARY_VERSION=$SCALA_BINARY_VERSION
export PATH=$PATH:$PWD/kafka_$SCALA_BINARY_VERSION-$KAFKA_BINARY_VERSION/bin

Writing environment.sh


## KAFKA SETUP

In [3]:
%%writefile kafka_setup.sh

source ./environment.sh
echo kafka_$SCALA_BINARY_VERSION-$KAFKA_BINARY_VERSION
echo $PATH

# Java Setup
wget -O- https://apt.corretto.aws/corretto.key | sudo apt-key add -
sudo add-apt-repository 'deb https://apt.corretto.aws stable main' -y
sudo apt-get -y update; sudo apt-get install -y java-11-amazon-corretto-jdk

# Kafka Setup
wget https://downloads.apache.org/kafka/${KAFKA_BINARY_VERSION}/kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}.tgz
tar xzf kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}.tgz

UUID=$(./kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}/bin/kafka-storage.sh random-uuid)
echo "export UUID=$UUID" >> ./environment.sh
cat environment.sh

# Start Kafka Broker

echo kafka_$SCALA_BINARY_VERSION-$KAFKA_BINARY_VERSION

# offsets.retention.minutes determines how long Kafka retains the commit offsets for consumer groups.
echo "offsets.retention.minutes=300" >> ./kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}/config/kraft/server.properties

./kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}/bin/kafka-storage.sh format -t ${UUID} -c ./kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}/config/kraft/server.properties
nohup ./kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}/bin/kafka-server-start.sh ./kafka_${SCALA_BINARY_VERSION}-${KAFKA_BINARY_VERSION}/config/kraft/server.properties > kafka_server.log &

Writing kafka_setup.sh


In [4]:
%%shell
source kafka_setup.sh
sleep 10
tail -20 kafka_server.log

kafka_2.13-3.7.0
/opt/bin:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/tools/node/bin:/tools/google-cloud-sdk/bin:/content/kafka_2.13-3.7.0/bin
--2024-04-15 19:29:35--  https://apt.corretto.aws/corretto.key
Resolving apt.corretto.aws (apt.corretto.aws)... 18.66.255.23, 18.66.255.118, 18.66.255.63, ...
Connecting to apt.corretto.aws (apt.corretto.aws)|18.66.255.23|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1695 (1.7K) [binary/octet-stream]
Saving to: ‘STDOUT’


2024-04-15 19:29:35 (4.38 MB/s) - written to stdout [1695/1695]

OK
Repository: 'deb https://apt.corretto.aws stable main'
Description:
Archive for codename: stable components: main
More info: https://apt.corretto.aws
Adding repository.
Adding deb entry to /etc/apt/sources.list.d/archive_uri-https_apt_corretto_aws-jammy.list
Adding disabled deb-src entry to /etc/apt/sources.list.d/archive_uri-https_apt_corretto_aws-jammy.list
Hit:1 http://ar



In [5]:
%%shell

source ./environment.sh

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic spotifyWrapped --create --partitions 3 --replication-factor 1

Created topic spotifyWrapped.




In [6]:
%%writefile check_kafka_consumers.sh
#!/usr/bin/env bash
source ./environment.sh

echo "Active Consumer Groups"
while true
do
date
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups
sleep 1
done

Writing check_kafka_consumers.sh


In [7]:
%%shell
chmod +x ./check_kafka_consumers.sh
nohup ./check_kafka_consumers.sh > kafka_consumers.log &

nohup: redirecting stderr to stdout




## AVRO PRODUCER FILE GENERATION

This file contains the data classes that describes the 8 different personalities with preferences, also the distributions extracted from the datasets, and the data generation.

In [8]:
%%writefile avro_producer.py
#!/usr/bin/python

import fastavro
from fastavro.schema import load_schema
from fastavro import parse_schema, writer
import pandas as pd
import uuid
from google.colab import files
from datetime import datetime, timedelta, timezone
import random
from faker import Faker
from timezonefinder import TimezoneFinder
import pytz
import numpy as np
from dataclasses import dataclass
from typing import Tuple
from kafka import KafkaProducer
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.functions import col
import sys
from kafka import KafkaConsumer
import io
import os
import time

fake = Faker()

from dataclasses import dataclass
from typing import Tuple

@dataclass
class ListeningPersonality:
    name: str
    genre_distribution: dict[str, float]
    active_hours: Tuple[int, int]
    interactions: dict
    inter_prob: float

personalities = [
    ListeningPersonality(
        name="RapRockJazzLover",
        genre_distribution={"hip-hop": 0.6, "rock": 0.3, "jazz": 0.1},
        active_hours=(17, 23),  # Active from 5 PM to 11 PM
        interactions={"skip_probability": 0.1, "like_probability": 0.3, "playlist_add_probability": 0.2},
        inter_prob=0.3
    ),
    ListeningPersonality(
        name="IndieSoulFan",
        genre_distribution={"indie": 0.5, "soul": 0.5},
        active_hours=(9, 17),  # Active from 9 AM to 5 PM
        interactions={"skip_probability": 0.15, "like_probability": 0.25, "playlist_add_probability": 0.1},
        inter_prob=0.7
    ),
    ListeningPersonality(
        name="ElectronicExplorer",
        genre_distribution={"electronic": 0.7, "house": 0.2, "techno": 0.1},
        active_hours=(22, 4),  # Active late night to early morning
        interactions={"skip_probability": 0.05, "like_probability": 0.2, "playlist_add_probability": 0.25},
        inter_prob=0.5
    ),
    ListeningPersonality(
        name="PopPunkPerson",
        genre_distribution={"pop": 0.4, "punk": 0.6},
        active_hours=(15, 22),
        interactions={"skip_probability": 0.2, "like_probability": 0.5, "playlist_add_probability": 0.3},
        inter_prob=0.8
    ),
    ListeningPersonality(
        name="ClassicalConnoisseur",
        genre_distribution={"classical": 1.0},
        active_hours=(8, 20),
        interactions={"skip_probability": 0.05, "like_probability": 0.4, "playlist_add_probability": 0.15},
        inter_prob=0.9
    ),
    ListeningPersonality(
        name="JazzJunkie",
        genre_distribution={"jazz": 0.9, "blues": 0.1},
        active_hours=(18, 24),
        interactions={"skip_probability": 0.1, "like_probability": 0.35, "playlist_add_probability": 0.25},
        inter_prob=0.3
    ),
    ListeningPersonality(
        name="CountryCruiser",
        genre_distribution={"country": 0.8, "folk": 0.2},
        active_hours=(10, 18),
        interactions={"skip_probability": 0.12, "like_probability": 0.3, "playlist_add_probability": 0.18},
        inter_prob=0.4

    ),
    ListeningPersonality(
        name="ReggaeRelaxer",
        genre_distribution={"reggae": 0.7, "dancehall": 0.3},
        active_hours=(16, 23),
        interactions={"skip_probability": 0.08, "like_probability": 0.4, "playlist_add_probability": 0.22},
        inter_prob=0.6
    )
]

user_data = pd.read_excel('user_data.xlsx')
music_data_loaded = pd.read_json('music_data.json', orient='records', lines=True)
music_data = music_data_loaded.to_dict('records')

gender_counts = user_data['Gender'].value_counts(normalize=True)
gender_options = gender_counts.index.tolist()
gender_probabilities = gender_counts.values.tolist()

age_counts = user_data['Age'].value_counts(normalize=True)
age_options = age_counts.index.tolist()
age_probabilities = age_counts.values.tolist()

subscription_plan_counts = user_data['spotify_subscription_plan'].value_counts(normalize=True)
subscription_plan_options = subscription_plan_counts.index.tolist()
subscription_plan_probabilities = subscription_plan_counts.values.tolist()

listening_device_counts = user_data['spotify_listening_device'].value_counts(normalize=True)
listening_device_options = listening_device_counts.index.tolist()
listening_device_probabilities = listening_device_counts.values.tolist()

schema = {
  "type": "record",
  "name": "SpotifyWrappedData",
  "namespace": "com.spotify.wrapped",
  "fields": [
    {"name": "UserId", "type": "string"},
    {"name": "Age", "type": "string"},
    {"name": "Gender", "type": "string"},
    {"name": "ListeningDevice", "type": "string"},
    {"name": "SubscriptionPlan", "type": "string"},
    {"name": "MusicTimeSlot", "type": "string"},
    {"name": "Location", "type": "string"},
    {"name": "SongId", "type": "string"},
    {"name": "TrackName", "type": "string"},
    {"name": "Artist", "type": "string"},
    {"name": "Genre", "type": "string"},
    {"name": "SongStart", "type": "string"},
    {"name": "SongEnd", "type": "string"},
    {"name": "Length", "type": "int"},
    {"name": "InteractionType", "type": { "type": "enum", "name": "InteractionTypeEnum",
    "symbols": ["PLAY", "PAUSE", "SKIP", "LIKE", "ADDED_TO_PLAYLIST"]}},
    {"name": "InteractionTimestamp", "type": "string"}
  ]
}

parsed_schema = fastavro.parse_schema(schema)


def generate_listening_session(user_id, user_device, user_country, track_duration, interaction_type, session_start, track_end):
    record = {
          "UserId": user_id,
          "Age": str(np.random.choice(age_options, p=age_probabilities)),
          "Gender": np.random.choice(gender_options, p=gender_probabilities),
          "ListeningDevice": user_device,
          "SubscriptionPlan": np.random.choice(subscription_plan_options, p=subscription_plan_probabilities),
          "MusicTimeSlot": "Morning" if 5 <= session_start.hour < 12 else "Afternoon" if 12 <= session_start.hour < 18 else "Evening",
          "Location": user_country,
          "SongId": song['track_id'],
          "TrackName": song['track_name'],
          "Artist": song['artists'],
          "Genre": song['track_genre'],
          "Length": track_duration,
          "SongStart": session_start.strftime('%Y-%m-%dT%H:%M:%S%z'),
          "SongEnd": track_end.strftime('%Y-%m-%dT%H:%M:%S%z'),
          "InteractionType": interaction_type,
          "InteractionTimestamp": session_start.strftime('%Y-%m-%dT%H:%M:%S%z')
      }

    return record

def serialize(message):
    print("Serialize:" + str(message))
    schemaless_bytes_writer = io.BytesIO()
    fastavro.schemaless_writer(schemaless_bytes_writer, schema, message)
    return schemaless_bytes_writer.getvalue()

producer = KafkaProducer(
    bootstrap_servers=['127.0.0.1:9092'],
    value_serializer=serialize
)

topic_name_default="spotifyWrapped"
if len(sys.argv) > 1:
    topic_name = sys.argv[1]
else:
    topic_name = topic_name_default

start_time = datetime.strptime('2024-04-01T00:00:00-05:00', '%Y-%m-%dT%H:%M:%S%z')
end_time = datetime.strptime('2024-04-01T23:59:59-05:00', '%Y-%m-%dT%H:%M:%S%z')

for i in range(10000):
    user_country = fake.country()
    user_device = np.random.choice(listening_device_options, p=listening_device_probabilities)

    session_start = datetime.now()
    user_id = str(uuid.uuid4())
    personality = random.choice(personalities)

    genre = random.choice(list(personality.genre_distribution.keys()))

    for j in range(round(50*personality.inter_prob)):

        filtered_songs = [song for song in music_data if song['track_genre'] == genre]
        if filtered_songs:
            song = random.choice(filtered_songs)
        else:
            print(f"No songs found for genre {genre}, selecting a random song instead.")
            song = random.choice(music_data)

        # Decide interaction type based on personality
        interaction_probabilities = [personality.interactions['skip_probability'],
                                        personality.interactions['like_probability'],
                                        personality.interactions.get('playlist_add_probability', 0)]
        interaction_types = ['SKIP', 'LIKE', 'ADDED_TO_PLAYLIST']
        interaction_type = random.choices(interaction_types + ['PLAY'], weights=interaction_probabilities + [1-sum(interaction_probabilities)], k=1)[0]

        track_duration = song['duration_ms']
        if interaction_type == 'SKIP':
            track_duration = track_duration*random.random()

        track_end = (start_time + timedelta(seconds=track_duration))

        message = generate_listening_session(user_id, user_device, user_country, track_duration, interaction_type, session_start, track_end)
        start_time = end_time

        print(message)
        producer.send(topic_name, value=message)
        time.sleep(1)

# Flush the producer
producer.flush()

Writing avro_producer.py


In [9]:
!nohup python avro_producer.py > avro_producer.log &

nohup: redirecting stderr to stdout


In [10]:
!sleep 5
!tail -20 avro_producer.log

## AVRO CONSUMER FILE GENERATION

In [11]:
%%writefile avro_consumer.py

from kafka import KafkaConsumer
import fastavro
from fastavro import parse_schema
import io
import time
import sys

# AVRO schema definition to the consumer
schema = {
  "type": "record",
  "name": "SpotifyWrappedData",
  "namespace": "com.spotify.wrapped",
  "fields": [
    {"name": "UserId", "type": "string"},
    {"name": "Age", "type": "string"},
    {"name": "Gender", "type": "string"},
    {"name": "ListeningDevice", "type": "string"},
    {"name": "SubscriptionPlan", "type": "string"},
    {"name": "MusicTimeSlot", "type": "string"},
    {"name": "Location", "type": "string"},
    {"name": "SongId", "type": "string"},
    {"name": "TrackName", "type": "string"},
    {"name": "Artist", "type": "string"},
    {"name": "Genre", "type": "string"},
    {"name": "SongStart", "type": "string"},
    {"name": "SongEnd", "type": "string"},
    {"name": "Length", "type": "int"},
    {"name": "InteractionType","type": { "type": "enum", "name": "InteractionTypeEnum",
        "symbols": ["PLAY", "PAUSE", "SKIP", "LIKE", "ADDED_TO_PLAYLIST"]}},
    {"name": "InteractionTimestamp", "type": "string"}
  ]
}

parsed_schema = parse_schema(schema)

def deserialize(message):
  # print("Deserialize:" + str(message))
  schemaless_bytes_reader = io.BytesIO(message)
  try:
    record=fastavro.schemaless_reader(schemaless_bytes_reader, schema)
    return record
  except Exception as ex:
    print(ex)
    return "pass"

topic_name_default="spotifyWrapped"
if len(sys.argv) > 1:
  topic_name = sys.argv[1]
else:
  topic_name = topic_name_default

consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='AVRO_Consumer',
    value_deserializer=deserialize #lambda v: fastavro.schemaless_reader(io.BytesIO(v), schema)
)

# Consume messages from the topic and print them
for message in consumer:
    print("="*10)
    print(message.value)

Writing avro_consumer.py


In [12]:
!nohup python avro_consumer.py > avro_consumer.log &

nohup: redirecting stderr to stdout


In [13]:
!sleep 5
!tail -20 avro_consumer.log

In [14]:
!ps -ef |grep avro

root        4866       1 34 19:30 ?        00:00:03 python3 avro_producer.py
root        5206       1  4 19:30 ?        00:00:00 python3 avro_consumer.py
root        5235     213  0 19:30 ?        00:00:00 /bin/bash -c ps -ef |grep avro
root        5237    5235  0 19:30 ?        00:00:00 grep avro


In [15]:
%%shell
source ./environment.sh
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

AVRO_Consumer




In [16]:
%%shell

source ./environment.sh
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group AVRO_Consumer


GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
AVRO_Consumer   spotifyWrapped  0          6               7               1               kafka-python-2.0.2-afe0164a-7b3c-4d1a-94ae-b10a2d50271c /127.0.0.1      kafka-python-2.0.2
AVRO_Consumer   spotifyWrapped  1          0               0               0               kafka-python-2.0.2-afe0164a-7b3c-4d1a-94ae-b10a2d50271c /127.0.0.1      kafka-python-2.0.2
AVRO_Consumer   spotifyWrapped  2          2               4               2               kafka-python-2.0.2-afe0164a-7b3c-4d1a-94ae-b10a2d50271c /127.0.0.1      kafka-python-2.0.2




In [17]:
%%shell

source ./environment.sh
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups


GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
AVRO_Consumer   spotifyWrapped  0          7               7               0               kafka-python-2.0.2-afe0164a-7b3c-4d1a-94ae-b10a2d50271c /127.0.0.1      kafka-python-2.0.2
AVRO_Consumer   spotifyWrapped  1          0               2               2               kafka-python-2.0.2-afe0164a-7b3c-4d1a-94ae-b10a2d50271c /127.0.0.1      kafka-python-2.0.2
AVRO_Consumer   spotifyWrapped  2          5               8               3               kafka-python-2.0.2-afe0164a-7b3c-4d1a-94ae-b10a2d50271c /127.0.0.1      kafka-python-2.0.2




## SPARK CONNECTIONS

In [18]:
spark_release='spark-3.5.1'
hadoop_version='hadoop3'
import time
import os

start=time.time()
os.environ['SPARK_RELEASE']=spark_release
os.environ['HADOOP_VERSION']=hadoop_version
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_release}-bin-{hadoop_version}"

In [19]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # install Java8
!wget -q http://apache.osuosl.org/spark/${SPARK_RELEASE}/${SPARK_RELEASE}-bin-${HADOOP_VERSION}.tgz # download spark-3.3.X
!tar xf ${SPARK_RELEASE}-bin-${HADOOP_VERSION}.tgz # unzip it
!pip install -q findspark
import findspark

# install findspark
# findspark find your Spark Distribution and sets necessary environment variables

findspark.init()

# Check the pyspark version
import pyspark
print(pyspark.__version__)

3.5.1


In [20]:
kafka_brokers="127.0.0.1:9092" # Can be a comma-separated list of brokers
topic_name="spotifyWrapped"

schema = """
{
  "type": "record",
  "name": "SpotifyWrappedData",
  "namespace": "com.spotify.wrapped",
  "fields": [
    {"name": "UserId", "type": "string"},
    {"name": "Age", "type": "string"},
    {"name": "Gender", "type": "string"},
    {"name": "ListeningDevice", "type": "string"},
    {"name": "SubscriptionPlan", "type": "string"},
    {"name": "MusicTimeSlot", "type": "string"},
    {"name": "Location", "type": "string"},
    {"name": "SongId", "type": "string"},
    {"name": "TrackName", "type": "string"},
    {"name": "Artist", "type": "string"},
    {"name": "Genre", "type": "string"},
    {"name": "SongStart", "type": "string"},
    {"name": "SongEnd", "type": "string"},
    {"name": "Length", "type": "int"},
    {"name": "InteractionType", "type": { "type": "enum", "name": "InteractionTypeEnum",
        "symbols": ["PLAY", "PAUSE", "SKIP", "LIKE", "ADDED_TO_PLAYLIST"]}},
    {"name": "InteractionTimestamp", "type": "string"}
  ]
}
"""

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro

spark = SparkSession \
    .builder \
    .appName("AVRO-Kafka_Streaming") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-avro_2.12:3.5.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()

In [22]:
# Kafka Configuration for reading from Kafka/Event Hub
# Kafka source will create a unique group id for each query automatically. The user can set the prefix of the automatically
# generated group.id’s via the optional source option groupIdPrefix, default value is “spark-kafka-source”.
kafkaConf = {
    "kafka.bootstrap.servers": kafka_brokers,
    # Below settins required if kafka is secured:
    # "kafka.sasl.mechanism": "PLAIN",
    # "kafka.security.protocol": "SASL_SSL",
    # "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://eventhubname.servicebus.windows.net/;SharedAccessKeyName=listenpolicyforspark;SharedAccessKey=ckNkSjcyXKGN8FCIRIS3qtkKvW+AEhB6QPaM=;EntityPath=instructortest";',
    "subscribe": topic_name, # to read from specific partitions use option: "assign": {topic_name:[0,1]})
    "startingOffsets": "latest", # "earliest", "latest"
    "enable.auto.commit": "true ",
    "groupIdPrefix": "Stream_Analytics_",
    "auto.commit.interval.ms": "5000"
}


# Read from Event Hub using Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .options(**kafkaConf)

In [23]:
df = df.load()
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [24]:
from_avro(df.value, schema).alias("SpotifyWrappedData")

Column<'from_avro(value) AS SpotifyWrappedData'>

In [25]:
df = df.select(from_avro(df.value, schema).alias("SpotifyWrappedData"))
df.printSchema()

root
 |-- SpotifyWrappedData: struct (nullable = true)
 |    |-- UserId: string (nullable = false)
 |    |-- Age: string (nullable = false)
 |    |-- Gender: string (nullable = false)
 |    |-- ListeningDevice: string (nullable = false)
 |    |-- SubscriptionPlan: string (nullable = false)
 |    |-- MusicTimeSlot: string (nullable = false)
 |    |-- Location: string (nullable = false)
 |    |-- SongId: string (nullable = false)
 |    |-- TrackName: string (nullable = false)
 |    |-- Artist: string (nullable = false)
 |    |-- Genre: string (nullable = false)
 |    |-- SongStart: string (nullable = false)
 |    |-- SongEnd: string (nullable = false)
 |    |-- Length: integer (nullable = false)
 |    |-- InteractionType: string (nullable = false)
 |    |-- InteractionTimestamp: string (nullable = false)



In [26]:
from pyspark.sql.functions import col

# Assuming 'df' is your DataFrame loaded with data conforming to the flattened Avro schema

df = df.select(
    col("SpotifyWrappedData.UserId"),
    col("SpotifyWrappedData.Age"),
    col("SpotifyWrappedData.Gender"),
    col("SpotifyWrappedData.ListeningDevice"),
    col("SpotifyWrappedData.SubscriptionPlan"),
    col("SpotifyWrappedData.MusicTimeSlot"),
    col("SpotifyWrappedData.Location"),
    col("SpotifyWrappedData.SongId"),
    col("SpotifyWrappedData.TrackName"),
    col("SpotifyWrappedData.Artist"),
    col("SpotifyWrappedData.Genre"),
    col("SpotifyWrappedData.Length"),
    col("SpotifyWrappedData.SongEnd"),
    col("SpotifyWrappedData.SongStart"),
    col("SpotifyWrappedData.InteractionType"),
    col("SpotifyWrappedData.InteractionTimestamp")
)

df.printSchema()

root
 |-- UserId: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- ListeningDevice: string (nullable = true)
 |-- SubscriptionPlan: string (nullable = true)
 |-- MusicTimeSlot: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- SongId: string (nullable = true)
 |-- TrackName: string (nullable = true)
 |-- Artist: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Length: integer (nullable = true)
 |-- SongEnd: string (nullable = true)
 |-- SongStart: string (nullable = true)
 |-- InteractionType: string (nullable = true)
 |-- InteractionTimestamp: string (nullable = true)



# Analyses Implementations

### Query # 1

In [28]:
from pyspark.sql.functions import col, sum as sql_sum, when

artist_popularity_query = df \
    .groupBy(col("Artist")) \
    .agg(
        sql_sum(when(col("InteractionType") == "PLAY", 1).otherwise(0)).alias("Total_Plays"),
        sql_sum(when(col("InteractionType") == "LIKE", 1).otherwise(0)).alias("Total_Likes")
    ) \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("artist_popularity") \
    .start()

time.sleep(30)

print("Artist Popularity and Engagement Metrics:")
spark.sql("SELECT * FROM artist_popularity ORDER BY Total_Plays DESC, Total_Likes DESC").show(truncate=False)

artist_popularity_query.stop()

Artist Popularity and Engagement Metrics:
+------------------------------------------------------------------------------------------+-----------+-----------+
|Artist                                                                                    |Total_Plays|Total_Likes|
+------------------------------------------------------------------------------------------+-----------+-----------+
|Wolfgang Amadeus Mozart;Ingrid Haebler;London Symphony Orchestra;Sir Colin Davis          |1          |0          |
|Salim–Sulaiman;Shreya Ghoshal                                                             |1          |0          |
|T. R. Mahalingam                                                                          |1          |0          |
|Yiruma                                                                                    |0          |1          |
|Wolfgang Amadeus Mozart;Danielle Laval                                                    |0          |1          |
|Wolfgang Amadeus Moza

###  Query 2

In [29]:
from pyspark.sql.functions import col, count

artist_region_popularity_query = df \
    .groupBy("Location", "Artist") \
    .agg(
        count("UserId").alias("Listener_Count")
    ) \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("artist_region_popularity") \
    .start()

time.sleep(30)
print("Artist Popularity Across Regions:")
spark.sql("""
    SELECT Location, Artist, Listener_Count
    FROM artist_region_popularity
    ORDER BY Location, Listener_Count DESC
""").show(truncate=False)

artist_region_popularity_query.stop()

Artist Popularity Across Regions:
+--------+--------------------------------------------------------------------------------+--------------+
|Location|Artist                                                                          |Listener_Count|
+--------+--------------------------------------------------------------------------------+--------------+
|China   |Wolfgang Amadeus Mozart;Danielle Laval                                          |2             |
|China   |Ottorino Respighi;Berliner Philharmoniker;Herbert von Karajan                   |1             |
|China   |Chicago Symphony Orchestra;Claudio Abbado                                       |1             |
|China   |Sirkazhi Govindarajan                                                           |1             |
|China   |Johannes Brahms;Music Lab Collective                                            |1             |
|China   |Begum Akhtar                                                                    |1             |
|Ch

## Query 3

In [30]:
from pyspark.sql.functions import col, when, sum as sql_sum

engagement_query = df \
    .groupBy(col("UserId")) \
    .agg(
        sql_sum(when(col("InteractionType") == "PLAY", 1).otherwise(0)).alias("Plays"),
        sql_sum(when(col("InteractionType") == "SKIP", 1).otherwise(0)).alias("Skips"),
        sql_sum(when(col("InteractionType") == "LIKE", 1).otherwise(0)).alias("Likes")
    ) \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("user_engagement") \
    .start()

time.sleep(30)

print("User Engagement:")
spark.sql("SELECT * FROM user_engagement ORDER BY Plays DESC, Likes DESC").show(truncate=False)

engagement_query.stop()

User Engagement:
+------------------------------------+-----+-----+-----+
|UserId                              |Plays|Skips|Likes|
+------------------------------------+-----+-----+-----+
|3624147e-c71a-40c3-b048-ef8baef774fc|5    |4    |6    |
|642e89a4-3899-4c9c-9c3d-e7806f264a28|1    |1    |2    |
+------------------------------------+-----+-----+-----+



## Other Considered Queries

In [31]:
from pyspark.sql.functions import col, window, to_timestamp, sum

df = df.withColumn("SongStart", to_timestamp(col("SongStart"), "yyyy-MM-dd'T'HH:mm:ssZ"))
df = df.withColumn("SongEnd", to_timestamp(col("SongEnd"), "yyyy-MM-dd'T'HH:mm:ssZ"))

df = df.withColumn("ListeningDuration", (col("SongEnd").cast("long") - col("SongStart").cast("long")))


windowDuration = "5 minutes"
slideDuration = "1 minute"

genre_location_duration_counts = df.groupBy(
    window(col("SongStart"), windowDuration, slideDuration),
    col("Genre"),
    col("Location")
).agg(
    sum("ListeningDuration").alias("TotalListeningDuration")
)

query_name = "genre_popularity_by_location_query"
genre_location_query = genre_location_duration_counts.writeStream \
    .queryName(query_name) \
    .outputMode("complete") \
    .format("memory") \
    .start()

import time
time.sleep(30)

print("Listening Duration by Genre and Location per Window:")
spark.sql(f"SELECT * FROM {query_name} ORDER BY window DESC, TotalListeningDuration DESC").show(truncate=False)

genre_location_query.stop()

Listening Duration by Genre and Location per Window:
+------+-----+--------+----------------------+
|window|Genre|Location|TotalListeningDuration|
+------+-----+--------+----------------------+
+------+-----+--------+----------------------+



In [32]:
from pyspark.sql.functions import col, window, to_timestamp, when, count

df = df.withColumn("SongStart", to_timestamp(col("SongStart"), "yyyy-MM-dd'T'HH:mm:ssZ"))
df = df.withColumn("SongEnd", to_timestamp(col("SongEnd"), "yyyy-MM-dd'T'HH:mm:ssZ"))

df_likes = df.filter(col("InteractionType") == "LIKE")

df_likes = df_likes.withWatermark("SongStart", "1 hour")

windowDuration = "5 minutes"
slideDuration = "1 minute"

song_artist_location_likes_counts = df_likes.groupBy(
    window(col("SongStart"), windowDuration, slideDuration),
    col("TrackName"),
    col("Artist"),
    col("Location")
).count().alias("LikesCount")

query_name = "song_popularity_by_artist_and_location_query"
song_artist_location_query = song_artist_location_likes_counts.writeStream \
    .queryName(query_name) \
    .outputMode("complete") \
    .format("memory") \
    .start()

import time
time.sleep(30)

print("Likes Count by Song, Artist, and Location per Window:")
spark.sql(f"SELECT * FROM {query_name} ORDER BY window DESC, count DESC").show(truncate=False)

song_artist_location_query.stop()

Likes Count by Song, Artist, and Location per Window:
+------+---------+------+--------+-----+
|window|TrackName|Artist|Location|count|
+------+---------+------+--------+-----+
+------+---------+------+--------+-----+



In [33]:

from pyspark.sql.functions import col, window

demographics_query = df.groupBy(col("Age"), col("Gender")).count() \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("demographics") \
    .start()

time.sleep(30)
print("Demographics Analysis:")
spark.sql("SELECT * FROM demographics ORDER BY count DESC").show(truncate=False)
demographics_query.stop()

Demographics Analysis:
+-----+------+-----+
|Age  |Gender|count|
+-----+------+-----+
|20-35|Female|18   |
|12-20|Female|4    |
|20-35|Male  |3    |
|20-35|Others|1    |
|12-20|Male  |1    |
+-----+------+-----+



In [34]:
from pyspark.sql.functions import window, avg

listening_habits_query = df.groupBy(window(col("InteractionTimestamp"), "1 hour"), col("MusicTimeSlot")).count() \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("listening_habits") \
    .start()

time.sleep(30)
print("Listening Habits Analysis:")
spark.sql("SELECT * FROM listening_habits ORDER BY window").show(truncate=False)
listening_habits_query.stop()

Listening Habits Analysis:
+------------------------------------------+-------------+-----+
|window                                    |MusicTimeSlot|count|
+------------------------------------------+-------------+-----+
|{2024-04-15 19:00:00, 2024-04-15 20:00:00}|Evening      |27   |
+------------------------------------------+-------------+-----+



In [35]:

genre_popularity_query = df.groupBy(col("Genre"), col("Age"), col("Gender")).count() \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("genre_popularity") \
    .start()

time.sleep(30)
print("Genre Popularity Analysis:")
spark.sql("SELECT * FROM genre_popularity ORDER BY count DESC").show(truncate=False)
genre_popularity_query.stop()

Genre Popularity Analysis:
+----------+-----+------+-----+
|Genre     |Age  |Gender|count|
+----------+-----+------+-----+
|electronic|20-35|Female|11   |
|electronic|20-35|Male  |3    |
|soul      |20-35|Male  |2    |
|soul      |12-20|Female|2    |
|electronic|35-60|Male  |2    |
|soul      |35-60|Male  |1    |
|electronic|20-35|Others|1    |
|soul      |20-35|Female|1    |
|electronic|12-20|Male  |1    |
|electronic|12-20|Female|1    |
+----------+-----+------+-----+



In [36]:
genre_location_query = df.groupBy(col("Genre"), col("Location")).count() \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("genre_location_popularity") \
    .start()


time.sleep(30)
print("Genre Location Analysis:")
spark.sql("SELECT * FROM genre_location_popularity ORDER BY count DESC").show(truncate=False)
genre_location_query.stop()

Genre Location Analysis:
+----------+---------------+-----+
|Genre     |Location       |count|
+----------+---------------+-----+
|folk      |North Macedonia|20   |
|pop       |Turkmenistan   |3    |
|electronic|Puerto Rico    |2    |
+----------+---------------+-----+



In [37]:
from pyspark.sql.functions import unix_timestamp, col, to_timestamp

df = df.withColumn("SongStartTimestamp", to_timestamp(col("SongStart"), "yyyy-MM-dd'T'HH:mm:ssZ"))
df = df.withColumn("SongEndTimestamp", to_timestamp(col("SongEnd"), "yyyy-MM-dd'T'HH:mm:ssZ"))


df = df.withColumn("ListeningDuration", (col("SongEnd") - col("SongStart")))

from pyspark.sql.functions import window, avg

windowDuration = "5 minutes"
slideDuration = "2 minutes"

listening_duration_query = df.groupBy(
    window(col("SongStart"), windowDuration, slideDuration), "Genre"
).agg(
    avg("ListeningDuration").alias("AverageListeningDuration")
)

query_name = "listening_duration_query"
listening_time_query = listening_duration_query.writeStream \
    .queryName(query_name) \
    .outputMode("complete") \
    .format("memory") \
    .start()

import time
time.sleep(30)

spark.sql(f"SELECT * FROM {query_name} ORDER BY window, AverageListeningDuration ASC").show(truncate=False)

listening_time_query.stop()


+------+-----+------------------------+
|window|Genre|AverageListeningDuration|
+------+-----+------------------------+
+------+-----+------------------------+



In [38]:
from pyspark.sql.functions import col
import time

interaction_counts = df.groupBy("InteractionType").count()

interaction_query = interaction_counts \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("interactions") \
    .start()

time.sleep(30)

spark.sql("SELECT * FROM interactions DESC").show(truncate=False)
#print(interaction_query.lastProgress)

interaction_query.stop()

+-----------------+-----+
|InteractionType  |count|
+-----------------+-----+
|PLAY             |8    |
|LIKE             |11   |
|ADDED_TO_PLAYLIST|8    |
+-----------------+-----+



In [39]:
from pyspark.sql.functions import col, window, to_timestamp, count

df = df.withColumn("SongStart", to_timestamp(col("SongStart"), "yyyy-MM-dd'T'HH:mm:ssZ"))
df_likes = df.filter(col("InteractionType") == "LIKE")

df_likes = df_likes.withWatermark("SongStart", "2 hours")

windowDuration = "1 hour"
slideDuration = "30 minutes"

song_artist_location_likes_counts = df_likes.groupBy(
    window(col("SongStart"), windowDuration, slideDuration),
    col("TrackName"),
    col("Artist"),
    col("Location")
).count().alias("LikesCount")

query_name = "extended_song_popularity_by_artist_and_location_query"
song_artist_location_query = song_artist_location_likes_counts.writeStream \
    .queryName(query_name) \
    .outputMode("complete") \
    .format("memory") \
    .start()

import time
time.sleep(30)

print("Extended Likes Count by Song, Artist, and Location per Window:")
spark.sql(f"SELECT * FROM {query_name} ORDER BY window DESC, count DESC").show(truncate=False)

song_artist_location_query.stop()


Extended Likes Count by Song, Artist, and Location per Window:
+------+---------+------+--------+-----+
|window|TrackName|Artist|Location|count|
+------+---------+------+--------+-----+
+------+---------+------+--------+-----+



In [40]:
from pyspark.sql.functions import col, window, to_timestamp, sum
import time

df = df.withColumn("SongStart", to_timestamp(col("SongStart"), "yyyy-MM-dd'T'HH:mm:ssZ"))
df = df.withColumn("SongEnd", to_timestamp(col("SongEnd"), "yyyy-MM-dd'T'HH:mm:ssZ"))

df = df.withColumn("ListeningDuration", (col("SongEnd").cast("long") - col("SongStart").cast("long")))

df = df.withWatermark("SongStart", "1 hour")

windowDuration = "30 minutes"
slideDuration = "15 minutes"

genre_duration_counts = df.groupBy(
    window(col("SongStart"), windowDuration, slideDuration),
    col("Genre")
).agg(
    sum("ListeningDuration").alias("TotalListeningDuration")
)

query_name = "top_genres_by_listening_duration_query"
top_genres_query = genre_duration_counts.writeStream \
    .queryName(query_name) \
    .outputMode("complete") \
    .format("memory") \
    .start()

time.sleep(30)

print("Listening Duration by Genre per Window:")
spark.sql(f"SELECT * FROM {query_name} ORDER BY window DESC, TotalListeningDuration DESC").show(truncate=False)

top_genres_query.stop()


Listening Duration by Genre per Window:
+------+-----+----------------------+
|window|Genre|TotalListeningDuration|
+------+-----+----------------------+
+------+-----+----------------------+

