In [1]:
import mysql
import pandas as pd
import mysql.connector as connection

from google.cloud import bigquery

In [2]:
import config

In [3]:
import os

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.KEY

In [4]:
# Funcion para obtener tablas del db de origen
def get_tables(db, db_name):
    cursor = db.cursor()
    cursor.execute(f"USE {db_name}") # select the database
    cursor.execute("SHOW TABLES")
    tables = cursor.fetchall()
    tables = [table[0] for table in tables]
    return tables

In [5]:
# Funcion para obtener una consulta query de la db de origen y exportarlo a un dataframe de pandas
def get_df(db, db_name, table):
    sql = f'select * from {db_name}.{table}'
    df = pd.read_sql(sql, db) 
    return df

In [6]:
# Funcion para obtener la tabla id y la propiedad de la tabla
def get_table_property(client, table, idformat, zona):
    tabla_id = idformat + f"{table}_{zona}"
    tabla_property = client.get_table(tabla_id)
    return tabla_id, tabla_property

In [7]:
# Funcion para obtener la configuracion del trabajo
def get_job_configuration(tabla_property):
    job_configuration = bigquery.LoadJobConfig(
        schema=tabla_property.schema,
        write_disposition='WRITE_TRUNCATE'    
    )
    return job_configuration

In [8]:
# Funcion para cargar los dataframes a BigQuery
def load_table(client, df, tabla_id, job_configuration):
    client.load_table_from_dataframe(
        df,tabla_id,job_config=job_configuration
    )

In [11]:
# Funcion principal, donde se ejecutarán las otras funciones
def main():
    # Creaciń del cliente
    client = bigquery.Client()
    
    # Conexión a la base de datos de origen
    mydb = mysql.connector.connect(
        host=config.HOST,
        user=config.USER,
        passwd=config.PASS,
        database=config.DB
    )
    tables = get_tables(mydb, config.DB)
    
    # String de formato con el proyecto ID y conjunto de datos de destino
    id_format = f'{config.PROYECTID}.{config.DATOS}.{config.ORIGEN}_'
    
    errores = []
    # Loop para tablas que se desean cargar
    for table in tables: 
        try:
            df = get_df(mydb, config.DB, table)

            tabla_id, table_property = get_table_property(client, table, id_format, config.DLZONA)

            job_configuration = get_job_configuration(table_property)

            load_table(client, df, tabla_id, job_configuration)  
            print(f'Tabla {table} cargada exitosamente')
        except Exception as e:
            print(f'Error en Tabla {table}: {e}')
            errores.append((table, e))
            
    mydb.close()  
    if not errores:
        print("Carga de Datos Exitosa.")
    else:
        print('Errores en las siguientes tablas')
        for table, exception in errores:
            print(f'{table}: {exception}')

In [12]:
main()

  df = pd.read_sql(sql, db)


Tabla customers cargada exitosamente


  df = pd.read_sql(sql, db)


Tabla employees cargada exitosamente


  df = pd.read_sql(sql, db)


Tabla offices cargada exitosamente


  df = pd.read_sql(sql, db)


Tabla orderdetails cargada exitosamente


  df = pd.read_sql(sql, db)


Tabla orders cargada exitosamente


  df = pd.read_sql(sql, db)


Tabla payments cargada exitosamente


  df = pd.read_sql(sql, db)


Tabla productlines cargada exitosamente


  df = pd.read_sql(sql, db)


Tabla products cargada exitosamente
Carga de Datos Exitosa.
