In [1]:
import pandas as pd
import numpy as np
import datetime
import boto3
import psycopg2
import configparser

### Inicializacion de variables

In [2]:
rdsIdentifier = 'comercio-db' #nombre de la instancia

#### Carga de archivos de configuraciones

In [3]:
config = configparser.ConfigParser()
config.read('escec.cfg')

['escec.cfg']

#### Crea instancia RDS

In [4]:
aws_conn = boto3.client('rds', aws_access_key_id=config.get('IAM', 'ACCESS_KEY'),
                    aws_secret_access_key=config.get('IAM', 'SECRET_ACCESS_KEY'),
                    region_name='us-east-1')

#### Verificamos Instancias de RDS disponibles

In [5]:
rdsInstanceIds = []

response = aws_conn.describe_db_instances()
for resp in response['DBInstances']:
    rdsInstanceIds.append(resp['DBInstanceIdentifier'])
    db_instance_status = resp['DBInstanceStatus']

print(f"DBInstanceIds {rdsInstanceIds}")

DBInstanceIds []


#### Creación de Servicio RDS

In [6]:
try:
    response = aws_conn.create_db_instance(
            AllocatedStorage=10,
            DBName=config.get('RDS', 'DB_NAME'),
            DBInstanceIdentifier=rdsIdentifier,
            DBInstanceClass="db.t3.micro",
            Engine="postgres",
            MasterUsername=config.get('RDS', 'DB_USER'),
            MasterUserPassword=config.get('RDS', 'DB_PASSWORD'),
            Port=int(config.get('RDS', 'DB_PORT')),
            VpcSecurityGroupIds=[config.get('VPC', 'SECURITY_GROUP')],
            PubliclyAccessible=True
        )
    print(response)
except aws_conn.exceptions.DBInstanceAlreadyExistsFault as ex:
    print("La Instancia de Base de Datos ya Existe.")

{'DBInstance': {'DBInstanceIdentifier': 'comercio-db', 'DBInstanceClass': 'db.t3.micro', 'Engine': 'postgres', 'DBInstanceStatus': 'creating', 'MasterUsername': 'postgres', 'DBName': 'comercio', 'AllocatedStorage': 10, 'PreferredBackupWindow': '09:28-09:58', 'BackupRetentionPeriod': 1, 'DBSecurityGroups': [], 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0a991954ba54ddb2e', 'Status': 'active'}], 'DBParameterGroups': [{'DBParameterGroupName': 'default.postgres14', 'ParameterApplyStatus': 'in-sync'}], 'DBSubnetGroup': {'DBSubnetGroupName': 'default', 'DBSubnetGroupDescription': 'default', 'VpcId': 'vpc-018eccc2fe2d01a58', 'SubnetGroupStatus': 'Complete', 'Subnets': [{'SubnetIdentifier': 'subnet-0dba6b93abeb35181', 'SubnetAvailabilityZone': {'Name': 'us-east-1f'}, 'SubnetOutpost': {}, 'SubnetStatus': 'Active'}, {'SubnetIdentifier': 'subnet-0bf89f2925eb7560f', 'SubnetAvailabilityZone': {'Name': 'us-east-1b'}, 'SubnetOutpost': {}, 'SubnetStatus': 'Active'}, {'SubnetIdentifier': 'subnet-0

##### Obtencion URL del Host

In [7]:
try:
     instances = aws_conn.describe_db_instances(DBInstanceIdentifier=rdsIdentifier)
     RDS_HOST = instances.get('DBInstances')[0].get('Endpoint').get('Address')
     print(RDS_HOST)
except Exception as ex:
     print("La instancia de base de datos no existe o aun no se ha terminado de crear.")
     print(ex)

comercio-db.cxfxom467xtx.us-east-1.rds.amazonaws.com


##### Conexión a Base de Datos desde Python

In [8]:
import sql_queries

try:
    db_conn = psycopg2.connect(
        database=config.get('RDS', 'DB_NAME'), 
        user=config.get('RDS', 'DB_USER'),
        password=config.get('RDS', 'DB_PASSWORD'), 
        host=RDS_HOST,
        port=config.get('RDS', 'DB_PORT')
    )

    cursor = db_conn.cursor()
    cursor.execute(sql_queries.DDL_QUERY)
    db_conn.commit()
    print("Base de Datos Creada Exitosamente")
except Exception as ex:
    print("ERROR: Error al crear la base de datos.")
    print(ex)

Base de Datos Creada Exitosamente


#### Insertar Data en tablas 

In [9]:
def insertDataToSQL(df_data, table_name):
     postgres_driver = f"""postgresql://{config.get('RDS', 'DB_USER')}:{config.get('RDS', 'DB_PASSWORD')}@{RDS_HOST}:{config.get('RDS', 'DB_PORT')}/{config.get('RDS', 'DB_NAME')}"""    
     try:
          response = df_data.to_sql(table_name, postgres_driver, index=False, if_exists='append')
          print(f'Se han insertado {response} nuevos registros.' )
     except Exception as ex:
          print(ex)

In [21]:
#Alimenta base de datos RDS
customer=pd.read_csv('cleanData/customer.csv')
order=pd.read_csv('cleanData/order.csv')
product=pd.read_csv('cleanData/product.csv')
review=pd.read_csv('cleanData/review.csv')
seller=pd.read_csv('cleanData/seller.csv')
zipCode=pd.read_csv('cleanData/zipCode.csv')
encabezado=pd.read_csv('cleanData/encabezado.csv')
detalle=pd.read_csv('cleanData/detalle.csv')
#Archivos directos S3
paymentType=pd.read_csv('cleanData/paymentType.csv')
productCat=pd.read_csv('cleanData/productCat.csv')

In [None]:
insertDataToSQL(encabezado, 'encabezado')

In [None]:
insertDataToSQL(detalle, 'detalle')

In [16]:
insertDataToSQL(seller, 'seller')

Se han insertado 95 nuevos registros.


In [11]:
insertDataToSQL(review, 'review')

Se han insertado 410 nuevos registros.


In [14]:
insertDataToSQL(customer, 'customer')

Se han insertado 441 nuevos registros.


In [13]:
insertDataToSQL(order, 'orderhead')

Se han insertado 441 nuevos registros.


In [12]:
insertDataToSQL(product, 'product')

Se han insertado 951 nuevos registros.
