# **Map Reduce**

## **O que é ?**
O MapReduce é um modelo de programação distribuída que divide uma tarefa em pequenas sub-tarefas, distribuindo essas sub-tarefas por várias máquinas em um cluster. Cada máquina processa a sua sub-tarefa e retorna os resultados ao sistema, que os combina para formar o resultado final.

O modelo é composto por duas etapas: Map e Reduce.

## **Etapa de Map**

 Inicialmente, os dados de entrada são divididos em blocos menores. O framework Hadoop então determina quantos mappers serão usados, com base no tamanho dos dados a serem processados e na capacidade de memória disponível em cada servidor de mapeamento. Cada bloco é atribuído a um mapeador para processamento. Cada nó de trabalho (worker) aplica a função de mapeamento aos dados locais e grava a saída em um armazenamento temporário. O nó principal (master) garante que apenas uma cópia dos dados redundantes de entrada seja processada.

## **Etapa de Reduce**

Um reducer não pode iniciar enquanto um mapper ainda está em execução. Os nós de trabalho processam cada grupo de pares de dados <chave, valor> de saída, em paralelo, para gerar novos pares <chave, valor> como resultado. Todos os valores de saída do map que possuem a mesma chave são atribuídos a um único reducer, que então agrega os valores para essa chave. Diferente da função map, que é obrigatória para filtrar e organizar os dados iniciais, a função reduce é opcional.

## **Implementação Prática do MapReduce com Python**
Vamos implementar um exemplo de MapReduce em Python para contar a frequência de palavras em um texto. Neste exemplo, o Map irá separar as palavras e o Reduce irá somar as ocorrências de cada palavra.


## **Explicação do Código**
**Dados de entrada**: A lista data contém linhas de texto para processamento.
Função Map (map_function): Recebe cada linha, divide em palavras e retorna uma lista de pares (palavra, 1).


**Função Reduce (reduce_function)**: Recebe a lista de pares (palavra, 1) e usa um dicionário para contar quantas vezes cada palavra aparece.


**Pipeline MapReduce**: Primeiro, a função map_function é aplicada a cada linha. Em seguida, os pares de chave-valor resultantes são agregados pela função reduce_function.

In [3]:
from collections import defaultdict

# Dados de entrada: um texto simples
data = [
    "MapReduce é um modelo de programação",
    "MapReduce permite o processamento de grandes volumes de dados",
    "Python pode ser usado para implementar MapReduce",
    "MapReduce é eficiente para Big Data",
    "MapReduce é legal",
    "MapReduce é muito legal",
    "MapReduce é muito muito legal",
    "Esfiha é legal mas não é paralela"
]

# Função Map: Recebe uma linha e retorna pares (palavra, 1)
def map_function(line):
    words = line.split()
    return [(word.lower(), 1) for word in words]

# Função Reduce: Recebe uma lista de (chave, [valores]) e retorna (chave, soma dos valores)
def reduce_function(mapped_data):
    reduced_data = defaultdict(int)
    for word, count in mapped_data:
        reduced_data[word] += count
    return reduced_data

# Pipeline MapReduce

# Passo 1: Aplicar a função map a cada linha do texto
mapped_data = []
for line in data:
    mapped_data.extend(map_function(line))

# Passo 2: Aplicar a função reduce para agregar os valores por chave
word_counts = reduce_function(mapped_data)

# Exibir o resultado
print("Contagem de palavras:")
for word, count in word_counts.items():
    print(f"{word}: {count}")

Contagem de palavras:
mapreduce: 7
é: 7
um: 1
modelo: 1
de: 3
programação: 1
permite: 1
o: 1
processamento: 1
grandes: 1
volumes: 1
dados: 1
python: 1
pode: 1
ser: 1
usado: 1
para: 2
implementar: 1
eficiente: 1
big: 1
data: 1
legal: 4
muito: 3
esfiha: 1
mas: 1
não: 1
paralela: 1


# **Parquet**

O Parquet é um tipo de arquivo criado para armazenar informações de maneira eficaz e compacta, sendo particularmente vantajoso para lidar com grandes quantidades de dados. Desenvolvido inicialmente pela Apache, ele foi concebido para ambientes de Big Data e se diferencia pelo armazenamento em formato de colunas, facilitando a leitura rápida de colunas específicas. Esse formato é amplamente adotado em ferramentas de processamento de dados como Apache Spark, Hive e outras baseadas em Hadoop, além de ser compatível com o Pandas no Python.

In [4]:
#Instalando nossas dependências
!pip install pandas pyarrow pyspark



In [5]:
import kagglehub

path = kagglehub.dataset_download("dhruvildave/spotify-charts")

print("Path local:", path)

# Listar todos os arquivos dentro do repositório

import os

for dirname, _, filenames in os.walk(path):
    for filename in filenames:
        print(os.path.join(dirname, filename))



