### Executar no kernel do spark

#### Captura de dados da API

In [None]:
import requests
from pyspark.sql.types import StringType 
from requests.adapters import HTTPAdapter 
from urllib3.util.retry import Retry
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from time import sleep

In [None]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('api_calling') \
    .getOrCreate()

In [None]:
schema_df = StructType([
    StructField('id',IntegerType(), nullable=False),
    StructField('ts', DateType(),nullable=True),
    StructField('ms_played',IntegerType(),nullable=True),
    StructField('track_name',StringType(),nullable=True),
    StructField('artist_name',StringType(),nullable=True),
    StructField('album_name',StringType(),nullable=True),
    StructField('user_name',StringType(),nullable=True),
    StructField('track_id',StringType(),nullable=True),
])


In [None]:
folder = "./datasets"
temp_folder = "./datasets/temp_csv"
destiny = "./datasets/return.csv"


df = spark.read \
    .format("csv") \
    .schema(schema_df) \
    .option("header","false") \
    .option("encoding","utf-8") \
    .load(f"{folder}/limited_dataset.csv")

df.show(5)

In [None]:
df = df.dropna()

df_grouped = (
    df.groupBy("track_id")
    .agg(
        F.first("track_name").alias("track_name")
    )
)
df_grouped.show(5)

df_grouped = df_grouped.repartition(36)

In [None]:
url_get_id = "https://api.reccobeats.com/v1/track?ids=:id"
url_get_info = "https://api.reccobeats.com/v1/track/:id/audio-features"
payload = {}

headers = {
    'Accept': 'application/json'
}


In [None]:
track_ids = df.select("track_id").collect()
session = requests.Session()
results = []
retries = Retry(total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
session.mount("https://", HTTPAdapter(max_retries=retries))


def process_partition(partition):
    for row in partition:
        track_id = row["track_id"]

        # 1. Buscar ID do Racoon
        url_get_track_id = url_get_id.replace(":id", track_id)
        resp = session.get(url_get_track_id, headers=headers, data=payload,verify=True).json()
        print(resp)
        if resp.get("content"):
            racoon_track_id = resp["content"][0]["id"]

            # 2. Buscar par√¢metros musicais
            url_get_track_info = url_get_info.replace(":id", racoon_track_id)
            info_resp = session.get(url_get_track_info, headers=headers, data=payload, verify=True)
            print(info_resp.content)

            if info_resp.status_code == 200:
                info = info_resp.json()
                results.append((
                    track_id,
                    info.get("acousticness"),
                    info.get("danceability"),
                    info.get("energy"),
                    info.get("instrumentalness"),
                    info.get("liveness"),
                    info.get("loudness"),
                    info.get("speechiness"),
                    info.get("valence")
                ))
            else: 
                results.append((track_id, None, None, None, None, None, None, None, None)) 
        else: 
            results.append((track_id, None, None, None, None, None, None, None, None))
        sleep(1)

    return results

In [None]:
df_out = df_grouped.rdd.mapPartitions(process_partition).toDF([
    "track_id","acousticness","danceability","energy",
    "instrumentalness","liveness","loudness","speechiness","valence"
])

df_out.show(5)


In [None]:
df_completed = df.join(df_out,on="track_id",how="inner")

df_completed.show()

In [None]:
df_completed = df_completed.drop("id")
df_completed.show()


In [None]:
df_completed.coalesce(1).write.mode("overwrite").csv("return.csv", header=True)

### Realize o download do arquivo no site local do Spark

#### Tratamento de dados

##### Conecte-se ao kernel do seu python local

In [None]:
import pandas as pd

In [None]:
df = pd.read_csv("./datasets/return.csv",encoding="utf-8")

display(df)

In [None]:
df_cleaned = df.dropna()
display(df_cleaned)

In [None]:
df_cleaned = df_cleaned.set_index('track_id')

display(df_cleaned)

df_cleaned.to_csv('./datasets/return_corrected.csv',encoding='utf-8')
