In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,explode
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import StringType
from pyspark.sql.functions import split, explode, trim


In [20]:
hdfs_uri = "hdfs://namenode:8020/silver_layer/silver_artists_genres.parquet"
spark = SparkSession.builder.appName("Check Genres").getOrCreate()

In [21]:
df_genres = spark.read.parquet(hdfs_uri)

In [22]:
df_genres.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artists_genres: string (nullable = true)



In [23]:
df_genres.distinct().count()

5064

In [24]:
df_genres.select("name").where(col("name")=="Drake").show(5)

+-----+
| name|
+-----+
|Drake|
|Drake|
+-----+



In [25]:
hdfs_uri_feature_bronze = "hdfs://namenode:8020/bronze_layer/feature_music.parquet"
df_feature_bronze = spark.read.parquet(hdfs_uri_feature_bronze)
df_feature_bronze.count()

2000

In [26]:
df_feature_bronze.select("artist", "genre").where(col("artist") == "Drake").show(5, truncate=False)

+------+-----------------+
|artist|genre            |
+------+-----------------+
|Drake |hip hop, pop, R&B|
|Drake |hip hop, pop, R&B|
|Drake |hip hop, pop, R&B|
|Drake |hip hop, pop, R&B|
|Drake |hip hop, pop, R&B|
+------+-----------------+
only showing top 5 rows



In [27]:
df_Drake= df_genres.select("name", "artists_genres").where(col("name") == "Drake")

In [28]:
artists_feature_genres = df_feature_bronze \
    .select("artist", "genre") \
    .withColumn("genre", explode(split(col("genre"), ",\s*"))) \
    .distinct()

In [29]:
df_Drake_feature = artists_feature_genres.select("artist", "genre").where(col("artist") == "Drake")

In [30]:
print("df ở genres artists",df_genres.count())
print("genres ở Feature", artists_feature_genres.count())

df ở genres artists 5064
genres ở Feature 1507


In [31]:
df_Drake.printSchema()

root
 |-- name: string (nullable = true)
 |-- artists_genres: string (nullable = true)



In [32]:
df_Drake_feature.printSchema()

root
 |-- artist: string (nullable = true)
 |-- genre: string (nullable = false)



In [33]:
df_genres_1 = df_genres.select(col("name").alias("artist"), col("artists_genres").alias("genre"))
df_genres_2 = df_feature_bronze.select("artist", "genre")

In [40]:
df_combined = df_genres_1.union(df_genres_2)
df_cleaned = df_combined.withColumn("genre", explode(split(col("genre"), ",")))
df_cleaned = df_cleaned.withColumn("genre", trim(col("genre")))
df_no_id = df_cleaned.dropDuplicates(["artist", "genre"])
artist_id_map = df_genres.select(col("name").alias("artist"), col("artist_id")).distinct()
df_with_id = df_no_id.join(artist_id_map, on="artist", how="left")
df_with_id = df_with_id.select("artist_id", "artist", "genre").dropDuplicates(["artist_id", "genre"])
df_with_id = df_with_id.filter(col("artist_id").isNotNull())
df_with_id.show(truncate=False)

+----------------------+----------------------+---------------+
|artist_id             |artist                |genre          |
+----------------------+----------------------+---------------+
|00YTqRClk82aMchQQpYMd5|Our Last Night        |metalcore      |
|00YTqRClk82aMchQQpYMd5|Our Last Night        |post-hardcore  |
|00me4Ke1LsvMxt5kydlMyU|Cosculluela           |latin hip hop  |
|00me4Ke1LsvMxt5kydlMyU|Cosculluela           |reggaeton      |
|00me4Ke1LsvMxt5kydlMyU|Cosculluela           |trap latino    |
|00sCATpEvwH48ays7PlQFU|Jonita Gandhi         |bollywood      |
|00sCATpEvwH48ays7PlQFU|Jonita Gandhi         |desi           |
|00sCATpEvwH48ays7PlQFU|Jonita Gandhi         |hindi pop      |
|00sCATpEvwH48ays7PlQFU|Jonita Gandhi         |kollywood      |
|00sCATpEvwH48ays7PlQFU|Jonita Gandhi         |tamil dance    |
|00sCATpEvwH48ays7PlQFU|Jonita Gandhi         |tamil pop      |
|00tVTdpEhQQw1bqdu8RCx2|Blue Öyster Cult      |classic rock   |
|00tVTdpEhQQw1bqdu8RCx2|Blue Öyster Cult

