In [5]:
import sqlite3
import pandas as pd
import paramiko
import os
import requests
import json

# IMPORTS

# Configuración del SFTP

In [6]:
import os
from dotenv import load_dotenv
import pandas as pd

# Load environment variables from .env file
load_dotenv()

# Get variables from environment
SFTP_HOST = os.getenv('SFTP_HOST')
SFTP_PORT = int(os.getenv('SFTP_PORT'))
SFTP_USER = os.getenv('SFTP_USER')
SFTP_PRIVATE_KEY = os.getenv('SFTP_PRIVATE_KEY')
RUTA = os.getenv('RUTA')

# Generate file name and paths
time = pd.Timestamp.now()
file = 'reporte_consolidado-' + time.strftime('%Y-%m-%d-%H_%M') + '.csv'

LOCAL_PATH = RUTA + file
print(LOCAL_PATH)

REMOTE_PATH = '/data/sftp/reporte_consolidado-' + file

C:/Users/alarc/Desktop/DataMining/ETL/reporte_consolidado-2024-11-27-16_59.csv


# Proceso ETL

In [7]:
print("== Paso 1: Extracción de datos ==")

# Conectar a SQLite y extraer datos
db_path = 'comercio_ventas.db'
conn = sqlite3.connect(db_path)

== Paso 1: Extracción de datos ==


# Extracción de datos de SQLite

In [8]:
comercio_df = pd.read_sql_query("SELECT * FROM Comercio", conn)
producto_df = pd.read_sql_query("SELECT * FROM Producto", conn)
transaccion_df = pd.read_sql_query("SELECT * FROM Transaccion", conn)
detalle_transaccion_df = pd.read_sql_query("SELECT * FROM Detalle_Transaccion", conn)

# Cerrar la conexión a la base de datos
conn.close()

# Imprimir los encabezados para verificar los datos extraídos

In [10]:
print("Encabezados de Comercio:")
print(comercio_df.head(), "\n")

print("Encabezados de Producto:")
print(producto_df.head(), "\n")

print("Encabezados de Transacción:")
print(transaccion_df.head(), "\n")

print("Encabezados de Detalle Transacción:")
print(detalle_transaccion_df.head(), "\n")

Encabezados de Comercio:
   id_comercio        RUC codigo_comercio nombre_comercio    direccion  \
0            1  205500006            C006      Comercio 6  Dirección Y   
1            2  205500007            C007      Comercio 7  Dirección Y   
2            3  205500008            C008      Comercio 8  Dirección Y   
3            4  205500009            C009      Comercio 9  Dirección Y   
4            5  205500010            C010     Comercio 10  Dirección Y   

    telefono                 email fecha_registro    ciudad    distrito  \
0  987654321   correo6@example.com     2024-11-27  Trujillo  San Isidro   
1  987654321   correo7@example.com     2024-11-27      Lima  San Isidro   
2  987654321   correo8@example.com     2024-11-27      Lima      Centro   
3  987654321   correo9@example.com     2024-11-27     Piura  Miraflores   
4  987654321  correo10@example.com     2024-11-27     Cusco  Miraflores   

     provincia    departamento tipo_comercio      sector  ingresos_anuales  \
0

# Extracción de datos externos (archivos CSV)

In [11]:
tarifas_bancarias = pd.read_csv('tarifas_bancarias.csv')  # Contiene banco, tipo_tarjeta y comisión
dias_festivos = pd.read_csv('dias_festivos.csv')  # Contiene fecha y descripción del día festivo

# Imprimir encabezados de archivos CSV
print("Encabezados de Tarifas Bancarias:")
print(tarifas_bancarias.head(), "\n")

print("Encabezados de Días Festivos:")
print(dias_festivos.head(), "\n")


Encabezados de Tarifas Bancarias:
        banco tipo_tarjeta  comision
0         BCP       Débito     0.015
1         BCP      Crédito     0.020
2   Interbank       Débito     0.012
3   Interbank      Crédito     0.018
4  Scotiabank       Débito     0.014 

Encabezados de Días Festivos:
        fecha              descripcion
