In [7]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("spotify-datalake") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "1024M") \
    .config("spark.ui.port", "4061") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
sc = spark.sparkContext

In [9]:
playlists_v1_path = '/shared/sampled/playlists_v1.json'
playlists_v1_df = spark.read.json(playlists_v1_path)
tracks_v1_path = '/shared/sampled/tracks_v1.json'
tracks_v1_df = spark.read.json(tracks_v1_path)

                                                                                

DataFrame[album_name: string, album_uri: string, artist_name: string, artist_uri: string, duration_ms: bigint, pid: bigint, pos: bigint, track_name: string, track_uri: string]

Task 1A:
- Silver layer:

In [12]:
from pyspark.sql import functions as F

song_info_df = tracks_v1_df.select(
    F.col("track_name").alias("name"),
    F.col("track_uri").alias("track_uri"),
    F.col("duration_ms").alias("duration_ms"),
    F.col("album_uri").alias("album_uri"),
    F.col("artist_uri").alias("artist_uri")
)

song_info_df.write.format("parquet").mode("overwrite").save("datalake/silver/song_info")

25/02/01 15:16:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

[Row(name='Buttons', track_uri='spotify:track:3BxWKCI06eQ5Od8TY2JBeA', duration_ms=225560, album_uri='spotify:album:5x8e8UcCeOgrOzSnDGuPye', artist_uri='spotify:artist:6wPhSqRtPu1UhRCDX5yaDJ'),
 Row(name='Say My Name', track_uri='spotify:track:7H6ev70Weq6DdpZyyTmUXk', duration_ms=271333, album_uri='spotify:album:283NWqNsCA9GwVHrJk59CG', artist_uri='spotify:artist:1Y8cdNmUJH7yBTd9yOvr5i'),
 Row(name='Hey Ya! - Radio Mix / Club Mix', track_uri='spotify:track:2PpruBYCo4H7WOBJ7Q2EwM', duration_ms=235213, album_uri='spotify:album:1UsmQ3bpJTyK6ygoOOjG1r', artist_uri='spotify:artist:1G9G7WwrXka3Z1r7aIDjI7'),
 Row(name='Promiscuous', track_uri='spotify:track:2gam98EZKrF9XuOkU13ApN', duration_ms=242293, album_uri='spotify:album:2yboV2QBcVGEhcRlYuPpDT', artist_uri='spotify:artist:2jw70GZXlAI8QzWeY2bgRc'),
 Row(name='Right Where You Want Me - Radio Edit Version', track_uri='spotify:track:4Y45aqo9QMa57rDsAJv40A', duration_ms=211693, album_uri='spotify:album:6022khQj4Fsvvse8f3A4lF', artist_uri='spo

In [14]:
album_info_df = tracks_v1_df.select(
    F.col("album_name").alias("name"),
    F.col("album_uri").alias("album_uri"),
    F.col("artist_uri").alias("artist_uri")
).distinct()

album_info_df.write.format("parquet").mode("overwrite").save("datalake/silver/album_info")

25/02/01 15:18:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [18]:
artist_info_df = tracks_v1_df.select(
    F.col("artist_name").alias("name"),
    F.col("artist_uri").alias("artist_uri")
).distinct()

artist_info_df.write.format("parquet").mode("overwrite").save("datalake/silver/artist_info")

In [27]:
playlist_info_df = playlists_v1_df.select(
    F.col("name").alias("name"),
    F.col("pid").alias("pid"),
    F.col("description").alias("description"),
    F.col("collaborative").alias("is_collaborative")
)

playlist_info_df.write.format("parquet").mode("overwrite").save("datalake/silver/playlist_info")

AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 4be036a0-5ddc-492e-8c36-0e3f68b77eee).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- name: string (nullable = true)
-- playlist_id: long (nullable = true)
-- description: string (nullable = true)
-- is_collaborative: string (nullable = true)


Data schema:
root
-- name: string (nullable = true)
-- pid: long (nullable = true)
-- description: string (nullable = true)
-- is_collaborative: string (nullable = true)

         
To overwrite your schema or change partitioning, please set:
'.option("overwriteSchema", "true")'.

Note that the schema can't be overwritten when using
'replaceWhere'.
         

In [22]:
playlist_tracks_df = tracks_v1_df.select(
    F.col("pid").alias("playlist_id"),
    F.col("pos").alias("position"),
    F.col("track_uri").alias("track_uri"),
    F.col("artist_uri").alias("artist_uri"),
    F.col("album_uri").alias("album_uri")
)

playlist_tracks_df.write.format("parquet").mode("overwrite").save("datalake/silver/playlist_tracks")

25/02/01 15:22:45 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

- Gold layer:

In [28]:
playlist_info_gold_df = tracks_v1_df.groupBy("pid").agg(
    F.count("track_uri").alias("num_tracks"),
    F.sum("duration_ms").alias("total_duration_ms"),
    F.countDistinct("artist_uri").alias("num_artists"),
    F.countDistinct("album_uri").alias("num_albums")
).join(playlist_info_df, "pid", "inner")

playlist_info_gold_df.write.format("parquet").mode("overwrite").save("datalake/gold/playlist_info")

                                                                                

In [35]:
song_info_df = song_info_df.withColumnRenamed("name", "song_name")
album_info_df = album_info_df.withColumnRenamed("name", "album_name")
artist_info_df = artist_info_df.withColumnRenamed("name", "artist_name")

playlist_tracks_gold_df = playlist_tracks_df.join(
    song_info_df, "track_uri", "inner"
).join(
    album_info_df, "album_uri", "inner"
).join(
    artist_info_df, "artist_uri", "inner"
).select(
    "playlist_id",
    "position",
    "song_name",
    "album_name",
    "artist_name"
)

Task 1B:

In [38]:
playlist_info_gold_df.write.format("json").mode("overwrite").save("datalake/gold_json/playlist_info")
playlist_tracks_gold_df.write.format("json").mode("overwrite").save("datalake/gold_json/playlist_tracks")

                                                                                

Tempo de carregamento (JSON): 0.3263206481933594 segundos


In [39]:
playlist_info_gold_df.write.format("parquet").mode("overwrite").save("datalake/gold_parquet/playlist_info")
playlist_tracks_gold_df.write.format("parquet").mode("overwrite").save("datalake/gold_parquet/playlist_tracks")

25/02/01 16:28:11 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/02/01 16:28:25 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

Tempo de carregamento (Parquet): 0.6755235195159912 segundos


In [42]:
import time

# tempo de load em json
start_time = time.time()
json_df = spark.read.json("datalake/gold_json/playlist_info")
json_df.count()
json_load_time = time.time() - start_time

print(f"Tempo de carregamento (JSON): {json_load_time} segundos")

# tempo de load em parquet
start_time = time.time()
parquet_df = spark.read.parquet("datalake/gold_parquet/playlist_info")
parquet_df.count()
parquet_load_time = time.time() - start_time

print(f"Tempo de carregamento (Parquet): {parquet_load_time} segundos")

Tempo de carregamento (JSON): 0.3448805809020996 segundos
Tempo de carregamento (Parquet): 0.2278423309326172 segundos


**Task 2**

In [None]:
bronze_path = "datalake/bronze/"
silver_path = "datalake/silver/"
gold_path = "datalake/gold_parquet/"

def ingest_new_data(version):
    playlists = spark.read.json(f"/shared/sampled/playlists_{version}.json")
    tracks = spark.read.json(f"/shared/sampled/tracks_{version}.json")

    playlists.write.mode("append").parquet(bronze_path + "playlists/")
    tracks.write.mode("append").parquet(bronze_path + "tracks/")

    return playlists, tracks

def update_silver_layer(playlists_v, tracks_v):
    
    song_info = spark.read.parquet(silver_path + "song_info")
    album_info = spark.read.parquet(silver_path + "album_info")
    artist_info = spark.read.parquet(silver_path + "artist_info")
    playlist_info = spark.read.parquet(silver_path + "playlist_info")
    playlist_tracks = spark.read.parquet(silver_path + "playlist_tracks")

    new_song_info = tracks_v.select(
        F.col("track_name").alias("name"),
        F.col("track_uri").alias("track_uri"),
        F.col("duration_ms").alias("duration_ms"),
        F.col("album_uri").alias("album_uri"),
        F.col("artist_uri").alias("artist_uri")
    ).distinct()
    song_info = song_info.unionByName(new_song_info, allowMissingColumns=True).dropDuplicates(["track_uri"])

    new_album_info = tracks_v.select(
        F.col("album_name").alias("name"),
        F.col("album_uri").alias("album_uri"),
        F.col("artist_uri").alias("artist_uri")
    ).distinct()
    album_info = album_info.unionByName(new_album_info, allowMissingColumns=True).dropDuplicates(["album_uri"])

    new_artist_info = tracks_v.select(
        F.col("artist_name").alias("name"),
        F.col("artist_uri").alias("artist_uri")
    ).distinct()
    artist_info = artist_info.unionByName(new_artist_info, allowMissingColumns=True).dropDuplicates(["artist_uri"])

    new_playlist_info = playlists_v.select(
        F.col("name").alias("name"),
        F.col("pid").alias("pid"),
        F.col("description").alias("description"),
        F.col("collaborative").alias("is_collaborative")
    ).distinct()
    playlist_info = playlist_info.unionByName(new_playlist_info, allowMissingColumns=True).dropDuplicates(["playlist_id"])

    new_playlist_tracks = tracks_v.select(
        F.col("pid").alias("playlist_id"),
        F.col("pos").alias("position"),
        F.col("track_uri").alias("track_uri"),
        F.col("artist_uri").alias("artist_uri"),
        F.col("album_uri").alias("album_uri")
    )
    playlist_tracks = playlist_tracks.unionByName(new_playlist_tracks, allowMissingColumns=True).dropDuplicates(["playlist_id", "track_uri"])

    playlist_info = playlist_info.withColumn(
        "name", F.when(F.col("playlist_id") == "11992", "GYM WORKOUT").otherwise(F.col("name"))
    ).withColumn(
        "collaborative", F.when(F.col("playlist_id") == "11992", True).otherwise(F.col("collaborative"))
    )

    song_info.write.mode("overwrite").parquet(silver_path + "song_info")
    album_info.write.mode("overwrite").parquet(silver_path + "album_info")
    artist_info.write.mode("overwrite").parquet(silver_path + "artist_info")
    playlist_info.write.mode("overwrite").parquet(silver_path + "playlist_info")
    playlist_tracks.write.mode("overwrite").parquet(silver_path + "playlist_tracks")

def update_gold_layer():
    playlist_info = spark.read.parquet(silver_path + "playlist_info")
    playlist_tracks = spark.read.parquet(silver_path + "playlist_tracks")

    playlists_gold = playlist_info.join(playlist_tracks, "playlist_id", "left") \
        .groupBy("playlist_id", "name", "description") \
        .agg(
            F.count("track_uri").alias("num_tracks"),
            F.sum("duration_ms").alias("total_duration_ms"),
            F.countDistinct("artist_uri").alias("num_artists"),
            F.countDistinct("album_uri").alias("num_albums")
        )

    playlist_tracks_gold = playlist_tracks.join(
        song_info_df, "track_uri", "inner"
    ).join(
        album_info_df, "album_uri", "inner"
    ).join(
        artist_info_df, "artist_uri", "inner"
    ).select(
        "playlist_id",
        "position",
        "song_name",
        "album_name",
        "artist_name"
    )

    playlists_gold.write.mode("overwrite").parquet(gold_path + "playlist_info")
    playlist_tracks_gold.write.mode("overwrite").parquet(gold_path + "playlist_tracks")


In [None]:
def run_pipeline(version_path):
    print(f"Iniciando ingestão dos dados de {version_path}...")
    playlists, tracks = ingest_new_data(version_path)

    print("Atualizando a camada Silver...")
    update_silver_layer(playlists, tracks)

    print("Atualizando a camada Gold...")
    update_gold_layer()

    print(f"Pipeline concluído para {version_path}!")

In [None]:
run_pipeline("v2")
run_pipeline("v3")