In [112]:
import pandas as pd
import numpy as np
from faker import Faker
import random
import datetime
import boto3
import psycopg2
import configparser
from sqlalchemy import create_engine
import pyodbc

In [113]:
config = configparser.ConfigParser()
config.read('escproy.cfg')

['escproy.cfg']

In [119]:
s3 = boto3.resource(
    service_name = 's3',
    region_name = 'us-east-1',
    aws_access_key_id = config.get('IAM', 'ACCESS_KEY'),
    aws_secret_access_key = config.get('IAM', 'SECRET_ACCESS_KEY')
)

In [123]:
for bucket in s3.buckets.all():
    S3_BUCKET_NAME = bucket.name
    print(bucket.name)

cdp-22007687
proyec2023


In [124]:
S3_BUCKET_NAME = 'proyec2023'

In [125]:
remoteFileList = []
for objt in s3.Bucket(S3_BUCKET_NAME).objects.all():
    remoteFileList.append(objt.key)

remoteFileList

['personas.csv', 'ventas.xlsx']

In [126]:
import io  

for remoteFile in remoteFileList:     
  try:         
     file = s3.Bucket(S3_BUCKET_NAME).Object(remoteFile).get()         
     if('.csv' in remoteFile):             
       print(remoteFile)            
       personas= pd.read_csv(file['Body'],sep=";")             
       personas.to_csv(remoteFile, index=False) 
     else:             
       data = file['Body'].read()             
       ventas= pd.read_excel(io.BytesIO(data), engine='openpyxl')             
       ventas.to_excel(remoteFile, index=False)
  except Exception as ex:         
     print("No es un archivo.")         
     print(ex)

personas.csv


Limpieza de datos

In [127]:
personas.describe()

Unnamed: 0,id gerente
count,4.0
mean,1002.5
std,1.290994
min,1001.0
25%,1001.75
50%,1002.5
75%,1003.25
max,1004.0


In [128]:
personas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 3 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   id gerente        4 non-null      int64 
 1   Región            4 non-null      object
 2   Gerente regional  4 non-null      object
dtypes: int64(1), object(2)
memory usage: 224.0+ bytes


In [129]:
personas.columns

Index(['id gerente', 'Región', 'Gerente regional'], dtype='object')

In [130]:
ventas.describe()

Unnamed: 0,Id. de la fila,Ventas,Cantidad,Descuento,Ganancia
count,10254.0,10254.0,10254.0,10254.0,10254.0
mean,5146.149308,2103.379037,3.740492,0.135709,214.737238
std,2970.876591,3491.494504,2.199333,0.195662,1189.617598
min,1.0,15.66,1.0,0.0,-18062.4
25%,2574.25,328.8,2.0,0.0,-7.74
50%,5146.5,803.96,3.0,0.0,80.0
75%,7718.75,2277.9,5.0,0.4,325.8
max,10288.0,34740.8,14.0,0.8,13132.8


In [131]:
ventas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10254 entries, 0 to 10253
Data columns (total 20 columns):
 #   Column               Non-Null Count  Dtype         
---  ------               --------------  -----         
 0   Id. de la fila       10254 non-null  int64         
 1   Id. del pedido       10254 non-null  object        
 2   Fecha del pedido     10254 non-null  datetime64[ns]
 3   Fecha de envío       10254 non-null  datetime64[ns]
 4   Forma de envío       10254 non-null  object        
 5   Id. del cliente      10254 non-null  object        
 6   Nombre del cliente   10254 non-null  object        
 7   Segmento             10254 non-null  object        
 8   Ciudad               10254 non-null  object        
 9   Estado               10254 non-null  object        
 10  País/Región          10254 non-null  object        
 11  Región               10254 non-null  object        
 12  Id. del producto     10254 non-null  object        
 13  Categoría            10254 non-

In [132]:
### Eliminar Nan
ventas.dropna(inplace=True)

In [133]:
### Cantidad de clientes

clientes=ventas[['Id. del cliente', 'Nombre del cliente']]
len(clientes['Id. del cliente'].unique())

794

In [134]:
## Total ventas por cliente
vtasc=ventas.groupby(['Nombre del cliente'])['Ventas'].sum()

In [135]:
## Ranking de clientes 

Rankclientes=vtasc.rank(numeric_only=True)
Rankclientes

Nombre del cliente
Aarón Navarrete     747.0
Abel Ángel          383.0
Abraham Cedillo     164.0
Abril Ferrer        636.0
Adela Blanco        206.0
                    ...  
