Código Estructurado

In [None]:
def calcular_promedio(lista_numeros):
    """Calcula el promedio de una lista de números."""
    return sum(lista_numeros) / len(lista_numeros)

# Uso de la función
numeros = [10, 20, 30, 40, 50]
print("El promedio es:", calcular_promedio(numeros))

 Modularidad

In [None]:
# Ejemplo de módulos (archivos separados)
# modulo_limpieza.py
def limpiar_datos(datos):
    """Limpia datos inconsistentes."""
    return datos.dropna()

# modulo_visualizacion.py
def graficar_datos(datos):
    """Genera un gráfico básico."""
    import matplotlib.pyplot as plt
    plt.plot(datos)
    plt.show()

# Uso en otro archivo (main.py)
import modulo_limpieza as ml
import modulo_visualizacion as mv

datos_limpios = ml.limpiar_datos(datos_sucios)
mv.graficar_datos(datos_limpios)

 Buenas Prácticas en el Código

In [None]:
def sumar(a, b):
    """Devuelve la suma de dos números.
    
    Args:
        a (int/float): Primer número.
        b (int/float): Segundo número.
    
    Returns:
        int/float: Resultado de la suma.
    """
    return a + b

# Uso
print(sumar(5, 3))  # Output: 8

SCRUM en Ciencia de Datos

In [None]:
# Sprint 1: Recolección y limpieza de datos  
def limpiar_datos(datos):  
    return datos.dropna()  

# Sprint 2: Exploración y visualización  
import matplotlib.pyplot as plt  
def graficar(datos):  
    plt.hist(datos)  
    plt.show()  

# Sprint 3: Modelado (ejemplo con Scikit-learn)  
from sklearn.linear_model import LinearRegression  
modelo = LinearRegression()  
modelo.fit(X_train, y_train)  

# Sprint 4: Evaluación  
from sklearn.metrics import mean_squared_error  
mse = mean_squared_error(y_test, modelo.predict(X_test))  

Comandos Básicos de Git

In [None]:
# Inicializar repositorio  
git init  

# Agregar cambios  
git add .  

# Confirmar cambios  
git commit -m "Mensaje descriptivo"  

# Subir a GitHub  
git push origin main  

Ejemplo Pytest

In [None]:
# Prueba unitaria  
def test_sumar():  
    assert sumar(2, 3) == 5  

# Ejecutar pruebas (en terminal)  
# pytest nombre_archivo.py  

Validación de Modelos (Scikit-learn)

In [None]:
from sklearn.metrics import accuracy_score  
y_real = [1, 0, 1, 1, 0]  
y_pred = [1, 0, 1, 0, 0]  
print(f"Precisión: {accuracy_score(y_real, y_pred):.2f}")  

Principios de Código Limpio

In [None]:
# Mal
def f(x, y): return x + y

# Bien
def sumar_numeros(numero1, numero2):
    """Devuelve la suma de dos números."""
    return numero1 + numero2

Documentación en Código

In [None]:
def calcular_promedio(lista_numeros):
    """
    Calcula el promedio de una lista de números.
    
    Args:
        lista_numeros (list): Lista de valores numéricos.
    
    Returns:
        float: Promedio de los números.
    """
    return sum(lista_numeros) / len(lista_numeros)

Consumo de APIs REST

In [None]:
import requests

url = "https://api.openweathermap.org/data/2.5/weather"
params = {"q": "Guatemala", "appid": "TU_CLAVE_API"}  # Reemplazar con API key real

respuesta = requests.get(url, params=params)

if respuesta.status_code == 200:
    datos = respuesta.json()
    print(f"Clima en Guatemala: {datos['weather'][0]['description']}")
else:
    print("Error en la solicitud")

Creación de APIs con Flask

In [None]:
from flask import Flask, jsonify

app = Flask(__name__)

@app.route('/api/saludo', methods=['GET'])
def saludo():
    return jsonify({"mensaje": "¡Hola, esta es una API en Flask!"})

if __name__ == '__main__':
    app.run(debug=True)

API Keys

In [None]:
headers = {"Authorization": "Bearer TU_API_KEY"}

Usos de APIs

In [None]:
# Ejemplo: Predicción vía API
@app.route('/api/predecir', methods=['POST'])
def predecir():
    datos = request.json
    prediccion = modelo_ml.predict([datos["features"]])
    return jsonify({"prediccion": prediccion.tolist()})

Refactorización

In [None]:
# Refactorizar este código:
def calc(a, b): return a * b + 2

Nodo Individual

In [None]:
class Nodo:
    def __init__(self, dato):
        self.dato = dato
        self.siguiente = None

# Ejemplo de uso
nodo1 = Nodo(5)
nodo2 = Nodo(10)
nodo1.siguiente = nodo2  # Enlazar nodos

Lista Simplemente Enlazada

In [None]:
class ListaEnlazada:
    def __init__(self):
        self.cabeza = None

    def agregar(self, dato):
        nuevo_nodo = Nodo(dato)
        if not self.cabeza:
            self.cabeza = nuevo_nodo
        else:
            actual = self.cabeza
            while actual.siguiente:
                actual = actual.siguiente
            actual.siguiente = nuevo_nodo

