In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as sfn

In [2]:
spark = SparkSession \
    .builder \
    .appName("Final project") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.executor.memory", "10g") \
    .config("spark.executor.cores", "10") \
    .getOrCreate()

24/06/07 18:30:31 WARN Utils: Your hostname, arch resolves to a loopback address: 127.0.1.1; using 192.168.1.39 instead (on interface enp3s0)
24/06/07 18:30:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/07 18:30:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/07 18:30:33 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
artist_df_json = spark.read.json("datasets/musicbrainz/artist.json")

24/06/05 19:47:02 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [4]:
artist_df_json.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+----------+--------------------+--------------------+--------------------+-------+--------------------+--------+--------+------+--------------------+--------------------+--------------------+----+------------------+--------------------+-------------------+---------+--------------------+--------------------+--------------------+------+--------------------+
|             aliases|annotation|                area|          begin-area|          begin_area|country|      disambiguation|end-area|end_area|gender|           gender-id|              genres|                  id|ipis|             isnis|           life-span|               name|   rating|           relations|           sort-name|                tags|  type|             type-id|
+--------------------+----------+--------------------+--------------------+--------------------+-------+--------------------+--------+--------+------+--------------------+--------------------+--------------------+----+------------------+---

                                                                                

In [None]:
artist_df_json.printSchema()

In [6]:
artist_df_filtered = artist_df_json \
    .select("name", "country") \
    .dropDuplicates(["name"]) \
    .filter(
        (sfn.col("name").isNotNull()) &
        (sfn.col("country").isNotNull())
    )

In [7]:
artist_df_filtered.count()

                                                                                

810575

In [3]:
spotify_df_parquet = spark.read.parquet("datasets/huggingface/0000.parquet")

                                                                                

In [9]:
spotify_df_parquet.count()

114000

In [10]:
spotify_df_parquet.show(5, truncate=False)

+----------+----------------------+----------------------+------------------------------------------------------+--------------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+
|Unnamed: 0|track_id              |artists               |album_name                                            |track_name                |popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo  |time_signature|track_genre|
+----------+----------------------+----------------------+------------------------------------------------------+--------------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+
|0         |5SuOikwiRyPMVoIQDJUgSV|Gen Hoshino           |Comedy                     

 Take artists column whose values are of the form artis1:artist2:artist3 and make it contain only the first artist


In [11]:
spotify_df = spotify_df_parquet.withColumn("name", sfn.split(sfn.col("artists"), ";")[0])
spotify_df = spotify_df.drop("Unnamed: 0", "artists", "album_name")

Look for duplicates and nulls

In [12]:
spotify_df_no_dups = spotify_df.dropDuplicates(["track_id"])

In [13]:
spotify_df_no_dups.select([sfn.count(sfn.when(sfn.isnull(c), c)).alias(c) for c in spotify_df_no_dups.columns]).show()

                                                                                

+--------+----------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+--------------+-----------+----+
|track_id|track_name|popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo|time_signature|track_genre|name|
+--------+----------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+--------------+-----------+----+
|       0|         1|         0|          0|       0|           0|     0|  0|       0|   0|          0|           0|               0|       0|      0|    0|             0|          0|   1|
+--------+----------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+--------------+-----------+----+



In [15]:
spotify_df_clean = spotify_df_no_dups \
    .drop("track_id") \
    .filter(
        sfn.col("track_name").isNotNull() & 
        sfn.col("name").isNotNull()
    )

spotify_df_clean.count()

89740

In [16]:
from pyspark import StorageLevel

joined_df = spotify_df_clean.join(artist_df_filtered, "name", "inner")
antijoined_df = spotify_df_clean.join(artist_df_filtered, "name", "left_anti")

joined_df.persist(StorageLevel.MEMORY_AND_DISK)
antijoined_df.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[name: string, track_name: string, popularity: bigint, duration_ms: bigint, explicit: boolean, danceability: double, energy: double, key: bigint, loudness: double, mode: bigint, speechiness: double, acousticness: double, instrumentalness: double, liveness: double, valence: double, tempo: double, time_signature: bigint, track_genre: string]

In [17]:
joined_df.show(5,truncate=False)
joined_df.count()

                                                                                

+----+---------------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------+-------+
|name|track_name                 |popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo  |time_signature|track_genre   |country|
+----+---------------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------+-------+
|ANNA|Forever Ravers - Radio Edit|42        |200930     |false   |0.507       |0.92  |6  |-8.151  |0   |0.0561     |0.00629     |0.627           |0.668   |0.225  |129.013|4             |minimal-techno|JP     |
|ANNA|Cosmovision                |53        |213390     |false   |0.644       |0.757 |1  |-8.204  |1   |0.0388     |0.0677      |0.864           |0.103   |0.046

58230

In [18]:
antijoined_df.show(5,truncate=False)
antijoined_df.count()

                                                                                

+-------------------+-----------------------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------+
|name               |track_name                         |popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo  |time_signature|track_genre   |
+-------------------+-----------------------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------+
|Thunderstorms HD   |Thunderstorm: Heavy Rain - Loopable|38        |175104     |false   |0.271       |0.961 |1  |-14.121 |1   |0.116      |1.89E-5     |0.97            |0.525   |0.0776 |86.095 |3             |sleep         |
|Shannon & The Clams|You Can Come Over                  |23        |146786     |false   |0.374      

31510

In [19]:
print(joined_df.groupBy("name").count().count())
print(antijoined_df.groupBy("name").count().count())

                                                                                

9646




8002


                                                                                

In [21]:
joined_df.printSchema()
antijoined_df.printSchema()
artist_df_filtered.printSchema()

