In [13]:
import spotipy
from spotipy.oauth2 import SpotifyOAuth
import pandas as pd
import sqlalchemy
from pprint import pprint
from spotipy.oauth2 import SpotifyClientCredentials
from config import spotify_client_id,spotify_client_secret,spotify_redirect_url,db_username,db_password

In [2]:
# This is the Python file to extract the songs from Spotify, transform the data and then load it into PostgreSQL.
# It is placed into a function for my Airflow DAG to call
def spotify_etl_func():
    # Step one: extract:
    spotify_client_id=spotify_client_id
    spotify_client_secret=spotify_client_secret
    spotify_redirect_url=spotify_redirect_url

    sp = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id=spotify_client_id,
                                                   client_secret=spotify_client_secret,
                                                   redirect_uri=spotify_redirect_url,
                                                   scope="user-read-recently-played"))
    recently_played = sp.current_user_recently_played(limit=50)
    
    #if the length of recently_played is 0 for some reason just exit the program
    if len(recently_played) ==0:
        sys.exit("No results recieved from Spotify")
    # Step 2: Transform    
    #Creating the Album Data Structure:
    album_list = []
    for row in recently_played['items']:
        album_id = row['track']['album']['id']
        album_name = row['track']['album']['name']
        album_release_date = row['track']['album']['release_date']
        album_total_tracks = row['track']['album']['total_tracks']
        album_url = row['track']['album']['external_urls']['spotify']
        album_element = {'album_id':album_id,'album_name':album_name,'release_date':album_release_date,
                        'total_tracks':album_total_tracks,'album_url':album_url}
        album_list.append(album_element)
        
    #Creating the Artist Data Structure:
    #As we can see here this is another way to store data with using a dictionary of lists. Personally, for this project
    #I think using the strategy with the albums dicts(lists) is better. It allows for more functionality if we have to sort for example.
    # Additionally we do not need to make the temporary lists. There may be a more pythonic method to creating this but it is not my preferred method
    artist_dict = {}
    id_list = []
    name_list = []
    url_list = []
    for item in recently_played['items']:
        for key,value in item.items():
            if key == "track":
                for data_point in value['artists']:
                    id_list.append(data_point['id'])
                    name_list.append(data_point['name'])
                    url_list.append(data_point['external_urls']['spotify'])
    artist_dict = {'artist_id':id_list,'artist_name':name_list,'artist_url':url_list}
    
    
    
    #Creating the Track(Song) Data Structure:
    song_list = []
    for row in recently_played['items']:
        song_id = row['track']['id']
        song_name = row['track']['name']
        song_duration = row['track']['duration_ms']
        song_url = row['track']['external_urls']['spotify']
        song_popularity = row['track']['popularity']
        song_time_played = row['played_at']
        album_id = row['track']['album']['id']
        artist_id = row['track']['album']['artists'][0]['id']
        song_element = {'song_id':song_id,'song_name':song_name,'duration_ms':song_duration,'song_url':song_url,
                        'popularity':song_popularity,'date_time_played':song_time_played,'album_id':album_id,
                        'artist_id':artist_id
                       }
        song_list.append(song_element)
        
        
    #Now that we have these two lists and one dictionary ready lets convert them to DataFrames
    #We will need to do some cleaning and add our Unique ID for the Track
    #Then load into PostgresSQL from the dataframe

    #Album = We can also just remove duplicates here. We dont want to load two of the same albums just to have SQL drop it later
    album_df = pd.DataFrame.from_dict(album_list)
    album_df = album_df.drop_duplicates(subset=['album_id'])

    #Artist = We can also just remove duplicates here. We dont want to load two of the same artists just to have SQL drop it later
    artist_df = pd.DataFrame.from_dict(artist_dict)
    artist_df = artist_df.drop_duplicates(subset=['artist_id'])
    
    #Song Dataframe
    song_df = pd.DataFrame.from_dict(song_list)
    #date_time_played is an object (data type) changing to a timestamp
    song_df['date_time_played'] = pd.to_datetime(song_df['date_time_played'])
    #converting to my timezone of Central
    song_df['date_time_played'] = song_df['date_time_played'].dt.tz_convert('US/Central')
    #I have to remove the timezone part from the date/time/timezone.
    song_df['date_time_played'] = song_df['date_time_played'].astype(str).str[:-7]
    song_df['date_time_played'] = pd.to_datetime(song_df['date_time_played'])
    #Creating a Unix Timestamp for Time Played. This will be one half of our unique identifier
    song_df['UNIX_Time_Stamp'] = (song_df['date_time_played'] - pd.Timestamp("1970-01-01"))//pd.Timedelta('1s')
    # I need to create a new unique identifier column because we dont want to be insterting the same song played at the same song
    # I can have the same song multiple times in my database but I dont want to have the same song played at the same time
    song_df['unique_identifier'] = song_df['song_id'] + "-" + song_df['UNIX_Time_Stamp'].astype(str)
    song_df = song_df[['unique_identifier','song_id','song_name','duration_ms','song_url','popularity','date_time_played','album_id','artist_id']]
    song_df.to_csv("spotify_etl.csv")

    # Step 3: Load
    # Create Engine
    engine = sqlalchemy.create_engine(f"postgresql://{db_username}:{db_password}@localhost/spotify_etl")
    # Load table 1
    album_df.to_sql("spotify_albums",engine, index=False, if_exists="append")
    # Load table 2
    artist_df.to_sql("spotify_artists",engine, index=False, if_exists="append")
    # Load Table 3
    song_df.to_sql("spotify_tracks",engine, index=False, if_exists="append")
    
