# Dados de Entrada
* Selecione "Adicionar ao Drive"
  *   https://tinyurl.com/bd-spotify







In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!apt install openjdk-11-jdk -y

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  at-spi2-core fonts-dejavu-core fonts-dejavu-extra gsettings-desktop-schemas
  libatk-bridge2.0-0 libatk-wrapper-java libatk-wrapper-java-jni libatk1.0-0
  libatk1.0-data libatspi2.0-0 libxcomposite1 libxt-dev libxtst6 libxxf86dga1
  openjdk-11-jdk-headless openjdk-11-jre openjdk-11-jre-headless
  session-migration x11-utils
Suggested packages:
  libxt-doc openjdk-11-demo openjdk-11-source visualvm libnss-mdns
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  | fonts-wqy-zenhei fonts-indic mesa-utils
The following NEW packages will be installed:
  at-spi2-core fonts-dejavu-core fonts-dejavu-extra gsettings-desktop-schemas
  libatk-bridge2.0-0 libatk-wrapper-java libatk-wrapper-java-jni libatk1.0-0
  libatk1.0-data libatspi2.0-0 libxcomposite1 libxt-dev libxtst6 libxxf86dga1
  openjdk-11-jdk openjdk-11-jdk-headless openjdk-

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *

from datetime import datetime

appName = 'Big Data Songs'
master = 'local[*]'

spark = SparkSession.builder     \
    .master(master) \
    .appName(appName) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Leitura de dados e Processamento Básico

In [None]:
spotify_df = spark.read.parquet('/content/drive/MyDrive/spotify.parquet')

In [None]:
spotify_df.printSchema()