root
 |-- name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: long (nullable = true)
 |-- duration_ms: long (nullable = true)
 |-- explicit: boolean (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: long (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: long (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: long (nullable = true)
 |-- track_genre: string (nullable = true)
 |-- country: string (nullable = true)

root
 |-- name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: long (nullable = true)
 |-- duration_ms: long (nullable = true)
 |-- explicit: boolean (nullable = true)
 |-- danceability: doub

In [28]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, DoubleType

joined_df_schema = StructType([
    StructField("name", StringType(), nullable=False),
    StructField("track_name", StringType(), nullable=False),
    StructField("popularity", LongType(), nullable=False),
    StructField("duration_ms", LongType(), nullable=False),
    StructField("explicit", BooleanType(), nullable=False),
    StructField("danceability", DoubleType(), nullable=False),
    StructField("energy", DoubleType(), nullable=False),
    StructField("key", LongType(), nullable=False),
    StructField("loudness", DoubleType(), nullable=False),
    StructField("mode", LongType(), nullable=False),
    StructField("speechiness", DoubleType(), nullable=False),
    StructField("acousticness", DoubleType(), nullable=False),
    StructField("instrumentalness", DoubleType(), nullable=False),
    StructField("liveness", DoubleType(), nullable=False),
    StructField("valence", DoubleType(), nullable=False),
    StructField("tempo", DoubleType(), nullable=False),
    StructField("time_signature", LongType(), nullable=False),
    StructField("track_genre", StringType(), nullable=False),
    StructField("country", StringType(), nullable=False)
])

antijoined_df_schema = StructType([
    StructField("name", StringType(), nullable=False),
    StructField("track_name", StringType(), nullable=False),
    StructField("popularity", LongType(), nullable=False),
    StructField("duration_ms", LongType(), nullable=False),
    StructField("explicit", BooleanType(), nullable=False),
    StructField("danceability", DoubleType(), nullable=False),
    StructField("energy", DoubleType(), nullable=False),
    StructField("key", LongType(), nullable=False),
    StructField("loudness", DoubleType(), nullable=False),
    StructField("mode", LongType(), nullable=False),
    StructField("speechiness", DoubleType(), nullable=False),
    StructField("acousticness", DoubleType(), nullable=False),
    StructField("instrumentalness", DoubleType(), nullable=False),
    StructField("liveness", DoubleType(), nullable=False),
    StructField("valence", DoubleType(), nullable=False),
    StructField("tempo", DoubleType(), nullable=False),
    StructField("time_signature", LongType(), nullable=False),
    StructField("track_genre", StringType(), nullable=False),
])

artist_df_schema = StructType([
    StructField("name", StringType(), nullable=False),
    StructField("country", StringType(), nullable=False)
])

In [22]:
print(joined_df.select([sfn.count(sfn.when(sfn.isnull(c), c)).alias(c) for c in joined_df.columns]).show())
print(antijoined_df.select([sfn.count(sfn.when(sfn.isnull(c), c)).alias(c) for c in antijoined_df.columns]).show())
print(artist_df_filtered.select([sfn.count(sfn.when(sfn.isnull(c), c)).alias(c) for c in artist_df_filtered.columns]).show())

+----+----------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+--------------+-----------+-------+
|name|track_name|popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo|time_signature|track_genre|country|
+----+----------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+--------------+-----------+-------+
|   0|         0|         0|          0|       0|           0|     0|  0|       0|   0|          0|           0|               0|       0|      0|    0|             0|          0|      0|
+----+----------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+--------------+-----------+-------+

None
+----+----------+----------+-----------+--------+-----



+----+-------+
|name|country|
+----+-------+
|   0|      0|
+----+-------+

None


                                                                                

In [30]:
joined_df_no_null_schema = spark.createDataFrame(joined_df.rdd, schema=joined_df_schema)
anti_joined_df_no_null_schema = spark.createDataFrame(antijoined_df.rdd, schema=antijoined_df_schema)
artist_df_filtered_no_null_schema = spark.createDataFrame(artist_df_filtered.rdd, schema=artist_df_schema)

joined_df_no_null_schema.printSchema()
anti_joined_df_no_null_schema.printSchema()
artist_df_filtered_no_null_schema.printSchema()

root
 |-- name: string (nullable = false)
 |-- track_name: string (nullable = false)
 |-- popularity: long (nullable = false)
 |-- duration_ms: long (nullable = false)
 |-- explicit: boolean (nullable = false)
 |-- danceability: double (nullable = false)
 |-- energy: double (nullable = false)
 |-- key: long (nullable = false)
 |-- loudness: double (nullable = false)
 |-- mode: long (nullable = false)
 |-- speechiness: double (nullable = false)
 |-- acousticness: double (nullable = false)
 |-- instrumentalness: double (nullable = false)
 |-- liveness: double (nullable = false)
 |-- valence: double (nullable = false)
 |-- tempo: double (nullable = false)
 |-- time_signature: long (nullable = false)
 |-- track_genre: string (nullable = false)
 |-- country: string (nullable = false)

root
 |-- name: string (nullable = false)
 |-- track_name: string (nullable = false)
 |-- popularity: long (nullable = false)
 |-- duration_ms: long (nullable = false)
 |-- explicit: boolean (nullable = false)

In [32]:
joined_df_no_null_schema.write.parquet("datasets/processed/joined_df.parquet")
anti_joined_df_no_null_schema.write.parquet("datasets/processed/antijoined_df.parquet")
artist_df_filtered_no_null_schema.write.parquet("datasets/processed/artist_df.parquet")

24/06/05 19:53:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/06/05 19:53:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/06/05 19:53:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/06/05 19:53:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/06/05 19:53:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/06/05 19:53:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/06/05 19:53:06 WARN MemoryManager: Total allocation exceeds 95.

In [33]:
spark.stop()