In [1]:
import os
from google.cloud import bigquery
from google.cloud import storage
import pandas as pd
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "ethereal-accord-397414-944cb605214b.json"

# Creción de Tablas

In [2]:
def crear_tabla(dataset_id, table_id, schema):
    """
    Funcion que crea una tabla en el dataset con el schema pasado.
    """
    client = bigquery.Client()
    # Preparar estructura de la tabla.
    table_ref = client.dataset(dataset_id).table(table_id)
    table = bigquery.Table(table_ref, schema=schema)

    # Envía la solicitud para crear la tabla.
    client.create_table(table)

In [4]:
# Definir nombre del Dataset
dataset_id = "Migraciones"
project_id = "ethereal-accord-397414"

In [4]:
# Crear tabla 'pais'
table_id = "pais"

schema = [
    bigquery.SchemaField("id_pais", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("nombre", "STRING", mode="NULLABLE")
]

crear_tabla(dataset_id, table_id, schema)

DefaultCredentialsError: File ethereal-accord-397414-944cb605214b.json was not found.

In [45]:
# Crear tabla 'fac_social'
table_id = "fac_social"

schema = [
    bigquery.SchemaField("id_fac_soc", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("hdi", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("anio_prom_esc", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("esperanza_vida", "FLOAT", mode="NULLABLE")
]

crear_tabla(dataset_id, table_id, schema)

In [46]:
# Crear tabla 'inmigracion'
table_id = "inmigracion"

schema = [
    bigquery.SchemaField("id_inmigracion", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("hombres", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("mujeres", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("total", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Argentina", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Antigua_y_Barbuda", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Bahamas", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Belice", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Bolivia", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Brasil", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Canada", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Chile", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Colombia", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Costa_Rica", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Cuba", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Santa_Lucia", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Ecuador", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Granada", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Guatemala", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Guyana", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Honduras", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Haiti", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Jamaica", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Republica_Dominicana", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Mexico", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Nicaragua", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Panama", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Peru", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Paraguay", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("El_Salvador", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Trinidad_y_Tobago", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Uruguay", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Estados_Unidos", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("Venezuela", "INTEGER", mode="NULLABLE")
]

crear_tabla(dataset_id, table_id, schema)

In [47]:
# Crear tabla 'fac_economico'
table_id = "fac_economico"

schema = [
    bigquery.SchemaField("id_fac_eco", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("pbi_per_capita", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("pbi_per_cap_aj", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("desempleo", "FLOAT", mode="NULLABLE"),
]

crear_tabla(dataset_id, table_id, schema)

In [5]:
# Crear tabla 'migracion'
table_id = "migracion"

schema = [
    bigquery.SchemaField("id_migracion", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("id_inmigracion", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("id_fac_soc", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("id_fac_eco", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("id_pais", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("migracion_neta", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("migracion_neta_pred", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("anio", "DATE", mode="NULLABLE"),
]

crear_tabla(dataset_id, table_id, schema)

############################################################################
# Generacion Data Frames preparados para BigQuery  y guardado CSV respaldo en el Cloud Storage
############################################################################

In [5]:
source_folder_name = "ArchivosFuente/"
bq_folder_name = "ParaBigQuery/"
bucket_name = 'bucket-pf-flujos_migratorios'

In [6]:
def lectura_csv_bucket(file_path, bucket_name):
    """ 
    Lectura de un csv en file_path desde el Bucket bucket_name.
    Retorna un DataFrame con el csv leido.
    """
    # Crea un cliente de almacenamiento de Google Cloud
    storage_client = storage.Client()

    # Obtiene el cubo de almacenamiento
    bucket = storage_client.get_bucket(bucket_name)

    # Obtiene el objeto de archivo CSV
    blob = bucket.blob(file_path)

    # Lee el contenido del archivo CSV en un DataFrame de Pandas
    with blob.open("r") as file:
        df = pd.read_csv(file)
    
    return df

def guardado_csv_bucket(df, file_path, bucket_name):
    """ 
    Guardado de un DataFrame en csv en file_path desde el Bucket bucket_name.
    """
    # Guarda el DataFrame en un archivo CSV
    csv_data = df.to_csv(index=False)

    # Crea un cliente de almacenamiento de Google Cloud
    storage_client = storage.Client()

    # Obtiene el cubo de almacenamiento
    bucket = storage_client.get_bucket(bucket_name)

    # Carga el archivo CSV en el bucket de Google Cloud Storage
    blob = bucket.blob(file_path)
    blob.upload_from_string(csv_data)

In [8]:
# Lectura de todos los csv fuentes desde el Bucket.
file_path_undp = source_folder_name + 'migracion_undp.csv'
df_undp = lectura_csv_bucket(file_path_undp, bucket_name)
file_path_bm = source_folder_name + 'migracion_desempleo_pbi_per_capita.csv'
df_bm = lectura_csv_bucket(file_path_bm, bucket_name)
file_path_inmigracion = source_folder_name + 'Inmigracion_por_pais.csv'
df_inmigracion = lectura_csv_bucket(file_path_inmigracion, bucket_name)

In [9]:
######################################
# Generacion DF y CSV para tabla 'pais'
######################################

# Generador de 'id_pais'
pais = []
j = 1
for i in (df_undp['Pais'].unique()):
    pais.append([j,i])
    j += 1

df_bq_pais = pd.DataFrame(pais, columns = ['id_pais','nombre'])

# Guardo CSV
file_path = bq_folder_name + 'pais.csv'
guardado_csv_bucket(df_bq_pais, file_path, bucket_name)

In [10]:
######################################
# Generacion DF y CSV para tabla 'fac_social'
######################################

# Como son todos del mismo Dataset no necesitamos hacer un merge.
df_bq_fac_soc = df_undp.copy()
df_bq_fac_soc['id_pais'] = df_undp['Pais'].replace(dict(zip(df_bq_pais["nombre"], df_bq_pais["id_pais"])))
df_bq_fac_soc['id_fac_soc'] = 10000 * df_bq_fac_soc['Anio'] + df_bq_fac_soc['id_pais']

df_bq_fac_soc.drop(columns=['Pais', 'id_pais', 'Anio', 'PBI_per_cap_aj'], inplace=True)

df_bq_fac_soc.head()

# Guardo CSV
file_path = bq_folder_name + 'fac_social.csv'
guardado_csv_bucket(df_bq_fac_soc, file_path, bucket_name)

In [11]:
######################################
# Generacion DF y CSV para tabla 'fac_economico'
######################################

df_bq_fac_eco = df_undp.merge(df_bm, on=['Pais', 'Anio'])

df_bq_fac_eco['id_pais'] = df_bq_fac_eco['Pais'].replace(dict(zip(df_bq_pais["nombre"], df_bq_pais["id_pais"])))
df_bq_fac_eco['id_fac_eco'] = 10000 * df_bq_fac_eco['Anio'] + df_bq_fac_eco['id_pais']

df_bq_fac_eco.drop(columns=['Pais', 'id_pais', 'Anio', 'hdi', 'Esperanza_vida', 'Anio_prom_esc', 'migracion_neta'], inplace=True)

df_bq_fac_eco.rename(columns={'PBI_per_cap_aj': 'pbi_per_cap_aj'}, inplace=True)

# Guardo CSV
file_path = bq_folder_name + 'fac_economico.csv'
guardado_csv_bucket(df_bq_fac_eco, file_path, bucket_name)

In [12]:
######################################
# Generacion DF y CSV para tabla 'inmigracion'
######################################

df_bq_inmigracion = df_inmigracion.copy()
df_bq_inmigracion['id_pais'] = df_bq_inmigracion['Pais'].replace(dict(zip(df_bq_pais["nombre"], df_bq_pais["id_pais"])))
df_bq_inmigracion['id_inmigracion'] = 10000 * df_bq_inmigracion['Anio'] + df_bq_inmigracion['id_pais']

df_bq_inmigracion.drop(columns=['Pais', 'id_pais', 'Anio'], inplace=True)
# Renombro asi coinciden
df_bq_inmigracion.rename(columns={'Inmigrantes_hombres': 'hombres', 'Inmigrantes_mujeres': 'mujeres', 'Inmigrantes': 'total'}, inplace=True)

# Paso a Int
df_bq_inmigracion[df_bq_inmigracion.columns] = df_bq_inmigracion[df_bq_inmigracion.columns].astype('Int64')

# Guardo CSV
file_path = bq_folder_name + 'inmigracion.csv'
guardado_csv_bucket(df_bq_inmigracion, file_path, bucket_name)

In [13]:
######################################
# Generacion DF y CSV para tabla 'migracion'
######################################

df_bq_migracion = df_bm.copy()
df_bq_migracion['id_pais'] = df_bq_migracion['Pais'].replace(dict(zip(df_bq_pais["nombre"], df_bq_pais["id_pais"])))
df_bq_migracion['id_migracion'] = 10000 * df_bq_migracion['Anio'] + df_bq_migracion['id_pais']

df_bq_migracion.drop(columns=['Pais', 'desempleo', 'pbi_per_capita'], inplace=True)
# Anio con minuscula
df_bq_migracion.rename(columns={'Anio': 'anio'}, inplace=True)

# crear ids de los otros copiando de id_migracion
df_bq_migracion['id_inmigracion'] = df_bq_migracion['id_migracion']
df_bq_migracion['id_fac_soc'] = df_bq_migracion['id_migracion']
df_bq_migracion['id_fac_eco'] = df_bq_migracion['id_migracion']

#Quitar id erroneos
df_bq_migracion.loc[~df_bq_migracion['id_migracion'].isin(df_bq_inmigracion['id_inmigracion']), 'id_inmigracion'] = None
df_bq_migracion.loc[~df_bq_migracion['id_migracion'].isin(df_bq_fac_soc['id_fac_soc']), 'id_fac_soc'] = None
df_bq_migracion.loc[~df_bq_migracion['id_migracion'].isin(df_bq_fac_eco['id_fac_eco']), 'id_fac_eco'] = None

# Vuelvo a convertir a Int
df_bq_migracion['id_inmigracion'] = df_bq_migracion['id_inmigracion'].astype('Int64')
df_bq_migracion['id_fac_soc'] = df_bq_migracion['id_fac_soc'].astype('Int64')
df_bq_migracion['id_fac_eco'] = df_bq_migracion['id_fac_eco'].astype('Int64')
df_bq_migracion['migracion_neta'] = df_bq_migracion['migracion_neta'].astype('Int64')

# Convertir anio a tipo DATE
df_bq_migracion['anio'] = df_bq_migracion['anio'].astype("string") + "-01-01"
df_bq_migracion['anio'] =  pd.to_datetime(df_bq_migracion['anio'], format="%Y-%m-%d")

# Guardo CSV
file_path = bq_folder_name + 'migracion.csv'
guardado_csv_bucket(df_bq_migracion, file_path, bucket_name)

######################################
# Pasaje a BigQuery
######################################

In [14]:
def carga_tabla_bq(df, dataset_id, table_id, bucket, project_id, primary_key):
    client = bigquery.Client()

    # Define el ID completo de la tabla en BigQuery
    table_ref = client.dataset(dataset_id).table(table_id)

    # Query existing data from the table
    query = f"SELECT {primary_key} FROM `{project_id}.{dataset_id}.{table_id}`"
    existing_primary_keys = set(row[primary_key] for row in client.query(query))

    # Identify unique rows based on primary key
    new_rows = df[~df[primary_key].isin(existing_primary_keys)]
    
    print(new_rows)
    
    # Load the new rows into BigQuery
    if not new_rows.empty:
        job_config = bigquery.LoadJobConfig(
            write_disposition="WRITE_APPEND", # Append al final de la tabla
            autodetect=True, # Detectar automáticamente el esquema de la tabla
        )

        job = client.load_table_from_dataframe(new_rows, table_ref, job_config=job_config)
        job.result()

In [15]:
carga_tabla_bq(df_bq_pais, dataset_id, 'pais', bucket_name, project_id, 'id_pais')
carga_tabla_bq(df_bq_fac_soc, dataset_id, 'fac_social', bucket_name, project_id, 'id_fac_soc')
carga_tabla_bq(df_bq_fac_eco, dataset_id, 'fac_economico', bucket_name, project_id, 'id_fac_eco')
carga_tabla_bq(df_bq_inmigracion, dataset_id, 'inmigracion', bucket_name, project_id, 'id_inmigracion')
carga_tabla_bq(df_bq_migracion, dataset_id, 'migracion', bucket_name, project_id, 'id_migracion')

Empty DataFrame
Columns: [id_pais, nombre]
Index: []
Empty DataFrame
Columns: [hdi, Esperanza_vida, Anio_prom_esc, id_fac_soc]
Index: []
Empty DataFrame
Columns: [pbi_per_cap_aj, desempleo, pbi_per_capita, id_fac_eco]
Index: []
Empty DataFrame
Columns: [hombres, mujeres, total, Argentina, Antigua_y_Barbuda, Bahamas, Belice, Bolivia, Brasil, Canada, Chile, Colombia, Costa_Rica, Cuba, Santa_Lucia, Ecuador, Granada, Guatemala, Guyana, Honduras, Haiti, Jamaica, Republica_Dominicana, Mexico, Nicaragua, Panama, Peru, Paraguay, El_Salvador, Trinidad_y_Tobago, Uruguay, Estados_Unidos, Venezuela, id_inmigracion]
Index: []

[0 rows x 34 columns]
Empty DataFrame
Columns: [anio, migracion_neta, id_pais, id_migracion, id_inmigracion, id_fac_soc, id_fac_eco]
Index: []