Downloading from https://www.kaggle.com/api/v1/datasets/download/dhruvildave/spotify-charts?dataset_version_number=85...


100%|██████████| 945M/945M [00:43<00:00, 22.8MB/s]

Extracting files...





Path local: /root/.cache/kagglehub/datasets/dhruvildave/spotify-charts/versions/85
/root/.cache/kagglehub/datasets/dhruvildave/spotify-charts/versions/85/charts.csv


In [6]:
# lendo o arquivo charts.csv e convertendo em um arquivo parquet com o nome charts.parquet

import pandas as pd
# lendo o arquivo em csv
df = pd.read_csv(path + "/charts.csv")

# salvando o arquivo em csv
df.to_parquet(path + "/charts.parquet")

In [7]:

# Carregar arquivo Parquet
df = pd.read_parquet(path + "/charts.parquet")
print(df.head())



                         title  rank        date  \
0      Chantaje (feat. Maluma)     1  2017-01-01   
1  Vente Pa' Ca (feat. Maluma)     2  2017-01-01   
2   Reggaetón Lento (Bailemos)     3  2017-01-01   
3                       Safari     4  2017-01-01   
4                  Shaky Shaky     5  2017-01-01   

                                  artist  \
0                                Shakira   
1                           Ricky Martin   
2                                   CNCO   
3  J Balvin, Pharrell Williams, BIA, Sky   
4                           Daddy Yankee   

                                                 url     region   chart  \
0  https://open.spotify.com/track/6mICuAdrwEjh6Y6...  Argentina  top200   
1  https://open.spotify.com/track/7DM4BPaS7uofFul...  Argentina  top200   
2  https://open.spotify.com/track/3AEZUABDXNtecAO...  Argentina  top200   
3  https://open.spotify.com/track/6rQSrBHf7HlZjtc...  Argentina  top200   
4  https://open.spotify.com/track/58IL315gMSTD3

In [8]:
# Primeiras linhas do DataFrame
print(df.head(5))

# Informações sobre o DataFrame
print(df.info())

# Estatísticas descritivas
print(df.describe())


                         title  rank        date  \
0      Chantaje (feat. Maluma)     1  2017-01-01   
1  Vente Pa' Ca (feat. Maluma)     2  2017-01-01   
2   Reggaetón Lento (Bailemos)     3  2017-01-01   
3                       Safari     4  2017-01-01   
4                  Shaky Shaky     5  2017-01-01   

                                  artist  \
0                                Shakira   
1                           Ricky Martin   
2                                   CNCO   
3  J Balvin, Pharrell Williams, BIA, Sky   
4                           Daddy Yankee   

                                                 url     region   chart  \
0  https://open.spotify.com/track/6mICuAdrwEjh6Y6...  Argentina  top200   
1  https://open.spotify.com/track/7DM4BPaS7uofFul...  Argentina  top200   
2  https://open.spotify.com/track/3AEZUABDXNtecAO...  Argentina  top200   
3  https://open.spotify.com/track/6rQSrBHf7HlZjtc...  Argentina  top200   
4  https://open.spotify.com/track/58IL315gMSTD3

In [9]:
# Filtrar músicas com mais de 200000.0 streams na region Brazil
high_streams_brazil = df[(df['region'] == 'Brazil') & (df['streams'] > 200000.0)]
print(high_streams_brazil)

                                       title  rank        date  \
869                                 Deu Onda     1  2017-01-01   
870                              Hear Me Now     2  2017-01-01   
871                            10% - Ao Vivo     3  2017-01-01   
872       Eu Sei de Cor - Ao Vivo | Acústico     4  2017-01-01   
873             Meu Coração Deu PT - Ao Vivo     5  2017-01-01   
...                                      ...   ...         ...   
25197428                             Pesadão    23  2018-01-31   
25197429                       Troféu do Ano    24  2018-01-31   
25197430                          Ta Tum Tum    25  2018-01-31   
25197431                          Dona Maria    26  2018-01-31   
25197432                        Azul Piscina    27  2018-01-31   

                                        artist  \
869                                     MC G15   
870                 Alok, Bruno Martini, Zeeba   
871                           Maiara & Maraisa   
872    

In [10]:
# Filtrar músicas com tendência de subida
trending_up = df[df['trend'] == 'MOVE_UP']
print(trending_up)

                                                 title  rank        date  \
1                          Vente Pa' Ca (feat. Maluma)     2  2017-01-01   
4                                          Shaky Shaky     5  2017-01-01   
8                                         La Bicicleta     9  2017-01-01   
11                      Desde Esa Noche (feat. Maluma)    12  2017-01-01   
12                                      Borro Cassette    13  2017-01-01   
...                                                ...   ...         ...   
26173494                                         Lotus    31  2021-07-31   
26173497                           Here's Your Perfect    34  2021-07-31   
26173503           Know Me Too Well (with Danna Paola)    40  2021-07-31   
26173506  Can I Have the Day With You (feat. Michelle)    43  2021-07-31   
26173509                                           BYE    46  2021-07-31   

                              artist  \
