In [None]:
import glob
import pandas as pd
from sqlalchemy import create_engine
from typing import Optional, Tuple
import re

def connect_db(user: str, password: str, host: str, port: str, db_name: str) -> Tuple:
    """Conexión a la base de datos

    Args:
        user (str): usuario de la base de datos
        password (str): contraseña del usuario
        host (str): dirección IP o hostname del servidor de la base de datos
        port (str): puerto del servidor de la base de datos
        db_name (str): nombre de la base de datos a la que conectarse

    Returns:
        Tuple: devuelve dos objetos para manejar la conexión con la base de datos:
            sql_engine (sqlalchemy.engine.base.Engine): objeto para utilizarlo como conexión y así, guardar información a la base de datos
            db_connection (sqlalchemy.engine.base.Connection): objeto para utilizarlo como conexión y así, leer información de la base datos
    """
    try:
        db_url = f"mysql+pymysql://{user}:{password}@{host}:{port}/{db_name}"
        sql_engine = create_engine(db_url)
        db_connection = sql_engine.connect()
        print(f"Connected to database {db_name} as user {user}")
    except Exception as e:
        sql_engine = None
        db_connection = None
        print(f"Failed to connect to database {db_name} as user {user}: {e}")
    return sql_engine, db_connection

def get_csv_files(path: str, pattern: str) -> list:
    """Obtiene los archivos CSV del directorio especificado que cumplen el patrón especificado.

    Args:
        path (str): directorio donde buscar los archivos
        pattern (str): patrón para buscar los archivos

    Returns:
        list: lista de archivos CSV que cumplen el patrón
    """
    csv_files = sorted(glob.glob(f"{path}/{pattern}"))
    return csv_files



def get_name_files(path_csv_files):
    """Obtiene el nombre de la tabla de la base de datos a partir del nombre del archivo CSV.

    Args:
        path_csv_files (str): ruta del archivo CSV

    Returns:
        str: nombre de la tabla de la base de datos
    """
    #Get filename in csv_files
    table_name=[]
    for file in path_csv_files:
        match = re.search(r'\\([^\\]+)\.', file)
        if match:
            filename = match.group(1)
            # Nombre que queda en la tabla de base de  datos
            table_name.append(filename)
    return table_name



def insert_csv_to_db(microbatch: int,path_csv_files: str,sql_engine: object, table_name:str)-> None:

    """Inserta los datos del archivo CSV en la tabla correspondiente de la base de datos.

    Args:
        csv_files (List): ruta de cada archivo CSV
        table_name (str): nombre de la tabla de la base de datos
        microbatch (int): tamaño de los bloques en los que se insertarán los datos
        sql_engine (sqlalchemy.engine.base.Engine): Sirve para utilizarlo como conexion y asi, guardar informacion a la base de datos

    Returns:
        None
    """
        
    column_names = [
        # Sublista para la tabla "departments"
        ["id", "department"],
        # Sublista para la tabla "hired_employees"
        ["id", "name", "datetime", "department_id", "job_id"],
        # Sublista para la tabla "jobs"
        ["id", "job"]
         ]

    for i in range(len(path_csv_files)):
        for chunk in pd.read_csv(path_csv_files[i], chunksize=microbatch,header=None,names=column_names[i]):
            # insercion en base de datos
            try:
                chunk.to_sql(name=table_name[i], con=sql_engine, if_exists="append",index=None)
            except Exception as e:
                print(f"Failed to insert rows from {path_csv_files[i]} into table {table_name}: {e}")
                raise e
             


if __name__ == "__main__":
    #credentials dataBases for connect_db()
    user="admin"
    password="12345678"
    host="mydb.cjt7teobtbru.us-east-1.rds.amazonaws.com"
    port="3306"
    db="Globant"

    # for get_csv_files()
    pattern = "/*.csv"
    path = "../files"

    # Cantidad de registros a insertar por iteracion
    microbatch = 20


    #call connect_db function
    sql_engine, db_connection=connect_db(user,password,host,port,db)

    #call get_csv_files function
    path_csv_files=get_csv_files(path,pattern)

    #call get_name_files function
    table_name=get_name_files(path_csv_files)

    #call insert_db() function
    insert_csv_to_db(microbatch,path_csv_files,sql_engine,table_name)




In [1]:
# avro_utils.py
import boto3
import pymysql
import botocore

from typing import Optional, Union, Dict
import os
from dotenv import load_dotenv

# Cargar variables de entorno desde el archivo .env
load_dotenv()

session = boto3.Session()
credentials = session.get_credentials()
aws_access_key_id=credentials.access_key
aws_secret_access_key=credentials.secret_key
aws_region_name = 'us-east-1'


# Acceder a las variables de entorno
user = os.getenv("user")
password = os.getenv("password")
host = os.getenv("host")
port = os.getenv("port")
db = os.getenv("db")
list_name_table = os.getenv("list_name_table").split(',')
s3_bucket_name = os.getenv("s3_bucket_name")
s3_prefix = os.getenv("s3_prefix")
s3_prefix_backup = os.getenv("s3_prefix_backup")

In [2]:
list_name_table

'jobs,departments,hired_employees'

In [None]:

import pandas as pd
import io
import boto3
import pandas as pd
from sqlalchemy import create_engine
from typing import Optional, Tuple

from typing import Optional, Tuple



