# Desarrollo de la Ejercitación

In [83]:
import pandas as pd
import mysql.connector as connection
from google.cloud import bigquery
from dotenv import load_dotenv
import os
import json

In [84]:
load_dotenv()

True

1. **Función de conexión a la base de datos SQL:** Esta función aceptará como entrada los parámetros de la base de datos (nombre de la base de datos, host, nombre de usuario, contraseña) para establecer una conexión segura utilizando una biblioteca de conexión a SQL. Si la conexión es exitosa devuelve un objeto de conexión; en caso de error, proporciona mensajes de error detallados. También puede gestionar la persistencia de la conexión según sea necesario.

In [85]:
mydb = None  # Variable global para almacenar el objeto de conexión

def connectionDB(user=None, host=None, password=None, database=None):
    """
    Esta función nos permite conectarnos a una base de datos MySQL y desconectarnos cuando sea necesario.

    Argumentos:
        user (str): Usuario de la base de datos.
        host (str): Dirección IP o nombre del host de la base de datos.
        password (str): Clave de acceso para el usuario especificado.
        database (str): Nombre de la base de datos a la que deseamos conectarnos.

    Retorna:
        str: Si se proporcionan los argumentos de conexión, la función intentará establecer una conexión a la base de datos.
             - Si la conexión es exitosa, devuelve un mensaje de confirmación.
             - Si hay un error al conectar, devuelve un mensaje de error con información detallada.
        Si no se proporcionan argumentos de conexión, la función verifica si hay una conexión activa para cerrarla.
             - Si la conexión está activa, se cierra y devuelve un mensaje de confirmación.
             - Si no hay una conexión activa, devuelve un mensaje informando que no se realizó ninguna conexión.

    Nota:
        Asegurarse de haber instalado la librería 'mysql-connector-python'. Se puede instalar usando el comando 'pip install mysql-connector-python'.
    """
    global mydb  # Acceder a la variable global mydb dentro de la función

    if user is not None and host is not None and password is not None and database is not None:
        try:
            mydb = connection.connect(
                host=host,
                user=user,
                passwd=password,
                database=database
            )
            return f"Conexión exitosa a la base de datos: {database}"

        except connection.Error as e:
            return f"Error al conectar a la base de datos: {database}, {e}"

    elif mydb is not None and mydb.is_connected():
        mydb.close()
        mydb = None  # Reiniciar el objeto de conexión
        return "Conexión cerrada"

    elif mydb is None:
        return "No se proporcionaron los argumentos de conexión. No se realizó la conexión"


In [86]:
connectionDB(host= os.getenv('DB_HOST'), 
        user= os.getenv('DB_USER'), 
        password= os.getenv('DB_PASS'), 
        database=os.getenv('DB_DB'))

'Conexión exitosa a la base de datos: retail_db'

In [87]:
connectionDB()

'Conexión cerrada'

In [88]:
connectionDB()

'No se proporcionaron los argumentos de conexión. No se realizó la conexión'

Probamos el mensaje cuando le ingresamos un parámetro erroneo:

In [89]:
connectionDB(host= "1.1.1.1", 
        user= os.getenv('DB_USER'), 
        password= os.getenv('DB_PASS'), 
        database=os.getenv('DB_DB'))

"Error al conectar a la base de datos: retail_db, 2003 (HY000): Can't connect to MySQL server on '1.1.1.1:3306' (10060)"

In [90]:
def listTables(user=None, host=None, password=None, database=None):
    """
    Obtiene la lista de tablas en una base de datos MySQL y la devuelve en formato JSON.

    Parámetros:
    user (str): Usuario de la base de datos MySQL. (Opcional si ya se han configurado las variables de entorno)
    host (str): Dirección del host de la base de datos MySQL. (Opcional si ya se han configurado las variables de entorno)
    password (str): Contraseña del usuario de la base de datos MySQL. (Opcional si ya se han configurado las variables de entorno)
    database (str): Nombre de la base de datos MySQL. (Opcional si ya se han configurado las variables de entorno)

    Retorna:
    str: Una lista en formato JSON que contiene los nombres de todas las tablas de la base de datos.

    Requiere:
    - La librería 'mysql-connector-python' debe estar instalada. Puedes instalarla usando el comando 'pip install mysql-connector-python'.
    - Se deben configurar las variables de entorno 'DB_HOST', 'DB_USER', 'DB_PASS' y 'DB_DB' con los valores de conexión a la base de datos MySQL 
    en caso que no las indiquemos en los parámetros.

    Notas:
    - Asegúrarse de haber configurado correctamente las variables de entorno 'DB_HOST', 'DB_USER', 'DB_PASS' y 'DB_DB' con los valores adecuados.
    - La función se conectará a la base de datos, ejecutará una consulta para obtener la lista de tablas y luego cerrará la conexión.
    - Si se proporcionan los parámetros user, host, password y database, se utilizarán para establecer la conexión. Si no se proporcionan,
      se utilizarán las variables de entorno configuradas previamente.
    """
    if user is not None and host is not None and password is not None and database is not None:
        try:
            mydb = connection.connect(
                host=host,
                user=user,
                passwd=password,
                database=database
            )

            cursor = mydb.cursor()
            sql = "SHOW TABLES"
            cursor.execute(sql)

            table_list = [table[0] for table in cursor.fetchall()]

            cursor.close()
            mydb.close()

            return json.dumps(table_list)    
        
        except connection.Error as e:
            return "No se pudo establecer conexión."
    
    else:
        try:
            mydb = connection.connect(
                host=os.getenv('DB_HOST'),
                user=os.getenv('DB_USER'),
                passwd=os.getenv('DB_PASS'),
                database=os.getenv('DB_DB')
            )

            cursor = mydb.cursor()
            sql = "SHOW TABLES"
            cursor.execute(sql)

            table_list = [table[0] for table in cursor.fetchall()]

            cursor.close()
            mydb.close()

            return json.dumps(table_list)    
        
        except connection.Error as e:
            return "No se pudo establecer conexión."

    

