In [72]:
import pyspark
import json
import requests
import os

In [73]:
api_key = 'f45caf5e-2bcc-461b-97c2-2ba21237c685'

artist_info_url = "https://api.songstats.com/enterprise/v1/artists/search"
headers = {
    'Content-Type': "application/json",
    'Accept-Encoding': "gzip, deflate, br",
    'apikey': api_key
    }

artists = {"Junior Mesa":None, "Carla Morrison":None, 'Ceci Bastida': None, 
          'Gaby Moreno':None}

for artist in artists.keys():
    querystring = {"q": artist}
    response = requests.request("GET", artist_info_url, headers=headers, params=querystring)
    artist_id = response.json()['results'][0]['songstats_artist_id']
    artists[artist] = artist_id
    

In [74]:


historic_url = "https://api.songstats.com/enterprise/v1/artists/historic_stats"
path = "data/raw"
for artist, artist_id in artists.items():
    params = {'songstats_artist_id':artist_id}
    response = requests.request("GET", historic_url, headers=headers, params=params)
    artist_json = response.json()
    file_name = os.path.join(path, artist.lower().replace(" ", "_") + ".json")    
    with open(file_name, 'w') as f:
        json.dump(artist_json, f)

In [None]:
# Pensar hacer un paso para la extraccion de la API y 
# poner en un bucket para todos los artistas (1 json por artista)

# guardar en raw/artista_xx.json -> un json por artista

# Crear las funciones de unstructured (raw) -> structured (staging (parquet))

# 1. Filtrar columnas objectivo y escribir en BQ
# 2. 

In [75]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]") \
                    .appName('resonance') \
                    .getOrCreate()

In [87]:
from pyspark.sql.functions import explode, col, udf, when, isnan, count, isnull, to_date

In [77]:
all_json = spark.read.json('data/raw/*')

In [78]:
all_json.printSchema()

root
 |-- artist_info: struct (nullable = true)
 |    |-- avatar: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- site_url: string (nullable = true)
 |    |-- songstats_artist_id: string (nullable = true)
 |-- message: string (nullable = true)
 |-- result: string (nullable = true)
 |-- source_ids: string (nullable = true)
 |-- stats: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- data: struct (nullable = true)
 |    |    |    |-- history: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- charted_tracks_current: long (nullable = true)
 |    |    |    |    |    |-- charts_total: long (nullable = true)
 |    |    |    |    |    |-- comments_total: long (nullable = true)
 |    |    |    |    |    |-- date: string (nullable = true)
 |    |    |    |    |    |-- favorites_total: long (nullable = true)
 |    |    |    |    |    |-- followers_total: long (nulla

In [90]:
def explode_and_subset(df):
    # Explode history
    out_df = df.withColumn('history', explode('stats.data.history')).withColumn('history', explode('history'))
    # Explode DSP names
    out_df = out_df.withColumn('dsp', explode('stats.source'))
    # Subset important columns
    out_df = out_df.select(col('artist_info.songstats_artist_id').alias('artist_id'),
                           col('artist_info.name').alias('artist_name'), 'dsp', 'history.*')
    out_df = out_df.withColumn('date', to_date('date'))
    return out_df

In [91]:
stage_df = explode_and_subset(all_json)

In [92]:
stage_df.write.mode('overwrite').parquet('data/stage/songstats_stage.parquet')

                                                                                

In [89]:
stage_df.withColumn('date', to_date('date')).printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- dsp: string (nullable = true)
 |-- charted_tracks_current: long (nullable = true)
 |-- charts_total: long (nullable = true)
 |-- comments_total: long (nullable = true)
 |-- date: date (nullable = true)
 |-- favorites_total: long (nullable = true)
 |-- followers_total: long (nullable = true)
 |-- likes_total: long (nullable = true)
 |-- monthly_listeners_current: long (nullable = true)
 |-- playlist_reach_total: long (nullable = true)
 |-- playlists_total: long (nullable = true)
 |-- popularity_current: long (nullable = true)
 |-- reposts_total: long (nullable = true)
 |-- shares_total: long (nullable = true)
 |-- shazams_total: long (nullable = true)
 |-- streams_current: long (nullable = true)
 |-- streams_total: long (nullable = true)
 |-- subscribers_total: long (nullable = true)
 |-- total_support_total: long (nullable = true)
 |-- unique_support_total: long (nullable = true)
 |-- videos_t