<a href="https://colab.research.google.com/github/datapreparation-javeriana/etl-tutorial/blob/master/mongodb-to-bigquery.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**EXTRACT**

In [None]:
import pandas as pd

#Lectura de datasets
tracks_df = pd.read_csv("/content/tracks_mod.csv")
df_artist = pd.read_csv("/content/artists_mod.csv")

**TRANSFORM**

In [None]:
# ----------------------TRACKS-----------------------------------------------------------------

#Eliminar filas con nombres de canción vacíos
tracks_df = tracks_df.dropna(subset=['name']) 

#Busca las fechas que tienen formato "year xxxx"
pos = tracks_df[["release_date"]][tracks_df[["release_date"]].release_date.str.contains('Year')].index

#Reemplaza fechas con formato "year xxxx" con el año
for i in pos:
  tracks_df.loc[i,'release_date'] = str(tracks_df.loc[i,'release_date'][-4:])

#Unificar formato de fechas
from datetime import datetime
from dateutil.parser import parse
import datetime

fechas = tracks_df["release_date"]

formatos = ['%Y-%m-%d','%Y-%m', '%Y/%m/%d','%Y-%Y']

#Lista de formatos comunes
formato_comun = '%Y'

#Convertir todas las fechas a un formato común
fechas_unificadas = []
for fecha in fechas:
    try:
        fecha_datetime = datetime.datetime.strptime(fecha, formato_comun)
        fechas_unificadas.append(fecha_datetime.year)
    except ValueError:
        fecha_datetime = parse(fecha)
        fechas_unificadas.append(fecha_datetime.year)
tracks_df['release_date'] = fechas_unificadas

#Conversión de la duración de las canciones de milisegundos a minutos
tracks_df["duration_ms"] = tracks_df["duration_ms"]/60000

#Se cambia el nombre de la columna duration_ms a duration_min
tracks_df = tracks_df.rename(columns={'duration_ms': 'duration_min'})

#Se reemplazan los valores vacíos de ciertas variables por su media 
mean_popularity = tracks_df["popularity"].mean()         
tracks_df["popularity"] = tracks_df["popularity"].fillna(mean_popularity)
tracks_df["popularity"] = tracks_df["popularity"].astype(int)

mean_danc = tracks_df["danceability"].mean()         
tracks_df["danceability"] = tracks_df["danceability"].fillna(mean_danc)

mean_energy = tracks_df["energy"].mean()        
tracks_df["energy"] = tracks_df["energy"].fillna(mean_energy)

mean_key = tracks_df["key"].mean()        
tracks_df["key"] = tracks_df["key"].fillna(mean_key)

mean_loudness = tracks_df["loudness"].mean()        
tracks_df["loudness"] = tracks_df["loudness"].fillna(mean_loudness)

mean_speech = tracks_df["speechiness"].mean()        
tracks_df["speechiness"] = tracks_df["speechiness"].fillna(mean_speech)

mean_acous = tracks_df["acousticness"].mean()        
tracks_df["acousticness"] = tracks_df["acousticness"].fillna(mean_acous)

mean_instr = tracks_df["instrumentalness"].mean()        
tracks_df["instrumentalness"] = tracks_df["instrumentalness"].fillna(mean_instr)

mean_live = tracks_df["liveness"].mean()        
tracks_df["liveness"] = tracks_df["liveness"].fillna(mean_live)

mean_val = tracks_df["valence"].mean()        
tracks_df["valence"] = tracks_df["valence"].fillna(mean_val)

mean_tempo = tracks_df["tempo"].mean()        
tracks_df["tempo"] = tracks_df["tempo"].fillna(mean_tempo)

#Convirtiendo variable speechiness a categórica 1 a 0.66 (alto sp), 0.66 a 0.33 (medio sp) y 0.33 a 0 (bajo sp)
datos = tracks_df["speechiness"]
nuevos = []
for i in datos:
  if i >= 0.66:
    nuevos.append("Alto SP") 
  elif i >= 0.33:
    nuevos.append("Medio SP")
  else:
    nuevos.append("Bajo SP")

In [None]:
# ----------------------ARTISTS-----------------------------------------------------------------
## Own functions to work in artist spotify 
## Defined in the order of use

def formatting_str(column, df):
    " To clean genres"
    for index in df.index:
        if df.loc[index,column]=='[]':
            df.loc[index,column] = 'unknown'
        else:
            new_string = df.loc[index,column]
            df.loc[index,column] = re.sub("[\[\]']", "", new_string)
    return df


