In [29]:
import pandas as pd

# Definir nombres de columnas
column_names_hired = ["id", "name", "col_datetime", "department_id", "job_id"]
column_names_departments = ["id", "department"]
column_names_jobs = ["id", "job"]

# Cargar CSV y definir los tipos de datos manualmente
employees_df = pd.read_csv(
    "hired_employees.csv", names=column_names_hired, header=None,
    dtype={"id": "Int64", "department_id": "Int64", "job_id": "Int64"}
)

# Convertir la columna datetime al formato correcto para SQL Server
employees_df["col_datetime"] = pd.to_datetime(employees_df["col_datetime"], errors="coerce")

# Reemplazar NaN en los IDs por un valor por defecto (ejemplo: 0)
employees_df = employees_df.fillna({"department_id": 0, "job_id": 0})
employees_df["col_datetime"] = pd.to_datetime(employees_df["col_datetime"], errors="coerce")
employees_df["col_datetime"] = employees_df["col_datetime"].dt.strftime("%Y-%m-%d %H:%M:%S")
employees_df.rename(columns={"name": "full_name"}, inplace=True)




departments_df = pd.read_csv(
    "departments.csv", names=column_names_departments, header=None,
    dtype={"id": "Int64"}
)

jobs_df = pd.read_csv(
    "jobs.csv", names=column_names_jobs, header=None,
    dtype={"id": "Int64"}
)

# Convertir datetime
employees_df["col_datetime"] = pd.to_datetime(employees_df["col_datetime"])



#### Conexion a BD

In [7]:
import pyodbc

server = "DESKTOP-8B57A0M"  # Reemplázalo con el nombre correcto de tu servidor
database = "Test"

conn_str = f"DRIVER={{SQL Server}};SERVER={server};DATABASE={database};Trusted_Connection=yes"

# Conectar
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
print("Conexión exitosa a SQL Server")


Conexión exitosa a SQL Server


#### Insercion en tablas

In [33]:

# Función para verificar si un ID ya existe
def id_exists(table, id_value):
    cursor.execute(f"SELECT COUNT(*) FROM dbo.{table} WHERE id = ?", id_value)
    return cursor.fetchone()[0] > 0

# Insertar datos en la tabla "departments"
for _, row in departments_df.iterrows():
    if not id_exists("departments", row.id):
        cursor.execute("INSERT INTO dbo.departments (id, department) VALUES (?, ?)", row.id, row.department)

# Insertar datos en la tabla "jobs"
for _, row in jobs_df.iterrows():
    if not id_exists("jobs", row.id):
        cursor.execute("INSERT INTO dbo.jobs (id, job) VALUES (?, ?)", row.id, row.job)

# Insertar datos en la tabla "hired_employees"
for _, row in employees_df.iterrows():
    try:
        cursor.execute(
            "INSERT INTO dbo.hired_employees (id, full_name, col_datetime, department_id, job_id) VALUES (?, ?, ?, ?, ?)",
            int(row.id),
            str(row.full_name).strip() if pd.notna(row.full_name) else "Desconocido",  # Usamos full_name
            row.col_datetime if pd.notna(row.col_datetime) else None,
            int(row.department_id) if pd.notna(row.department_id) else None,
            int(row.job_id) if pd.notna(row.job_id) else None
        )
    except Exception as e:
        print(f"Error al insertar ID {row.id}: {e}")


conn.commit()
print("Datos insertados exitosamente en SQL Server.")

Datos insertados exitosamente en SQL Server.


In [111]:
# Cerrar conexión
cursor.close()
conn.close()

#### Creacion de la api rest

In [16]:
import uvicorn
import nest_asyncio
import threading
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import pyodbc
import pandas as pd


# Aplicar nest_asyncio para que la API corra en Jupyter
nest_asyncio.apply()

# Configurar conexión con SQL Server
server = "DESKTOP-8B57A0M"  # Reemplázalo con el nombre de tu servidor
database = "Test"
conn_str = f"DRIVER={{SQL Server}};SERVER={server};DATABASE={database};Trusted_Connection=yes"

# Crear la API
app = FastAPI()

# Modelo de datos para validación
class Employee(BaseModel):
    id: int
    name: str
    col_datetime: str
    department_id: Optional[int] = None
    job_id: Optional[int] = None

# Función para verificar si un ID ya existe en la base de datos
def id_exists(employee_id: int) -> bool:
    with pyodbc.connect(conn_str) as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT COUNT(*) FROM dbo.hired_employees WHERE id = ?", employee_id)
        return cursor.fetchone()[0] > 0

# Endpoint para insertar empleados
@app.post("/add_employees/")
def add_employees(employees: List[Employee]):
    with pyodbc.connect(conn_str) as conn:
        cursor = conn.cursor()
        inserted_rows = 0
        errors = []

        for emp in employees:
            try:
                formatted_date = pd.to_datetime(emp.col_datetime).strftime("%Y-%m-%d %H:%M:%S")
            except Exception:
                errors.append(f"ID {emp.id}: Fecha inválida '{emp.col_datetime}'")
                continue

            if id_exists(emp.id):
                errors.append(f"ID {emp.id}: Ya existe en la base de datos")
                continue

            try:
                cursor.execute(
                    "INSERT INTO dbo.hired_employees (id, name, col_datetime, department_id, job_id) VALUES (?, ?, ?, ?, ?)",
                    emp.id, emp.name, formatted_date, emp.department_id, emp.job_id
                )
                inserted_rows += 1
            except Exception as e:
                errors.append(f"ID {emp.id}: Error al insertar ({e})")

        conn.commit()
        return {"message": f"Se insertaron {inserted_rows} empleados.", "errors": errors if errors else "Sin errores."}

# Endpoint para obtener todos los empleados
@app.get("/employees/")
def get_employees():
    with pyodbc.connect(conn_str) as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT id, name, col_datetime, department_id, job_id FROM dbo.hired_employees")
        employees = [{"id": row[0], "name": row[1], "col_datetime": row[2], "department_id": row[3], "job_id": row[4]} for row in cursor.fetchall()]
    return {"employees": employees}

# Iniciar el servidor Uvicorn dentro de Jupyter Notebook
#uvicorn.run(app, host="127.0.0.1", port=8000)

# Función para ejecutar Uvicorn en un hilo separado
def run_api():
    uvicorn.run(app, host="127.0.0.1", port=8000)

# Iniciar el servidor en un hilo separado
api_thread = threading.Thread(target=run_api, daemon=True)
api_thread.start()

print("API corriendo en http://127.0.0.1:8000 🚀")

API corriendo en http://127.0.0.1:8000 🚀


### Probar API

In [21]:
import requests

data = [
    {
        "id": 5001,
        "name": "Kevin Serrano",
        "col_datetime": "2025-03-04 10:30:00",
        "department_id": 2,
        "job_id": 3
    },
    {
        "id": 5002,
        "name": "Evelin Barriga",
        "col_datetime": "2025-03-04 11:00:00",
        "department_id": 1,
        "job_id": 4
    }
]

response = requests.post("http://127.0.0.1:8000/add_employees/", json=data)
print(response.json())


INFO:     127.0.0.1:56365 - "POST /add_employees/ HTTP/1.1" 200 OK
{'message': 'Se insertaron 2 empleados.', 'errors': 'Sin errores.'}


In [None]:
response = requests.get("http://127.0.0.1:8000/employees/")
print(response.json())