0  2024-01-01                Año Nuevo
1  2024-04-18             Jueves Santo
2  2024-04-19            Viernes Santo
3  2024-05-01          Día del Trabajo
4  2024-07-28  Día de la Independencia 



# Paso 2: Transformación de datos

In [12]:
print("== Paso 2: Transformación de datos ==")

# 1. Unir tablas de SQLite
transaccion_completa = pd.merge(transaccion_df, detalle_transaccion_df, left_on='id_transaccion', right_on='id_transaccion')
transaccion_completa = pd.merge(transaccion_completa, comercio_df, left_on='id_comercio', right_on='id_comercio')
transaccion_completa = pd.merge(transaccion_completa, producto_df, left_on='id_producto', right_on='id_producto', suffixes=('_comercio', '_producto'))

# Imprimir encabezados después de la unión
print("Encabezados después de unir Transacción y Detalle:")
print(transaccion_completa.head(), "\n")

# 2. Agregar información de tarifas bancarias
transaccion_completa = pd.merge(
    transaccion_completa, 
    tarifas_bancarias, 
    how='left', 
    left_on=['banco_emisor', 'tipo_tarjeta'], 
    right_on=['banco', 'tipo_tarjeta']
)

# Imprimir encabezados después de agregar tarifas bancarias
print("Encabezados después de agregar tarifas bancarias:")
print(transaccion_completa.head(), "\n")

== Paso 2: Transformación de datos ==
Encabezados después de unir Transacción y Detalle:
   id_transaccion  id_comercio_comercio           fecha_transaccion  \
0             201                     1  2024-05-24 00:30:59.391416   
1             201                     1  2024-05-24 00:30:59.391416   
2             202                     1  2024-07-21 00:30:59.391416   
3             202                     1  2024-07-21 00:30:59.391416   
4             204                     1  2024-03-07 00:30:59.391416   

  tipo_operacion id_operacion     estado    monto moneda  id_producto  \
0          Venta          1_1  Rechazado   175.91  Soles          808   
1          Venta          1_1  Rechazado   175.91  Soles          808   
2          Venta          1_2  Rechazado  1268.25  Soles          227   
3          Venta          1_2  Rechazado  1268.25  Soles          227   
4          Venta          1_4  Rechazado   252.90  Soles          479   

  motivo_venta_observada  ...      sector ing

# 3. Agregar información de días festivos

In [13]:
transaccion_completa['fecha_transaccion'] = pd.to_datetime(transaccion_completa['fecha_transaccion'])
dias_festivos['fecha'] = pd.to_datetime(dias_festivos['fecha'])
transaccion_completa = pd.merge(
    transaccion_completa, 
    dias_festivos, 
    how='left', 
    left_on='fecha_transaccion', 
    right_on='fecha'
)
transaccion_completa['es_dia_festivo'] = ~transaccion_completa['descripcion'].isna()

# Imprimir encabezados después de agregar días festivos
print("Encabezados después de agregar días festivos:")
print(transaccion_completa.head(), "\n")

Encabezados después de agregar días festivos:
   id_transaccion  id_comercio_comercio          fecha_transaccion  \
0             201                     1 2024-05-24 00:30:59.391416   
1             201                     1 2024-05-24 00:30:59.391416   
2             202                     1 2024-07-21 00:30:59.391416   
3             202                     1 2024-07-21 00:30:59.391416   
4             204                     1 2024-03-07 00:30:59.391416   

  tipo_operacion id_operacion     estado    monto moneda  id_producto  \
0          Venta          1_1  Rechazado   175.91  Soles          808   
1          Venta          1_1  Rechazado   175.91  Soles          808   
2          Venta          1_2  Rechazado  1268.25  Soles          227   
3          Venta          1_2  Rechazado  1268.25  Soles          227   
4          Venta          1_4  Rechazado   252.90  Soles          479   

  motivo_venta_observada  ... nivel_lealtad id_comercio_producto  \
0      Sin observaciones  

# 4. Calcular comisiones bancarias aplicadas

In [14]:
transaccion_completa['comision_aplicada'] = transaccion_completa['monto'] * transaccion_completa['comision']


# 5. Agregar columnas adicionales

In [15]:

transaccion_completa['DayOfMonth'] = transaccion_completa['fecha_transaccion'].dt.day

