In [None]:
%run
./connect_sql

In [None]:
import os
import base64
import requests
import json

In [None]:
# get token
client_id = os.getenv('client_id')
client_secret = os.getenv('client_secret')
    
def get_token():
    auth_base64 = base64.b64encode(f"{client_id}:{client_secret}".encode()).decode()

    url = 'https://accounts.spotify.com/api/token'

    headers = {
        "Authorization": "Basic" + " " + auth_base64,
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "grant_type" : "client_credentials"
        }

    response = requests.post(url, headers=headers, data=data)
    json_response = json.loads(response.content)
    token = json_response['access_token']

    return token

In [None]:
# get bearer token
def get_bearer_token(token):
    return {'Authorization': 'Bearer' + ' ' + token}

In [None]:
#get artist id by name
def get_artist(artist_name):
    resp = []
    for artist in artist_name:
        url = f'https://api.spotify.com/v1/search?q={artist}&type=artist&limit=1'

        token = get_token()
        headers = get_bearer_token(token)

        response = requests.get(url, headers=headers)
        api_response = json.loads(response.content)['artists']['items']
        # print(api_response[0]['id'], api_response[0]['name'])
        resp.append(api_response[0]['id'])
    return resp

In [None]:
# get the data from the "artist_name"
# for each band name in "artist_name", it is get the top 10 tracks for each
artist_name = ['acdc', 'angra', 'audioslave', 'foofigthers', 'gorillaz', 'ironmaiden', 'muse', 'redhotchilipeppers', 'oasis', 'theoffspring', 'thesmiths', 'thestrokes', 'u2']
get_artist = get_artist(artist_name)

lista = []
for artist_id in get_artist:
    url = f'https://api.spotify.com/v1/artists/{artist_id}/top-tracks?country=US'

    token = get_token()
    headers = get_bearer_token(token)

    response = requests.get(url, headers=headers)
    api_response = json.loads(response.content)['tracks']
    len_response = len(api_response)

    for i in range(len_response):

        if response.status_code == 200:
            api_response_2 = json.loads(response.content)['tracks']

            dict = {
                'id' : api_response_2[i]['id'],
                'band' : api_response_2[i]['album']['artists'][0]['name'],
                'song_name' : api_response_2[i]['name'],
                'popularity' : api_response_2[i]['popularity'],
                'type' : api_response_2[i]['type'],
                'release_date': api_response_2[i]['album']['release_date'],
                'album_name': api_response_2[i]['album']['name'],
                'total_tracks': api_response_2[i]['album']['total_tracks']
            }
            lista.append(dict)
        else:
            print('request error: ', response.status_code)
            break

df =  spark.createDataFrame(lista)

In [None]:
# some data treatment
from pyspark.sql.functions import col, trim

df_trim = df.withColumn('album_name', trim(col('album_name'))) \
            .withColumn('band', trim(col('band'))) \
            .withColumn('id', trim(col('id'))) \
            .withColumn('song_name', trim(col('song_name'))) \
            .withColumn('type', trim(col('type')))

df_int = df_trim.withColumn('popularity', col('popularity').cast('int')) \
                .withColumn('total_tracks', col('total_tracks').cast('int'))

df_date = df_int.withColumn('release_date', col('release_date').cast('date'))

In [None]:
# add load date
from pyspark.sql.functions import lit
from datetime import datetime
import pytz

date_time = pytz.timezone("Brazil/East")

load_date = datetime.now(date_time).strftime('%Y-%m-%d %H:%M:%S')

print(load_date)

df_load_date = df_date.withColumn("load_date", lit(load_date))

display(df_load_date)

In [None]:
# save as delta
save_path = 'dbfs:/FileStore/tables/datalake/delta/spotify.delta'

df_load_date.write\
    .format('delta')\
    .mode('overwrite')\
    .save(save_path)


In [None]:
%sql
-- create table in the metastore
create table if not exists default.spotify (
  album_name string,
  band string,
  id string,
  popularity integer,
  release_date date,
  song_name string,
  total_tracks integer,
  type string,
  load_date string
) using delta location 'dbfs:/FileStore/tables/datalake/delta/spotify.delta'

In [None]:
# save table in the azure sql
try:
    columns = "album_name varchar(40)\
    , band varchar(30)\
    , id varchar(25)\
    , popularity int\
    , release_date date\
    , song_name varchar(65)\
    , total_tracks int\
    , type varchar(6)\
    , load_date varchar(20)"

    insert_sql_table(df_load_date, 'Sales.spotify_top', columns)

    print('table inserted in azure sql')
except Exception as e:
    if 'com.microsoft.sqlserver.jdbc.SQLServerException' in str(e):
        print(e)
        pass
    else:
        raise
        pass

In [None]:
# update the created table in metastore
from delta.tables import *

load_delta = DeltaTable.forPath(spark, 'dbfs:/FileStore/tables/datalake/delta/spotify.delta')

load_delta.alias("load_delta").merge(df_load_date.alias("df_load_date"), \
    "load_delta.id = df_load_date.id") \
    .whenMatchedUpdate(set = 
        {
            "album_name": "df_load_date.album_name", 
            "band": "df_load_date.band",
            "popularity": "df_load_date.popularity", 
            "release_date": "df_load_date.release_date", 
            "song_name": "df_load_date.song_name", 
            "total_tracks": "df_load_date.total_tracks", 
            "type": "df_load_date.type"
        }) \
    .whenNotMatchedInsertAll() \
    .execute()