# Ejemplo
lista = ListaEnlazada()
lista.agregar(5)
lista.agregar(10)

Lista Doblemente Enlazada

In [None]:
class NodoDoble:
    def __init__(self, dato):
        self.dato = dato
        self.anterior = None
        self.siguiente = None

class ListaDobleEnlazada:
    def __init__(self):
        self.cabeza = None

    def agregar(self, dato):
        nuevo_nodo = NodoDoble(dato)
        if not self.cabeza:
            self.cabeza = nuevo_nodo
        else:
            actual = self.cabeza
            while actual.siguiente:
                actual = actual.siguiente
            actual.siguiente = nuevo_nodo
            nuevo_nodo.anterior = actual

# Ejemplo
lista_doble = ListaDobleEnlazada()
lista_doble.agregar(5)
lista_doble.agregar(10)

Búsqueda Binaria

In [None]:
def busqueda_binaria(lista, objetivo):
    izquierda, derecha = 0, len(lista) - 1
    while izquierda <= derecha:
        medio = (izquierda + derecha) // 2
        if lista[medio] == objetivo:
            return medio
        elif lista[medio] < objetivo:
            izquierda = medio + 1
        else:
            derecha = medio - 1
    return -1  # No encontrado

# Ejemplo
lista_ordenada = [2, 5, 8, 12, 16]
print(busqueda_binaria(lista_ordenada, 12))  # Output: 3

Merge Sort

In [None]:
def merge_sort(lista):
    if len(lista) > 1:
        medio = len(lista) // 2
        izquierda = lista[:medio]
        derecha = lista[medio:]
        merge_sort(izquierda)
        merge_sort(derecha)
        
        i = j = k = 0
        while i < len(izquierda) and j < len(derecha):
            if izquierda[i] < derecha[j]:
                lista[k] = izquierda[i]
                i += 1
            else:
                lista[k] = derecha[j]
                j += 1
            k += 1
        
        while i < len(izquierda):
            lista[k] = izquierda[i]
            i += 1
            k += 1
        
        while j < len(derecha):
            lista[k] = derecha[j]
            j += 1
            k += 1

# Ejemplo
lista = [8, 3, 2, 9, 7, 1]
merge_sort(lista)
print(lista)  # Output: [1, 2, 3, 7, 8, 9]

Bases de Datos Relacionales

In [None]:
import pandas as pd

# Tabla "Clientes"
clientes = pd.DataFrame({
    "ID_Cliente": [101, 102],
    "Nombre": ["Ana López", "Juan Pérez"],
    "Teléfono": ["123456", "789012"]
})
print(clientes)

Teoría Relacional de Codd

In [None]:
# Tabla "Pedidos"
pedidos = pd.DataFrame({
    "ID_Pedido": [1, 2],
    "ID_Cliente": [101, 102],  # Clave foránea
    "Producto": ["Laptop", "Teclado"]
})

Normalización

In [None]:
# Ejemplo de 1FN (evitar celdas con múltiples valores)
productos = pd.DataFrame({
    "ID_Cliente": [101, 101, 102],
    "Producto": ["Laptop", "Mouse", "Teclado"]  # Un producto por fila
})

# Tabla "Clientes" (solo información del cliente)
clientes_2fn = clientes[["ID_Cliente", "Nombre", "Teléfono"]]

# Tabla "Pedidos" (solo información de pedidos)
pedidos_2fn = pedidos[["ID_Pedido", "ID_Cliente", "Producto"]]

# Tabla "Productos" (información independiente)
productos_3fn = pd.DataFrame({
    "Producto": ["Laptop", "Mouse", "Teclado"],
    "Categoría": ["Electrónica", "Accesorios", "Accesorios"]
})

Álgebra Relacional en Python

In [None]:
# Filtrar clientes con ID > 101
clientes_filtrados = clientes[clientes["ID_Cliente"] > 101]

# Seleccionar solo nombres
nombres = clientes[["Nombre"]]

# Inner Join entre "Clientes" y "Pedidos"
join_clientes_pedidos = pd.merge(clientes, pedidos, on="ID_Cliente")

Bases de Datos NoSQL

In [None]:
# Ejemplo de documento JSON
cliente_nosql = {
    "_id": 101,
    "nombre": "Ana López",
    "pedidos": ["Laptop", "Mouse"]
}

Creación de Base de Datos

In [None]:
-- Crear base de datos y usuario
CREATE DATABASE tienda;
CREATE USER admin WITH PASSWORD 'admin123';
GRANT ALL PRIVILEGES ON DATABASE tienda TO admin;
# Esto seria en SQL pero aqui solo lo dejamos mencionado

Creación de Tablas

In [None]:
-- Tabla 'clientes'
CREATE TABLE clientes (
    cliente_id SERIAL PRIMARY KEY,
    nombre VARCHAR(100),
    correo VARCHAR(100) UNIQUE
);

