In [6]:
#!pip install boto3
#!pip install sqlalchemy



In [None]:
import boto3
import pandas as pd
from sqlalchemy import create_engine
import os
from datetime import datetime

# üîß CONFIGURACI√ìN

AWS_ACCESS_KEY = 'TU_ACCESS_KEY'
AWS_SECRET_KEY = 'TU_SECRET_KEY'
BUCKET_NAME = 'tu-bucket'
S3_PREFIX = 'datos/'  # Ruta en el bucket donde est√°n los CSV
LOCAL_FOLDER = 'tmp_s3_files'

# Datos de conexi√≥n a MySQL
DB_USER = 'usuario_mysql'
DB_PASSWORD = 'clave_mysql'
DB_HOST = 'localhost'
DB_PORT = '3306'
DB_NAME = 'mi_base'
DB_TABLE = 'ventas'

# Crear carpeta local para guardar los archivos descargados
os.makedirs(LOCAL_FOLDER, exist_ok=True)

# Crear motor SQLAlchemy para MySQL
engine = create_engine(f'mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}')

# Conectar a S3
s3 = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY)

# Listar archivos .csv en el bucket
response = s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=S3_PREFIX)

for obj in response.get('Contents', []):
    key = obj['Key']
    if key.endswith('.csv'):
        filename = key.split('/')[-1]
        local_path = os.path.join(LOCAL_FOLDER, filename)

        # Evitar re-descargar si ya est√°
        if not os.path.exists(local_path):
            print(f"üì• Descargando {filename}...")
            s3.download_file(BUCKET_NAME, key, local_path)

            try:
                # Leer CSV
                df = pd.read_csv(local_path)

                # Agregar columna de fecha de carga (opcional)
                df['fecha_carga'] = datetime.now()

                # Cargar a MySQL
                df.to_sql(DB_TABLE, engine, if_exists='append', index=False)
                print(f"‚úÖ {filename} cargado en MySQL.")
            except Exception as e:
                print(f"‚ùå Error procesando {filename}: {e}")

In [8]:
#En Linux para que se corra cada 5 

# */5 * * * * /usr/bin/python3 /ruta/script.py

In [None]:
import boto3
import pandas as pd
from sqlalchemy import create_engine
import pymysql
import os
from datetime import datetime

# CONFIGURACI√ìN
AWS_ACCESS_KEY = 'TU_ACCESS_KEY'
AWS_SECRET_KEY = 'TU_SECRET_KEY'
BUCKET_NAME = 'tu-bucket'
S3_PREFIX = 'datos/'
LOCAL_FOLDER = 'tmp_s3_files'
PROCESADOS_FILE = 'procesados.txt'

# Base de datos MySQL
DB_USER = 'usuario_mysql'
DB_PASSWORD = 'clave_mysql'
DB_HOST = 'localhost'
DB_PORT = '3306'
DB_NAME = 'mi_base'
DB_TABLE = 'ventas'
CAMPO_CLAVE = 'id'  # Cambia esto por el campo que identifique registros √∫nicos

# Crear carpeta y archivo de control
os.makedirs(LOCAL_FOLDER, exist_ok=True)
if not os.path.exists(PROCESADOS_FILE):
    open(PROCESADOS_FILE, 'w').close()

# Cargar lista de archivos ya procesados
with open(PROCESADOS_FILE, 'r') as f:
    archivos_procesados = set(line.strip() for line in f)

# Conexiones
engine = create_engine(f'mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}')
s3 = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY)

# Descargar archivos nuevos
response = s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=S3_PREFIX)

for obj in response.get('Contents', []):
    key = obj['Key']
    if key.endswith('.csv'):
        filename = key.split('/')[-1]

        if filename in archivos_procesados:
            print(f"‚è© Ya procesado: {filename}")
            continue

        local_path = os.path.join(LOCAL_FOLDER, filename)
        print(f"üì• Descargando: {filename}")
        s3.download_file(BUCKET_NAME, key, local_path)

        try:
            # Leer CSV
            df = pd.read_csv(local_path)
            df['fecha_carga'] = datetime.now()

            # Verificar duplicados
            if CAMPO_CLAVE in df.columns:
                # Obtener claves ya presentes en BD
                with engine.connect() as conn:
                    existing = pd.read_sql(f"SELECT {CAMPO_CLAVE} FROM {DB_TABLE}", conn)
                claves_existentes = set(existing[CAMPO_CLAVE].astype(str))

                # Filtrar registros nuevos
                df = df[~df[CAMPO_CLAVE].astype(str).isin(claves_existentes)]

                if df.empty:
                    print(f"üö´ Todos los registros ya existen en {DB_TABLE}.")
                else:
                    df.to_sql(DB_TABLE, engine, if_exists='append', index=False)
                    print(f"‚úÖ {len(df)} registros insertados desde {filename}.")
            else:
                print(f"‚ö†Ô∏è El archivo {filename} no contiene la columna clave '{CAMPO_CLAVE}'.")

            # Guardar el nombre del archivo como procesado
            with open(PROCESADOS_FILE, 'a') as f:
                f.write(f"{filename}\n")

        except Exception as e:
            print(f"‚ùå Error procesando {filename}: {e}")

In [14]:
import pandas as pd
import sqlite3
import psycopg2
from sqlalchemy import create_engine

#engine = create_engine('mysql+pymysql://usuario:contrase√±a@host:puerto/nombre_basedatos')

# Ejemplo real
engine = create_engine('mysql+pymysql://root:PPOMBO@localhost:3306/empresadb')

query = "SELECT e.nombre AS Nombre, e.apellido AS Apellido, SUM(asi.horas_asignadas) AS 'Total Horas Trabajadas', RANK()OVER(PARTITION BY e.depto_id ORDER BY SUM(asi.horas_asignadas) DESC)AS 'Ranking Departamental' FROM Empleados AS e LEFT JOIN AsignacionesDeProyectos AS asi ON asi.empleado_id = e.empleado_id JOIN Departamentos AS d ON e.depto_id = d.depto_id GROUP BY e.empleado_id;"

df = pd.read_sql(query, con=engine)

df.to_sql('tabla_python', con=engine, if_exists='append', index=False)

9