def connect_db(user: str, password: str, host: str, port: str, db_name: str) -> Tuple:
    """Conexión a la base de datos

    Args:
        user (str): usuario de la base de datos
        password (str): contraseña del usuario
        host (str): dirección IP o hostname del servidor de la base de datos
        port (str): puerto del servidor de la base de datos
        db_name (str): nombre de la base de datos a la que conectarse

    Returns:
        Tuple: devuelve dos objetos para manejar la conexión con la base de datos:
            sql_engine (sqlalchemy.engine.base.Engine): objeto para utilizarlo como conexión y así, guardar información a la base de datos
            db_connection (sqlalchemy.engine.base.Connection): objeto para utilizarlo como conexión y así, leer información de la base datos
    """
    try:
        db_url = f"mysql+pymysql://{user}:{password}@{host}:{port}/{db_name}"
        sql_engine = create_engine(db_url)
        db_connection = sql_engine.connect()
        print(f"Connected to database {db_name} as user {user}")
    except Exception as e:
        sql_engine = None
        db_connection = None
        print(f"Failed to connect to database {db_name} as user {user}: {e}")
    return sql_engine, db_connection

def insert_db(df,sql_engine):
    name_table="department_prueba"
    if name_table=="department_prueba":
        name_colum=["id","department"]
    try:
        df.columns = name_colum
        df.to_sql(name=name_table, con=sql_engine, if_exists="append", index=None)
        print("Inserting")
    except Exception as e:
        print(f"Failed to insert rows from {df}.csv into table {name_table}: {e}")
        raise e
    # Crea un objeto S3
    

    # Define el nombre del bucket y el archivo a leer


def csv_to_df(csv):
    return pd.read_csv(io.StringIO(csv))
   




user="admin"
password="12345678"
host="mydb.cjt7teobtbru.us-east-1.rds.amazonaws.com"
port="3306"
db="Globant"

csv='''id,department
1,Marketing
2,Finance
3,Human Resources
4,IT
5,Operations'''

sql_engine, db_connection=connect_db(user,password,host,port,db)
df=csv_to_df(csv)
insert_db(df,sql_engine)


In [None]:
import json

import pymysql
import pandas as pd
import boto3
from typing import Optional, Tuple

import io


def connect_db(user, password, host, database):
    try:
        # Establecer la conexión con MySQL
        cnx = pymysql.connect(user=user, password=password, host=host, database=database)

        print("Connected to the database")

        return cnx
        
    except Exception as e:
        print(f"Failed to connect to the database: {e}")
        raise e



def insert_db(cnx,df,table_name):
    try:
    
        # Crear un cursor para ejecutar sentencias SQL
        cursor = cnx.cursor()
        
        # Verificar si la tabla existe, y crearla si no existe
        query = f"CREATE TABLE IF NOT EXISTS {table_name} (id INT PRIMARY KEY, department TEXT)"
        cursor.execute(query)
        
        # Preparar la sentencia SQL para insertar los datos del DataFrame en la tabla de MySQL
        cols = ",".join([str(i) for i in df.columns.tolist()])
        values = "),(".join([", ".join([f"'{str(x)}'" for x in i]) for i in df.values.tolist()])
        query = f"INSERT INTO {table_name} ({cols}) VALUES ({values})"
        
        # Ejecutar la sentencia SQL
        cursor.execute(query)
        
        # Hacer commit para confirmar los cambios en la base de datos
        cnx.commit()
        
        print("Inserting")
        
    except Exception as e:
        print(f"Failed to insert rows from {df} into table {table_name}: {e}")
        raise e
        
    finally:
        # Cerrar la conexión y liberar los recursos
        cursor.close()
        cnx.close()


        
    
def csv_to_df(csv):
    return pd.read_csv(io.StringIO(csv))



user="admin"
password="12345678"
host="mydb.cjt7teobtbru.us-east-1.rds.amazonaws.com"
port="3306"
db="Globant"

csv='''id,department
1,Marketing
2,Finance
3,Human Resources
4,IT
5,Operations'''

table_name="departments"
cnx=connect_db(user, password, host, db)
df=csv_to_df(csv)
insert_db(cnx,df,table_name)


In [None]:
user="admin"
password="12345678"
host="mydb.cjt7teobtbru.us-east-1.rds.amazonaws.com"
port="3306"
db="Globant"

csv='''id,department
1,Marketing
2,Finance
3,Human Resources
4,IT
5,Operations'''

table_name="depart_test"
cnx=connect_db(user, password, host, db)
df=csv_to_df(csv)
insert_db(cnx,df,table_name)

In [None]:
import pandas as pd
import json
data={
 "id": [111, 211, 311, 411, 511],
 "job": ["Software Engineer", "Data Scientist", "Backend", "Gerente", "Operations"]
}

data='''{
  "id": [111, 211, 311, 411, 511],
  "name": ["Marketing", "Finance", "Human Resources", "IT", "Operations"],
  "datetime": ["2021-11-07T02:48:42Z", "2021-11-07T02:48:42Z", "2021-11-07T02:48:42Z", "2021-11-07T02:48:42Z", "2021-11-07T02:48:42Z"],
  "department_id": [11, 22, 33, 41, 52],
  "job_id": [1, 2, 3, 4, 5]
}'''

#data=json.dumps(data)
data=json.loads(data)
gg=pd.DataFrame(data)
gg

In [None]:
data={
  "id": [111, 211, 311, 411, 511],
  "department": ["Marketing", "Finance", "Human Resources", "IT", "Operations"],
  "datetime": ["2021-11-07T02:48:42Z", "2021-11-07T02:48:42Z", "2021-11-07T02:48:42Z", "2021-11-07T02:48:42Z", "2021-11-07T02:48:42Z"],
  "department_id": [11, 22, 33, 41, 52,],
  "job_id": [1, 2, 3, 4, 5]
}

data=json.dumps(data)
data=json.loads(data)
data

In [None]:
insert_db(df, user, password, host, "Globant", "Prueba_departmen")

In [None]:
df.reset_index(drop=True)

In [None]:
df