In [91]:
listTables(host= os.getenv('DB_HOST'), 
        user= os.getenv('DB_USER'), 
        password= os.getenv('DB_PASS'), 
        database=os.getenv('DB_DB'))

'["categories", "customers", "departments", "order_items", "orders", "products"]'

3. **Función para copiar tablas desde SQL a BigQuery:** Esta función permitirá la copia de todas las tablas desde la base de datos SQL a BigQuery. Sin embargo, también ofrecerá la flexibilidad de seleccionar y copiar una tabla específica si el usuario así lo desea. 

In [92]:
def copyBigQuery(*tables):
    """
    Copia tablas desde una base de datos MySQL a BigQuery utilizando la API de streaming.

    Parámetros:
    *tables: Argumentos opcionales. Si no se proporcionan argumentos, la función copiará todas las tablas disponibles.
             Si se proporcionan nombres de tablas como argumentos, solo copiará esas tablas específicas.

    Retorna:
    Si no se proporcionan argumentos (*tables), la función copiará todas las tablas disponibles y devolverá un mensaje
    indicando que la carga se completó para todas las tablas.
    Si se proporcionan nombres de tablas como argumentos, la función copiará solo esas tablas y devolverá un mensaje
    indicando las tablas que se cargaron correctamente y las que no se pudieron cargar.

    Requiere:
    - Que el usuario haya configurado correctamente las variables de entorno para la conexión a la base de datos MySQL.
      (DB_HOST, DB_USER, DB_PASS, DB_DB)
    - Un archivo de credenciales de servicio de Google Cloud (service.json) que tenga acceso a BigQuery.

    Notas:
    - La función carga los datos utilizando la API de streaming de BigQuery, lo que puede ser más eficiente y escalable
      para grandes conjuntos de datos que cargar un DataFrame.
    - La función asume que las tablas en MySQL tienen la misma estructura que las tablas de destino en BigQuery.

    Ejemplos de uso:
    # Copiar todas las tablas disponibles desde MySQL a BigQuery
    copyBigQuery()

    # Copiar tablas específicas desde MySQL a BigQuery
    copyBigQuery("table1", "table2", "table3")
    """
    tablesList = json.loads(listTables())
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "service.json"
    client = bigquery.Client.from_service_account_json("service.json")

    mydb = connection.connect(
    host= os.getenv('DB_HOST'), 
    user= os.getenv('DB_USER'),
    passwd= os.getenv('DB_PASS'), 
    database=os.getenv('DB_DB') 
    )
    
    database = os.getenv('DB_DB')

    loaded = []
    not_loaded = []

    if len(tables) == 0:
        for table in tablesList:
            sql = f"SELECT * FROM retail_db.{table}"
            df = pd.read_sql(sql, mydb)
                                    
            tabla_id = f"acquired-script-391200.ar_dp_dtkl_raw.{table}"
            tabla_property = client.get_table(tabla_id)

            job_configuration = bigquery.LoadJobConfig(
            schema = tabla_property.schema,
            write_disposition = "WRITE_TRUNCATE"
            )

            job = client.load_table_from_dataframe(
            df, tabla_id, job_config=job_configuration
            )

            job.result() #esperar a que se complete la carga
        
        return f"Carga completa de todas las tablas en: {database}"
    
    else:
        for tb in tables:
            if tb in tablesList:
                sql = f"SELECT * FROM retail_db.{tb}"
                df = pd.read_sql(sql, mydb)
                                    
                tabla_id = f"acquired-script-391200.ar_dp_dtkl_raw.{tb}"
                tabla_property = client.get_table(tabla_id)

                job_configuration = bigquery.LoadJobConfig(
                schema = tabla_property.schema,
                write_disposition = "WRITE_TRUNCATE"
                )

                job = client.load_table_from_dataframe(
                df, tabla_id, job_config=job_configuration
                )

                job.result()

                loaded.append(tb)
            
            else:
                not_loaded.append(tb)
    return f"Se cargaron las siguientes tablas: {loaded}. Hubo inconvenientes al cargar: {not_loaded}"

    mydb.close()

In [93]:
copyBigQuery("customers", "categories", "clientes_nuevos")

  df = pd.read_sql(sql, mydb)
  df = pd.read_sql(sql, mydb)


"Se cargaron las siguientes tablas: ['customers', 'categories']. Hubo inconvenientes al cargar: ['clientes_nuevos']"

In [94]:
copyBigQuery()

  df = pd.read_sql(sql, mydb)
  df = pd.read_sql(sql, mydb)
  df = pd.read_sql(sql, mydb)
  df = pd.read_sql(sql, mydb)
  df = pd.read_sql(sql, mydb)
  df = pd.read_sql(sql, mydb)


'Carga completa de todas las tablas en: retail_db'