In [0]:
%fs
ls FileStore/tables

In [0]:
dbutils.fs.ls('dbfs:/FileStore/tables')

In [0]:
df = spark.read.option('multiline', 'true').json('dbfs:/FileStore/tables/Streaming_History_Audio_2019_2024.json')

In [0]:
df.printSchema()

In [0]:
# No. of Columns in df dataset
len(df.columns)

In [0]:
# Total no of records in dataset.
df.count()

In [0]:
df.show(5)

In [0]:
df = df.withColumnRenamed('master_metadata_album_album_name' ,'album_name')\
    .withColumnRenamed('master_metadata_album_artist_name', 'artist_name')\
        .withColumnRenamed('master_metadata_track_name', 'track_name')
df.show(10)

In [0]:
df.select('platform').distinct().show()

In [0]:
from pyspark.sql.functions import lower, when
df = df.withColumn('platform',
                   when(lower('platform').contains('windows'), 'Windows')\
                       .when(lower('platform').contains('android'), 'Android')\
                           .otherwise(None)
                           )
df.show(10)

In [0]:
# Creating a new dataset by considering only required features.
sp_df = df.select(['album_name', 'artist_name', 'track_name', 'ms_played', 'platform', 'reason_end', 'reason_start', 'shuffle', 'skipped', 'ts'])

In [0]:
len(sp_df.columns)

In [0]:
# No. of records in dataset
sp_df.count()

In [0]:
sp_df.show(10)

In [0]:
#Displaying the count of NULL values in each Column.
from pyspark.sql.functions import col, when, count, coalesce
df_null_count = sp_df.select([count(when(col(c).isNull(),c)).alias(c) for c in sp_df.columns])
df_null_count.show(10)
# A Dictonary with columns and count of null values.
df_null_count_dict = {c : sp_df.filter(col(c).isNull()).count() for c in sp_df.columns}
df_null_count_dict

In [0]:
sp_df = sp_df.dropna(subset=['album_name', 'artist_name', 'track_name'])

In [0]:
# Replacing the NULL values to False
#Method 1 - Using WHEN function.
sp_df = sp_df.withColumn('skipped', when(col('skipped').isNull(), False).otherwise(col('skipped')))

#Method 2 - Using COALESCE function.
# sp_df = sp_df.withColumn('skipped', coalesce(col('skipped'), lit(False)))

In [0]:
from pyspark.sql.functions import col, when, count, coalesce
df_null_count = sp_df.select([count(when(col(c).isNull(),c)).alias(c) for c in sp_df.columns])
df_null_count.show(10)

In [0]:
sp_df = sp_df.withColumn('date', col('ts').cast('date'))
sp_df.show(10)

In [0]:
from pyspark.sql.functions import date_format
sp_df = sp_df.withColumn('year', date_format('date', 'y'))\
        .withColumn('month', date_format('date', 'MMMM'))\
            .withColumn('day', date_format('date', 'EEEE'))
sp_df.show(10)

In [0]:
from pyspark.sql.types import IntegerType
sp_df = sp_df.withColumn('month_num', date_format(col('ts'), 'M'))
sp_df = sp_df.withColumn('day_num',
                 when(col('day')=='Sunday', 1)\
                     .when(col('day')=='Monday', 2)\
                         .when(col('day')=='Tuesday', 3)\
                             .when(col('day')=='Wednesday', 4)\
                                 .when(col('day')=='Thursday', 5)\
                                     .when(col('day')=='Friday', 6)\
                                         .when(col('day')=='Saturday', 7)\
                                             .otherwise(None))
sp_df = sp_df.withColumn('hour', date_format(col('ts'), 'H'))
sp_df = sp_df.withColumn('year', col('year').cast(IntegerType()))\
    .withColumn('month_num', col('month_num').cast(IntegerType()))\
        .withColumn('hour', col('hour').cast(IntegerType()))
sp_df.show(10)

In [0]:
sp_df.printSchema()

In [0]:
sp_df.groupBy('reason_end').count().orderBy('count', ascending = False).show()
sp_df.groupBy('shuffle').count().orderBy('count', ascending = False).show()

In [0]:
sp_df.write.saveAsTable('spotify', format='delta', mode='overwrite')