-- Tabla 'productos'
CREATE TABLE productos (
    producto_id SERIAL PRIMARY KEY,
    nombre VARCHAR(100),
    precio DECIMAL(10,2)
);

-- Tabla 'pedidos' (con clave foránea)
CREATE TABLE pedidos (
    pedido_id SERIAL PRIMARY KEY,
    cliente_id INT REFERENCES clientes(cliente_id),
    fecha TIMESTAMP DEFAULT now()
);

Consultas Básicas

In [None]:
-- Seleccionar todos los clientes
SELECT nombre, correo FROM clientes;

-- Filtrar productos caros
SELECT * FROM productos WHERE precio > 50;

-- Ordenar productos por precio (descendente)
SELECT * FROM productos ORDER BY precio DESC;

Uso de Índices y JOINs

In [None]:
-- Crear índice en el campo 'correo'
CREATE INDEX idx_cliente_correo ON clientes(correo);

-- Verificar uso del índice
EXPLAIN ANALYZE SELECT * FROM clientes WHERE correo = 'email@example.com';

-- JOIN entre clientes y pedidos
SELECT c.nombre, p.pedido_id, p.fecha
FROM clientes c
JOIN pedidos p ON c.cliente_id = p.cliente_id;

-- Optimización con EXPLAIN
EXPLAIN ANALYZE
SELECT c.nombre, p.pedido_id
FROM clientes c
LEFT JOIN pedidos p ON c.cliente_id = p.cliente_id;

Consultas Avanzadas

In [None]:
-- Calcular el total de ventas por cliente.
SELECT c.nombre, SUM(pr.precio) AS total_gastado
FROM clientes c
JOIN pedidos p ON c.cliente_id = p.cliente_id
JOIN productos pr ON p.producto_id = pr.producto_id
GROUP BY c.nombre;

Consulta SQL para Stock Bajo

In [None]:
SELECT p.nombre, i.cantidad
FROM Productos p
JOIN Inventario i ON p.id_producto = i.id_producto
WHERE i.cantidad < 10;

JOINs y Subconsultas

In [None]:
SELECT c.nombre, a.account_no
FROM Clientes c
JOIN Cuentas a ON c.id_cliente = a.id_cliente
WHERE a.tipo = 'corriente';

SELECT nombre, cantidad
FROM Inventario
WHERE cantidad > (SELECT AVG(cantidad) FROM Inventario);

Agregación con GROUP BY

In [None]:
SELECT c.nombre, SUM(t.monto) AS total_gastado
FROM Clientes c
JOIN Cuentas a ON c.id_cliente = a.id_cliente
JOIN Transacciones t ON a.account_no = t.cuenta_origen
GROUP BY c.nombre;

Índices

In [None]:
-- Índice para búsquedas por saldo
CREATE INDEX idx_saldo ON Cuentas(balance);

-- Índice para JOINs frecuentes
CREATE INDEX idx_proveedor ON Productos(id_proveedor);

EXPLAIN ANALYZE SELECT * FROM Cuentas WHERE balance > 1000;

Particionamiento de Tablas

In [None]:
-- Crear tabla particionada
CREATE TABLE Transacciones (
    id_transacción SERIAL,
    fecha DATE,
    monto DECIMAL(10,2)
) PARTITION BY RANGE (fecha);

-- Particiones específicas
CREATE TABLE Transacciones_2023 PARTITION OF Transacciones
    FOR VALUES FROM ('2023-01-01') TO ('2024-01-01');

Evaluación de Calidad

In [None]:
# Verificar valores nulos y duplicados
print(datos_csv.isnull().sum())
print(datos_csv.duplicated().sum())

Anonimización

In [None]:
datos_csv['nombre'] = datos_csv['nombre'].apply(lambda x: 'ANON_' + str(hash(x)))

Combinar datos de una API y un CSV

In [None]:
# Combinar datos de API y CSV
datos_combinados = pd.concat([datos_api, datos_csv], ignore_index=True)

Extracción desde Archivos CSV

In [None]:
import pandas as pd

# Cargar datos desde CSV
df = pd.read_csv("ventas.csv")
print("Primeras filas del dataset:")
print(df.head())

Extracción desde una API REST

In [None]:
import requests

url = "https://api.datos.gob.mx/v1/ventas"
response = requests.get(url)

if response.status_code == 200:
    datos_api = pd.DataFrame(response.json())
    print("Datos de la API:")
    print(datos_api.head())

Limpieza Básica

In [None]:
# Eliminar filas con valores nulos
df_limpio = df.dropna()

# Convertir texto a minúsculas
df_limpio["categoria"] = df_limpio["categoria"].str.lower()

# Crear nueva columna 'total'
df_limpio["total"] = df_limpio["precio_unitario"] * df_limpio["cantidad"]

Normalización de Datos

In [None]:
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
df_limpio[["total_norm"]] = scaler.fit_transform(df_limpio[["total"]])
print("Datos normalizados:")
print(df_limpio[["total", "total_norm"]].head())

