In [0]:
%fs
ls "/mnt/spotify"

path,name,size,modificationTime
dbfs:/mnt/spotify/raw-data/,raw-data/,0,1693827752000
dbfs:/mnt/spotify/transformed_data/,transformed_data/,0,1693832843000


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *     

In [0]:
# Loading data
ariana_grande = spark.read.csv('/mnt/spotify/raw-data/Ariana_Grande_feat.csv', header=True, inferSchema=True)
billie_eilish = spark.read.csv('/mnt/spotify/raw-data/Billie_Eillish_feats.csv', header=True, inferSchema=True)
dj_khaled = spark.read.csv('/mnt/spotify/raw-data/DJ_Khaled_feats.csv', header=True, inferSchema=True)
drake = spark.read.csv('/mnt/spotify/raw-data/Drake_feats.csv', header=True, inferSchema=True)
dua_lipa = spark.read.csv('/mnt/spotify/raw-data/Dua_Lipa_feats.csv', header=True, inferSchema=True)
ed_sheeran = spark.read.csv('/mnt/spotify/raw-data/Ed_Sheeran_feat.csv', header=True, inferSchema=True)
justin_biebar = spark.read.csv('/mnt/spotify/raw-data/Justin_Beiber_feats.csv', header=True, inferSchema=True)
taylor_swift = spark.read.csv('/mnt/spotify/raw-data/Taylor_Swift_feats.csv', header=True, inferSchema=True)
the_weeknd = spark.read.csv('/mnt/spotify/raw-data/The_Weeknd_feats.csv', header=True, inferSchema=True)
travis_scott = spark.read.csv('/mnt/spotify/raw-data/Travis_Scott_feats.csv', header=True, inferSchema=True)

## Combine Datasets

In [0]:
features_df = ariana_grande.union(billie_eilish).union(dj_khaled).union(drake).union(dua_lipa).union(ed_sheeran).union(justin_biebar).union(taylor_swift).union(the_weeknd).union(travis_scott)

In [0]:
features_df.show()

+--------------------+-----------+------+---+-------+----+------+--------+----------+-------+-------+-------+--------+--------------+
|                  id|dancebility|energy|key|loudnes|mode|speech|acoustic|instrument|livenss|valance|  tempo|duration|time_signature|
+--------------------+-----------+------+---+-------+----+------+--------+----------+-------+-------+-------+--------+--------------+
|63y6xWR4gXz7bnUGO...|      0.623| 0.734|  9| -5.948|   1| 0.107|  0.0162|   1.75E-6|  0.145|   0.37|107.853|  244453|             4|
|6RUhbFEhrvGISaQ8u...|      0.664| 0.602|  4| -5.369|   0|0.0412|  0.0529|       0.0|  0.356|  0.289|134.049|  235947|             3|
|6ocbgoVGwYJhOv1Gg...|      0.778| 0.317|  1|-10.732|   0| 0.334|   0.592|       0.0| 0.0881|  0.327|140.048|  178627|             4|
|43bCmCI0nSgcT7QdM...|       0.65| 0.736|  6|  -5.84|   0| 0.229|  0.0528|       0.0|  0.235|  0.613|159.173|  226160|             4|
|5OCJzvD7sykQEKHH7...|      0.602| 0.658|  1| -5.934|   1|0.05

In [0]:
features_df = features_df.withColumnRenamed('id', 'trackID') \
                        .withColumnRenamed('dancebility', 'danceability') \
                        .withColumnRenamed('energy', 'energy') \
                        .withColumnRenamed('loudnes', 'loudness') \
                        .withColumnRenamed('mode', 'mode') \
                        .withColumnRenamed('speech', 'speechiness') \
                        .withColumnRenamed('acoustic', 'acousticness') \
                        .withColumnRenamed('instrument', 'instrumentalness') \
                        .withColumnRenamed('livenss', 'liveness') \
                        .withColumnRenamed('valance', 'valence') \
                        .withColumnRenamed('tempo', 'tempo') \
                        .withColumnRenamed('duration', 'duration') \
                        .withColumnRenamed('time_signature', 'time_signature')

In [0]:
features_df = features_df.withColumn("duration", round(col("duration") / 1000, 2))

In [0]:
features_df.show()

+--------------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------+--------------+
|             trackID|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|duration|time_signature|
+--------------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------+--------------+
|63y6xWR4gXz7bnUGO...|       0.623| 0.734|  9|  -5.948|   1|      0.107|      0.0162|         1.75E-6|   0.145|   0.37|107.853|  244.45|             4|
|6RUhbFEhrvGISaQ8u...|       0.664| 0.602|  4|  -5.369|   0|     0.0412|      0.0529|             0.0|   0.356|  0.289|134.049|  235.95|             3|
|6ocbgoVGwYJhOv1Gg...|       0.778| 0.317|  1| -10.732|   0|      0.334|       0.592|             0.0|  0.0881|  0.327|140.048|  178.63|             4|
|43bCmCI0nSgcT7QdM...|        0.65| 0.736|  6|   -5.84|   0|      0.229|      0.0528|   

In [0]:
features_df.printSchema()

root
 |-- trackID: string (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration: double (nullable = true)
 |-- time_signature: integer (nullable = true)



In [0]:
features_df = features_df.withColumn("trackID", col("trackID").cast(StringType())) \
                        .withColumn("danceability", col("danceability").cast(DoubleType())) \
                        .withColumn('energy', col('energy').cast(DoubleType())) \
                        .withColumn('key', col('key').cast(IntegerType())) \
                        .withColumn('loudness', col('loudness').cast(DoubleType())) \
                        .withColumn('mode', col('mode').cast(IntegerType())) \
                        .withColumn('speechiness', col('speechiness').cast(DoubleType())) \
                        .withColumn('acousticness',col('acousticness').cast(DoubleType())) \
                        .withColumn('instrumentalness', col('instrumentalness').cast(DoubleType())) \
                        .withColumn('liveness', col('liveness').cast(DoubleType())) \
                        .withColumn('valence', col('valence').cast(DoubleType())) \
                        .withColumn('tempo', col('tempo').cast(DoubleType())) \
                        .withColumn('duration', col('duration').cast(IntegerType())) \
                        .withColumn('time_signature', col('time_signature').cast(IntegerType()))

In [0]:
features_df.show()

+--------------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------+--------------+
|             trackID|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|duration|time_signature|
+--------------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------+--------------+
|63y6xWR4gXz7bnUGO...|       0.623| 0.734|  9|  -5.948|   1|      0.107|      0.0162|         1.75E-6|   0.145|   0.37|107.853|     244|             4|
|6RUhbFEhrvGISaQ8u...|       0.664| 0.602|  4|  -5.369|   0|     0.0412|      0.0529|             0.0|   0.356|  0.289|134.049|     235|             3|
|6ocbgoVGwYJhOv1Gg...|       0.778| 0.317|  1| -10.732|   0|      0.334|       0.592|             0.0|  0.0881|  0.327|140.048|     178|             4|
|43bCmCI0nSgcT7QdM...|        0.65| 0.736|  6|   -5.84|   0|      0.229|      0.0528|   

## Write file to Data Lake

In [0]:
features_df.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/spotify/transformed-data/spotify_tracks_features")