In [42]:
df_with_id.select("artist").count()


5459

<h1> Prepare Gold Layer </h1>

In [36]:
artists = "hdfs://namenode:8020/silver_layer/silver_artists.parquet"
albums = "hdfs://namenode:8020/silver_layer/silver_album.parquet"
artists_genres = "hdfs://namenode:8020/silver_layer/silver_artists_genres.parquet"
feature_music = "hdfs://namenode:8020/silver_layer/silver_feature_music.parquet"
tracks = "hdfs://namenode:8020/silver_layer/tracks_data.parquet"

spark = SparkSession.builder.appName("Gold_Layer_Processing").getOrCreate()

In [37]:
artists_df = spark.read.parquet(artists)
albums_df = spark.read.parquet(albums)
artists_genres_df = spark.read.parquet(artists_genres)
feature_music_df = spark.read.parquet(feature_music)
tracks_df = spark.read.parquet(tracks)

In [10]:
artists_df.show(2, vertical=True, truncate=False)
albums_df.show(2, vertical=True, truncate=False)
artists_genres_df.show(2, vertical=True, truncate=False)
feature_music_df.show(2, vertical=True, truncate=False)
tracks_df.show(2, vertical=True, truncate=False)

-RECORD 0---------------------------------------------------------------------------------
 id                    | 00FQb4jTyendYWaN8pK0wa                                           
 name                  | Lana Del Rey                                                     
 popularity            | 91                                                               
 type                  | artist                                                           
 uri                   | spotify:artist:00FQb4jTyendYWaN8pK0wa                            
 artist_id             | 00FQb4jTyendYWaN8pK0wa                                           
 external_urls_artists | https://open.spotify.com/artist/00FQb4jTyendYWaN8pK0wa           
 followers number      | 45985411                                                         
 images_artists        | https://i.scdn.co/image/ab6761610000e5ebb99cacf8acd5378206767261 
-RECORD 1---------------------------------------------------------------------------------

In [None]:
artists_data.show(2, vertical=True, truncate=False)

<h1> gold_track_metadata </h1>

In [19]:
tracks = spark.read.parquet("hdfs://namenode:8020/silver_layer/tracks_data.parquet") \
                   .withColumnRenamed("name", "track")
albums = spark.read.parquet("hdfs://namenode:8020/silver_layer/silver_album.parquet")

artists = spark.read.parquet("hdfs://namenode:8020/silver_layer/silver_artists.parquet") \
                    .withColumnRenamed("name", "artist_name")
genre = spark.read.parquet("hdfs://namenode:8020/silver_layer/silver_artists_genres.parquet")

df = (
    tracks.join(albums, on="album_id", how="left")
          .join(artists, on="artist_id", how="left")
          .join(genre, on="artist_id", how="left")
)