Carga en PostgreSQL


In [None]:
from sqlalchemy import create_engine

# Conexión a la base de datos
engine = create_engine("postgresql://usuario:clave@localhost:5432/mi_db")

# Cargar DataFrame en PostgreSQL
df_limpio.to_sql("ventas_procesadas", engine, if_exists="replace", index=False)
print("¡Datos cargados en PostgreSQL!")

Exportar a CSV/Parquet

In [None]:
# Guardar como CSV
df_limpio.to_csv("ventas_procesadas.csv", index=False)

# Guardar como Parquet (formato eficiente para big data)
df_limpio.to_parquet("ventas_procesadas.parquet")

Análisis Descriptivo

In [None]:
import pandas as pd

# Datos de ejemplo
data = {'Ventas': [100, 150, 200, 250, 300]}
df = pd.DataFrame(data)

# Estadísticas descriptivas
print(df.describe())

 Análisis Exploratorio (EDA)

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Heatmap de correlación
sns.heatmap(df.corr(), annot=True)
plt.title("Matriz de Correlación")
plt.show()

Análisis Predictivo

In [None]:
from sklearn.linear_model import LinearRegression

# Datos de entrenamiento
X = [[1], [2], [3], [4], [5]]  # Variable independiente
y = [100, 150, 200, 250, 300]   # Variable dependiente

# Modelo predictivo
modelo = LinearRegression()
modelo.fit(X, y)

# Predicción
print(f"Predicción para 6: {modelo.predict([[6]])[0]:.2f}")

Análisis Prescriptivo

In [None]:
# Ejemplo simplificado de optimización de inventario
def optimizar_inventario(demanda_prevista, stock_actual):
    return max(0, demanda_prevista - stock_actual)

print(f"Pedir {optimizar_inventario(350, 200)} unidades")

Visualización de Datos

In [None]:
import matplotlib.pyplot as plt

df["Ventas"].hist(bins=20, edgecolor="black")
plt.title("Distribución de Ventas")
plt.xlabel("Ventas")
plt.ylabel("Frecuencia")
plt.show()

Boxplots

In [None]:
import seaborn as sns

sns.boxplot(data=df, y="Ventas")
plt.title("Boxplot de Ventas")
plt.show()

Gráficos de Dispersión y Correlación

In [None]:
# Datos adicionales
df["Publicidad"] = [50, 60, 70, 80, 90, 100, 110]

# Scatter plot
plt.scatter(df["Publicidad"], df["Ventas"])
plt.title("Ventas vs. Publicidad")
plt.xlabel("Publicidad ($)")
plt.ylabel("Ventas ($)")
plt.show()

# Matriz de correlación
sns.heatmap(df.corr(), annot=True, cmap="coolwarm")
plt.title("Matriz de Correlación")
plt.show()

Método Z-Score

In [None]:
from scipy.stats import zscore

df["Z-Score"] = zscore(df["Ventas"])
outliers = df[df["Z-Score"].abs() > 3]
print("Outliers detectados:\n", outliers)

Correlacion

In [None]:
import pandas as pd

# Calcular correlación
correlacion = df.corr()
print(correlacion)

# Filtrar variables correlacionadas (> 0.8)
variables_redundantes = correlacion[correlacion > 0.8].stack().index.tolist()

Normalización vs. Estandarización

In [None]:
from sklearn.preprocessing import MinMaxScaler, StandardScaler

# Normalización
scaler_minmax = MinMaxScaler()
df_normalizado = scaler_minmax.fit_transform(df[['ingreso']])

# Estandarización
scaler_zscore = StandardScaler()
df_estandarizado = scaler_zscore.fit_transform(df[['edad']])

One-Hot Encoding

In [None]:
# Convertir columna 'color' a variables dummy
df_encoded = pd.get_dummies(df, columns=['color'])
print(df_encoded.head())

Reducción de Dimensionalidad con PCA (Análisis de Componentes Principales)

In [None]:
from sklearn.decomposition import PCA

pca = PCA(n_components=2)
componentes = pca.fit_transform(df[['x1', 'x2', 'x3']])
print("Varianza explicada:", pca.explained_variance_ratio_)

t-SNE (Visualización No Lineal)

In [None]:
from sklearn.manifold import TSNE

tsne = TSNE(n_components=2, perplexity=30)
resultado_tsne = tsne.fit_transform(df)
plt.scatter(resultado_tsne[:, 0], resultado_tsne[:, 1])
plt.title("Visualización t-SNE")
plt.show()

Bibliotecas Principales

In [None]:
import matplotlib.pyplot as plt  # Gráficos básicos
import seaborn as sns            # Gráficos estadísticos avanzados
import plotly.express as px      # Gráficos interactivos

Visualización Interactiva con Plotly

In [None]:
import plotly.express as px

fig = px.scatter(df, x="precio", y="ventas", color="categoria", 
                 title="Ventas vs. Precio por Categoría")
fig.show()

Dashboards con Seaborn y Matplotlib