1                       Ricky Martin   
4      

In [11]:
# Ordenar por streams em ordem decrescente
df_sorted = df.sort_values(by='streams', ascending=False)
print(df_sorted)

                                    title  rank        date  \
22999369                       Easy On Me     1  2021-10-15   
17849245  All I Want for Christmas Is You     1  2020-12-24   
17849246                   Last Christmas     2  2020-12-24   
16687378                  drivers license     1  2021-01-15   
16564161                  drivers license     1  2021-01-14   
...                                   ...   ...         ...   
26173509                              BYE    46  2021-07-31   
26173510                          Pillars    47  2021-07-31   
26173511                     Gái Độc Thân    48  2021-07-31   
26173512    Renegade (feat. Taylor Swift)    49  2021-07-31   
26173513                  Letter to Jarad    50  2021-07-31   

                             artist  \
22999369                      Adele   
17849245               Mariah Carey   
17849246                      Wham!   
16687378             Olivia Rodrigo   
16564161             Olivia Rodrigo   
...      

In [12]:
# Média de streams por região
mean_streams = df.groupby('region')['streams'].mean()
print(mean_streams)

region
Andorra                           NaN
Argentina                53607.620874
Australia                55573.770435
Austria                   7427.878846
Belgium                  10834.963612
                            ...      
United Arab Emirates      2147.129325
United Kingdom          101194.585867
United States           390275.482734
Uruguay                   3768.295815
Vietnam                   5695.388968
Name: streams, Length: 70, dtype: float64


In [13]:
# Converter para datetime
df['date'] = pd.to_datetime(df['date'])

# Filtrar por data específica
specific_date = df[df['date'] == '2021-12-31']
print(specific_date)

                                            title  rank       date  \
22682083  Tiago PZK: Bzrp Music Sessions, Vol. 48     1 2021-12-31   
22682084                                      Bar     2 2021-12-31   
22682085                                    TITAN     3 2021-12-31   
22682086                          Salimo de Noche     4 2021-12-31   
22682087                               DANCE CRIP     5 2021-12-31   
...                                           ...   ...        ...   
25237519                                      Luv    46 2021-12-31   
25237520                         Surface Pressure    47 2021-12-31   
25237521                           Đâu Ai Dám Hứa    48 2021-12-31   
25237522                            I Love You So    49 2021-12-31   
25237523               Hạnh Phúc Mới - Heki Lo-Fi    50 2021-12-31   

                       artist  \
22682083  Bizarrap, Tiago PZK   
22682084        TINI, L-Gante   
22682085          Salastkbron   
22682086    Tiago PZK, True

In [14]:
# Categorizar o sucesso da música
df['success'] = df['streams'].apply(lambda x: 'High' if x > 90000 else 'Moderate')
print(df[['title', 'streams', 'success']])

                                  title   streams   success
0               Chantaje (feat. Maluma)  253019.0      High
1           Vente Pa' Ca (feat. Maluma)  223988.0      High
2            Reggaetón Lento (Bailemos)  210943.0      High
3                                Safari  173865.0      High
4                           Shaky Shaky  153956.0      High
...                                 ...       ...       ...
26173509                            BYE       NaN  Moderate
26173510                        Pillars       NaN  Moderate
26173511                   Gái Độc Thân       NaN  Moderate
26173512  Renegade (feat. Taylor Swift)       NaN  Moderate
26173513                Letter to Jarad       NaN  Moderate

[26173514 rows x 3 columns]


In [15]:
# Total de streams por artista
total_streams_artist = df.groupby('artist')['streams'].sum()
print(total_streams_artist)


artist
!!!                           0.0
!!!, Lea Lea                  0.0
"Elena Of Avalor" Cast        0.0
"Weird Al" Yankovic           0.0
"聲夢傳奇"學員                      0.0
                           ...   
이시강                           0.0
이아람                       12951.0
지선                            0.0
하숙자                        5885.0
Ｍ, 兩千 2ØØØ, RedcolorG         0.0
Name: streams, Length: 96156, dtype: float64


In [16]:
del df
del df_sorted

## Exemplos com pyspark


In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, when, avg, expr

# Iniciar a sessão do Spark
spark = SparkSession.builder.appName("SparkDataFrameExample").getOrCreate()

# Carregar arquivo Parquet
df = spark.read.parquet(path + "/charts.parquet")

# Exibir DataFrame
df.show()