df = df.select(
    "track_id", "track", "external_urls_tracks",
    "artists.artist_id", "artist_name", "followers number", "images_artists", "external_urls_artists",
    "album_id", "album_name", "release_date", "image_album", "external_urls_albums",
    "duration_ms", "explicit", "genre"
)


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `artists`.`artist_id` cannot be resolved. Did you mean one of the following? [`artist_id`, `artist_id`, `artist_name`, `images_artists`, `artist`].;
'Project [track_id#2380, track#2397, external_urls_tracks#2381, 'artists.artist_id, artist_name#2460, followers number#2449, images_artists#2450, external_urls_artists#2448, album_id#2368, album_name#2426, release_date#2419, image_album#2427, external_urls_albums#2425, duration_ms#2370L, explicit#2371, genre#2473]
+- Project [artist_id#2367, album_id#2368, disc_number#2369, duration_ms#2370L, explicit#2371, id#2372, is_local#2373, track#2397, popularity#2375, preview_url#2376, track_number#2377, type#2378, uri#2379, track_id#2380, external_urls_tracks#2381, album_type#2414, artist_id#2415, id#2416, label#2417, popularity#2418, release_date#2419, release_date_precision#2420, total_tracks#2421, type#2422, ... 14 more fields]
   +- Join LeftOuter, (artist_id#2367 = artist_id#2471)
      :- Project [artist_id#2367, album_id#2368, disc_number#2369, duration_ms#2370L, explicit#2371, id#2372, is_local#2373, track#2397, popularity#2375, preview_url#2376, track_number#2377, type#2378, uri#2379, track_id#2380, external_urls_tracks#2381, album_type#2414, artist_id#2415, id#2416, label#2417, popularity#2418, release_date#2419, release_date_precision#2420, total_tracks#2421, type#2422, ... 12 more fields]
      :  +- Join LeftOuter, (artist_id#2367 = artist_id#2447)
      :     :- Project [album_id#2368, artist_id#2367, disc_number#2369, duration_ms#2370L, explicit#2371, id#2372, is_local#2373, track#2397, popularity#2375, preview_url#2376, track_number#2377, type#2378, uri#2379, track_id#2380, external_urls_tracks#2381, album_type#2414, artist_id#2415, id#2416, label#2417, popularity#2418, release_date#2419, release_date_precision#2420, total_tracks#2421, type#2422, ... 4 more fields]
      :     :  +- Join LeftOuter, (album_id#2368 = album_id#2424)
      :     :     :- Project [artist_id#2367, album_id#2368, disc_number#2369, duration_ms#2370L, explicit#2371, id#2372, is_local#2373, name#2374 AS track#2397, popularity#2375, preview_url#2376, track_number#2377, type#2378, uri#2379, track_id#2380, external_urls_tracks#2381]
      :     :     :  +- Relation [artist_id#2367,album_id#2368,disc_number#2369,duration_ms#2370L,explicit#2371,id#2372,is_local#2373,name#2374,popularity#2375,preview_url#2376,track_number#2377,type#2378,uri#2379,track_id#2380,external_urls_tracks#2381] parquet
      :     :     +- Relation [album_type#2414,artist_id#2415,id#2416,label#2417,popularity#2418,release_date#2419,release_date_precision#2420,total_tracks#2421,type#2422,uri#2423,album_id#2424,external_urls_albums#2425,album_name#2426,image_album#2427] parquet
      :     +- Project [id#2442, name#2443 AS artist_name#2460, popularity#2444, type#2445, uri#2446, artist_id#2447, external_urls_artists#2448, followers number#2449, images_artists#2450]
      :        +- Relation [id#2442,name#2443,popularity#2444,type#2445,uri#2446,artist_id#2447,external_urls_artists#2448,followers number#2449,images_artists#2450] parquet
      +- Relation [artist_id#2471,artist#2472,genre#2473] parquet


In [38]:
print("artists_DataFrame Schema ", artists_df.printSchema())
print("albums_DataFrame Schema ", albums_df.printSchema())
print("artists_genres_DataFrame Schema ", artists_genres_df.printSchema())
print("feature_music_df_DataFrame Schema ", feature_music_df.printSchema())
print("tracks_df_DataFrame Schema ", tracks_df.printSchema())


root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- uri: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- external_urls_artists: string (nullable = true)
 |-- followers number: integer (nullable = true)
 |-- images_artists: string (nullable = true)

artists_DataFrame Schema  None
root
 |-- album_type: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- label: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- release_date: string (nullable = true)
 |-- release_date_precision: string (nullable = true)
 |-- total_tracks: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- uri: string (nullable = true)
 |-- album_id: string (nullable = true)
 |-- external_urls_albums: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- image_album: string (nullable = tru

In [48]:
from pyspark.sql.functions import collect_set, concat_ws

tracks_df = spark.read.parquet("hdfs://namenode:8020/silver_layer/tracks_data.parquet")
albums_df = spark.read.parquet("hdfs://namenode:8020/silver_layer/silver_album.parquet")
artists_df = spark.read.parquet("hdfs://namenode:8020/silver_layer/silver_artists.parquet")
artists_genres_df = spark.read.parquet("hdfs://namenode:8020/silver_layer/silver_artists_genres.parquet")

# Rename schema
artists_df = artists_df.selectExpr(
    "artist_id", 
    "id as artist_spotify_id", 
    "name as artist_name", 
    "popularity as popularity_artist", 
    "type as type_artist", 
    "uri as uri_artist", 
    "external_urls_artists", 
    "`followers number`", 
    "images_artists"
)

albums_df = albums_df.selectExpr(
    "album_id", 
    "artist_id as album_artists_id", 
    "id as album_spotify_id", 
    "album_name", 
    "album_type", 
    "release_date", 
    "label", 
    "popularity as popularity_album", 
    "image_album", 
    "external_urls_albums"
)

tracks_df = tracks_df.selectExpr(
    "track_id", 
    "album_id as track_album_id", 
    "artist_id as track_artist_id", 
    "name as track_name", 
    "duration_ms", 
    "explicit", 
    "external_urls_tracks"
)

artists_genres_df = artists_genres_df.selectExpr(
    "artist_id as artist_genres_id", 
    "genre"
)

artists_genres_df = artists_genres_df.groupBy("artist_genres_id") \
                                     .agg(concat_ws(", ", collect_set("genre")).alias("genres"))
# JOINs
df = (
    tracks_df
    .join(albums_df, tracks_df.track_album_id == albums_df.album_id, how="inner")
    .join(artists_df, tracks_df.track_artist_id == artists_df.artist_id, how="inner")
    .join(artists_genres_df, tracks_df.track_artist_id == artists_genres_df.artist_genres_id, how="inner")
)

# Select columns để xuất ra Gold Layer
df = df.select(
    "track_id", "track_name", "external_urls_tracks",
    "track_artist_id", "artist_name", "`followers number`", "images_artists", "external_urls_artists",
    "track_album_id", "album_name", "release_date", "image_album", "external_urls_albums",
    "duration_ms", "explicit", "genres"
)

In [49]:
df.count()

304177

In [50]:
sample_df = df.limit(1000).toPandas()
sample_df.head()


Unnamed: 0,track_id,track_name,external_urls_tracks,track_artist_id,artist_name,followers number,images_artists,external_urls_artists,track_album_id,album_name,release_date,image_album,external_urls_albums,duration_ms,explicit,genres
0,000KKisPBgKkWwRSxIzsnE,Beat of the Music,https://open.spotify.com/track/000KKisPBgKkWwR...,0qSX3s5pJnAlSsgsCne8Cz,Brett Eldredge,1484950,https://i.scdn.co/image/ab6761610000e5ebcb0906...,https://open.spotify.com/artist/0qSX3s5pJnAlSs...,6bn1TML338LShzJsuhxbj5,Bring You Back (10-Year Anniversary Edition),2023-08-17,https://i.scdn.co/image/ab67616d0000b273c8ae31...,https://open.spotify.com/album/6bn1TML338LShzJ...,179826,False,"country, christmas"
1,000V86A5s6gAGp73ucRnmD,Sunny (feat. Connor Price),https://open.spotify.com/track/000V86A5s6gAGp7...,54R6Y0I7jGUCveDTtI21nb,Boney M.,2190336,https://i.scdn.co/image/94f8fa49e6ac6748393ff0...,https://open.spotify.com/artist/54R6Y0I7jGUCve...,2B780cgk4dXO3GK2l9NBnn,Sunny (feat. Connor Price),2023-04-14,https://i.scdn.co/image/ab67616d0000b27397e450...,https://open.spotify.com/album/2B780cgk4dXO3GK...,192434,False,disco
2,001KDkkQqFsi5UnlntI63c,Deixa Acontecer / Brilho de Cristal / Toda Noi...,https://open.spotify.com/track/001KDkkQqFsi5Un...,6vTqEFbTtTRJsuIpzZgjxi,Grupo Menos É Mais,4639139,https://i.scdn.co/image/ab6761610000e5ebc054c6...,https://open.spotify.com/artist/6vTqEFbTtTRJsu...,4yw62KXxwjJT0wQMnLDGoi,Ao Vivo No Buteco,2019-03-29,https://i.scdn.co/image/ab67616d0000b2736cbba7...,https://open.spotify.com/album/4yw62KXxwjJT0wQ...,277259,False,"samba, pagode"
3,001UkMQHw4zXfFNdKpwXAF,Brand New Man - with Luke Combs,https://open.spotify.com/track/001UkMQHw4zXfFN...,0XKOBt59crntr7HQXXO8Yz,Brooks & Dunn,3147859,https://i.scdn.co/image/ab6761610000e5eb3ef457...,https://open.spotify.com/artist/0XKOBt59crntr7...,0U3nI78LUpAwprraUf7vAS,Reboot,2019-04-05,https://i.scdn.co/image/ab67616d0000b27304b298...,https://open.spotify.com/album/0U3nI78LUpAwprr...,191053,False,"acoustic country, country"
4,004SquvWQJVJh5IC8KdQJn,I Luv This Shit,https://open.spotify.com/track/004SquvWQJVJh5I...,19Fi1Rj7kk8kyiwxpXy3yM,August Alsina,2729803,https://i.scdn.co/image/ab6761610000e5eb45ffc8...,https://open.spotify.com/artist/19Fi1Rj7kk8kyi...,3FHEN0yaab8BxVHXHyaYkk,Testimony (Deluxe),2014-01-01,https://i.scdn.co/image/ab67616d0000b273a13175...,https://open.spotify.com/album/3FHEN0yaab8BxVH...,267093,True,r&b


In [31]:
feature_music_df.printSchema()

root
 |-- acousticness: double (nullable = true)
 |-- artist: string (nullable = true)
 |-- danceability: double (nullable = true)
 |-- duration_ms: long (nullable = true)
 |-- energy: double (nullable = true)
 |-- explicit: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- liveness: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- track: string (nullable = true)



In [52]:
feature_path = "hdfs://namenode:8020/silver_layer/silver_feature_music.parquet"
tracks_path = "hdfs://namenode:8020/silver_layer/tracks_data.parquet"
output_path = "hdfs://namenode:8020/gold_layer/gold_feature_matrix.parquet"

features_df = spark.read.parquet(feature_path).withColumnRenamed("track", "track_name")

tracks_df = spark.read.parquet(tracks_path).selectExpr(
    "track_id", "name as track_name", "artist_id as track_artist_id"
)

artists_df = spark.read.parquet("hdfs://namenode:8020/silver_layer/silver_artists.parquet").selectExpr(
    "artist_id", "name as artist_name"
)

tracks_joined = tracks_df.join(
    artists_df,
    tracks_df["track_artist_id"] == artists_df["artist_id"],
    how="left"
).select("track_id", "track_name", "artist_name")

df_joined = features_df.join(
    tracks_joined,
    on=(features_df["track_name"] == tracks_joined["track_name"]) &
       (features_df["artist"] == tracks_joined["artist_name"]),
    how="inner"
)

feature_cols = ['track_id', 'danceability', 'energy', 'loudness', 'speechiness',
                'acousticness', 'instrumentalness', 'liveness', 'valence',
                'tempo', 'duration_ms']

final_df = df_joined.select(feature_cols).dropDuplicates(["track_id"])

final_df.write.mode("overwrite").parquet(output_path)

final_df.show(5)


+--------------------+------------+------+--------+-----------+------------+----------------+--------+-------+-------+-----------+
|            track_id|danceability|energy|loudness|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|duration_ms|
+--------------------+------------+------+--------+-----------+------------+----------------+--------+-------+-------+-----------+
|0W4NhJhcqKCqEP2GI...|       0.688| 0.519|  -4.285|     0.0283|       0.064|             0.0|     0.1|  0.318|116.714|     255333|
|0iGR60UzkFoyAZ1uN...|       0.495| 0.894|  -4.814|     0.0441|     0.00453|         5.96E-4|   0.103|  0.213| 126.03|     239894|
|0qcjuYtMWhBjXg0Xw...|        0.94| 0.633|   -3.56|     0.0467|      0.0581|         4.04E-5|   0.281|  0.962|121.003|     248680|
|232puZVLpayvhEMel...|       0.729| 0.675|  -6.003|     0.0312|       0.175|         1.58E-6|    0.55|  0.779|119.968|     245866|
|2aibwv5hGXSgw7Yru...|       0.427|   0.9|  -3.674|     0.0499|       0.116|       

In [55]:
final_df.select("track_id").distinct().count()

3981

In [33]:
#gold_track_search_index
track_meta = spark.read.parquet("hdfs://namenode:8020/gold_layer/gold_track_metadata.parquet")
feature = spark.read.parquet("hdfs://namenode:8020/gold_layer/gold_feature_matrix.parquet")

search_index = (
    track_meta
    .join(feature, "track_id")
    .select("track_id", "track_name", "name", "album_name", "release_date", "genre", "danceability", "energy", "valence")
)

search_index.write.mode("overwrite").parquet("hdfs://namenode:8020/gold_layer/gold_track_search_index.parquet")