spotify_etl_func()


In [8]:
# This is the Python file to extract the daily top 50 songs in the US from Spotify, transform the data and then load it into PostgreSQL.
# It is placed into a function for my Airflow DAG to call

def spotify_top50usa_func(creator, playlist_id):
    
    spotify_client_id=spotify_client_id
    spotify_client_secret=spotify_client_secret
    spotify_redirect_url=spotify_redirect_url
    
    client_credentials_manager = SpotifyClientCredentials(client_id=spotify_client_id, client_secret=spotify_client_secret)
    sp = spotipy.Spotify(client_credentials_manager = client_credentials_manager)
    
    #step1 : create columns 
    playlist_features_list = ["artist_name","album_name","song_name",  "song_id","danceability","energy","key","loudness","mode", "speechiness","instrumentalness","liveness","valence","tempo", "duration_ms","time_signature"]
    # Create a DataFrame that will hold the columns
    playlist_df = pd.DataFrame(columns = playlist_features_list)
    
    #step2
    
    playlist = sp.user_playlist_tracks(creator, playlist_id, limit=50)["items"]
    for track in playlist:
        # Create empty dict
        playlist_features = {}
        # Get metadata
        playlist_features["artist_name"] = track["track"]["album"]["artists"][0]["name"]
        playlist_features["album_name"] = track["track"]["album"]["name"]
        playlist_features["song_name"] = track["track"]["name"]
        playlist_features["song_id"] = track["track"]["id"]
        
        # Get audio features
        audio_features = sp.audio_features(playlist_features["song_id"])[0]
        for feature in playlist_features_list[4:]:
            playlist_features[feature] = audio_features[feature]
        
        # Concat the dfs
        track_df = pd.DataFrame(playlist_features, index = [0])
        playlist_df = pd.concat([playlist_df, track_df], ignore_index = True)
        playlist_df["unique_identifier"] = playlist_df["song_id"]
        playlist_df = playlist_df[["unique_identifier", "artist_name","song_name", "song_id","danceability","energy","key","loudness","mode", "speechiness","instrumentalness","liveness","valence","tempo", "duration_ms","time_signature"]]
        playlist_df.to_csv("top50_songs_usa.csv")
        
    # Step 3: Load into PostgreSQL
    
    engine = sqlalchemy.create_engine(f"postgresql://{db_username}:{db_password}@localhost/spotify_etl")
    
    playlist_df.to_sql("usa_top_50",engine, index=False, if_exists="append")

spotify_top50usa_func("spotify","37i9dQZEVXbLRQDuF5jeBp")



In [None]:
try:
    spotify_top50usa_func("spotify","37i9dQZEVXbLRQDuF5jeBp")
except Exception as e:
    print('Something broke...', e)
finally:
    engine.dispose()