root
 |-- spotify_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- daily_rank: integer (nullable = true)
 |-- daily_movement: integer (nullable = true)
 |-- weekly_movement: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- snapshot_date: date (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- is_explicit: boolean (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- album_name: string (nullable = true)
 |-- album_release_date: date (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable

In [None]:
continents_df = spark.read.parquet('/content/drive/MyDrive/continents.parquet')

In [None]:
# Observação: o campo alpha_2 contém o código do país
continents_df.printSchema()

root
 |-- country_name: string (nullable = true)
 |-- alpha_2: string (nullable = true)
 |-- alpha_3: string (nullable = true)
 |-- country_code: integer (nullable = true)
 |-- iso_3166_2: string (nullable = true)
 |-- region: string (nullable = true)
 |-- sub_region: string (nullable = true)
 |-- intermediate_region: string (nullable = true)
 |-- region_code: integer (nullable = true)
 |-- sub_region_code: integer (nullable = true)
 |-- intermediate_region_code: integer (nullable = true)



In [None]:
continents_df = continents_df \
    .withColumnRenamed("alpha_2", "country")

## Join

In [None]:
spotify_df_with_locale = spotify_df.join(continents_df, on="country", how="left")

In [None]:
spotify_df_with_locale.printSchema()

root
 |-- country: string (nullable = true)
 |-- spotify_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- daily_rank: integer (nullable = true)
 |-- daily_movement: integer (nullable = true)
 |-- weekly_movement: integer (nullable = true)
 |-- snapshot_date: date (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- is_explicit: boolean (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- album_name: string (nullable = true)
 |-- album_release_date: date (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable

## Limpeza e enriquecimento do dataframe

In [None]:
spotify_df_with_locale.show(10)

+-------+--------------------+--------------------+--------------------+----------+--------------+---------------+-------------+----------+-----------+-----------+--------------------+------------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+------------+-------+------------+-------------+------+------------------+-------------------+-----------+---------------+------------------------+
|country|          spotify_id|                name|             artists|daily_rank|daily_movement|weekly_movement|snapshot_date|popularity|is_explicit|duration_ms|          album_name|album_release_date|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|country_name|alpha_3|country_code|   iso_3166_2|region|        sub_region|intermediate_region|region_code|sub_region_code|intermediate_region_code|
+-------+--------------------+--------------------+-------

In [None]:
registros_invalidos = spotify_df_with_locale.filter( 
    (col('name').isNull() | (col('name').isin(['', ' ','NA','N/A','NULL','null','-']))) |
    (col('artists').isNull() | (col('artists').isin(['', ' ','NA','N/A','NULL','null','-']))) |
    (col('album_name').isNull() | (col('album_name').isin(['', ' ','NA','N/A','NULL','null','-']))) |
    (col('album_release_date').isNull() | (col('album_release_date').isin(['', ' ','NA','N/A','NULL','null','-','1900-01-01']))) |
    (col('snapshot_date').isNull() | (col('snapshot_date').isin(['', ' ','NA','N/A','NULL','null','-',0]))) |
    (col('duration_ms').isNull() | (col('duration_ms').isin(['', ' ','NA','N/A','NULL','null','-',0]))))

print('Número de registros inválidos ou ausentes:', registros_invalidos.count())
print('Amostra de registros com nomes inválidos ou ausentes:')
registros_invalidos.show(5)
registros_invalidos_ids = registros_invalidos.select('spotify_id').rdd.flatMap(lambda x: x).distinct().collect()
registros_invalidos_ids

Número de registros inválidos ou ausentes: 815
Amostra de registros com nomes inválidos ou ausentes:
+-------+--------------------+--------------------+-------------+----------+--------------+---------------+-------------+----------+-----------+-----------+----------+------------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+------------+-------+------------+-------------+--------+----------------+-------------------+-----------+---------------+------------------------+
|country|          spotify_id|                name|      artists|daily_rank|daily_movement|weekly_movement|snapshot_date|popularity|is_explicit|duration_ms|album_name|album_release_date|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|country_name|alpha_3|country_code|   iso_3166_2|  region|      sub_region|intermediate_region|region_code|sub_region_code|intermediate_regi

['4HEOgBHRCExyYVeTyrXsnL',
 '35wSewUpyeXwde8wVlHW7L',
 '0y2sUHVDAWQbfuIQGEfhFO',
 '2V2K1hzCtgj9xAnga9WUTy',
 '7lyv2sysHCzFjypILxAynT',
 '6yxtsR3nc3aUL1wcbLn8A3',
 '3yrSvpt2l1xhsV9Em88Pul',
 '4nLJ61BP9owXw9sLoLtqIa',
 '2FPfeYlrbSBR8PwCU0zaqq',
 '5yNgdD8E6WruhULb4n2Con',
 '53rB05bAi7JdNbUfgz72I1',
 '0V2passWyAXnON67kfAj7y',
 '3GMeCx87zWbtg6jD4evZ58',
 '0wbcG4kKNOdZ9wTEra3aYI',
 '3vz3SKnCyJCzPuXWLoGfdG',
 '2h4b8QdmU4nxZrlpz7INIs',
 '7faDzZnZYqTyYThx2sbHVQ',
 '0kvD9ksvXyRHANPypIpkIh']

In [None]:
spotify_df_with_locale.sort(asc('album_name')).show(10)

+-------+--------------------+--------------------+-------------+----------+--------------+---------------+-------------+----------+-----------+-----------+----------+------------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-------------+-------+------------+-------------+--------+--------------------+-------------------+-----------+---------------+------------------------+
|country|          spotify_id|                name|      artists|daily_rank|daily_movement|weekly_movement|snapshot_date|popularity|is_explicit|duration_ms|album_name|album_release_date|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature| country_name|alpha_3|country_code|   iso_3166_2|  region|          sub_region|intermediate_region|region_code|sub_region_code|intermediate_region_code|
+-------+--------------------+--------------------+-------------+----------+------

In [None]:
# remove de spotify_df_with_locale todos os itens analisados como inválidos: invalid_name_ids, invalid_duration_ms_ids, invalid_artist_ids
spotify_df_limpo = spotify_df_with_locale.filter(~col('spotify_id').isin(registros_invalidos_ids))


In [None]:
print('Número de registros removidos: ', spotify_df_with_locale.count() - spotify_df_limpo.count())

Número de registros removidos:  921


In [None]:
from pyspark.sql import functions as F

# Questões de Análise

##Q1: A duração média das músicas ao longo do tempo está diminuindo?

```
# Isto está formatado como código
```

.

In [None]:
colunas_desejadas = ["name", "duration_ms","album_release_date"]
dataset_q1 = spotify_df_limpo.select(colunas_desejadas)
dataset_q1 = dataset_q1.withColumn("ano", col("album_release_date").substr(0,4))
dataset_q1 = dataset_q1.groupBy("ano").agg(
    F.median("duration_ms").alias("mediana_valor"),
    F.avg("duration_ms").alias("media_valor"),
    F.count("duration_ms").alias("total_musicas")
)
dataset_q1.sort(asc('ano')).show(100)

In [None]:
primeiro_quartil = dataset_q1.approxQuantile("total_musicas", [0.25, 0.5, 0.75], 0.1)[0]
dataset_q1 = dataset_q1.filter(dataset_q1["total_musicas"] >= primeiro_quartil)
dataset_q1.sort(asc('ano')).show(100)

##Q2: Quais artistas levam músicas ao top 10 global de forma mais rápida?

In [None]:
colunas_desejadas = ["artists", "name", "album_release_date","daily_rank","snapshot_date","country"]
dataset_q2 = spotify_df_limpo.select(colunas_desejadas)

dataset_q2_data_lancamento = dataset_q2.groupBy("name").agg(
    F.min("snapshot_date").alias("data_lancamento_proxy")
)

dataset_q2_chegada_no_top_10 = dataset_q2.filter((dataset_q2["daily_rank"] <= 10) & (dataset_q2["country"].isNull()))

dataset_q2_chegada_no_top_10 = dataset_q2_chegada_no_top_10.groupBy("name").agg(
    F.min("snapshot_date").alias("data_primeiro_aparecimento_top_10"),
    F.min("artists").alias("artists")
)

dataset_q2 = dataset_q2_chegada_no_top_10.join(dataset_q2_data_lancamento, on="name")

dataset_q2 = dataset_q2.withColumn("dias_ate_top_10", datediff(col("data_primeiro_aparecimento_top_10"), col("data_lancamento_proxy")))

dataset_q2 = dataset_q2.groupBy("artists").agg(
    F.avg("dias_ate_top_10").alias("qnt_media_de_dias_ate_top_10"),
    F.count("name").alias("total_de_musicas")
)

dataset_q2.sort(asc('qnt_media_de_dias_ate_top_10'),desc('total_de_musicas')).show(50)

##Q3: Quais são os países que mais consomem músicas com conteúdo explícito e como é essa evolução nos últimos 15 anos?

In [None]:
colunas_desejadas = ["snapshot_date", "is_explicit", "country_name"]
dataset_q3 = spotify_df_limpo.select(colunas_desejadas)

dataset_q3 = dataset_q3.filter(~dataset_q3["country_name"].isNull())
dataset_q3 = dataset_q3.withColumn("is_explicit_int", col('is_explicit').cast(IntegerType()))
# dataset_q3 = dataset_q3.withColumn("ano-mes", col("snapshot_date").substr(0,7))
dataset_q3 = dataset_q3.withColumn("ano", col("snapshot_date").substr(0,4))

dataset_q3_total_audicoes = dataset_q3.groupBy("country_name", "ano") \
  .agg(count("country_name").alias("total_audicoes"))

dataset_q3_total_audicoes_explicitas = dataset_q3.groupBy("country_name", "ano") \
  .agg(sum("is_explicit_int").alias("total_audicoes_explicitas"))

dataset_q3 = dataset_q3_total_audicoes.join(dataset_q3_total_audicoes_explicitas, on=['country_name', 'ano'])

dataset_q3 = dataset_q3.withColumn("%_audicoes_explicitas", col('total_audicoes_explicitas') / col('total_audicoes'))

##Q4: ...

##Q5: ...

##Q6: ...