# Imprimir encabezados después de agregar cálculos adicionales
print("Encabezados después de cálculos adicionales:")
print(transaccion_completa.head(), "\n")

# Paso 3: Carga de datos
print("== Paso 3: Carga de datos ==")

# Guardar el resultado consolidado
transaccion_completa.to_csv(LOCAL_PATH, index=False)
print(f"Archivo consolidado guardado localmente como {LOCAL_PATH}.\n")

# Subir el archivo al servidor SFTP
try:
    # Verificar si el archivo local existe
    if not os.path.exists(LOCAL_PATH):
        print(f"Error: El archivo '{LOCAL_PATH}' no existe.")
        exit()

    # Crear conexión SFTP
    private_key = paramiko.RSAKey.from_private_key_file(SFTP_PRIVATE_KEY)
    transport = paramiko.Transport((SFTP_HOST, SFTP_PORT))
    transport.connect(username=SFTP_USER, pkey=private_key)
    sftp = paramiko.SFTPClient.from_transport(transport)

    # Subir el archivo
    print(f"Subiendo {LOCAL_PATH} a {REMOTE_PATH}...")
    sftp.put(LOCAL_PATH, REMOTE_PATH)
    print("Archivo subido con éxito.")

    # Cerrar la conexión
    sftp.close()
    transport.close()
except FileNotFoundError as e:
    print(f"Error: Archivo no encontrado. {e}")
except paramiko.SSHException as e:
    print(f"Error SSH: {e}")
except Exception as e:
    print(f"Error al subir el archivo al servidor SFTP: {e}")

Encabezados después de cálculos adicionales:
   id_transaccion  id_comercio_comercio          fecha_transaccion  \
0             201                     1 2024-05-24 00:30:59.391416   
1             201                     1 2024-05-24 00:30:59.391416   
2             202                     1 2024-07-21 00:30:59.391416   
3             202                     1 2024-07-21 00:30:59.391416   
4             204                     1 2024-03-07 00:30:59.391416   

  tipo_operacion id_operacion     estado    monto moneda  id_producto  \
0          Venta          1_1  Rechazado   175.91  Soles          808   
1          Venta          1_1  Rechazado   175.91  Soles          808   
2          Venta          1_2  Rechazado  1268.25  Soles          227   
3          Venta          1_2  Rechazado  1268.25  Soles          227   
4          Venta          1_4  Rechazado   252.90  Soles          479   

  motivo_venta_observada  ... nombre_producto categoria precio_unitario  \
0      Sin observaci

# REPORT DEL AREA DE ATENCION A LOS ERRORES

In [18]:
import paramiko
import os
import requests
import pandas as pd
import json
from datetime import datetime

def etl_process(endpoint_url):
    # Generar timestamp para los nombres de los archivos
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    ONLINE_TICKETS = f"tickets_online_{timestamp}.csv"
    OFFLINE_TICKETS = f"tickets_offline_{timestamp}.csv"
    REMOTE_ONLINE_TICKETS = f"/data/sftp/reportes-tickets/online/{ONLINE_TICKETS}"
    REMOTE_OFFLINE_TICKETS = f"/data/sftp/reportes-tickets/offline/{OFFLINE_TICKETS}"

    print("Iniciando la extracción de datos...")
    response = requests.get(endpoint_url)
    if response.status_code == 200:
        data = response.json()
        print("Datos extraídos exitosamente.")
    else:
        print(f"Error al extraer los datos: {response.status_code}")
        return

    print("Transformando los datos...")
    tickets = data["tickets"]
    df_tickets = pd.DataFrame(tickets)

    # Filtrar datos para productos online y offline
    online_tickets = df_tickets[df_tickets["product"].str.lower().isin(["Online"])]
    offline_tickets = df_tickets[df_tickets["product"].str.lower().isin(["Offline"])]

    # Guardar los DataFrames en archivos CSV separados
    online_tickets.to_csv(ONLINE_TICKETS, index=False)
    offline_tickets.to_csv(OFFLINE_TICKETS, index=False)
    print(f"Archivos generados: {ONLINE_TICKETS} y {OFFLINE_TICKETS}")

    print("Cargando los datos...")
    try:
        private_key = paramiko.RSAKey.from_private_key_file(SFTP_PRIVATE_KEY)
        transport = paramiko.Transport((SFTP_HOST, SFTP_PORT))
        transport.connect(username=SFTP_USER, pkey=private_key)
        sftp = paramiko.SFTPClient.from_transport(transport)

        # Subir el archivo de tickets online
        print(f"Subiendo {ONLINE_TICKETS} a {REMOTE_ONLINE_TICKETS}...")
        sftp.put(ONLINE_TICKETS, REMOTE_ONLINE_TICKETS)
        print("Archivo de tickets online subido con éxito.")

        # Subir el archivo de tickets offline
        print(f"Subiendo {OFFLINE_TICKETS} a {REMOTE_OFFLINE_TICKETS}...")
        sftp.put(OFFLINE_TICKETS, REMOTE_OFFLINE_TICKETS)
        print("Archivo de tickets offline subido con éxito.")

        # Cerrar la conexión
        sftp.close()
        transport.close()
    except Exception as e:
        print(f"Error al subir los archivos al servidor SFTP: {e}")
        raise

