# Imports

In [0]:
import pandas as pd
import numpy as np
from pyspark.sql.functions import count
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
from databricks.connect import DatabricksSession
import seaborn as sns
from pyspark.sql.functions import split,element_at
from pyspark.sql.types import IntegerType, FloatType, DoubleType
import warnings
from pyspark.sql import SparkSession
from functools import reduce
from pyspark.sql.functions import col, upper, min
from pyspark.sql.functions import max as spark_max
import requests


## Get secrets

In [0]:
client_id = dbutils.secrets.get(scope="market-mood", key="spotify-client-id")
client_secret = dbutils.secrets.get(scope="market-mood", key="spotify-client-secret")
print(f"Client ID: {'✅' if client_id else '❌'}")
print(f"Client Secret: {'✅' if client_secret else '❌'}")
url = "https://accounts.spotify.com/api/token"
headers = {
    "Content-Type": "application/x-www-form-urlencoded"
}
data = {
    "grant_type":"client_credentials",
    "client_id":client_id,
    "client_secret":client_secret
}
r = requests.post(url, headers=headers, data=data)
print(r.status_code)
print(r.json())
token = r.json()['access_token']

# Extract data from spotify API

In [0]:
spotify_id='2aaf5ba374464196'
url = 'https://api.spotify.com/v1/playlists/2ZfnLoLsOpit8LraQ6nyaP'


headers = {
    'Authorization': f'Bearer {token}'
}
r = requests.get(url, headers=headers)

print(r.status_code)
print(r.json())

In [0]:

headers = {"Authorization": f"Bearer {token}"}
response = requests.get(
    "https://api.spotify.com/v1/search",
    headers=headers,
    params={"q": "Top 50 Global", "type": "playlist", "limit": 5}
)
print(response.json())
for playlist in response.json()["playlists"]["items"]:
    print(f"Nombre: {playlist['name']}")
    print(f"ID: {playlist['id']}")
    print(f"Owner: {playlist['owner']['display_name']}")
    print("---")


# Get data from kaggle dataset

In [0]:
spark = SparkSession.getActiveSession()
if spark is None:
    spark = SparkSession.builder.getOrCreate()

In [0]:
# spark = SparkSession.builder \
#     .appName('Practise') \
#     .config("spark.driver.host", "localhost") \
#     .getOrCreate()


In [0]:
spotify_charts = spark.read.csv(
    "/Volumes/market-mood/raw/spotify/charts.csv",
    header=True,
    inferSchema=True
)
spotify_charts.printSchema()

# Complete data using the spotify api

In [0]:
spotify_charts.select(count('url')).show()

In [0]:
spotify_charts = spotify_charts.na.drop(how='any',subset=['url'])


In [0]:
spotify_charts = spotify_charts.withColumn(
    "track_id", 
    element_at(split(col("url"), "/"), -1)#se usa el element_at porque getItem no existe en pyspark
)
spotify_charts.show()

In [0]:
spotify_charts.select('track_id').show()


In [0]:
# spotify_charts.collect()[0]["track_id"]#collect trae la fila en el índice indicado, no usar porque se trae todo el df jajaja
spotify_charts.select('track_id').first()[0]

In [0]:
track_id = spotify_charts.select('track_id').first()[0]
url = 'https://api.spotify.com/v1/audio-features/'+track_id
print(url)

headers = {
    'Authorization': f'Bearer '
}
r = requests.get(url, headers=headers)

print(r.status_code)
print(r.json())

# Complete data using another dataset

In [0]:
track_data = spark.read.csv(
    "/Volumes/market-mood/raw/spotify/dataset.csv",
    header=True,
    inferSchema=True
)
track_data_m = spark.read.csv(
    "/Volumes/market-mood/raw/spotify/tracks_features.csv",
    header=True,
    inferSchema=True
)
track_data.printSchema()

# Transformation


In [0]:
global_charts = spotify_charts.filter(spotify_charts['region']=='Global')

In [0]:
global_charts.show()

In [0]:
track_list = [row[0] for row in track_data_m.select('id').collect()]
tracks = global_charts.filter(global_charts['track_id'].isin(track_list)).select('track_id')
track_list = [row[0] for row in track_data.select('track_id').collect()]
tracks_m = global_charts.filter(global_charts['track_id'].isin(track_list)).select('track_id')

In [0]:
track_list = [row[0] for row in tracks.select('track_id').collect()] +[row[0] for row in tracks_m.select('track_id').collect()]

In [0]:
len(track_list)

In [0]:
global_charts.filter(global_charts['track_id'].isin(track_list)).count()/global_charts.count()*100

In [0]:
len(track_list)

In [0]:
track_data_m = track_data_m.withColumnRenamed('id','track_id')

In [0]:
audio_cols = ['track_id','key','mode','time_signature','danceability','energy','loudness','speechiness','acousticness','instrumentalness','liveness','valence','tempo']
combined_data = track_data.select(audio_cols).union(track_data_m.select(audio_cols))#union es el concat de pyspark

In [0]:
combined_data.printSchema()

In [0]:
global_charts = global_charts.filter(global_charts['track_id'].isin(track_list))

In [0]:
full_df = global_charts.join(combined_data, on ='track_id',how='inner')


In [0]:
full_df.printSchema()

In [0]:
full_df.describe()

In [0]:
from pyspark.sql.functions import col

In [0]:
int_cols = ['rank','streams','key','mode','time_signature']
float_cols = ['danceability','energy','loudness','speechiness','acousticness','instrumentalness','liveness','valence','tempo']
for c in int_cols:
    full_df = full_df.withColumn(c,full_df[c].try_cast('integer'))
for c in float_cols:
    full_df = full_df.withColumn(c,full_df[c].try_cast('double'))
full_df.printSchema()


In [0]:
full_df.show(5)

In [0]:
full_df.filter(~col("rank").rlike("^[0-9]+$")).select("rank").show(10, truncate=False)


In [0]:
full_df.printSchema()
full_df.select("rank", "valence", "energy", "danceability").show(3)

In [0]:
from pyspark.sql.functions import sum as spark_sum

numeric_cols = ["valence", "energy", "danceability", "acousticness"]
total = full_df.count()

for c in numeric_cols:
    nulls = full_df.filter(col(c).isNull()).count()
    print(f"{c}: {nulls} nulls ({nulls/total*100:.1f}%)")

In [0]:
full_df = full_df.withColumn('weight',full_df['rank']/spark_max(full_df['rank']))