Extracting Data from Spotify_API

In [0]:
playlist_id=dbutils.widgets.text("playlist_id","")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, ArrayType , LongType , DoubleType  , BooleanType , TimestampType , IntegerType , DateType
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials


In [0]:
client_id=dbutils.secrets.get("Spotify_auth_details","client_id")
client_secret=dbutils.secrets.get("Spotify_auth_details","client_secret")

sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(client_id=client_id,
                                                            client_secret=client_secret))

try:
    token_info = sp.auth_manager.get_access_token(as_dict=True)
    if token_info and 'access_token' in token_info:
        print("SUCCESS: Access token retrieved successfully. Authentication is working.")
        print(f"Token will expire in {token_info.get('expires_in', 'N/A')} seconds.")
    else:
        print("FAILURE: Access token could NOT be retrieved. This is the root of your 403 error.")
        print("Double-check your Client ID and Client Secret on the Spotify Developer Dashboard.")
except spotipy.SpotifyException as e:
    print(f"SPOTIPY EXCEPTION during authentication test: {e}")
    print("This indicates a problem with your credentials or permissions.")
except Exception as e:
    print(f"GENERIC EXCEPTION during authentication test: {e}")

  token_info = sp.auth_manager.get_access_token(as_dict=True)


SUCCESS: Access token retrieved successfully. Authentication is working.
Token will expire in 3600 seconds.


Helper Functions

In [0]:

def get_all_playlist_items(spotipy_client: spotipy.Spotify, playlist_id: str) -> list:
    all_items = []
    offset = 0
    limit = 100
    while True:
        response = spotipy_client.playlist_items(playlist_id, limit=limit, offset=offset)
        if not response or not response.get('items'):
            break
        all_items.extend(response['items'])
        if len(response['items']) < limit: 
            break
        offset += limit
    return all_items

def get_batch_items(spotipy_client: spotipy.Spotify, func, item_ids: list, batch_size: int = 50) -> list:
    """Fetches items in batches from Spotify API using a given spotipy function."""
    all_results = []
    if not item_ids:
        return [] 

    for i in range(0, len(item_ids), batch_size):
        batch = item_ids[i:i + batch_size] 
        try:
            response = func(batch) # Make the API call with the batch of IDs
            if response:
                if func == spotipy_client.albums:
                    if 'albums' in response and response['albums']:
                        all_results.extend([item for item in response['albums'] if item is not None])
                elif func == spotipy_client.tracks:
                    if 'tracks' in response and response['tracks']:
                        all_results.extend([item for item in response['tracks'] if item is not None])
                elif func == spotipy_client.artists:
                    if 'artists' in response and response['artists']:
                        all_results.extend([item for item in response['artists'] if item is not None])
                else:
                    print(f"Warning: Unexpected response structure for {func.__name__}")
        except spotipy.SpotifyException as e:
            print(f"Error during batch call for {func.__name__} with IDs {batch}: {e}")
    return all_results

def process_playlists(playlist_id: str) -> F.DataFrame:
    
    try:        raw_playlist_items = get_all_playlist_items(sp, playlist_id)
    except ConnectionError as e:
        print(f"Connection error occurred: {e}")
        raw_playlist_items = []


    schema = StructType([
        StructField("added_at", StringType()), 
        StructField("track", StructType([  
            StructField("artists", ArrayType(StructType([
            StructField("id", StringType()),
            StructField("name", StringType()),
            ]))),
            StructField("album",StructType([
                StructField("album_type", StringType()),
                StructField("id", StringType()),
                StructField("name", StringType()),
                StructField("release_date", StringType()),
                StructField("total_tracks", LongType()),
           
                     
            ])),
            StructField("duration_ms", LongType()),
            StructField("id", StringType()),
            StructField("name", StringType()),
            StructField("popularity", LongType()),
            StructField("explicit", BooleanType()), 
        ])), 
        StructField("added_by", StructType([
            StructField("id", StringType()), 
            StructField("name", StringType())
        ]))
        ])
    
    if not raw_playlist_items:
        print("No items found for this playlist.")
        return spark.createDataFrame([],schema) 

    df = spark.createDataFrame(raw_playlist_items,schema)
    print("Playlist DataFrame created. Inferred Schema:")
    df.printSchema()
    return df


   