In [None]:
# Ejemplo de subgráficos
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
sns.histplot(df["precio"], ax=axes[0], kde=True)
sns.boxplot(data=df, x="categoria", y="ventas", ax=axes[1])
plt.tight_layout()
plt.show()

Clasificación

In [None]:
from sklearn.datasets import load_iris
from sklearn.tree import DecisionTreeClassifier

# Cargar datos
iris = load_iris()
X, y = iris.data, iris.target

# Entrenar modelo
modelo = DecisionTreeClassifier()
modelo.fit(X, y)

Evaluación de Modelos

In [None]:
from sklearn.metrics import mean_squared_error

pred = modelo.predict(X_test)
mse = mean_squared_error(y_test, pred)
print(f"Error Cuadrático Medio: {mse:.2f}")

Clasificación con Iris Dataset

In [None]:
from sklearn.datasets import load_iris
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score

# Cargar datos
iris = load_iris()
X, y = iris.data, iris.target

# Entrenar y evaluar
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
modelo = DecisionTreeClassifier()
modelo.fit(X_train, y_train)

# Precisión
pred = modelo.predict(X_test)
print(f"Precisión: {accuracy_score(y_test, pred):.2f}")

Selección de Modelo

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

# Elección según el problema
modelo_lineal = LogisticRegression()
modelo_no_lineal = RandomForestClassifier()

Clasificación con Regresión Logística

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# Dividir datos
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.3, random_state=42
)

# Entrenar modelo
modelo = LogisticRegression(max_iter=200)
modelo.fit(X_train, y_train)

# Evaluar
y_pred = modelo.predict(X_test)
print(classification_report(y_test, y_pred, target_names=iris.target_names))

Matriz de Confusión

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

cm = confusion_matrix(y_test, y_pred)
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", 
            xticklabels=iris.target_names, 
            yticklabels=iris.target_names)
plt.title("Matriz de Confusión")
plt.show()

K-Nearest Neighbors (KNN)

In [None]:
from sklearn.neighbors import KNeighborsClassifier

# Crear y entrenar modelo
knn = KNeighborsClassifier(n_neighbors=3)
knn.fit(X_train, y_train)

# Predecir y evaluar
y_pred = knn.predict(X_test)
print(f"Precisión KNN: {accuracy_score(y_test, y_pred):.2f}")

Árbol de Decisión

In [None]:
from sklearn.tree import DecisionTreeClassifier

# Crear y entrenar modelo
dt = DecisionTreeClassifier(max_depth=3)
dt.fit(X_train, y_train)

# Predecir y evaluar
y_pred = dt.predict(X_test)
print(f"Precisión Árbol de Decisión: {accuracy_score(y_test, y_pred):.2f}")

# Visualizar árbol (requiere graphviz)
from sklearn.tree import plot_tree
import matplotlib.pyplot as plt
plt.figure(figsize=(12,8))
plot_tree(dt, filled=True, feature_names=iris.feature_names, class_names=iris.target_names)
plt.show()

Random Forest

In [None]:
from sklearn.ensemble import RandomForestClassifier

# Crear y entrenar modelo
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)

# Predecir y evaluar
y_pred = rf.predict(X_test)
print(f"Precisión Random Forest: {accuracy_score(y_test, y_pred):.2f}")

# Importancia de características
importances = rf.feature_importances_
for name, importance in zip(iris.feature_names, importances):
    print(f"{name}: {importance:.3f}")

Support Vector Machine

In [None]:
from sklearn.svm import SVC

# Crear y entrenar modelo
svm = SVC(kernel='linear', C=1.0)
svm.fit(X_train, y_train)

# Predecir y evaluar
y_pred = svm.predict(X_test)
print(f"Precisión SVM: {accuracy_score(y_test, y_pred):.2f}")

Comparativa de Modelos

In [None]:
models = {
    "Regresión Logística": LogisticRegression(max_iter=200),
    "KNN": KNeighborsClassifier(n_neighbors=3),
    "Árbol de Decisión": DecisionTreeClassifier(max_depth=3),
    "Random Forest": RandomForestClassifier(n_estimators=100),
    "SVM": SVC(kernel='linear')
}

for name, model in models.items():
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    print(f"{name}: {accuracy_score(y_test, y_pred):.4f}")

Matriz de Confusión para Todos los Modelos

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

for name, model in models.items():
    y_pred = model.predict(X_test)
    cm = confusion_matrix(y_test, y_pred)
    
    plt.figure(figsize=(6,4))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=iris.target_names, 
                yticklabels=iris.target_names)
    plt.title(f"Matriz de Confusión - {name}")
    plt.ylabel('Verdadero')
    plt.xlabel('Predicho')
    plt.show()

Árboles de Decisión

In [None]:
from sklearn.tree import DecisionTreeClassifier, plot_tree
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

# Cargar datos
iris = load_iris()
X, y = iris.data, iris.target

# Dividir datos
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Crear y entrenar modelo
arbol = DecisionTreeClassifier(max_depth=3, random_state=42)
arbol.fit(X_train, y_train)