def gen_database():
    genres = []
    for index in df_artist.index:
        new_string = df_artist.loc[index,'genres']
        new_string = new_string.split(",")
        for genre in new_string:
            if genre not in genres:
                genres.append(genre) 

    genres = [genre.strip() for genre in genres]# droping initial and final blank characters
    genres = list(set(genres))          
    database = {}
    alpha =60
    mainly = ['unknown','disco','metal', 'blues', 'jazz', 'rock', 'rap', 'hip hop', 'reggae',
                     'pop', 'indie', 'ballad', 'folclor', 'folk', 'gospel',
                     'ska', 'punk', 'country', 'electronic','soul', 'opera',
                     'cumbia', 'techno', 'alternative', 'bolero', 'trap','vallenato',
                     'grunge','corrido','flamenco', 'trio', 'motivation', 'classical', 'instrumental',
                     'funk', 'hardcore', 'bachata','merengue', 'salsa', 'ranchera', 'orchestra', 
                      'tango', 'opera', 'son cubano', 'banda','percusion','samba','mambo']
    
    genres = [x.replace(" ","") for x in genres]

    for main in mainly:
        database[main] = []

    for main in mainly:
        for genre in genres:
            if main in genre:
                database[main].append(genre)

    for main in mainly:
        for genre in genres:
            if fuzz.ratio(main, genre)>alpha:
                database[main].append(genre)

    # All not categorized going to other
    categorized = []
    for key in database:
        for item in database[key]:
            if item not in categorized:
                categorized.append(item)
    database['other'] = list(set(genres) - set(categorized))
    
    return database 


def return_key(genre, database):
    """This function will return us the general genre
        We can improve this function when appear in more
        of a key uses fuzzy to math more adecuately actually is greedy
    """
    paired = []
    for key in database:
        for element in database[key]:
            paired.append((element,key))
    index = 0
    for pair in paired:
        if genre == pair[0]:
            break
        index +=1
    result = paired[index][1]
    return result


def standard_row(row,database):
    """
    This function allow me put all genres of a column in a standard way
    """
    replaced = row.split(",")
    replaced = [x.replace(" ","") for x in replaced]
    replaced = [return_key(x,database) for x in replaced]  # Applyng the function of homogenize a single word
    replaced = ','.join(replaced)
    return  replaced

mainly = ['unknown','disco','metal', 'blues', 'jazz', 'rock', 'rap', 'hip hop', 'reggae',
                     'pop', 'indie', 'ballad', 'folclor', 'folk', 'gospel',
                     'ska', 'punk', 'country', 'electronic','soul', 'opera',
                     'cumbia', 'techno', 'alternative', 'bolero', 'trap','vallenato',
                     'grunge','corrido','flamenco', 'trio', 'motivation', 'classical', 'instrumental',
                     'funk', 'hardcore', 'bachata','merengue', 'salsa', 'ranchera', 'orchestra', 
                      'tango', 'opera', 'son cubano', 'banda','percusion','samba','mambo']
mainly.append('other')

scores = {}
for main in mainly:
    if (main=='other'):
        scores[main]=0
    elif (main=='unknown'):
        scores[main]=-1
    else:
        scores[main]=1


def main_genre(row,mainly,scores):
    genres = row.split(',')
    genres_score = [scores[genre] for genre in genres]
    index = genres_score.index(max(genres_score))
    main_genre = genres[index]
    return main_genre



In [None]:
!pip install thefuzz
!pip install tableOne

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tableOne
  Downloading tableone-0.7.12-py3-none-any.whl (32 kB)
Installing collected packages: tableOne
Successfully installed tableOne-0.7.12


In [None]:
import re
import numpy as np
import pandas as pd
from thefuzz import fuzz, process

"""
symbols = []
for index in df_artist.index:
    print(df_artist.loc[index,'genres'],index)
    for symb in df_artist.loc[index,'genres']:
        if symb not in symbols:
            symbols.append(symb)
"""

"""
for genre in genres[0:20]:
    df_artist[genre]=''
    for index in df_artist.index[560:600]:
        if bool(re.fullmatch(genre, df_artist.loc[index,'genres'])):
            df_artist.loc[index,genre] = 1
"""

"""
df_artist = pd.read_csv("artists_mod.csv")
print(df_artist['genres'].isnull().sum())
df_artist = formatting_str('genres',df_artist)
print(df_artist['genres'].isnull().sum())
database = gen_database()
print(df_artist['genres'].isnull().sum())

df_artist['genres'] = df_artist['genres'].apply(lambda x: standard_row(x,database))

df_artist.to_csv("remaster.csv")
"""

# Second part 
df = pd.read_csv("/content/remaster.csv")

df['main_genre'] = df['genres'].apply(lambda x : main_genre(x,mainly,scores))
df.drop(columns=['Unnamed: 0', 'id', 'genres', 'name'], inplace=True)
df.to_csv("lastbase.csv")

from tableone  import TableOne, load_dataset
from scipy import stats
categorial = []
nonormal = []
normal = []
for t in df.columns:
    print(df[t].dtypes, t)
    if df[t].dtypes=="object" or df[t].dtypes.name=='category':
        categorial.append(t)
    if df[t].dtypes=="int64" or df[t].dtypes=="float64":
            n,p = stats.shapiro(df[t])
            if p<0.05:
                nonormal.append(t)
            else: 
                normal.append(t)
                