Transformation of data

In [0]:
playlistid=dbutils.widgets.get("playlist_id")

def get_most_popular_genres(playlist_id: str):
    """
    Retrieves the most popular genres from artists associated with tracks in a playlist.
    """
    tracks_df = process_playlists(playlist_id)

    if tracks_df.isEmpty():
        return spark.createDataFrame([], StructType([StructField("genre", StringType()), StructField("count", StringType())]))

    artists_details_df = get_artist_details_for_enrichment(tracks_df)
    
    artists_details_df.printSchema()
    artists_details_df.show(5, truncate=False) 

    if artists_details_df.isEmpty():
        print("No artist details found to extract genres from.")
        return spark.createDataFrame([], StructType([StructField("genre", StringType()), StructField("count", LongType())]))
    artists_details_df.printSchema()

   
    # An artist can have multiple genres, so we explode the 'genres' array.
    # Each genre becomes a new row for that artist.
    genres_df = artists_details_df.select(F.explode("genres").alias("genre")) \
                                  .filter(F.col("genre").isNotNull()) \
                                  .groupBy("genre").count() \
                                  .orderBy(F.desc("count")) 

    return genres_df

def get_artist_details_for_enrichment(tracks_df: F.DataFrame):
   
    if tracks_df.isEmpty():
        return spark.createDataFrame([], StructType([
            StructField("artist_id", StringType(), True),
            StructField("ArtistGenres", ArrayType(StringType()), True),
            StructField("ArtistPopularity", IntegerType(), True),
            StructField("ArtistFollowers", LongType(), True)
        ]))

    artist_ids_df = tracks_df.select(F.explode("track.artists").alias("artist")) \
                              .select(F.col("artist.id").alias("artist_id")) \
                              .filter(F.col("artist_id").isNotNull()) \
                              .dropDuplicates()

    artist_ids_list_row = artist_ids_df.select(F.collect_list("artist_id")).first()
    artist_ids_list = artist_ids_list_row[0] if artist_ids_list_row else []

    if not artist_ids_list:
        print("No artists found in the playlist to get genres from.")
        return spark.createDataFrame([], StructType([
            StructField("artist_id", StringType(), True),
            StructField("ArtistGenres", ArrayType(StringType()), True),
            StructField("ArtistPopularity", IntegerType(), True),
            StructField("ArtistFollowers", LongType(), True)
        ]))

    print(f"Fetching details for {len(artist_ids_list)} unique artists...")
    raw_artist_details = get_batch_items(sp, sp.artists, artist_ids_list)

    raw_artist_schema = StructType([
        StructField("id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("genres", ArrayType(StringType()), True),
        StructField("popularity", IntegerType(), True),
        StructField("followers", StructType([StructField("total", LongType(), True)]), True),
    ])

    artists_details_df = spark.createDataFrame([item for item in raw_artist_details if item is not None], schema=raw_artist_schema)

    artists_for_join_df = artists_details_df.select(
        F.col("id").alias("artist_id"),
        F.col("genres").alias("ArtistGenres"),
        F.col("popularity").alias("ArtistPopularity"),
        F.col("followers.total").alias("ArtistFollowers")
    ).filter(F.col("artist_id").isNotNull())

    return artists_for_join_df



In [0]:
def get_artists_name_ids(playlist_id: str):
    """
    Retrieves distinct artist names and IDs from a playlist.
    Simplifies the original posexplode logic for better readability and efficiency.
    """
    tracks_df = process_playlists(playlist_id)

    if tracks_df.isEmpty():
        return spark.createDataFrame([], StructType([StructField("artist_id", StringType(), True), StructField("artist_name", StringType(), True)]))

   
    artists_df = tracks_df.select(F.explode("track.artists").alias("artist")) \
                          .select(
                              F.col("artist.id").alias("artist_id"),
                              F.col("artist.name").alias("artist_name")
                          ) \
                          .filter(F.col("artist_id").isNotNull()) \
                          .dropDuplicates(["artist_id", "artist_name"]) 

   
    print("Artists Names and IDs DataFrame created. Inferred Schema:")
    artists_df.printSchema()
    return artists_df

def get_playlists(playlist_id: str):
    """
    Retrieves detailed and flattened playlist track information, matching the original output structure.
    """
    raw_tracks_df = process_playlists(playlist_id)
    if raw_tracks_df.isEmpty():
        empty_playlist_schema = StructType([
            StructField("album_type", StringType(), True),
            StructField("album_id", StringType(), True),
            StructField("album_name", StringType(), True),
            StructField("release_date", StringType(), True),
            StructField("total_tracks", LongType(), True),
            StructField("artist_id", StringType(), True), 
            StructField("artist_name", StringType(), True),
            StructField("duration_ms", LongType(), True),
            StructField("track_id", StringType(), True),
            StructField("track_name", StringType(), True),
            StructField("popularity", LongType(), True),
            StructField("track_number", LongType(), True),
            StructField("explicit", BooleanType(), True),

        ])
        return spark.createDataFrame([], empty_playlist_schema)

   
    # F.explode(F.col("track.artists.name")) is used to get one row per artist name per track.
    playlists_df = raw_tracks_df.select(
        F.col("track.album.album_type").alias("album_type"),
        F.col("track.album.id").alias("album_id"),
        F.col("track.album.name").alias("album_name"),
        F.col("track.album.release_date").alias("release_date"),
        F.col("track.album.total_tracks").alias("total_tracks"),
        F.explode(F.col("track.artists")).alias("exploded_artist"),
        F.col("track.duration_ms").alias("duration_ms"),
        F.col("track.id").alias("track_id"),
        F.col("track.name").alias("track_name"),
        F.col("track.popularity").alias("popularity"),
        F.col("track.explicit").alias("explicit")
         ).select( # Select again after explode to get specific artist fields
        F.col("album_type"),
        F.col("album_id"),
        F.col("album_name"),
        F.col("release_date"),
        F.col("total_tracks"),
        F.col("exploded_artist.name").alias("artist_name"),
        F.col("exploded_artist.id").alias("artist_id"),     
       F.col("duration_ms"),
        F.col("track_id"),
        F.col("track_name"),
        F.col("popularity"),
        F.col("explicit")
    )


    
    print("Flattened Playlist Tracks DataFrame created. Inferred Schema:")
    playlists_df.printSchema()
    return playlists_df


def get_albums(albums_ids: list):
    """
    Retrieves album details for a list of album IDs and flattens them.
    Uses batching for efficient API calls.
    """
    if not albums_ids:
        empty_album_track_schema = StructType([
            StructField("album_name", StringType(), True),
            StructField("release_date", StringType(), True),
            StructField("type", StringType(), True),
            StructField("artist_name", StringType(), True),
            StructField("track_id", StringType(), True),
            StructField("duration_ms", LongType(), True),
            StructField("popularity", LongType(), True),
            StructField("track_number", LongType(), True),

        ])
        return spark.createDataFrame([], empty_album_track_schema)

    print(f"Fetching details for {len(albums_ids)} unique albums...")
    raw_albums_data = get_batch_items(sp, sp.albums, albums_ids)

    if not raw_albums_data:
        print("No album details fetched.")
        return spark.createDataFrame([], empty_album_track_schema)

    album_schema = StructType([
    StructField("name", StringType(), True),
    StructField("release_date", StringType(), True),
    StructField("album_type", StringType(), True),
    StructField("artists", ArrayType(StructType([
        StructField("name", StringType(), True)
    ])), True),
    StructField("tracks", StructType([
        StructField("items", ArrayType(StructType([
            StructField("id", StringType(), True),
            StructField("duration_ms", LongType(), True),
            StructField("track_number", LongType(), True)

        ])), True)
    ]), True),
    StructField("popularity", LongType(), True),
    StructField("explicit", BooleanType(), True), 

])

    albums_df = spark.createDataFrame(raw_albums_data,album_schema)
    print("Albums Details DataFrame created. Inferred Schema:")
    albums_df.printSchema()

    # Flatten the nested track items within each album
    albums_flattened_df = albums_df.select(
        F.col("name").alias("album_name"),
        F.col("release_date"),
        F.col("album_type").alias("type"),
        F.element_at(F.col("artists.name"), 1).alias("artist_name"),
        F.explode(F.col("tracks.items")).alias("track_item"),
        F.col("popularity")
    ).select(
        F.col("album_name"),
        F.col("release_date"),
        F.col("type"),
        F.col("artist_name"),
        F.col("track_item.id").alias("track_id"),
        F.col("track_item.duration_ms").alias("duration_ms"),
        F.col("track_item.track_number").alias("track_number"),
        F.col("popularity")
    ).dropDuplicates(["track_id"])

    print("Flattened Albums DataFrame created. Inferred Schema:")
    albums_flattened_df.printSchema()
    return albums_flattened_df

def get_albums_from_playlist(playlist_id: str):
    """
    Based on the playlist ID, retrieves a list of all album IDs,
    and then returns all the tracks from those albums.
    """
    playlist_tracks_df = get_playlists(playlist_id)
    if playlist_tracks_df.isEmpty():
        empty_album_track_schema = StructType([
            StructField("album_name", StringType(), True),
            StructField("release_date", StringType(), True),
            StructField("type", StringType(), True),
            StructField("artist_name", StringType(), True),
            StructField("track_id", StringType(), True),
            StructField("popularity", LongType(), True),
        ])
        return spark.createDataFrame([], empty_album_track_schema)
    
    display(playlist_tracks_df)

    album_ids = [row.album_id for row in playlist_tracks_df.select("album_id").distinct().collect()]
    album_ids = [aid for aid in album_ids if aid is not None] 

    if not album_ids:
        print("No unique album IDs found in the playlist.")
        return spark.createDataFrame([], empty_album_track_schema)

    print(f"Found {len(album_ids)} unique album IDs. Fetching album tracks...")
    albums_df = get_albums(album_ids)
    
    return albums_df



In [0]:
def get_tracks(track_lists: list):
    if not track_lists:
        return spark.createDataFrame([], StructType([StructField("id", StringType(), True), StructField("name", StringType(), True)]))

    print(f"Fetching details for {len(track_lists)} unique tracks...")
    raw_tracks_data = get_batch_items(sp, sp.tracks, track_lists)

    if not raw_tracks_data:
        print("No track details fetched.")
        return spark.createDataFrame([], StructType([StructField("id", StringType(), True), StructField("name", StringType(), True)]))

    track_schema=StructType([
        StructField("id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("popularity", StringType(), True),
        StructField("duration_ms", StringType(), True),
        StructField("explicit", StringType(), True),
        StructField("track_number", StringType(), True)
    ])


    df_tracks = spark.createDataFrame(raw_tracks_data,track_schema)
    print("Detailed Tracks DataFrame created. Inferred Schema:")
    df_tracks.printSchema()
    return df_tracks


def get_tracks_list(playlist_id: str):
    """
    Retrieves a Python list of unique track IDs from albums associated with a playlist.
    """
    
    albums_from_playlist_df = get_albums_from_playlist(playlist_id)

    if albums_from_playlist_df.isEmpty():
        return [] 
    
    tracks_list = [row.track_id for row in albums_from_playlist_df.select("track_id").distinct().collect()]

    tracks_list = [tid for tid in tracks_list if tid is not None]
    
    return tracks_list


In [0]:

def get_playlist_album_tracks_details(playlist_id: str):
    """
    Combines album-level track information with detailed track-level information
    for all tracks from albums associated with a given playlist.
    """
    albums_df = get_albums_from_playlist(playlist_id)

    empty_schema = StructType([ 
        StructField("album_name", StringType(), True),
        StructField("release_date", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("artist_id", StringType(), True),
        StructField("track_id", StringType(), True),
        StructField("track_name", StringType(), True),
        StructField("duration_seconds", DoubleType(), True),
        StructField("popularity", LongType(), True),
        StructField("track_number", LongType(), True),
        StructField("Explicit", BooleanType(), True), 
        StructField("ArtistGenres", ArrayType(StringType()), True), 
        StructField("ArtistPopularity", IntegerType(), True), 
        StructField("ArtistFollowers", LongType(), True), 
        StructField("AlbumType", StringType(), True), 
        StructField("AlbumPopularity", IntegerType(), True), 
        StructField("ReleaseYear", IntegerType(), True), 
        StructField("ReleaseMonth", IntegerType(), True), 
        StructField("ReleaseDayOfWeek", IntegerType(), True), 
        StructField("ReleaseDecade", IntegerType(), True)
        ])

    if albums_df.isEmpty():
        return spark.createDataFrame([], empty_schema)

    print("\n--- Schema of final_combined_df before final select ---")
    albums_df.printSchema()
    print("\n--- Sample data from final_combined_df (first 5 rows) ---")
    albums_df.show(5, truncate=False)

    tracks_list = get_tracks_list(playlist_id) 

    if not tracks_list: 
        print("No track IDs found for detailed track fetching.")
        return spark.createDataFrame([], empty_schema)

    tracks_details_df = get_tracks(tracks_list)
    if tracks_details_df.isEmpty(): 
        print("No detailed track information fetched.")
        return spark.createDataFrame([], empty_schema)
    
    print("\n--- Schema of tracks_details_df ---")
    tracks_details_df.printSchema()

    base_tracks_from_playlist = process_playlists(playlist_id) 

    artist_details_df = get_artist_details_for_enrichment(base_tracks_from_playlist)
    album_details_df = albums_df.select(
        F.col("track_id"), 
        F.col("type").alias("AlbumType"),
        F.col("popularity").alias("AlbumPopularity")
    ).distinct() 

    print("\n--- Schema of artist_details_df ---")
    artist_details_df.printSchema()

    print("\n--- Schema of album_details_df ---")
    album_details_df.printSchema()
    
    tracks_details_df_renamed = tracks_details_df.select(
        F.col("id").alias("track_id_from_details"), 
        F.col("name").alias("track_name_from_details"),
        F.col("popularity").alias("track_popularity_from_details"), 
        F.col("explicit").alias("explicit_from_details"),
        F.col("duration_ms").alias("duration_ms_from_details"),
        F.col("track_number").alias("track_number_from_details")
    )
    
    print("\n--- DEBUG: Schema of tracks_details_df_renamed before join ---")
    tracks_details_df_renamed.printSchema()
    tracks_details_df_renamed.show(5, truncate=False)

    albums_tracks_joined = albums_df.join(
        tracks_details_df_renamed,
        (albums_df["track_id"] == tracks_details_df_renamed["track_id_from_details"]),
        how='inner'
    )

    
    final_combined_df = albums_tracks_joined.join(
        base_tracks_from_playlist.select(
            F.col("track.id").alias("original_track_id"),
            F.explode("track.artists").alias("artist_exploded"), 
            F.col("track.explicit").alias("Explicit_Flag") 
        ),
        (albums_tracks_joined["track_id"] == F.col("original_track_id")),
        how='left'
    ).drop("original_track_id")
     
    final_combined_df = final_combined_df.join(
        artist_details_df,
        (final_combined_df["artist_exploded.id"] == artist_details_df["artist_id"]),
        how='left'
    ).drop("artist_exploded.id")
    
    print("\n--- Schema of final_combined_df before final select ---")
    final_combined_df.printSchema()
    print("\n--- Sample data from final_combined_df (first 5 rows) ---")
    final_combined_df.show(5, truncate=False)
    
    

    albums_tracks_joined_final = (
        final_combined_df
        .select(
            F.col("album_name").alias("AlbumName"),
            F.col("release_date").cast(DateType()).alias("ReleaseDate"),
            F.col("artist_exploded.name").alias("ArtistName"), 
            F.col("artist_exploded.id").alias("ArtistId"),
            F.col("track_id").alias("TrackId"),
            F.col("track_name_from_details").alias("TrackName"), 
            (F.col("duration_ms") / 1000).alias('DurationSeconds'),
            F.from_unixtime(F.col("duration_ms") / 1000, 'mm:ss').alias('Duration'), 
            final_combined_df["popularity"].alias("Popularity"), 
            F.col("track_number"),
            F.col("Explicit_Flag").alias("Explicit"), 
            F.col("ArtistGenres"),
            F.col("ArtistPopularity"),
            F.col("ArtistFollowers"),
            F.col("type").alias("AlbumType"), 
            albums_df["popularity"].alias("AlbumPopularity"), 
            F.year(F.col("release_date")).alias("ReleaseYear"),
            F.month(F.col("release_date")).alias("ReleaseMonth"),
            F.dayofweek(F.col("release_date")).alias("ReleaseDayOfWeek"),
            (F.floor(F.year(F.col("release_date")) / 10) * 10).alias("ReleaseDecade")
        )
    )
    print("Combined Albums and Tracks DataFrame created. Inferred Schema:")
    albums_tracks_joined_final.printSchema()
    return albums_tracks_joined_final
 


Loading the Data

In [0]:
%sql

CREATE DATABASE IF NOT EXISTS spotify;


CREATE TABLE IF NOT EXISTS spotify.
full_albums_enriched (
  AlbumName STRING,
  ReleaseDate DATE,
  ArtistName STRING,
  ArtistId STRING,          
  TrackId STRING,
  TrackName STRING,
  DurationSeconds DECIMAL(8, 2),
  Duration STRING,
  Popularity INT,          
  TrackNumber INT,
  Explicit BOOLEAN,         
  ArtistGenres ARRAY<STRING>,
  ArtistPopularity INT,    
  ArtistFollowers BIGINT,   
  AlbumType STRING,         
  AlbumPopularity INT,      
  ReleaseYear INT,
  ReleaseMonth INT,
  ReleaseDayOfWeek INT,
  ReleaseDecade INT
) PARTITIONED BY (ArtistName);

In [0]:
full_albums_from_playlist_enriched = get_playlist_album_tracks_details(playlistid)

full_albums_from_playlist_enriched.createOrReplaceTempView("full_album_enriched_temp_view") 

full_albums_from_playlist_enriched.write.mode("overwrite").insertInto("spotify.full_albums_enriched")
print("Data loaded into spotify.full_albums_enriched.")

Playlist DataFrame created. Inferred Schema:
root
 |-- added_at: string (nullable = true)
 |-- track: struct (nullable = true)
 |    |-- artists: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |-- album: struct (nullable = true)
 |    |    |-- album_type: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- release_date: string (nullable = true)
 |    |    |-- total_tracks: long (nullable = true)
 |    |-- duration_ms: long (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- popularity: long (nullable = true)
 |    |-- explicit: boolean (nullable = true)
 |-- added_by: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)

Flattened Playlist Tracks DataFrame created. Infe

album_type,album_id,album_name,release_date,total_tracks,artist_name,artist_id,duration_ms,track_id,track_name,popularity,explicit
compilation,2pANdqPvxInB0YvcDiw4ko,Progressive Psy Trance Picks Vol.8,2012-04-02,20,Odiseo,6eSdhw46riw2OUHgMwR8B5,376000,4rzfv0JLZfVhOhbSQ8o5jZ,Api,1,False
compilation,6nlfkk5GoXRL1nktlATNsy,Wellness & Dreaming Source,2015-01-09,25,Vlasta Marek,5VQE4WOzPu9h3HnGLuBoA6,730066,5o3jMYOSbaVz3tkgwhELSV,Is,0,False
album,4hnqM0JK4CM1phwfq1Ldyz,This Is Happening,2010-05-17,9,LCD Soundsystem,066X20Nz7iquqkkCW6Jxy6,401440,4Cy0NHJ8Gh0xMdwyM9RkQm,All I Want,49,False
album,2usKFntxa98WHMcyW6xJBz,Glenn Horiuchi Trio / Gelenn Horiuchi Quartet: Mercy / Jump Start / Endpoints / Curl Out / Earthworks / Mind Probe / Null Set / Another Space (A),2011-04-01,8,Glenn Horiuchi,1cZW3NKv2zT57wB9ZFsCSw,358760,6hvFrZNocdt2FcKGCSY5NI,Endpoints,0,False
album,2usKFntxa98WHMcyW6xJBz,Glenn Horiuchi Trio / Gelenn Horiuchi Quartet: Mercy / Jump Start / Endpoints / Curl Out / Earthworks / Mind Probe / Null Set / Another Space (A),2011-04-01,8,Glenn Horiuchi Trio,272ArH9SUAlslQqsSgPJA2,358760,6hvFrZNocdt2FcKGCSY5NI,Endpoints,0,False
album,0ivM6kSawaug0j3tZVusG2,All The Best (Spanish Version),2007-01-01,18,Zucchero,2KftmGt9sk1yLjsAoloC3M,176093,2E2znCPaS8anQe21GLxcvJ,You Are So Beautiful,0,False


Found 5 unique album IDs. Fetching album tracks...
Fetching details for 5 unique albums...
Albums Details DataFrame created. Inferred Schema:
root
 |-- name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- album_type: string (nullable = true)
 |-- artists: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |-- tracks: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- duration_ms: long (nullable = true)
 |    |    |    |-- track_number: long (nullable = true)
 |-- popularity: long (nullable = true)
 |-- explicit: boolean (nullable = true)

Flattened Albums DataFrame created. Inferred Schema:
root
 |-- album_name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- type: string (nullable = true)
 |-- artist_name: string (nullable = true)


album_type,album_id,album_name,release_date,total_tracks,artist_name,artist_id,duration_ms,track_id,track_name,popularity,explicit
compilation,2pANdqPvxInB0YvcDiw4ko,Progressive Psy Trance Picks Vol.8,2012-04-02,20,Odiseo,6eSdhw46riw2OUHgMwR8B5,376000,4rzfv0JLZfVhOhbSQ8o5jZ,Api,1,False
compilation,6nlfkk5GoXRL1nktlATNsy,Wellness & Dreaming Source,2015-01-09,25,Vlasta Marek,5VQE4WOzPu9h3HnGLuBoA6,730066,5o3jMYOSbaVz3tkgwhELSV,Is,0,False
album,4hnqM0JK4CM1phwfq1Ldyz,This Is Happening,2010-05-17,9,LCD Soundsystem,066X20Nz7iquqkkCW6Jxy6,401440,4Cy0NHJ8Gh0xMdwyM9RkQm,All I Want,49,False
album,2usKFntxa98WHMcyW6xJBz,Glenn Horiuchi Trio / Gelenn Horiuchi Quartet: Mercy / Jump Start / Endpoints / Curl Out / Earthworks / Mind Probe / Null Set / Another Space (A),2011-04-01,8,Glenn Horiuchi,1cZW3NKv2zT57wB9ZFsCSw,358760,6hvFrZNocdt2FcKGCSY5NI,Endpoints,0,False
album,2usKFntxa98WHMcyW6xJBz,Glenn Horiuchi Trio / Gelenn Horiuchi Quartet: Mercy / Jump Start / Endpoints / Curl Out / Earthworks / Mind Probe / Null Set / Another Space (A),2011-04-01,8,Glenn Horiuchi Trio,272ArH9SUAlslQqsSgPJA2,358760,6hvFrZNocdt2FcKGCSY5NI,Endpoints,0,False
album,0ivM6kSawaug0j3tZVusG2,All The Best (Spanish Version),2007-01-01,18,Zucchero,2KftmGt9sk1yLjsAoloC3M,176093,2E2znCPaS8anQe21GLxcvJ,You Are So Beautiful,0,False


Found 5 unique album IDs. Fetching album tracks...
Fetching details for 5 unique albums...
Albums Details DataFrame created. Inferred Schema:
root
 |-- name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- album_type: string (nullable = true)
 |-- artists: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |-- tracks: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- duration_ms: long (nullable = true)
 |    |    |    |-- track_number: long (nullable = true)
 |-- popularity: long (nullable = true)
 |-- explicit: boolean (nullable = true)

Flattened Albums DataFrame created. Inferred Schema:
root
 |-- album_name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- type: string (nullable = true)
 |-- artist_name: string (nullable = true)


In [0]:
top_5_longest_albums = spark.sql("""
               SELECT
                    AlbumName,
                    SUM(DurationSeconds) AS AlbumDuration
                FROM
                    spotify.full_albums_enriched 
                GROUP BY
                    AlbumName
                ORDER BY AlbumDuration DESC
                LIMIT 5
                """)

albums_with_most_popular_songs = spark.sql("""
                                           SELECT
                                                AlbumName,
                                                CASE
                                                    WHEN
                                                        Popularity BETWEEN 0 AND 40 THEN 'NotPopular'
                                                    WHEN
                                                        Popularity BETWEEN 41 AND 100 THEN 'Popular'
                                                    ELSE NULL
                                                END AS PopularityCategory 
                                            FROM
                                                spotify.full_albums_enriched 
                                           """)

albums_per_popularity = (albums_with_most_popular_songs
    .groupBy("AlbumName")
    .pivot("PopularityCategory")
    .agg(F.count("PopularityCategory"))
    .na.fill(0)
)


albums_by_artist_popularity = spark.sql("""
                            SELECT
                                AlbumName,
                                CASE
                                    WHEN ArtistPopularity BETWEEN 0 AND 40 THEN 'LessPopularArtists'
                                    WHEN ArtistPopularity BETWEEN 41 AND 70 THEN 'MediumPopularArtists'
                                    WHEN ArtistPopularity BETWEEN 71 AND 100 THEN 'HighlyPopularArtists'
                                    ELSE 'Unknown'
                                END AS ArtistPopularityCategory
                            FROM
                                spotify.full_albums_enriched 
                            """)

albums_per_artist_popularity = (albums_by_artist_popularity
    .groupBy("AlbumName")
    .pivot("ArtistPopularityCategory")
    .agg(F.count("ArtistPopularityCategory"))
    .na.fill(0)
)


tracks_per_genre = spark.sql("""
                               SELECT
                                    genre_exploded AS Genre,
                                    COUNT(TrackId) AS NoOfTracksInGenre
                               FROM
                                    spotify.full_albums_enriched
                               LATERAL VIEW EXPLODE(ArtistGenres) exploded_genres AS genre_exploded -- Use new ArtistGenres
                               WHERE genre_exploded IS NOT NULL
                               GROUP BY
                                    genre_exploded
                               ORDER BY NoOfTracksInGenre DESC
                              """)





In [0]:
top_5_longest_albums.write.option("header", "true").mode('overwrite').saveAsTable('spotify.top_5_longest_albums')
albums_per_popularity.write.option("header", "true").mode('overwrite').saveAsTable('spotify.albums_per_popularity')

albums_per_artist_popularity.write.option("header", "true").mode('overwrite').saveAsTable('spotify.albums_per_artist_popularity')

tracks_per_genre.write.option("header", "true").mode('overwrite').saveAsTable('spotify.tracks_per_genre')