# Visualizar árbol
plt.figure(figsize=(15,10))
plot_tree(arbol, 
          filled=True, 
          feature_names=iris.feature_names, 
          class_names=iris.target_names)
plt.title("Árbol de Decisión - Iris Dataset")
plt.show()


from sklearn.metrics import classification_report

# Predecir y evaluar
y_pred = arbol.predict(X_test)
print("Reporte de Clasificación:")
print(classification_report(y_test, y_pred, target_names=iris.target_names))

Random Forest

In [None]:
from sklearn.ensemble import RandomForestClassifier

# Crear y entrenar modelo
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)

# Evaluar modelo
y_pred_rf = rf.predict(X_test)
print("Reporte de Clasificación (Random Forest):")
print(classification_report(y_test, y_pred_rf, target_names=iris.target_names))

import pandas as pd
import seaborn as sns

# Obtener importancia de características
importancias = rf.feature_importances_
df_importancia = pd.DataFrame({
    'Característica': iris.feature_names,
    'Importancia': importancias
}).sort_values('Importancia', ascending=False)

# Visualizar
plt.figure(figsize=(10,6))
sns.barplot(x='Importancia', y='Característica', data=df_importancia, palette='viridis')
plt.title('Importancia de Características - Random Forest')
plt.show()

Comparativa y Optimización

In [None]:
from sklearn.metrics import accuracy_score

# Calcular precisiones
acc_arbol = accuracy_score(y_test, y_pred)
acc_rf = accuracy_score(y_test, y_pred_rf)

print(f"Precisión Árbol de Decisión: {acc_arbol:.4f}")
print(f"Precisión Random Forest: {acc_rf:.4f}")
print(f"Mejora: {(acc_rf - acc_arbol)*100:.2f}%")

from sklearn.model_selection import GridSearchCV

# Definir parámetros a probar
param_grid = {
    'n_estimators': [50, 100, 200],
    'max_depth': [None, 5, 10],
    'min_samples_split': [2, 5, 10]
}

# Búsqueda de mejores parámetros
grid_search = GridSearchCV(RandomForestClassifier(random_state=42), 
                          param_grid, 
                          cv=5,
                          scoring='accuracy')
grid_search.fit(X_train, y_train)

# Mostrar mejores parámetros
print("Mejores parámetros encontrados:")
print(grid_search.best_params_)

Overfitting vs Underfitting

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

# Generar datos sintéticos
np.random.seed(42)
X = np.linspace(0, 1, 30)
y = np.sin(2 * np.pi * X) + np.random.normal(0, 0.1, 30)

# Configurar subplots
plt.figure(figsize=(18, 4))

# Underfitting (grado 1)
plt.subplot(1, 3, 1)
model = Pipeline([('poly', PolynomialFeatures(degree=1)), 
                  ('linear', LinearRegression())])
model.fit(X[:, np.newaxis], y)
plt.scatter(X, y, s=20, label='Datos')
plt.plot(X, model.predict(X[:, np.newaxis]), color='r', label='Modelo')
plt.title('Underfitting (Grado 1)')
plt.legend()

# Buen ajuste (grado 3)
plt.subplot(1, 3, 2)
model = Pipeline([('poly', PolynomialFeatures(degree=3)), 
                  ('linear', LinearRegression())])
model.fit(X[:, np.newaxis], y)
plt.scatter(X, y, s=20)
plt.plot(X, model.predict(X[:, np.newaxis]), color='r')
plt.title('Buen Ajuste (Grado 3)')

# Overfitting (grado 15)
plt.subplot(1, 3, 3)
model = Pipeline([('poly', PolynomialFeatures(degree=15)), 
                  ('linear', LinearRegression())])
model.fit(X[:, np.newaxis], y)
plt.scatter(X, y, s=20)
plt.plot(X, model.predict(X[:, np.newaxis]), color='r')
plt.title('Overfitting (Grado 15)')

plt.tight_layout()
plt.show()

from sklearn.model_selection import train_test_split

# Dividir datos
X_train, X_test, y_train, y_test = train_test_split(X[:, np.newaxis], y, test_size=0.3)

# Evaluar modelos
degrees = [1, 3, 15]
results = []

for degree in degrees:
    model = Pipeline([('poly', PolynomialFeatures(degree=degree)), 
                     ('linear', LinearRegression())])
    model.fit(X_train, y_train)
    
    train_score = model.score(X_train, y_train)
    test_score = model.score(X_test, y_test)
    results.append((degree, train_score, test_score))

# Mostrar resultados
import pandas as pd
df_results = pd.DataFrame(results, columns=['Grado', 'Train R²', 'Test R²'])
print(df_results)

Validación Cruzada

In [None]:
from sklearn.model_selection import cross_val_score, KFold
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris

# Cargar datos
iris = load_iris()
X, y = iris.data, iris.target

# Configurar validación cruzada
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
model = RandomForestClassifier(n_estimators=100)

# Evaluar modelo
scores = cross_val_score(model, X, y, cv=kfold, scoring='accuracy')