print(len(df.columns)) 
print(len(normal) + len(nonormal) + len(categorial))
mytable = TableOne(df,categorical=categorial, nonnormal=nonormal)
mytable
mytable.to_latex('tableOne.tex')

artist_df = pd.read_csv("/content/lastbase.csv")

float64 followers
int64 popularity
object main_genre
3
3


  mytable.to_latex('tableOne.tex')


**LOAD**

In [None]:
import random
import string

import numpy as np
import pandas as pd

from google.cloud import bigquery
from google.oauth2 import service_account

In [None]:
credentials = service_account.Credentials.from_service_account_file("/content/gestion-bases-datos-5cb2829adf4e.json", scopes=["https://www.googleapis.com/auth/cloud-platform"])

In [None]:
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

In [None]:
# Creating the job config
job_config = bigquery.LoadJobConfig(
    schema=[
        # Supported datatypes: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
        bigquery.SchemaField("id", bigquery.enums.SqlTypeNames.STRING), #revisar string
        bigquery.SchemaField("name", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("popularity", bigquery.enums.SqlTypeNames.INT64),
        bigquery.SchemaField("duration_min", bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("explicit", bigquery.enums.SqlTypeNames.INT64),
        bigquery.SchemaField("artists", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("id_artists", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("release_date", bigquery.enums.SqlTypeNames.INT64),
        bigquery.SchemaField("danceability", bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("energy", bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("key", bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("loudness", bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("mode", bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("speechiness", bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("acousticness", bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("instrumentalness", bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("liveness", bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("valence", bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("tempo", bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("time_signature", bigquery.enums.SqlTypeNames.FLOAT64)
    ],
    # Drod and re-create table, if exist
    write_disposition="WRITE_TRUNCATE"
)

In [None]:
def get_random_string(length):
    # choose from all lowercase letter
    letters = string.ascii_lowercase
    return "".join(random.choice(letters) for i in range(length))

In [None]:
BQ_TABLE_NAME = f"Gestion_bases_datos.tracks_{get_random_string(4)}"
print(BQ_TABLE_NAME)

Gestion_bases_datos.tracks_xqnx


In [None]:
# Sending the job to BigQuery
job = client.load_table_from_dataframe(tracks_df, BQ_TABLE_NAME, job_config=job_config)
job.result()

LoadJob<project=gestion-bases-datos, location=US, id=04b2b746-f3e2-47f5-ab36-d4f8e2b42fc2>

In [None]:
# Creating the job config
job_config = bigquery.LoadJobConfig(
    schema=[
        # Supported datatypes: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
        bigquery.SchemaField("followers", bigquery.enums.SqlTypeNames.FLOAT64), #revisar string
        bigquery.SchemaField("popularity", bigquery.enums.SqlTypeNames.INT64),
        bigquery.SchemaField("main_genre", bigquery.enums.SqlTypeNames.STRING)
    ],
    # Drod and re-create table, if exist
    write_disposition="WRITE_TRUNCATE"
)

In [None]:
BQ_TABLE_NAME_A = f"Gestion_bases_datos.artists_{get_random_string(4)}"
print(BQ_TABLE_NAME_A)

Gestion_bases_datos.artists_plmr


In [None]:
# Sending the job to BigQuery
job = client.load_table_from_dataframe(artist_df, BQ_TABLE_NAME_A, job_config=job_config)
job.result()

LoadJob<project=gestion-bases-datos, location=US, id=1d42c600-a81b-4050-b088-6ff4d9c229c0>

In [None]:
# Verifying if table was successfully created or updated
table = client.get_table(BQ_TABLE_NAME)
print("Loaded {} rows and {} columns to {}".format(table.num_rows, len(table.schema), BQ_TABLE_NAME))

Loaded 586601 rows and 20 columns to Gestion_bases_datos.tracks_xqnx


In [None]:
# Verifying if table was successfully created or updated
table = client.get_table(BQ_TABLE_NAME_A)
print("Loaded {} rows and {} columns to {}".format(table.num_rows, len(table.schema), BQ_TABLE_NAME_A))

Loaded 1162095 rows and 4 columns to Gestion_bases_datos.artists_plmr


In [None]:
query = f"""SELECT * FROM `gestion-bases-datos.{BQ_TABLE_NAME}`"""
pd.read_gbq(query, credentials=credentials)

KeyboardInterrupt: ignored

In [None]:
query = f"""SELECT * FROM `gestion-bases-datos.{BQ_TABLE_NAME_A}`"""
pd.read_gbq(query, credentials=credentials)

Unnamed: 0.1,Unnamed: 0,followers,popularity,main_genre
0,,53636.0,53,pop
1,,72684.0,51,pop
2,,248568.0,52,folk
3,,5644.0,52,unknown
4,,786.0,56,other
...,...,...,...,...
1162090,,1.0,35,unknown
1162091,,1.0,35,unknown
1162092,,1.0,35,other
1162093,,1.0,35,unknown