# Ejecutar el ETL
etl_process("https://run.mocky.io/v3/d1bb9814-eb15-4af6-9a81-917ac7293686")


Iniciando la extracción de datos...
Datos extraídos exitosamente.
Transformando los datos...
Archivos generados: tickets_online_2024-11-27_17-02-51.csv y tickets_offline_2024-11-27_17-02-51.csv
Cargando los datos...
Subiendo tickets_online_2024-11-27_17-02-51.csv a /data/sftp/reportes-tickets/online/tickets_online_2024-11-27_17-02-51.csv...
Archivo de tickets online subido con éxito.
Subiendo tickets_offline_2024-11-27_17-02-51.csv a /data/sftp/reportes-tickets/offline/tickets_offline_2024-11-27_17-02-51.csv...
Archivo de tickets offline subido con éxito.


# Descargar archivo del servidor SFTP

In [None]:
import paramiko
import os
from datetime import datetime


LOCAL_DIR = os.getenv('RUTA')
REMOTE_DIR = '/data/sftp/'

try:
    # Crear conexión SFTP
    private_key = paramiko.RSAKey.from_private_key_file(SFTP_PRIVATE_KEY)
    transport = paramiko.Transport((SFTP_HOST, SFTP_PORT))
    transport.connect(username=SFTP_USER, pkey=private_key)
    sftp = paramiko.SFTPClient.from_transport(transport)

    # Listar archivos en el directorio remoto
    print(f"Conectando al servidor SFTP en {REMOTE_DIR}...")
    files = sftp.listdir_attr(REMOTE_DIR)
    
    # Buscar el archivo más reciente
    latest_file = None
    latest_time = None
    for file in files:
        if file.filename.endswith('.csv'):  # Solo considerar archivos .csv
            file_time = file.st_mtime  # Tiempo de modificación
            if latest_time is None or file_time > latest_time:
                latest_time = file_time
                latest_file = file

    if latest_file:
        # Descargar el archivo más reciente
        remote_file_path = REMOTE_DIR + latest_file.filename
        local_file_path = os.path.join(LOCAL_DIR, latest_file.filename)
        print(f"Descargando el archivo más reciente: {latest_file.filename}")
        sftp.get(remote_file_path, local_file_path)
        print(f"Archivo descargado con éxito: {local_file_path}")
    else:
        print("No se encontraron archivos .csv en el directorio remoto.")

    # Cerrar la conexión
    sftp.close()
    transport.close()
except FileNotFoundError as e:
    print(f"Error: Archivo no encontrado. {e}")
except paramiko.SSHException as e:
    print(f"Error SSH: {e}")
except Exception as e:
    print(f"Error: {e}")


Iniciando la extracción de datos...
Datos extraídos exitosamente.
Transformando los datos...
Cargando los datos...
Archivos en el directorio remoto:
[]
Subiendo tickets_transformed_2024-11-27_10-57-25.csv a /data/sftp/reportes-tickets/tickets_transformed_2024-11-27_10-57-25.csv...
Archivo subido con éxito.