print("Accuracy por fold:", scores)
print("Accuracy promedio: {:.2f}%".format(scores.mean() * 100))
print("Desviación estándar: {:.4f}".format(scores.std()))

from sklearn.model_selection import StratifiedKFold

skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
stratified_scores = cross_val_score(model, X, y, cv=skf)

print("Accuracy estratificado:", stratified_scores)
print("Mejora respecto a KFold normal: {:.2f}%".format(
    (stratified_scores.mean() - scores.mean()) * 100))

Regularización

In [None]:
from sklearn.linear_model import Ridge, Lasso
from sklearn.datasets import make_regression
from sklearn.preprocessing import StandardScaler

# Generar datos
X, y = make_regression(n_samples=100, n_features=10, noise=0.5, random_state=42)
X = StandardScaler().fit_transform(X)

# Entrenar modelos
ridge = Ridge(alpha=1.0)
lasso = Lasso(alpha=0.1)

ridge.fit(X, y)
lasso.fit(X, y)

# Comparar coeficientes
plt.figure(figsize=(10, 5))
plt.bar(range(10), ridge.coef_, alpha=0.7, label='Ridge')
plt.bar(range(10), lasso.coef_, alpha=0.7, label='Lasso')
plt.xticks(range(10), [f'X{i}' for i in range(1, 11)])
plt.title('Comparación de Coeficientes: Ridge vs Lasso')
plt.legend()
plt.show()



Implementación con Regularización y Validación Cruzada

In [None]:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import PolynomialFeatures, StandardScaler

# Crear pipeline
pipe = make_pipeline(
    PolynomialFeatures(degree=3),
    StandardScaler(),
    Ridge(alpha=1.0)
)

# Evaluar con validación cruzada
scores = cross_val_score(pipe, X[:, np.newaxis], y, cv=5,
                        scoring='neg_mean_squared_error')

print("MSE promedio: {:.4f}".format(-scores.mean()))

 Big Data

In [None]:
from functools import reduce
import random
from collections import defaultdict

# Generar datos de ejemplo (1 millón de registros)
datos_ventas = [random.randint(1, 100) for _ in range(1000000)]

# Función Map
def mapper(valor):
    return (valor, 1)

# Función Reduce
def reducer(acumulado, elemento):
    clave, valor = elemento
    acumulado[clave] += valor
    return acumulado

# Procesamiento en lotes (simulando procesamiento distribuido)
resultados = reduce(reducer, map(mapper, datos_ventas[:10000]), defaultdict(int))

# Mostrar resultados
print("Conteo de ventas por producto (top 10):")
for producto, cantidad in sorted(resultados.items(), key=lambda x: x[1], reverse=True)[:10]:
    print(f"Producto {producto}: {cantidad} ventas")

Ciclo de Vida de los Datos

In [None]:
import pandas as pd
from datetime import datetime
import hashlib
import matplotlib.pyplot as plt

class DataLifecycle:
    def __init__(self):
        self.raw_data = None
        self.processed_data = None
        self.analysis_results = None
    
    # Fase 1: Generación/Adquisición
    def acquire_data(self, source):
        """Simula adquisición de datos desde diferentes fuentes"""
        print(f"Adquiriendo datos desde: {source}")
        if source == 'ejemplo_csv':
            self.raw_data = pd.DataFrame({
                'id': [1, 2, 3, 4],
                'nombre': ['Ana', 'Juan', 'María', 'Pedro'],
                'edad': [25, 32, 28, 41],
                'fecha_registro': [datetime.now() for _ in range(4)]
            })
        return self.raw_data
    
    # Fase 2: Almacenamiento
    def store_data(self, data, destination):
        """Simula almacenamiento seguro con hash de verificación"""
        data_hash = hashlib.sha256(str(data.values).encode()).hexdigest()
        print(f"Datos almacenados en {destination} | Hash: {data_hash[:10]}...")
        return data_hash
    
    # Fase 3: Procesamiento
    def process_data(self):
        """Ejemplo de limpieza y transformación"""
        if self.raw_data is not None:
            self.processed_data = self.raw_data.copy()
            self.processed_data['edad_categoria'] = pd.cut(
                self.processed_data['edad'],
                bins=[0, 30, 40, 100],
                labels=['Joven', 'Adulto', 'Mayor']
            )
            return self.processed_data
    
    # Fase 4: Análisis
    def analyze_data(self):
        """Ejemplo de análisis básico"""
        if self.processed_data is not None:
            self.analysis_results = {
                'conteo_edades': self.processed_data['edad_categoria'].value_counts(),
                'edad_promedio': self.processed_data['edad'].mean()
            }
            return self.analysis_results
    
    # Fase 5: Visualización
    def visualize_results(self):
        """Generación de visualizaciones"""
        if self.analysis_results:
            plt.figure(figsize=(8, 4))
            self.analysis_results['conteo_edades'].plot(kind='bar')
            plt.title('Distribución por categoría de edad')
            plt.ylabel('Cantidad')
            plt.show()
    
    # Fase 6: Retención
    def apply_retention_policy(self, days=30):
        """Simula política de retención"""
        print(f"Aplicando política de retención: {days} días")
    
    # Fase 7: Archivado/Eliminación
    def archive_or_delete(self, action='archive'):
        """Simula archivado o eliminación segura"""
        print(f"Proceso completado: {action.upper()} ejecutado")

