In [1]:
# require to wrap all our import libraries within the spotify function 
from airflow.decorators import dag, task
from airflow import DAG
from datetime import datetime
from datetime import timedelta
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago 
import pathlib
import pandas as pd 
import sqlalchemy 
import configparser as ConfigParser
import spotipy
from spotipy.oauth2 import SpotifyOAuth

In [2]:
def check_if_valid_data(df:pd.DataFrame) -> bool:
        # check if dataframe is empty, it means there were no songs listened to
        if df.empty:
            print('No songs downloaded. Finishing execution')
            return False 

        # Primary key check 
        if pd.Series(df['played_at_list']).is_unique:       # this check helps us understand that there are no duplicate rows in our database
            pass 
        else: 
            raise Exception("Primary Key check if violated")  # here pipeline fail, maybe can send email to me
        yesterday = datetime.now() - timedelta(days = 1)
        # just check yesterday's date at 0 hour, 0 minute, 0 second and 0 microseconds
        yesterday = yesterday.replace(hour = 0,minute = 0,second =0,microsecond =0 )
        timestamps = df['timestamps'].tolist()

        for timestamp in timestamps:
            # strptime --> converts string to time
             # if we catch records that are not yesterday, we want the pipeline to raise exception 
            if datetime.strptime(timestamp,"%Y-%m-%d")!= yesterday:
                raise Exception("At least one of the songs does not come within last 24 hours")
        return True

## Spotipy documentation: 

https://spotipy.readthedocs.io/en/master/#

In [3]:
client_id = 'bf76bf609f1a4be6b656c55e4d77abd4'
client_secret = '8949a39342a74eddb3405b7a4f747852'

In [4]:
redirect_uri = 'https://localhost:8888/callback/'
scope= "user-read-recently-played"

In [13]:
current_directory = !pwd

In [14]:
# CURRENT_PATH_DIR= pathlib.Path(!pwd).absolute()
GRANDPARENT_PATH = pathlib.Path(current_directory[0])
CONF_PATH = GRANDPARENT_PATH.joinpath("configuration/config.ini")
cf_parser= ConfigParser.ConfigParser()
cf_parser.read(CONF_PATH)


['/home/maxong/spotify_airflow/configuration/config.ini']

In [15]:
    # TOKEN = cf_parser.get('spotify_details','token')
WEBSITE = cf_parser.get('spotify_details','website')
CLIENT_ID = cf_parser.get('spotify_details','client_id')
CLIENT_SECRET = cf_parser.get('spotify_details','client_secret')
REDIRECT_URL = cf_parser.get('spotify_details','redirect_url')
SCOPE = cf_parser.get('spotify_details','scope')
DB_LOCATION = cf_parser.get('database','db_location_postgres')

In [16]:
config_dictionary = {
'website':WEBSITE,
'client_id':CLIENT_ID,
'client_secret':CLIENT_SECRET,
'redirect_url':REDIRECT_URL,
'scope':SCOPE,
'db_location':DB_LOCATION
}

## Following this stackoverflow to auto refresh the token 

https://stackoverflow.com/questions/48883731/refresh-token-spotipy#:~:text=So%20it%20will%20be%20refreshed,access%20token%20%2F%20refresh%20token%20previously.

In [17]:
def create_spotify_api_details(config_dictionary):
    auth_manager=SpotifyOAuth(scope=config_dictionary['scope'],
                                client_id =config_dictionary['client_id'] ,
                                client_secret = config_dictionary['client_secret'],
                                redirect_uri = config_dictionary['redirect_url'])
    spotify = spotipy.Spotify(auth_manager=auth_manager)
    return auth_manager,spotify

   

In [18]:
def refresh_spotify_api_details(auth_manager, spotify,config_dictionary):
   token_info = auth_manager.cache_handler.get_cached_token()
   if auth_manager.is_token_expired(token_info):
       auth_manager, spotify = create_spotify(config_dictionary)
   return auth_manager, spotify

In [19]:
today = datetime.now().replace(hour = 0,second = 0,minute =0,microsecond=0)

# because everyday we want to see the songs we've listed to for the 
# previous 24 hrs
yesterday = today - timedelta(days =1)
# unix timestamp in miliseconds, that's why need to * 1000
yesterday_unix_timestamp = int(yesterday.timestamp()) * 1000