+--------------------+----+----------+--------------------+--------------------+---------+------+-------------+--------+
|               title|rank|      date|              artist|                 url|   region| chart|        trend| streams|
+--------------------+----+----------+--------------------+--------------------+---------+------+-------------+--------+
|Chantaje (feat. M...|   1|2017-01-01|             Shakira|https://open.spot...|Argentina|top200|SAME_POSITION|253019.0|
|Vente Pa' Ca (fea...|   2|2017-01-01|        Ricky Martin|https://open.spot...|Argentina|top200|      MOVE_UP|223988.0|
|Reggaetón Lento (...|   3|2017-01-01|                CNCO|https://open.spot...|Argentina|top200|    MOVE_DOWN|210943.0|
|              Safari|   4|2017-01-01|J Balvin, Pharrel...|https://open.spot...|Argentina|top200|SAME_POSITION|173865.0|
|         Shaky Shaky|   5|2017-01-01|        Daddy Yankee|https://open.spot...|Argentina|top200|      MOVE_UP|153956.0|
|         Traicionera|   6|2017-

In [18]:
# Visualizar as primeiras linhas
df.show(5)

+--------------------+----+----------+--------------------+--------------------+---------+------+-------------+--------+
|               title|rank|      date|              artist|                 url|   region| chart|        trend| streams|
+--------------------+----+----------+--------------------+--------------------+---------+------+-------------+--------+
|Chantaje (feat. M...|   1|2017-01-01|             Shakira|https://open.spot...|Argentina|top200|SAME_POSITION|253019.0|
|Vente Pa' Ca (fea...|   2|2017-01-01|        Ricky Martin|https://open.spot...|Argentina|top200|      MOVE_UP|223988.0|
|Reggaetón Lento (...|   3|2017-01-01|                CNCO|https://open.spot...|Argentina|top200|    MOVE_DOWN|210943.0|
|              Safari|   4|2017-01-01|J Balvin, Pharrel...|https://open.spot...|Argentina|top200|SAME_POSITION|173865.0|
|         Shaky Shaky|   5|2017-01-01|        Daddy Yankee|https://open.spot...|Argentina|top200|      MOVE_UP|153956.0|
+--------------------+----+-----

In [19]:
# Verificar informações básicas (schema)
df.printSchema()

root
 |-- title: string (nullable = true)
 |-- rank: long (nullable = true)
 |-- date: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- url: string (nullable = true)
 |-- region: string (nullable = true)
 |-- chart: string (nullable = true)
 |-- trend: string (nullable = true)
 |-- streams: double (nullable = true)



In [20]:
# Filtrar músicas com mais de 300.000 streams
filtro_streams = df.filter(col("streams") > 300000.0)
filtro_streams.show()

+--------------------+----+----------+--------------------+--------------------+------+------+-------------+---------+
|               title|rank|      date|              artist|                 url|region| chart|        trend|  streams|
+--------------------+----+----------+--------------------+--------------------+------+------+-------------+---------+
|            Deu Onda|   1|2017-01-01|              MC G15|https://open.spot...|Brazil|top200|SAME_POSITION| 612271.0|
|             Starboy|   1|2017-01-01|The Weeknd, Daft ...|https://open.spot...|Global|top200|SAME_POSITION|3135625.0|
|              Closer|   2|2017-01-01|The Chainsmokers,...|https://open.spot...|Global|top200|SAME_POSITION|3015525.0|
|     Let Me Love You|   3|2017-01-01|DJ Snake, Justin ...|https://open.spot...|Global|top200|      MOVE_UP|2545384.0|
|Rockabye (feat. S...|   4|2017-01-01|        Clean Bandit|https://open.spot...|Global|top200|    MOVE_DOWN|2356604.0|
|           One Dance|   5|2017-01-01| Drake, Wi

In [21]:
# selecionando colunas
selecionadas = df.select("title", "artist", "streams")
selecionadas.show()


+--------------------+--------------------+--------+
|               title|              artist| streams|
+--------------------+--------------------+--------+
|Chantaje (feat. M...|             Shakira|253019.0|
|Vente Pa' Ca (fea...|        Ricky Martin|223988.0|
|Reggaetón Lento (...|                CNCO|210943.0|
|              Safari|J Balvin, Pharrel...|173865.0|
|         Shaky Shaky|        Daddy Yankee|153956.0|
|         Traicionera|     Sebastian Yatra|151140.0|
|Cuando Se Pone a ...|              Rombai|148369.0|
|Otra vez (feat. J...|       Zion & Lennox|143004.0|
|        La Bicicleta|Carlos Vives, Sha...|126389.0|
|Dile Que Tu Me Qu...|               Ozuna|112012.0|
|  Andas En Mi Cabeza|Chino & Nacho, Da...|110395.0|
|Desde Esa Noche (...|              Thalia|104592.0|
|      Borro Cassette|              Maluma|101535.0|
|Gyal You A Party ...|Charly Black, Dad...| 99722.0|
|Me llamas (feat. ...|             Piso 21| 95010.0|
|La Bicicleta (fea...|Carlos Vives, Sha...| 92

In [22]:
# Agrupar por Região e Calcular a Média de Streams
media_streams_regiao = df.groupBy("region").agg(avg("streams").alias("avg_streams"))
media_streams_regiao.show()


+-------------+------------------+
|       region|       avg_streams|
+-------------+------------------+
|     Paraguay| 6286.671493617753|
|       Sweden| 41185.30777730343|
|  Philippines| 52417.66259382237|
|     Malaysia|11142.606986802088|
|    Singapore|11038.974893132583|
|       Turkey|  35253.5100812418|
|      Germany|111789.55613780832|
|       France| 66603.04668378024|
|       Greece| 4483.041096571792|
|       Taiwan|  9191.75591799814|
|     Slovakia|2421.8039452345542|
|    Argentina| 53607.62087352663|
|      Belgium|10834.963612265967|
|      Ecuador|  7489.96118409304|
|      Finland|14077.958767923348|
|    Nicaragua| 2065.814527933088|
|         Peru|22018.889293711018|
|       Global|1212937.9812387142|
|United States| 390275.4827340026|
|        India| 47458.82676758647|
+-------------+------------------+
only showing top 20 rows



In [23]:
# Classificar o DataFrame por rank
df_sorted = df.orderBy("rank")
df_sorted.show()


+--------------------------------+----+----------+--------------------+--------------------+--------------------+-------+-------------+---------+
|                           title|rank|      date|              artist|                 url|              region|  chart|        trend|  streams|
+--------------------------------+----+----------+--------------------+--------------------+--------------------+-------+-------------+---------+
|                    Dance Monkey|   1|2020-01-17|         Tones And I|https://open.spot...|        Saudi Arabia| top200|SAME_POSITION|  24362.0|
|                 drivers license|   1|2021-01-29|      Olivia Rodrigo|https://open.spot...|           Singapore| top200|SAME_POSITION|  70914.0|
|                       Your Song|   1|2017-06-01|            Rita Ora|https://open.spot...|           Lithuania|viral50|SAME_POSITION|     NULL|
|                    Blaues Licht|   1|2021-07-28|RAF Camora, Bonez MC|https://open.spot...|             Germany| top200|SAM

In [24]:
# Adicionar uma Coluna Calculada para Taxa de Crescimento de Streams
# Para calcular a taxa de crescimento entre as linhas, usaremos uma função de janela (Window) para acessar o valor dos streams da linha anterior.

from pyspark.sql.window import Window
from pyspark.sql.functions import lag


# Definir uma janela ordenada por rank
window_spec = Window.orderBy("rank")

# Usar `lag` para obter o valor dos streams da linha anterior
df = df.withColumn("previous_streams", lag("streams").over(window_spec))

# Calcular a taxa de crescimento
df = df.withColumn("growth_rate", expr("((streams - previous_streams) / previous_streams) * 100"))
df = df.fillna({"growth_rate": 0})  # Substituir valores nulos por 0 para crescimento inicial

# Exibir o DataFrame atualizado com a taxa de crescimento
df.select("title", "streams", "previous_streams", "growth_rate").show()

+--------------------+--------+----------------+-----------+
|               title| streams|previous_streams|growth_rate|
+--------------------+--------+----------------+-----------+
|Chantaje (feat. M...|253019.0|            NULL|        0.0|
|               Pepas|    NULL|        253019.0|        0.0|
|             Starboy|107350.0|            NULL|        0.0|
|            Di Di Di|    NULL|        107350.0|        0.0|
|               Alone| 12990.0|            NULL|        0.0|
|        Life Goes On|    NULL|         12990.0|        0.0|
|            Hey Baby| 23277.0|            NULL|        0.0|
|              Pierre|    NULL|         23277.0|        0.0|
|Reggaetón Lento (...|  6784.0|            NULL|        0.0|
|         Hadal Ahbek|    NULL|          6784.0|        0.0|
|            Deu Onda|612271.0|            NULL|        0.0|
|         Hadal Ahbek|    NULL|        612271.0|        0.0|
|             Starboy|  1628.0|            NULL|        0.0|
|          うっせぇわ|    NUL

In [25]:
# Converter a Coluna trend em Valores Numéricos
# Podemos converter os valores da coluna trend (up, down, stable) em valores numéricos para facilitar a análise (ex.: up = 1, down = -1, stable = 0).

# Usar `when` para mapear valores de texto para números
df = df.withColumn(
    "trend_num",
    when(col("trend") == "up", 1)
    .when(col("trend") == "down", -1)
    .when(col("trend") == "stable", 0)
    .otherwise(None)
)

# Exibir o DataFrame com a nova coluna `trend_num`
df.select("title", "trend", "trend_num").show()



+--------------------+-------------+---------+
|               title|        trend|trend_num|
+--------------------+-------------+---------+
|Chantaje (feat. M...|SAME_POSITION|     NULL|
|Vente Pa' Ca (fea...|      MOVE_UP|     NULL|
|Reggaetón Lento (...|    MOVE_DOWN|     NULL|
|              Safari|SAME_POSITION|     NULL|
|         Shaky Shaky|      MOVE_UP|     NULL|
|         Traicionera|    MOVE_DOWN|     NULL|
|Cuando Se Pone a ...|    MOVE_DOWN|     NULL|
|Otra vez (feat. J...|    MOVE_DOWN|     NULL|
|        La Bicicleta|      MOVE_UP|     NULL|
|Dile Que Tu Me Qu...|    MOVE_DOWN|     NULL|
|  Andas En Mi Cabeza|SAME_POSITION|     NULL|
|Desde Esa Noche (...|      MOVE_UP|     NULL|
|      Borro Cassette|      MOVE_UP|     NULL|
|Gyal You A Party ...|    MOVE_DOWN|     NULL|
|Me llamas (feat. ...|    MOVE_DOWN|     NULL|
|La Bicicleta (fea...|      MOVE_UP|     NULL|
|DUELE EL CORAZON ...|      MOVE_UP|     NULL|
|     Let Me Love You|    MOVE_DOWN|     NULL|
|La Noche No 

## Operações com Spark SQL

In [26]:
# Registrar a tabela temporária
df.createOrReplaceTempView("music_data")


In [27]:
#Consulta SQL para calcular a média de streams por região

media_streams_regiao_sql = spark.sql("""
    SELECT region, AVG(streams) as avg_streams
    FROM music_data
    GROUP BY region
""")
media_streams_regiao_sql.show()

+-------------+------------------+
|       region|       avg_streams|
+-------------+------------------+
|     Paraguay| 6286.671493617753|
|       Sweden| 41185.30777730343|
|  Philippines| 52417.66259382237|
|     Malaysia|11142.606986802088|
|    Singapore|11038.974893132583|
|       Turkey|  35253.5100812418|
|      Germany|111789.55613780832|
|       France| 66603.04668378024|
|       Greece| 4483.041096571792|
|       Taiwan|  9191.75591799814|
|     Slovakia|2421.8039452345542|
|    Argentina| 53607.62087352663|
|      Belgium|10834.963612265967|
|      Ecuador|  7489.96118409304|
|      Finland|14077.958767923348|
|    Nicaragua| 2065.814527933088|
|         Peru|22018.889293711018|
|       Global|1212937.9812387142|
|United States| 390275.4827340026|
|        India| 47458.82676758647|
+-------------+------------------+
only showing top 20 rows



In [28]:
# Filtrar músicas com streams acima de 300.000 usando SQL

filtro_streams_sql = spark.sql("""
    SELECT title, artist, streams
    FROM music_data
    WHERE streams > 300000
""")
filtro_streams_sql.show()


+--------------------+--------------------+---------+
|               title|              artist|  streams|
+--------------------+--------------------+---------+
|            Deu Onda|              MC G15| 612271.0|
|             Starboy|The Weeknd, Daft ...|3135625.0|
|              Closer|The Chainsmokers,...|3015525.0|
|     Let Me Love You|DJ Snake, Justin ...|2545384.0|
|Rockabye (feat. S...|        Clean Bandit|2356604.0|
|           One Dance| Drake, WizKid, Kyla|2259887.0|
|           Fake Love|               Drake|2137437.0|
|           24K Magic|          Bruno Mars|2111599.0|
|    Don't Wanna Know|Maroon 5, Kendric...|2107137.0|
|       Black Beatles|Rae Sremmurd, Guc...|2075732.0|
|    I Feel It Coming|The Weeknd, Daft ...|1949575.0|
|Chantaje (feat. M...|             Shakira|1893214.0|
|I Don’t Wanna Liv...|  ZAYN, Taylor Swift|1857252.0|
|Say You Won't Let Go|        James Arthur|1812335.0|
|Bad and Boujee (f...|               Migos|1801707.0|
|Cold Water (feat....|      

In [29]:
df.write.mode("overwrite").parquet(path+"/dados_musica_exercicio.parquet")


## Exercício 1: Filtragem Básica
Questão: Carregue o DataFrame e filtre as músicas que têm mais de 200.000 streams e foram lançadas na região dos Estados Unidos (US). Exiba apenas as colunas title, streams, region e date.

In [30]:
df.filter((df['streams'] > 200000) & (df['region'] == 'United States')).select("title", "streams", "region", "date").show()


+--------------------+---------+-------------+----------+
|               title|  streams|       region|      date|
+--------------------+---------+-------------+----------+
|Bad and Boujee (f...|1371493.0|United States|2017-01-01|
|           Fake Love|1180074.0|United States|2017-01-01|
|             Starboy|1064351.0|United States|2017-01-01|
|              Closer|1010492.0|United States|2017-01-01|
|       Black Beatles| 874289.0|United States|2017-01-01|
|Broccoli (feat. L...| 763259.0|United States|2017-01-01|
|           One Dance| 753150.0|United States|2017-01-01|
|            Caroline| 714839.0|United States|2017-01-01|
|     Let Me Love You| 690483.0|United States|2017-01-01|
|         Bounce Back| 682688.0|United States|2017-01-01|
|    I Feel It Coming| 651807.0|United States|2017-01-01|
|           24K Magic| 574974.0|United States|2017-01-01|
|Bad Things (with ...| 567789.0|United States|2017-01-01|
|    X (feat. Future)| 544620.0|United States|2017-01-01|
|I Don’t Wanna

## Exercício 2: Cálculo de Média de Streams
Questão: Use uma consulta SQL para calcular a média de streams para cada chart e exiba o resultado.



In [None]:
df.createOrReplaceTempView("music_data")

media_streams_chart_sql = spark.sql("""
    SELECT chart, AVG(streams) as avg_streams
    FROM music_data
    GROUP BY chart
""")
media_streams_chart_sql.show()


+-------+------------------+
|  chart|       avg_streams|
+-------+------------------+
| top200|55261.314376595816|
|viral50|              NULL|
+-------+------------------+



## Exercício 3: Adicionar uma Coluna de Taxa de Crescimento

Questão: Adicione uma nova coluna chamada growth_rate, que calcula a taxa de crescimento dos streams em relação ao valor do stream da linha anterior, ordenando pela coluna rank. Exiba as colunas title, streams, previous_streams e growth_rate.

In [31]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, expr

window_spec = Window.orderBy("rank")
df = df.withColumn("previous_streams", lag("streams").over(window_spec))
df = df.withColumn("growth_rate", expr("((streams - previous_streams) / previous_streams) * 100")).fillna({"growth_rate": 0})
df.select("title", "streams", "previous_streams", "growth_rate").show()


+--------------------+--------+----------------+-----------+
|               title| streams|previous_streams|growth_rate|
+--------------------+--------+----------------+-----------+
|Chantaje (feat. M...|253019.0|            NULL|        0.0|
|               Pepas|    NULL|        253019.0|        0.0|
|             Starboy|107350.0|            NULL|        0.0|
|            Di Di Di|    NULL|        107350.0|        0.0|
|               Alone| 12990.0|            NULL|        0.0|
|        Life Goes On|    NULL|         12990.0|        0.0|
|            Hey Baby| 23277.0|            NULL|        0.0|
|              Pierre|    NULL|         23277.0|        0.0|
|Reggaetón Lento (...|  6784.0|            NULL|        0.0|
|         Hadal Ahbek|    NULL|          6784.0|        0.0|
|            Deu Onda|612271.0|            NULL|        0.0|
|         Hadal Ahbek|    NULL|        612271.0|        0.0|
|             Starboy|  1628.0|            NULL|        0.0|
|          うっせぇわ|    NUL

## Exercício 4: Converter Coluna trend para Valores Numéricos

Questão: Crie uma nova coluna chamada trend_num que mapeie os valores da coluna trend para números da seguinte forma: up = 1, down = -1, stable = 0. Exiba as colunas title, trend e trend_num.

In [37]:
df = df.withColumn(
    "trend_num",
    when(col("trend") == "MOVE_UP", 1)
    .when(col("trend") == "MOVE_DOWN", -1)
    .when(col("trend") == "SAME_POSITION", 0)
    .otherwise(None)
)

df.select("title", "trend", "trend_num").show()


+--------------------+-------------+---------+
|               title|        trend|trend_num|
+--------------------+-------------+---------+
|Chantaje (feat. M...|SAME_POSITION|        0|
|Vente Pa' Ca (fea...|      MOVE_UP|        1|
|Reggaetón Lento (...|    MOVE_DOWN|       -1|
|              Safari|SAME_POSITION|        0|
|         Shaky Shaky|      MOVE_UP|        1|
|         Traicionera|    MOVE_DOWN|       -1|
|Cuando Se Pone a ...|    MOVE_DOWN|       -1|
|Otra vez (feat. J...|    MOVE_DOWN|       -1|
|        La Bicicleta|      MOVE_UP|        1|
|Dile Que Tu Me Qu...|    MOVE_DOWN|       -1|
|  Andas En Mi Cabeza|SAME_POSITION|        0|
|Desde Esa Noche (...|      MOVE_UP|        1|
|      Borro Cassette|      MOVE_UP|        1|
|Gyal You A Party ...|    MOVE_DOWN|       -1|
|Me llamas (feat. ...|    MOVE_DOWN|       -1|
|La Bicicleta (fea...|      MOVE_UP|        1|
|DUELE EL CORAZON ...|      MOVE_UP|        1|
|     Let Me Love You|    MOVE_DOWN|       -1|
|La Noche No 

## Exercício 5: Análise Temporal
Questão: Filtre todas as músicas que foram lançadas em junho de 2021 e exiba as colunas title, date, e region.

In [33]:
from pyspark.sql.functions import month, year

df.filter((month(col("date")) == 6) & (year(col("date")) == 2021)).select("title", "date", "region").show()


+--------------------+----------+---------+
|               title|      date|   region|
+--------------------+----------+---------+
|          Todo De Ti|2021-06-01|Argentina|
|            Miénteme|2021-06-01|Argentina|
|       Qué Más Pues?|2021-06-01|Argentina|
|      Pareja Del Año|2021-06-01|Argentina|
|YO SE QUE TU (fea...|2021-06-01|Argentina|
|         Te Necesito|2021-06-01|Argentina|
|                  AM|2021-06-01|Argentina|
|           2/Catorce|2021-06-01|Argentina|
|                Fiel|2021-06-01|Argentina|
|                 512|2021-06-01|Argentina|
|Además de Mí - Remix|2021-06-01|Argentina|
|          Acaramelao|2021-06-01|Argentina|
|            good 4 u|2021-06-01|Argentina|
|              Malbec|2021-06-01|Argentina|
|L-Gante: Bzrp Mus...|2021-06-01|Argentina|
|          EL MAKINON|2021-06-01|Argentina|
|               WACHA|2021-06-01|Argentina|
|Snow Tha Product:...|2021-06-01|Argentina|
|Baila Conmigo (wi...|2021-06-01|Argentina|
|             Poblado|2021-06-01

## Exercício 6: Calcular a Média de Streams por Região usando Spark SQL

Questão: Calcule a média de streams para cada region usando Spark SQL. Exiba o nome da região e a média de streams.

In [38]:
df.createOrReplaceTempView("music_data")

media_streams_chart_sql = spark.sql("""
    SELECT region, AVG(streams) as avg_streams
    FROM music_data
    GROUP BY region
""")
media_streams_chart_sql.show()

+-------------+------------------+
|       region|       avg_streams|
+-------------+------------------+
|     Paraguay| 6286.671493617753|
|       Sweden| 41185.30777730343|
|  Philippines| 52417.66259382237|
|     Malaysia|11142.606986802088|
|    Singapore|11038.974893132583|
|       Turkey|  35253.5100812418|
|      Germany|111789.55613780832|
|       France| 66603.04668378024|
|       Greece| 4483.041096571792|
|       Taiwan|  9191.75591799814|
|     Slovakia|2421.8039452345542|
|    Argentina| 53607.62087352663|
|      Belgium|10834.963612265967|
|      Ecuador|  7489.96118409304|
|      Finland|14077.958767923348|
|    Nicaragua| 2065.814527933088|
|         Peru|22018.889293711018|
|       Global|1212937.9812387142|
|United States| 390275.4827340026|
|        India| 47458.82676758647|
+-------------+------------------+
only showing top 20 rows



## Exercício 7: Classificação por Rank

Questão: Ordene o DataFrame pelo rank em ordem crescente e exiba as colunas rank, title, e streams.

In [39]:

df = df.orderBy(col("rank").asc())

df.select("rank", "title", "streams").show()


+----+--------------------------------+---------+
|rank|                           title|  streams|
+----+--------------------------------+---------+
|   1|                    Dance Monkey|  24362.0|
|   1|                 drivers license|  70914.0|
|   1|                       Your Song|     NULL|
|   1|                    Blaues Licht| 788689.0|
|   1|                          Closer|   6406.0|
|   1|                 drivers license|  10587.0|
|   1|            STAY (with Justin...|1824958.0|
|   1|                  Last Christmas| 286787.0|
|   1|                    Dance Monkey|  30646.0|
|   1|            Моя голова винтом...|  12821.0|
|   1|                 Strip That Down|     NULL|
|   1|                 drivers license|  39310.0|
|   1|                      God's Plan|   8654.0|
|   1|                 drivers license|  11342.0|
|   1|            Met Gala (feat. O...|     NULL|
|   1|            Thezz (feat. Smallx)| 103577.0|
|   1|                   Unforgettable|1223919.0|


## Exercício 8: Contagem de Músicas por Tendência

Questão: Use uma consulta SQL para contar quantas músicas há em cada tipo de trend (up, down, stable).

In [40]:
df.createOrReplaceTempView("music_data")

media_streams_chart_sql = spark.sql("""
    SELECT trend, COUNT (*) as qtd
    FROM music_data
    GROUP BY trend
""")
media_streams_chart_sql.show()

+-------------+--------+
|        trend|     qtd|
+-------------+--------+
|    NEW_ENTRY| 1853640|
|    MOVE_DOWN|11220434|
|      MOVE_UP| 9801048|
|SAME_POSITION| 3298392|
+-------------+--------+