# Ejemplo de uso completo
dlc = DataLifecycle()
dlc.acquire_data('ejemplo_csv')
dlc.store_data(dlc.raw_data, 'almacenamiento_primario')
dlc.process_data()
dlc.analyze_data()
dlc.visualize_results()
dlc.apply_retention_policy(60)
dlc.archive_or_delete('archive')

Arquitectura e Ingeniería de Datos

In [None]:
from abc import ABC, abstractmethod
import time
from functools import wraps

# Patrón de diseño para componentes de arquitectura
class DataComponent(ABC):
    @abstractmethod
    def process(self, data):
        pass

class DataIngester(DataComponent):
    def process(self, source):
        print(f"Ingestando datos desde: {source}")
        return pd.DataFrame({'valor': [10, 20, 30]})

class DataTransformer(DataComponent):
    def process(self, data):
        print("Transformando datos...")
        return data * 2

class DataLoader(DataComponent):
    def process(self, data):
        print(f"Cargando {len(data)} registros al warehouse")
        return True

# Decorador para monitoreo de performance
def monitor_performance(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"Tiempo de ejecución: {end_time - start_time:.2f} segundos")
        return result
    return wrapper

# Pipeline modular con manejo de errores
class DataPipeline:
    def __init__(self):
        self.components = []
    
    def add_component(self, component):
        self.components.append(component)
    
    @monitor_performance
    def execute(self):
        data = None
        for component in self.components:
            try:
                data = component.process(data if data is not None else None)
            except Exception as e:
                print(f"Error en {component.__class__.__name__}: {str(e)}")
                raise
        return data

# Configuración del pipeline
pipeline = DataPipeline()
pipeline.add_component(DataIngester())
pipeline.add_component(DataTransformer())
pipeline.add_component(DataLoader())

# Ejecución
pipeline.execute()

Seguridad y Privacidad de Datos

In [None]:
import cryptography
from cryptography.fernet import Fernet
import numpy as np

# Generación de clave de encriptación
key = Fernet.generate_key()
cipher_suite = Fernet(key)

# Encriptación de datos sensibles
def encrypt_data(data):
    if isinstance(data, pd.DataFrame):
        return data.applymap(lambda x: cipher_suite.encrypt(str(x).encode()).decode())
    return cipher_suite.encrypt(str(data).encode()).decode()

# Desencriptación
def decrypt_data(data):
    if isinstance(data, pd.DataFrame):
        return data.applymap(lambda x: cipher_suite.decrypt(x.encode()).decode())
    return cipher_suite.decrypt(data.encode()).decode()

# Anonimización de datos
def anonymize_data(df, columns):
    anonymized = df.copy()
    for col in columns:
        if col in anonymized.columns:
            anonymized[col] = anonymized[col].apply(
                lambda x: hashlib.sha256(str(x).encode()).hexdigest()[:8]
            )
    return anonymized

# Ejemplo de uso
sensitive_data = pd.DataFrame({
    'nombre': ['Ana Pérez', 'Juan García', 'María López'],
    'email': ['ana@example.com', 'juan@example.com', 'maria@example.com'],
    'telefono': ['12345678', '87654321', '55555555']
})

print("Datos originales:")
display(sensitive_data.head())

# Aplicar seguridad
encrypted_data = encrypt_data(sensitive_data)
print("\nDatos encriptados:")
display(encrypted_data.head())

anonymized_data = anonymize_data(sensitive_data, ['nombre', 'email'])
print("\nDatos anonimizados:")
display(anonymized_data.head())

Estándares Internacionales

In [None]:
class DataStandardChecker:
    @staticmethod
    def check_gdpr_compliance(df):
        """Verifica requisitos básicos GDPR"""
        checks = {
            'has_pii': any(col in df.columns for col in ['nombre', 'email', 'telefono']),
            'has_consent_column': 'consentimiento' in df.columns,
            'has_retention_policy': False  # Simulado
        }
        return checks
    
    @staticmethod
    def check_fair_principles(df):
        """Verifica principios FAIR"""
        checks = {
            'findable': 'id' in df.columns,
            'accessible': not df.empty,
            'interoperable': isinstance(df, pd.DataFrame),
            'reusable': len(df.columns) > 1
        }
        return checks

# Ejemplo de uso
sample_data = pd.DataFrame({
    'id': [1, 2, 3],
    'nombre': ['A', 'B', 'C'],
    'consentimiento': [True, True, False]
})

print("Verificación GDPR:")
display(DataStandardChecker.check_gdpr_compliance(sample_data))

print("\nVerificación FAIR:")
display(DataStandardChecker.check_fair_principles(sample_data))