In [1]:
import pandas as pd
from sqlalchemy import create_engine
import requests
import base64
import datetime
import os

In [2]:
def get_engine():
    db_params = {
        "url":os.getenv("DATASOURCE_URL").split("//")[1],
        "user": os.getenv("DATASOURCE_USERNAME"),
        "password": os.getenv("DATASOURCE_PASSWORD"),     
    }
    connection_string = "postgresql://{user}:{password}@{url}".format(**db_params)
    engine = create_engine(connection_string)
    return engine

In [3]:
def get_users(engine):
  users = pd.read_sql("SELECT * FROM users", engine)
  users['access_token'] = None
  return users

In [4]:
def add_access_tokens(users):
    refresh_token = users.loc[0,'refresh_token']
    client_id = "6a6355fbeb044695930d74e002d91214"
    client_secret = "2722c709f97248889ba35a4f33069ced"

    auth_string = f"{client_id}:{client_secret}"
    auth_bytes = auth_string.encode('utf-8')
    auth_base64 = base64.b64encode(auth_bytes).decode('utf-8')

    url = "https://accounts.spotify.com/api/token"
    headers = {
        "Content-Type": "application/x-www-form-urlencoded",
        "Authorization": f"Basic {auth_base64}"
    }
    for i, user in users.iterrows():
        data = {
            "grant_type": "refresh_token",
            "refresh_token": refresh_token,
        }
        response = requests.post(url, headers=headers, data=data)
        if response.status_code == 200:
            response_json = response.json()
            users.loc[0,'access_token'] = response_json['access_token']
        else:
            print(f"Error: {response.content}")

In [5]:
def get_top_artists(users, engine):
    top_artists = pd.read_sql("SELECT * FROM user_top_artist", engine)
    url = "https://api.spotify.com/v1/me/top/artists?time_range=short_term"
    date = datetime.datetime.now()
    year = date.strftime("%Y")
    month = date.strftime("%m")
    for i, user in users.iterrows():
        headers = {
            "Authorization": f"Bearer {users.loc[0,'access_token']}"
        }
        response = requests.get(url, headers=headers)
        artists =response.json()['items']
        for rank, artist in enumerate(artists):
            record = {
                "rank": rank+1,
                "user_id": user['id'],
                "year": year,
                "month": month,
                "artist_id": artist['id'],
                "artist_name": artist['name'],
            }
            record_df = pd.DataFrame(record, index=[0])
            top_artists = pd.concat([top_artists,record_df])
    top_artists = top_artists.drop_duplicates(['user_id', 'artist_id', 'year', 'month'])
    return top_artists

In [6]:
def main():
    print("Pipeline Started")

    engine = get_engine()
    users = get_users(engine)
    if users.empty:
        print("No users found")
        return
    add_access_tokens(users)
    top_artists = get_top_artists(users, engine)
    top_artists.to_sql('user_top_artist', engine, index=False, if_exists='replace')

    print("Pipeline Finished")