Érica Casas         794.0
Íñigo Leal           66.0
Óscar Alba          235.0
Óscar Armendáriz    213.0
Úrsula Soto         276.0
Name: Ventas, Length: 794, dtype: float64

In [136]:
aws_conn = boto3.client('rds', aws_access_key_id=config.get('IAM', 'ACCESS_KEY'),
                    aws_secret_access_key=config.get('IAM', 'SECRET_ACCESS_KEY'),
                    region_name='us-east-1')

In [137]:
rdsIdentifier = 'proyecto' #nombre de la instancia

In [138]:
rdsInstanceIds = []

response = aws_conn.describe_db_instances()
for resp in response['DBInstances']:
    rdsInstanceIds.append(resp['DBInstanceIdentifier'])
    db_instance_status = resp['DBInstanceStatus']

print(f"DBInstanceIds {rdsInstanceIds}")

DBInstanceIds ['banco-db', 'dw-db', 'proyecto']


In [139]:
try:
    response = aws_conn.create_db_instance(
            AllocatedStorage=10,
            DBName=config.get('RDS', 'DB_NAME'),
            DBInstanceIdentifier=rdsIdentifier,
            DBInstanceClass="db.t3.micro",
            Engine="postgres",
            MasterUsername=config.get('RDS', 'DB_USER'),
            MasterUserPassword=config.get('RDS', 'DB_PASSWORD'),
            Port=int(config.get('RDS', 'DB_PORT')),
            VpcSecurityGroupIds=[config.get('VPC', 'SECURITY_GROUP')],
            PubliclyAccessible=True
        )
    print(response)
except aws_conn.exceptions.DBInstanceAlreadyExistsFault as ex:
    print("La Instancia de Base de Datos ya Existe.")

La Instancia de Base de Datos ya Existe.


In [140]:
try:
     instances = aws_conn.describe_db_instances(DBInstanceIdentifier=rdsIdentifier)
     RDS_HOST = instances.get('DBInstances')[0].get('Endpoint').get('Address')
     print(RDS_HOST)
except Exception as ex:
     print("La instancia de base de datos no existe o aun no se ha terminado de crear.")
     print(ex)

proyecto.cjsvaexfxmdn.us-east-1.rds.amazonaws.com


In [141]:
import sql_qproy1

try:
    db_conn = psycopg2.connect(
        database=config.get('RDS', 'DB_NAME'), 
        user=config.get('RDS', 'DB_USER'),
        password=config.get('RDS', 'DB_PASSWORD'), 
        host=RDS_HOST,
        port=config.get('RDS', 'DB_PORT')
    )

    cursor = db_conn.cursor()
    cursor.execute(sql_qproy1.DDL_QUERY)
    db_conn.commit()
    print("Base de Datos Creada Exitosamente")
except Exception as ex:
    print("ERROR: Error al crear la base de datos.")
    print(ex) 

Base de Datos Creada Exitosamente


In [142]:
postgres_driver = f"""postgresql://{config.get('RDS', 'DB_USER')}:{config.get('RDS', 'DB_PASSWORD')}@{RDS_HOST}:{config.get('RDS', 'DB_PORT')}/{config.get('RDS', 'DB_NAME')}"""  

In [143]:
def insertDataToSQL(data_dict, table_name):
     postgres_driver = f"""postgresql://{config.get('RDS', 'DB_USER')}:{config.get('RDS', 'DB_PASSWORD')}@{RDS_HOST}:{config.get('RDS', 'DB_PORT')}/{config.get('RDS', 'DB_NAME')}"""    
     df_data = pd.DataFrame.from_records(data_dict)
     try:
          response = df_data.to_sql(table_name, postgres_driver, index=False, if_exists='append')
          print(f'Se han insertado {response} nuevos registros.' )
     except Exception as ex:
          print(ex)

In [145]:
conn_string = f"""postgresql://{config.get('RDS', 'DB_USER')}:{config.get('RDS', 'DB_PASSWORD')}@{RDS_HOST}:{config.get('RDS', 'DB_PORT')}/{config.get('RDS', 'DB_NAME')}"""    

personas.to_sql('personas', conn_string, if_exists='replace')

conn_string = psycopg2.connect(conn_string)

In [146]:
conn_string = f"""postgresql://{config.get('RDS', 'DB_USER')}:{config.get('RDS', 'DB_PASSWORD')}@{RDS_HOST}:{config.get('RDS', 'DB_PORT')}/{config.get('RDS', 'DB_NAME')}"""    

ventas.to_sql('ventas', conn_string, if_exists='replace')

conn_string = psycopg2.connect(conn_string)