<a href="https://colab.research.google.com/github/felipecampelo/ETL-BucketGCP-MongoDB/blob/main/ETL_BucketGCP_MongoDB.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## ❗ ETL: Bucket GCP + MongoDB ❗

`Bucket <-> ETL <-> MongoDB`

⏩**Objetivo**: Extrair o dataset do bucket do GCP, fazer o tratamento e enviar para um banco de dados do MongoDB

⚡ Instalação das bibliotecas necessárias ⚡

In [None]:
!pip install pandera
!pip install pymongo[srv]

⚡ Importação das bibliotecas ⚡

In [2]:
import pandas as pd 
import numpy as np
import pandera as pa

import pymongo
from pymongo import MongoClient

⚡ Leitura dos dados do bucket e pré-análise ⚡

In [3]:
df = pd.read_csv('https://storage.googleapis.com/spotify-bucket-etl/spotify.csv', index_col=0) # Fazendo a leitura do dataset no bucket

# Criando uma cópia do arquivo original
dforiginal = df.copy()

df.sort_values('song_popularity', ascending=False, inplace=True) # Deixando as músicas em ordem de popularidade
df.head() # Mostrando as 5 primeiras linhas do dataframe

Unnamed: 0,song_name,song_popularity,song_duration_ms,acousticness,danceability,energy,instrumentalness,key,liveness,loudness,audio_mode,speechiness,tempo,time_signature,audio_valence
1757,Party In The U.S.A.,nao_sei,0.8220000000000001kg,0.519mol/L,0.36,0.0,10.0,0.177,-8.575,0.0,0.105,97.42,4.0,0.7,
7574,I Love It (& Lil Pump),99,127946,0.0114kg,0.901mol/L,0.522,0.0,2.0,0.259,-8.304,1.0,0.33,104.053,4.0,0.329
11777,I Love It (& Lil Pump),99,127946,0.0114kg,0.901mol/L,0.522,0.0,2.0,0.259,-8.304,1.0,0.33,104.053,4.0,0.329
4301,I Love It (& Lil Pump),99,127946,0.0114kg,0.901mol/L,0.522,0.0,2.0,0.259,-8.304,1.0,0.33,104.053,4.0,0.329
14444,I Love It (& Lil Pump),99,127946,0.0114kg,0.901mol/L,0.522,0.0,2.0,0.259,-8.304,1.0,0.33,104.053,4.0,0.329


In [11]:
# Percebemos tipos equivocados em algumas features
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14932 entries, 0 to 14931
Data columns (total 16 columns):
 #   Column            Non-Null Count  Dtype   
---  ------            --------------  -----   
 0   index             14932 non-null  int64   
 1   song_name         14932 non-null  object  
 2   song_popularity   14931 non-null  category
 3   song_duration_ms  14932 non-null  float64 
 4   acousticness      14932 non-null  float64 
 5   danceability      14932 non-null  float64 
 6   energy            14931 non-null  float64 
 7   instrumentalness  14930 non-null  float64 
 8   key               14931 non-null  category
 9   liveness          14928 non-null  float64 
 10  loudness          14931 non-null  float64 
 11  audio_mode        14930 non-null  category
 12  speechiness       14931 non-null  float64 
 13  tempo             14931 non-null  float64 
 14  time_signature    14929 non-null  category
 15  audio_valence     14931 non-null  float64 
dtypes: category(4), float6

⚡ Tratamento dos dados ⚡

In [5]:
# Removendo as duplicatas
df.drop_duplicates(keep='first', inplace=True) # Mantendo a primeira ocorrência para não desperdiçar dados

# Removendo unidades de medidas
def remove_units(DataFrame, columns, remover):
    for col in columns:
        DataFrame[col] = DataFrame[col].str.strip(remover)
 
remove_units(df, ['acousticness', 'danceability'], 'mol/L')
remove_units(df, ['song_duration_ms', 'acousticness'], 'kg')

# Trocando inconsistencias por np.nan
df = df.replace(['nao_sei'], np.nan)
df['key'] = df['key'].replace([0.177], np.nan)
df['audio_mode'] = df['audio_mode'].replace(['0.105'], np.nan)
df['speechiness'] = df['speechiness'].replace(['0.nao_sei'], np.nan)
df['time_signature'] = df['time_signature'].replace(['0.7', '2800000000'], np.nan)