In [20]:
yesterday = today - timedelta(days =1)
yesterday_unix_timestamp = int(yesterday.timestamp()) * 1000

In [21]:
today_unix_timestamp= int(today.timestamp() * 1000)

In [22]:
yesterday_unix_timestamp

1666540800000

In [23]:
today.strftime("%Y-%m-%d")

'2022-10-25'

In [24]:
auth_manager, spotify = create_spotify_api_details(config_dictionary)


In [25]:
auth_manager, spotify = refresh_spotify_api_details(auth_manager, spotify,config_dictionary)
data = spotify.current_user_recently_played(after =yesterday_unix_timestamp)

In [62]:
data['items'][0]['played_at'][:10]

'2022-10-25'

In [26]:
while True:
        auth_manager, spotify = refresh_spotify_api_details(auth_manager, spotify,config_dictionary)
        data = spotify.current_user_recently_played(after =yesterday_unix_timestamp)

        try:
            # if python not equals to zero 
            if data['items'] != []:
                artist_name = []
                song_names = []
                played_at_list = []
                timestamps = []         
                for i in data['items']:
                    if i['played_at'][:10]!= today.strftime("%Y-%m-%d"):
                        song_names.append(i['track']['name'])
                        played_at_list.append(i['played_at'])
                        timestamps.append(i['played_at'][:10])
                        artist_name.append(i['track']['artists'][0]['name'])            
                # data is in dataframe format now
                songs_table = pd.DataFrame([played_at_list,timestamps,artist_name,song_names]).T
                songs_table.columns = ['played_at_list','timestamps','artist_name','song_names']            

                # update database
                # update_database(DB_LOCATION,songs_table)
                print(songs_table)

                # once you updated the database, break out of the loop
                break
            # if there are no data that day, don't even open up the database, skip that day    
            else:
                print("No songs played yesterday")
                # if there are no songs, break out 
                break 
        except:
            print("Error with database or spotify data returned")

             played_at_list  timestamps      artist_name  \
0  2022-10-24T15:06:30.612Z  2022-10-24  Hillsong UNITED   
1  2022-10-24T12:27:41.508Z  2022-10-24   Olivia Rodrigo   

                                song_names  
0  Oceans (Where Feet May Fail) - Acoustic  
1             1 step forward, 3 steps back  


In [27]:
songs_table

Unnamed: 0,played_at_list,timestamps,artist_name,song_names
0,2022-10-24T15:06:30.612Z,2022-10-24,Hillsong UNITED,Oceans (Where Feet May Fail) - Acoustic
1,2022-10-24T12:27:41.508Z,2022-10-24,Olivia Rodrigo,"1 step forward, 3 steps back"


In [65]:
check_if_valid_data(songs_table)

True

In [77]:
import sqlalchemy

In [78]:
engine = sqlalchemy.create_engine('postgresql://dev:pandaburp94*@localhost:5433/postgres')


In [79]:
    sql_query = """
    CREATE TABLE IF NOT EXISTS james_played_tracks(
        played_at_list VARCHAR(200), 
        timestamps VARCHAR(200),
        artist_name VARCHAR(200), 
        song_names VARCHAR(200),
        CONSTRAINT primary_key_constraint PRIMARY KEY (played_at_list)
    )
    """
    engine.execute(sql_query)

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f0c5aedeac0>

In [80]:
songs_table

Unnamed: 0,played_at_list,timestamps,artist_name,song_names
0,2022-10-24T15:06:30.612Z,2022-10-24,Hillsong UNITED,Oceans (Where Feet May Fail) - Acoustic
1,2022-10-24T12:27:41.508Z,2022-10-24,Olivia Rodrigo,"1 step forward, 3 steps back"


In [81]:
songs_table.to_sql(name = 'james_played_tracks',con = engine, if_exists= 'append',index = False)


ImportError: Unable to find a usable engine; tried using: 'sqlalchemy'.
A suitable version of sqlalchemy is required for sql I/O support.
Trying to import the above resulted in these errors:
 - Pandas requires version '1.4.16' or newer of 'sqlalchemy' (version '1.4.9' currently installed).

In [None]:
pip install sqlalchemy==1.4.16