In [2]:
import pandas as pd
import os
from pyspark.sql.functions import countDistinct
from pyspark.sql import SparkSession


### Initalizing dataframes and looking at their schemas

In [3]:
## Initialize the SparkSession
# appName is the name of the application
# getOrCreate() creates a new session or retrieves an existing one
spark = SparkSession.builder \
    .appName("MuseDash PySpark Learning") \
        .getOrCreate()

## Verify that SparkSession is created
spark

25/05/04 14:23:38 WARN Utils: Your hostname, Zipcoders-MacBook-Pro-5.local resolves to a loopback address: 127.0.0.1; using 192.168.68.70 instead (on interface en0)
25/05/04 14:23:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/04 14:23:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/04 14:23:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
try:
    df_auth = spark.read.json ('./Data/auth_events')
    print('Data loaded successfully')
except Exception as e:
    print(f'Error loading data: {e}')

df_auth.show(10)

Data loaded successfully
+----------------+---------+------+-------------+--------+---------+-----+-----------+-------------+---------+-----+-------+-------------+--------------------+------+-----+
|            city|firstName|gender|itemInSession|lastName|      lat|level|        lon| registration|sessionId|state|success|           ts|           userAgent|userId|  zip|
+----------------+---------+------+-------------+--------+---------+-----+-----------+-------------+---------+-----+-------+-------------+--------------------+------+-----+
|    Philadelphia|      Mia|     F|            1|Thompson|40.096485| paid| -74.983219|1713838246433|     1257|   PA|   true|1714575065433|Mozilla/5.0 (Wind...|   782|19154|
|       Owensboro| Anakaren|     F|           16|Crawford|37.735778| paid| -87.249397|1714087326433|     1886|   KY|   true|1714577211433|"Mozilla/5.0 (Win...|   278|42301|
|      New London|     NULL|  NULL|            1|    NULL|39.579374| free| -91.369252|         NULL|      319|

In [5]:
try:
    df_listen = spark.read.json ('./Data/listen_events')
    print('Data loaded successfully')
except Exception as e:
    print(f'Error loading data: {e}')

df_listen.show(10)

                                                                                

Data loaded successfully
+--------------------+---------+---------+---------+---------+------+-------------+--------+---------+-----+----------+-------------+---------+--------------------+-----+-------------+--------------------+------+-----+
|              artist|     auth|     city| duration|firstName|gender|itemInSession|lastName|      lat|level|       lon| registration|sessionId|                song|state|           ts|           userAgent|userId|  zip|
+--------------------+---------+---------+---------+---------+------+-------------+--------+---------+-----+----------+-------------+---------+--------------------+-----+-------------+--------------------+------+-----+
|John Fred & His P...|Logged In|   Covert|171.80688|     Vera|     F|            2| Stanley|42.285992| paid|-86.268503|1713855719433|     1498|    Judy In Disguise|   MI|1714574635433|Mozilla/5.0 (Wind...|   851|49043|
|    Transvision Vamp|Logged In| Gas City|209.91956|   Landon|     M|           17|  Lucero|40.4798

In [6]:
try:
    df_page = spark.read.json ('./Data/page_view_events')
    print('Data loaded successfully')
except Exception as e:
    print(f'Error loading data: {e}')

df_page.show(10)

                                                                                

Data loaded successfully
+--------------------+----------+---------+---------+---------+------+-------------+--------+---------+-----+----------+------+--------+-------------+---------+--------------------+-----+------+-------------+--------------------+------+-----+
|              artist|      auth|     city| duration|firstName|gender|itemInSession|lastName|      lat|level|       lon|method|    page| registration|sessionId|                song|state|status|           ts|           userAgent|userId|  zip|
+--------------------+----------+---------+---------+---------+------+-------------+--------+---------+-----+----------+------+--------+-------------+---------+--------------------+-----+------+-------------+--------------------+------+-----+
|John Fred & His P...| Logged In|   Covert|171.80688|     Vera|     F|            2| Stanley|42.285992| paid|-86.268503|   PUT|NextSong|1713855719433|     1498|    Judy In Disguise|   MI|   200|1714574635433|Mozilla/5.0 (Wind...|   851|49043|
|  

In [7]:
try:
    df_status = spark.read.json ('./Data/status_change_events')
    print('Data loaded successfully')
except Exception as e:
    print(f'Error loading data: {e}')

df_status.show(10)

Data loaded successfully
+---------+-------------------+---------+------+-------------+--------+---------+-----+-----------+-------------+---------+-----+-------------+--------------------+------+-----+
|     auth|               city|firstName|gender|itemInSession|lastName|      lat|level|        lon| registration|sessionId|state|           ts|           userAgent|userId|  zip|
+---------+-------------------+---------+------+-------------+--------+---------+-----+-----------+-------------+---------+-----+-------------+--------------------+------+-----+
|Logged In|       Conshohocken|    Kezia|     F|            5|  Fowler|40.080337| paid| -75.300461|1714382893433|     1702|   PA|1714577869433|Mozilla/5.0 (Maci...|   813|19428|
|Logged In|          Las Vegas|     Jace|     M|            3| Patrick|36.111833| free|-115.211676|1714224142433|     1326|   NV|1714581991433|"Mozilla/5.0 (X11...|   691|89103|
|Logged In|          Milwaukee|  Zachary|     M|            3|Gonzalez|43.049005| fre

In [8]:
df_auth.printSchema()

root
 |-- city: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- level: string (nullable = true)
 |-- lon: double (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- state: string (nullable = true)
 |-- success: boolean (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: long (nullable = true)
 |-- zip: string (nullable = true)



In [9]:
df_listen.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- city: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- level: string (nullable = true)
 |-- lon: double (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- state: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: long (nullable = true)
 |-- zip: string (nullable = true)



In [10]:
df_page.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- city: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- level: string (nullable = true)
 |-- lon: double (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- state: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: long (nullable = true)
 |-- zip: string (nullable = true)



In [11]:
df_status.printSchema()

root
 |-- auth: string (nullable = true)
 |-- city: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- level: string (nullable = true)
 |-- lon: double (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- state: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: long (nullable = true)
 |-- zip: string (nullable = true)



### Starting the Clean

##### auth

In [12]:
# rearranging database columns
auth_cleaned = df_auth
auth_cleaned = auth_cleaned.select('userId','lastName','firstName', 'gender','city','state','zip', 'lat','lon', \
                                   'level', 'sessionId','itemInSession', 'registration', 'success', 'userAgent', 'ts')
auth_cleaned.show()

+------+--------+---------+------+----------------+-----+-----+---------+-----------+-----+---------+-------------+-------------+-------+--------------------+-------------+
|userId|lastName|firstName|gender|            city|state|  zip|      lat|        lon|level|sessionId|itemInSession| registration|success|           userAgent|           ts|
+------+--------+---------+------+----------------+-----+-----+---------+-----------+-----+---------+-------------+-------------+-------+--------------------+-------------+
|   782|Thompson|      Mia|     F|    Philadelphia|   PA|19154|40.096485| -74.983219| paid|     1257|            1|1713838246433|   true|Mozilla/5.0 (Wind...|1714575065433|
|   278|Crawford| Anakaren|     F|       Owensboro|   KY|42301|37.735778| -87.249397| paid|     1886|           16|1714087326433|   true|"Mozilla/5.0 (Win...|1714577211433|
|  NULL|    NULL|     NULL|  NULL|      New London|   MO|63459|39.579374| -91.369252| free|      319|            1|         NULL|  fals

In [13]:
# finding rows with null values
auth_cleaned.filter(auth_cleaned['userId'].isNull()).show()

+------+--------+---------+------+----------------+-----+-----+---------+-----------+-----+---------+-------------+------------+-------+--------------------+-------------+
|userId|lastName|firstName|gender|            city|state|  zip|      lat|        lon|level|sessionId|itemInSession|registration|success|           userAgent|           ts|
+------+--------+---------+------+----------------+-----+-----+---------+-----------+-----+---------+-------------+------------+-------+--------------------+-------------+
|  NULL|    NULL|     NULL|  NULL|      New London|   MO|63459|39.579374| -91.369252| free|      319|            1|        NULL|  false|"Mozilla/5.0 (Win...|1714577284433|
|  NULL|    NULL|     NULL|  NULL|            Erie|   PA|16511|42.166262|  -79.99017| free|      499|            2|        NULL|  false|"Mozilla/5.0 (Mac...|1714578974433|
|  NULL|    NULL|     NULL|  NULL|            Erie|   PA|16511|42.166262|  -79.99017| free|      499|            3|        NULL|  false|"Moz

In [14]:
# filtering out rows where userid is null
auth_cleaned2 = auth_cleaned.filter(auth_cleaned['userId'].isNotNull())

In [15]:
auth_cleaned2.select('success').distinct().show()

+-------+
|success|
+-------+
|   true|
+-------+



If i get rid of the nulls, we loose all the failed log ins so, yea the data is clean

##### Page view events

In [16]:
# List all columns in the DataFrame
columns = df_page.columns
print("Columns in the DataFrame:", columns)

Columns in the DataFrame: ['artist', 'auth', 'city', 'duration', 'firstName', 'gender', 'itemInSession', 'lastName', 'lat', 'level', 'lon', 'method', 'page', 'registration', 'sessionId', 'song', 'state', 'status', 'ts', 'userAgent', 'userId', 'zip']


In [17]:
#reorder the columns in the DataFrame
new_column_order = ['userID', 'lastName', 'firstName', 'gender', 'level', 'auth', 'page', 'sessionId', 'itemInSession', 'artist', 'song', 'duration', 'ts', 'city', 'state', 'zip', 'lat', 'lon', 'method', 'status', 'registration', 'userAgent']
reordered_df = df_page.select(new_column_order)
reordered_df.show(5)

+------+--------+---------+------+-----+----------+--------+---------+-------------+--------------------+--------------------+---------+-------------+---------+-----+-----+---------+----------+------+------+-------------+--------------------+
|userID|lastName|firstName|gender|level|      auth|    page|sessionId|itemInSession|              artist|                song| duration|           ts|     city|state|  zip|      lat|       lon|method|status| registration|           userAgent|
+------+--------+---------+------+-----+----------+--------+---------+-------------+--------------------+--------------------+---------+-------------+---------+-----+-----+---------+----------+------+------+-------------+--------------------+
|   851| Stanley|     Vera|     F| paid| Logged In|NextSong|     1498|            2|John Fred & His P...|    Judy In Disguise|171.80688|1714574635433|   Covert|   MI|49043|42.285992|-86.268503|   PUT|   200|1713855719433|Mozilla/5.0 (Wind...|
|  NULL|    NULL|     NULL| 

In [18]:
# Count nulls in each column
from pyspark.sql.functions import col, when, sum

null_counts = df_page.select(
    [sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df_page.columns]
)

# Show the null counts
null_counts.show()

[Stage 14:>                                                         (0 + 8) / 8]

+------+----+----+--------+---------+------+-------------+--------+---+-----+---+------+----+------------+---------+------+-----+------+---+---------+------+---+
|artist|auth|city|duration|firstName|gender|itemInSession|lastName|lat|level|lon|method|page|registration|sessionId|  song|state|status| ts|userAgent|userId|zip|
+------+----+----+--------+---------+------+-------------+--------+---+-----+---+------+----+------------+---------+------+-----+------+---+---------+------+---+
|177714|   0|   0|  177714|    50859| 50859|            0|   50859|  0|    0|  0|     0|   0|       50859|        0|177714|    0|     0|  0|        0| 50859|  0|
+------+----+----+--------+---------+------+-------------+--------+---+-----+---+------+----+------------+---------+------+-----+------+---+---------+------+---+



                                                                                

In [19]:
#Drop rows with null values
no_nulls_df = reordered_df.dropna()
no_nulls_df.show(5)

+------+--------+---------+------+-----+---------+--------+---------+-------------+--------------------+--------------------+---------+-------------+---------+-----+-----+---------+----------+------+------+-------------+--------------------+
|userID|lastName|firstName|gender|level|     auth|    page|sessionId|itemInSession|              artist|                song| duration|           ts|     city|state|  zip|      lat|       lon|method|status| registration|           userAgent|
+------+--------+---------+------+-----+---------+--------+---------+-------------+--------------------+--------------------+---------+-------------+---------+-----+-----+---------+----------+------+------+-------------+--------------------+
|   851| Stanley|     Vera|     F| paid|Logged In|NextSong|     1498|            2|John Fred & His P...|    Judy In Disguise|171.80688|1714574635433|   Covert|   MI|49043|42.285992|-86.268503|   PUT|   200|1713855719433|Mozilla/5.0 (Wind...|
|    24|  Lucero|   Landon|     

In [20]:
#Rename the 'level' column to 'subscription'
rename_column_df = no_nulls_df.withColumnRenamed('level', 'subscription') 
rename_column_df.show(5)

+------+--------+---------+------+------------+---------+--------+---------+-------------+--------------------+--------------------+---------+-------------+---------+-----+-----+---------+----------+------+------+-------------+--------------------+
|userID|lastName|firstName|gender|subscription|     auth|    page|sessionId|itemInSession|              artist|                song| duration|           ts|     city|state|  zip|      lat|       lon|method|status| registration|           userAgent|
+------+--------+---------+------+------------+---------+--------+---------+-------------+--------------------+--------------------+---------+-------------+---------+-----+-----+---------+----------+------+------+-------------+--------------------+
|   851| Stanley|     Vera|     F|        paid|Logged In|NextSong|     1498|            2|John Fred & His P...|    Judy In Disguise|171.80688|1714574635433|   Covert|   MI|49043|42.285992|-86.268503|   PUT|   200|1713855719433|Mozilla/5.0 (Wind...|
|   

In [21]:
#rounding decimlal values to 2 decimal places
from pyspark.sql.functions import round

rounded_df = rename_column_df.withColumn('duration', round(col('duration'), 2)) 
rounded_df.show(5)

+------+--------+---------+------+------------+---------+--------+---------+-------------+--------------------+--------------------+--------+-------------+---------+-----+-----+---------+----------+------+------+-------------+--------------------+
|userID|lastName|firstName|gender|subscription|     auth|    page|sessionId|itemInSession|              artist|                song|duration|           ts|     city|state|  zip|      lat|       lon|method|status| registration|           userAgent|
+------+--------+---------+------+------------+---------+--------+---------+-------------+--------------------+--------------------+--------+-------------+---------+-----+-----+---------+----------+------+------+-------------+--------------------+
|   851| Stanley|     Vera|     F|        paid|Logged In|NextSong|     1498|            2|John Fred & His P...|    Judy In Disguise|  171.81|1714574635433|   Covert|   MI|49043|42.285992|-86.268503|   PUT|   200|1713855719433|Mozilla/5.0 (Wind...|
|    24|

In [22]:
# convert the 'ts' column to a timestamp
from pyspark.sql.functions import to_timestamp
timestamp_df = rounded_df.withColumn('ts', to_timestamp(col('ts') / 1000))
timestamp_df.show(5)

+------+--------+---------+------+------------+---------+--------+---------+-------------+--------------------+--------------------+--------+--------------------+---------+-----+-----+---------+----------+------+------+-------------+--------------------+
|userID|lastName|firstName|gender|subscription|     auth|    page|sessionId|itemInSession|              artist|                song|duration|                  ts|     city|state|  zip|      lat|       lon|method|status| registration|           userAgent|
+------+--------+---------+------+------------+---------+--------+---------+-------------+--------------------+--------------------+--------+--------------------+---------+-----+-----+---------+----------+------+------+-------------+--------------------+
|   851| Stanley|     Vera|     F|        paid|Logged In|NextSong|     1498|            2|John Fred & His P...|    Judy In Disguise|  171.81|2024-05-01 10:43:...|   Covert|   MI|49043|42.285992|-86.268503|   PUT|   200|1713855719433|Mo

In [23]:
# Rename timestamp_df to clean_page_view_events
clean_page_view_events = timestamp_df

# Show the renamed DataFrame
clean_page_view_events.show(5)

+------+--------+---------+------+------------+---------+--------+---------+-------------+--------------------+--------------------+--------+--------------------+---------+-----+-----+---------+----------+------+------+-------------+--------------------+
|userID|lastName|firstName|gender|subscription|     auth|    page|sessionId|itemInSession|              artist|                song|duration|                  ts|     city|state|  zip|      lat|       lon|method|status| registration|           userAgent|
+------+--------+---------+------+------------+---------+--------+---------+-------------+--------------------+--------------------+--------+--------------------+---------+-----+-----+---------+----------+------+------+-------------+--------------------+
|   851| Stanley|     Vera|     F|        paid|Logged In|NextSong|     1498|            2|John Fred & His P...|    Judy In Disguise|  171.81|2024-05-01 10:43:...|   Covert|   MI|49043|42.285992|-86.268503|   PUT|   200|1713855719433|Mo

In [24]:
# Save the DataFrame to JSON file
#in case you needed to save the DataFrame to a JSON file, you can use the following code:


# clean_page_view_events.coalesce(1).write.json("clean_page_view_events.json", mode="overwrite")

##### listening events

In [25]:
listen_df = df_listen

listen_df.head(5)

[Row(artist='John Fred & His Playboy Band', auth='Logged In', city='Covert', duration=171.80688, firstName='Vera', gender='F', itemInSession=2, lastName='Stanley', lat=42.285992, level='paid', lon=-86.268503, registration=1713855719433, sessionId=1498, song='Judy In Disguise', state='MI', ts=1714574635433, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId=851, zip='49043'),
 Row(artist='Transvision Vamp', auth='Logged In', city='Gas City', duration=209.91956, firstName='Landon', gender='M', itemInSession=17, lastName='Lucero', lat=40.479845, level='paid', lon=-85.582288, registration=1714232178433, sessionId=23, song='I Want Your Love', state='IN', ts=1714574750433, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"', userId=24, zip='46933'),
 Row(artist='Florence + The Machine', auth='Logged In', city='Rochester', duration=219.66322, firstName='Mia', gender='F', itemInSession

In [26]:
listen_df = listen_df.select('userId', 'lastName', 'firstName', 'gender', 'song', 'artist', 'duration', 'sessionId', 'itemInSession', 'auth', 'level', 'city', 'state', 'zip', 'lat', 'lon', 'registration', 'userAgent', 'ts')
listen_df.head(5)

[Row(userId=851, lastName='Stanley', firstName='Vera', gender='F', song='Judy In Disguise', artist='John Fred & His Playboy Band', duration=171.80688, sessionId=1498, itemInSession=2, auth='Logged In', level='paid', city='Covert', state='MI', zip='49043', lat=42.285992, lon=-86.268503, registration=1713855719433, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', ts=1714574635433),
 Row(userId=24, lastName='Lucero', firstName='Landon', gender='M', song='I Want Your Love', artist='Transvision Vamp', duration=209.91956, sessionId=23, itemInSession=17, auth='Logged In', level='paid', city='Gas City', state='IN', zip='46933', lat=40.479845, lon=-85.582288, registration=1714232178433, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"', ts=1714574750433),
 Row(userId=916, lastName='Jimenez', firstName='Mia', gender='F', song='Dog Days Are Over (Radio Edit)', artist='Florence + The Machine'

In [27]:
listen_df = listen_df.withColumnRenamed("level", "subscription")
listen_df.head(5)

[Row(userId=851, lastName='Stanley', firstName='Vera', gender='F', song='Judy In Disguise', artist='John Fred & His Playboy Band', duration=171.80688, sessionId=1498, itemInSession=2, auth='Logged In', subscription='paid', city='Covert', state='MI', zip='49043', lat=42.285992, lon=-86.268503, registration=1713855719433, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', ts=1714574635433),
 Row(userId=24, lastName='Lucero', firstName='Landon', gender='M', song='I Want Your Love', artist='Transvision Vamp', duration=209.91956, sessionId=23, itemInSession=17, auth='Logged In', subscription='paid', city='Gas City', state='IN', zip='46933', lat=40.479845, lon=-85.582288, registration=1714232178433, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"', ts=1714574750433),
 Row(userId=916, lastName='Jimenez', firstName='Mia', gender='F', song='Dog Days Are Over (Radio Edit)', artist='Florence 

In [28]:
from pyspark.sql.functions import from_unixtime, col
# Convert milliseconds to seconds
listen_df = listen_df.withColumn("ts_seconds", col("ts") / 1000)
# Convert the Unix timestamp (in seconds) to a readable timestamp format
readable_ts = from_unixtime(col("ts_seconds"), "yyyy-MM-dd HH:mm:ss")
# Replace the original 'ts' column with the readable timestamp
listen_df = listen_df.withColumn("ts", readable_ts).drop("ts_seconds")
# Show the modified DataFrame
listen_df.show(truncate=False)

+------+--------+---------+------+----------------------------------------------------------------------------+----------------------------------------------------------------------+---------+---------+-------------+---------+------------+-----------+-----+-----+---------+----------+-------------+----------------------------------------------------------------------------------------------------------------------------------+-------------------+
|userId|lastName|firstName|gender|song                                                                        |artist                                                                |duration |sessionId|itemInSession|auth     |subscription|city       |state|zip  |lat      |lon       |registration |userAgent                                                                                                                         |ts                 |
+------+--------+---------+------+------------------------------------------------------------------

In [38]:
listen_df.select('userID','song','artist','ts').orderBy(desc("ts"))
listen_df.show()

+------+--------+---------+------+--------------------+--------------------+---------+---------+-------------+---------+------------+-----------+-----+-----+---------+----------+-------------+--------------------+-------------------+
|userId|lastName|firstName|gender|                song|              artist| duration|sessionId|itemInSession|     auth|subscription|       city|state|  zip|      lat|       lon| registration|           userAgent|                 ts|
+------+--------+---------+------+--------------------+--------------------+---------+---------+-------------+---------+------------+-----------+-----+-----+---------+----------+-------------+--------------------+-------------------+
|   851| Stanley|     Vera|     F|    Judy In Disguise|John Fred & His P...|171.80688|     1498|            2|Logged In|        paid|     Covert|   MI|49043|42.285992|-86.268503|1713855719433|Mozilla/5.0 (Wind...|2024-05-01 10:43:55|
|    24|  Lucero|   Landon|     M|    I Want Your Love|    Trans

In [29]:
from functools import reduce
rows_with_nulls = df_listen.filter(
    reduce(lambda x, y: x | y, [col(c).isNull() for c in df_listen.columns])
)
print(f"Number of rows with at least one null value: {rows_with_nulls.count()}")
rows_with_nulls.show()
#Checking for any rows with a null value

                                                                                

Number of rows with at least one null value: 0


                                                                                

+------+----+----+--------+---------+------+-------------+--------+---+-----+---+------------+---------+----+-----+---+---------+------+---+
|artist|auth|city|duration|firstName|gender|itemInSession|lastName|lat|level|lon|registration|sessionId|song|state| ts|userAgent|userId|zip|
+------+----+----+--------+---------+------+-------------+--------+---+-----+---+------------+---------+----+-----+---+---------+------+---+
+------+----+----+--------+---------+------+-------------+--------+---+-----+---+------------+---------+----+-----+---+---------+------+---+



                                                                                

no null values

In [36]:
from pyspark.sql.functions import col, count, desc, asc

top_10_artist_df = listen_df.groupBy("artist") \
                      .agg(count("*").alias("count")) \
                      .orderBy(desc("count")) \
                      .limit(10) \
                      .select(col("artist"), col("count"))

top_10_artist_df.show()

# To get the top 10 mode values as a list:
top_10_artist_df = [row["artist"] for row in top_10_artist_df.collect()]
print(f"Top 10 modes of 'artist': {top_10_artist_df}")

                                                                                

+--------------------+-----+
|              artist|count|
+--------------------+-----+
|       Kings Of Leon| 9644|
|            Coldplay| 9318|
|Florence + The Ma...| 6548|
|       Dwight Yoakam| 6043|
|            BjÃÂ¶rk| 5967|
|                Muse| 5964|
|      The Black Keys| 5828|
|       Justin Bieber| 5738|
|        Jack Johnson| 5254|
|              Eminem| 4687|
+--------------------+-----+



[Stage 48:>                                                         (0 + 8) / 8]

Top 10 modes of 'artist': ['Kings Of Leon', 'Coldplay', 'Florence + The Machine', 'Dwight Yoakam', 'BjÃ\x83Â¶rk', 'Muse', 'The Black Keys', 'Justin Bieber', 'Jack Johnson', 'Eminem']


                                                                                

In [None]:
listen_df.select('userID','song','artist','ts').orderBy(desc("ts")).show()

[Stage 55:>                                                         (0 + 8) / 8]

+------+--------------------+---------------+-------------------+
|userID|                song|         artist|                 ts|
+------+--------------------+---------------+-------------------+
|   522|       Addams Groove|      MC Hammer|2025-04-30 10:43:53|
|   694|The Dynamo Of Vol...|     Jason Mraz|2025-04-30 10:43:29|
|   259|         Todo Cambio|         Camila|2025-04-30 10:43:02|
|   970|         Don't Panic|       Coldplay|2025-04-30 10:42:36|
|   971|Blooze feat. Warr...| Peter Frampton|2025-04-30 10:42:32|
|   482|Have A Talk With God|  Stevie Wonder|2025-04-30 10:42:10|
|   245|     Human Behaviour|       BjÃÂ¶rk|2025-04-30 10:42:04|
|   186|         Bulletproof|        La Roux|2025-04-30 10:42:01|
|   272|       Coffee And TV|           Blur|2025-04-30 10:41:57|
|   526|           Fade Away|      12 Stones|2025-04-30 10:41:46|
|   775|Numb (Album Version)|      Disturbed|2025-04-30 10:41:25|
|   619|Don't Sweat The T...|Eric B. & Rakim|2025-04-30 10:40:54|
|   245|  

                                                                                

25/05/04 18:01:55 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 208277 ms exceeds timeout 120000 ms
25/05/04 18:01:55 WARN SparkContext: Killing executors is not supported by current scheduler.
25/05/04 18:02:01 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o

In [31]:
# you don't need to add the select clause
tops_10_artist_df = listen_df.groupBy("artist") \
                      .agg(count("*").alias("count")) \
                      .orderBy(desc("count")) \
                      .limit(10) 
tops_10_artist_df.show(10)

[Stage 38:>                                                         (0 + 8) / 8]

+--------------------+-----+
|              artist|count|
+--------------------+-----+
|       Kings Of Leon| 9644|
|            Coldplay| 9318|
|Florence + The Ma...| 6548|
|       Dwight Yoakam| 6043|
|            BjÃÂ¶rk| 5967|
|                Muse| 5964|
|      The Black Keys| 5828|
|       Justin Bieber| 5738|
|        Jack Johnson| 5254|
|              Eminem| 4687|
+--------------------+-----+



                                                                                

In [32]:
def get_top_10_artists_by_state(df_listen, selected_state):
    """
    Finds the top 10 artists for a given state in the DataFrame.

    Returns:
        A PySpark DataFrame containing the top 10 artists and their counts for the selected state.
    """
    top_10_artists_df = df_listen.filter(col("state") == selected_state) \
                           .groupBy("artist") \
                           .agg(count("*").alias("count")) \
                           .orderBy(desc("count")) \
                           .limit(10) \
                           .select(col("artist"), col("count"))
    return top_10_artists_df

# Select the state you want to analyze
selected_state = "MI"  # Replace with the desired state

# Get the top 10 artists for the selected state
top_10_artists_for_state_df = get_top_10_artists_by_state(df_listen, selected_state)

# Show the resulting DataFrame
print(f"Top 10 artists in {selected_state}:")
top_10_artists_for_state_df.show()

Top 10 artists in MI:


[Stage 41:>                                                         (0 + 8) / 8]

+--------------------+-----+
|              artist|count|
+--------------------+-----+
|       Kings Of Leon|  480|
|            Coldplay|  477|
|Florence + The Ma...|  349|
|      The Black Keys|  291|
|       Dwight Yoakam|  280|
|       Justin Bieber|  275|
|            BjÃÂ¶rk|  271|
|                Muse|  267|
|        Jack Johnson|  222|
|         Evanescence|  220|
+--------------------+-----+



                                                                                

## before fragments code

In [None]:
import streamlit as st
import numpy as np
import plotly.express as px
import kunle_engine
import plotly.graph_objects as go

# fig = px.choropleth(locations=["CA", "TX", "NY",'AK'], locationmode="USA-states", color=[1,2,3,4], scope="usa")


# st.plotly_chart(fig)

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Museh PySpark Learning") \
        .getOrCreate()

## Verify that SparkSession is created

try:
    df_listen = spark.read.json ('/Users/kunle/Python Projects/Kunles_Muse/Data/listen_events')
    print('Data loaded successfully')
except Exception as e:
    print(f'Error loading data: {e}')


clean_listen = kunle_engine.clean(df=df_listen)
# list of artists with a certain number
artist_list = kunle_engine.get_artist_over_1000(df=clean_listen,number_of_lis=1000)

# makes page wide
st.set_page_config(layout = 'wide')
#selected_artist = st.sidebar.selectbox()
col_table = st.columns((5, 10), gap='medium')                                  



location = 'Nationwide'
#option = None
#creating tabs
tab1, tab2 = st.tabs(["Map", "2nd for giggles"])

map_container = st.container(border=True)
df_container = st.container(border=True)

with tab1:

    with col_table[1]:
        with map_container:
            # option = st.selectbox(
            # 'Select an Artist',
            # artist_list,
            # #index=None,
            # placeholder="Chosen Artist",
            # accept_new_options = True
            # )

            if 'option' not in st.session_state:
                pass
            else:

                st.write("You selected: ", st.session_state.option)



                # if not option:
                #     pass
                # else:
                # creating the dataframe of listens for specific artists
                b = kunle_engine.get_artist_state_listen(df=clean_listen, artist=st.session_state.option)

                # filtering data to what is needed to make map
                c = kunle_engine.map_prep_df(df=b)
                c_max = c['listens'].max()
                c_min = c['listens'].min()
                ## creating the maps
                fig = go.Figure(data=go.Choropleth(
                    locations=c.state, # Spatial coordinates
                    z = c.listens, # Data to be color-coded
                    locationmode = 'USA-states', # set of locations match entries in `locations`
                    colorscale = 'Blues',
                    #range_color=(c_min, c_max),
                    colorbar_title = "Number of\n Listens"
                ))

                # adding context to the map
                fig.update_layout(
                    title_text = f'Number of {st.session_state.option} Listens \n 2024-2025',
                    geo_scope='usa', # limit map scope to USA
                )

                fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})

                event = st.plotly_chart(fig, on_select="rerun", selection_mode=["points","box","lasso"])

                points = event["selection"].get("points", [])
                
                #sigla = first_point["properties"].get("sigla", None)
                #st.plotly_chart(fig)  points = event["selection"].get("points", [])
                if points:
                    first_point = points[0]
                    location = first_point['location']
                    st.write("You selected: ", location)
                else:
                    st.write("You selected: ", 'Nationwide')

                #event = st.plotly_chart(fig, on_select="rerun", selection_mode=["points","box","lasso"])

                #st.subheader("You selected: ", event)

                # st.dataframe(b.toPandas(), hide_index=True)
            # if not location:
            #     selected_state = "Nationwide"
            # else:
            #     selected_state = location
    selected_state = location
    # titles depending on state selected
    if selected_state == 'Nationwide' :
        top_10_header = "Top 10 National Artists"
        pie_title = "National Subscription Type Distribution"
        paid_title = 'Top Songs for Paid Users'
        free_title = 'Top Songs for Free Users'
    else:
        top_10_header = f"Top 10 Artists in {location}"
        pie_title = f"Subscription Type Distribution in {location}"
        paid_title = f'Top Songs for Paid Users in {location}'
        free_title = f'Top Songs for Free Users in {location}'
    
    with col_table[0]:
        with df_container:
            top_10 = kunle_engine.get_top_10_artists(df=clean_listen, state=location)
            st.header(top_10_header)
            selected_row = st.dataframe(top_10,
                                        use_container_width=True,
                                        selection_mode='single-row',
                                        on_select='rerun',
                                        hide_index=True)
            
            row_item = selected_row['selection'].get('rows')[0]
            
            if not st.session_state.option or not row_item == st.session_state.option:
                st.session_state.option = top_10.artist[row_item]
                st.rerun()



with tab2:
    option = None
    if st.button("Send balloons!"):
        st.balloons()