# Ajustando o tipo das colunas
numerical_cols = ['song_duration_ms', 'acousticness', 'danceability',
                  'energy', 'instrumentalness', 'liveness', 'loudness',
                  'speechiness', 'tempo', 'audio_valence']
 
categorical_cols = ['song_popularity', 'key', 'audio_mode', 'time_signature']
 
def to_type(DataFrame, columns, type):
    for col in columns:
        DataFrame[col] = DataFrame[col].astype(type)
 
to_type(df, numerical_cols, 'float')
to_type(df, categorical_cols, 'category')

In [12]:
# Agora está correto
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14932 entries, 0 to 14931
Data columns (total 16 columns):
 #   Column            Non-Null Count  Dtype   
---  ------            --------------  -----   
 0   index             14932 non-null  int64   
 1   song_name         14932 non-null  object  
 2   song_popularity   14931 non-null  category
 3   song_duration_ms  14932 non-null  float64 
 4   acousticness      14932 non-null  float64 
 5   danceability      14932 non-null  float64 
 6   energy            14931 non-null  float64 
 7   instrumentalness  14930 non-null  float64 
 8   key               14931 non-null  category
 9   liveness          14928 non-null  float64 
 10  loudness          14931 non-null  float64 
 11  audio_mode        14930 non-null  category
 12  speechiness       14931 non-null  float64 
 13  tempo             14931 non-null  float64 
 14  time_signature    14929 non-null  category
 15  audio_valence     14931 non-null  float64 
dtypes: category(4), float6

⚡ Validação dos dados com o pandera ⚡

In [None]:
# Criando o Schema de dados(validação)
schema = pa.DataFrameSchema(
      columns = {
          'song_duration_ms': pa.Column(pa.Float, nullable = True),
          'acousticness': pa.Column(pa.Float, nullable = True),
          'danceability': pa.Column(pa.Float, nullable = True),
          'energy': pa.Column(pa.Float, nullable = True),
          'instrumentalness': pa.Column(pa.Float, nullable = True),
          'liveness': pa.Column(pa.Float, nullable = True),
          'loudness': pa.Column(pa.Float, nullable = True),
          'speechiness': pa.Column(pa.Float, nullable = True),
          'tempo': pa.Column(pa.Float, nullable = True),
          'audio_valence': pa.Column(pa.Float, nullable = True),
          'song_popularity': pa.Column(pa.Category, nullable = True),
          'key': pa.Column(pa.Category, nullable = True),
          'audio_mode': pa.Column(pa.Category, nullable = True),
          'time_signature': pa.Column(pa.Category, nullable = True)
      })

#Validador do Dataframe baseado no Schema
schema.validate(df)

⚡ Conexão com o MongoDB ⚡

In [7]:
# Conexão ao servidor
mongo_uri = 'mongodb+srv://felipecampelo:senha@spotify.e0u26fn.mongodb.net/?retryWrites=true&w=majority'
client = pymongo.MongoClient(mongo_uri)

⚡ Inserindo o dataframe original ⚡

In [9]:
# Selecionar Database e Coleção
db = client['Spotify'] # Database -> Spotify
colecao = db.original # Collection -> original

# Transformando os dados em dicionário
dforiginal.reset_index(inplace = True) # Eliminar o rótulo
dforiginal_dict = dforiginal.to_dict("records")

# Enviando para a coleção original da base de dados Spotify
colecao.insert_many(dforiginal_dict)

<pymongo.results.InsertManyResult at 0x7fbc5ec8d710>

⚡ Inserindo o dataframe tratado ⚡

In [10]:
# Selecionar Database e Coleção
db = client['Spotify'] # Database -> Spotify
colecaotrat = db.tratado # Collection -> tratado

# Transformando os dados em dicionário
df.reset_index(inplace = True) # Eliminar o rótulo
df_dict = df.to_dict("records")

# Enviando para a coleção original da base de dados Spotify
colecaotrat.insert_many(df_dict)

<pymongo.results.InsertManyResult at 0x7fbc5e8f7d50>