# **Objetivos de la clase:**
1. Aplicar listas, tuplas, diccionarios y sets en un mini sistema real.
2. Entender programación funcional en Python (funciones puras, inmutabilidad razonable, composición).
3. Dominar lambda junto a map, filter y reduce (y cuándo preferir comprensiones).
4. Diseñar pipelines de limpieza y validación de datos.
Separar lógica de negocio de presentación/IO.
5. Prepararse para trabajar con JSON y estructuras anidadas.

## **Programación Funcional**
1. La programación funcional es un paradigma de programación que trata de la computación como la evaluación de funciones matematicas y evita el cambio de estado y los datos mutables.
2. La programación funcional se enfoca en qué se debe hacer, utilizando expresiones en lugar de declaraciones

In [2]:
# Lista de numeros inicial
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Usando una compresion de lista (list comprehension)
# Combina filtrado y mapeo en una sola expresion declarativa.
numeros_pares_cuadrados = [
   num ** 2 for num in numbers if num % 2 == 0
]

 # ejemplo de como hace el proceso con ciclos
for num in numbers:
    print(f"imprimiendo el iterable de la lista: {num}")

for num in numbers:
    if num % 2 == 0:
        print(f"imprimiendo los numeros pares: {num}")
        print(f"imprimiendo el cuadrado de los numeros pares: {num ** 2}")

# Imprime los numeros pares cuadrados
print(numeros_pares_cuadrados)

imprimiendo el iterable de la lista: 1
imprimiendo el iterable de la lista: 2
imprimiendo el iterable de la lista: 3
imprimiendo el iterable de la lista: 4
imprimiendo el iterable de la lista: 5
imprimiendo el iterable de la lista: 6
imprimiendo el iterable de la lista: 7
imprimiendo el iterable de la lista: 8
imprimiendo el iterable de la lista: 9
imprimiendo el iterable de la lista: 10
imprimiendo los numeros pares: 2
imprimiendo el cuadrado de los numeros pares: 4
imprimiendo los numeros pares: 4
imprimiendo el cuadrado de los numeros pares: 16
imprimiendo los numeros pares: 6
imprimiendo el cuadrado de los numeros pares: 36
imprimiendo los numeros pares: 8
imprimiendo el cuadrado de los numeros pares: 64
imprimiendo los numeros pares: 10
imprimiendo el cuadrado de los numeros pares: 100
[4, 16, 36, 64, 100]


**Explicacion del ejemplo:**

* Paso 1: Lista de numeros del  1 al 10.
* paso 1.1: EL ciclo for itera sobre cada elemento de la lista, donde numbers es la lista original y num es el elemento actual.
* paso 1.2: El ciclo if filtra los numeros pares. Con la formula para numeros pares es:
`num % 2 == 0, donde 1 % 2 == 1`
* paso 2: `El ciclo **num ** 2 `mapea cada numero a su cuadrado.
* Paso 3: Combinar los resultados en una sola lista.

**Explicación paso a paso:**

1. Se define una única expresión `([... for ... if ...])` que describe el resultado deseado.
2. `if num % 2 == 0 e`s la parte que filtra los números pares.
3. `num ** 2 `es la parte que transforma (mapea) cada número par en su cuadrado.
4. La comprensión de lista crea una nueva lista directamente a partir de la original, sin necesidad de un bucle explícito o de modificar un estado intermedio.
5. El código se lee como una declaración de lo que es el resultado, sin detallar el "cómo" se construye.

**Comparación y Ventajas:**

7. **Legibilidad y Concisión:** El enfoque funcional es mucho más corto y, para muchos, más fácil de leer, ya que la intención del código es evidente de inmediato.

8. **Inmutabilidad:** La versión funcional no modifica la lista original ni utiliza una lista mutable intermedia, lo que reduce los efectos secundarios y hace que el código sea más seguro y predecible.

9. **Abstracción:** El código funcional eleva el nivel de abstracción, permitiéndote pensar en operaciones sobre colecciones `(filtrar, mapear)` en lugar de en bucles y mutaciones.

# **Como Se aplica en Python**

1. Funciones de primera clase y orden superior
2. Funciones puras
3. Inmutabilidad
4. Inmutabilidad razonable
4. Funciones lambda
5. Composición
6. Pipelines
* Legibilidad y Concisión
* Abstracción

* Colecciones + funciones de orden superior
* Evaluación perezosa

# **Funciones de primera clase y orden superior**

* Las funciones de primera clase se pueden pasar como argumentos a otras funciones.
* Las funciones de orden superior son las que aceptan otras funciones como argumentos o las devuelven.

In [3]:
# 1 Funciones de primera clase: Es solo una función regular.
def aplicar_opreacion(op,a, b):
    # 2 Función  de orden superior: toma una función -op- como argumento.
    return op(a, b)

# Una función simple que suma dos números.}
def suma(x, y):
    return x + y

# 3 La función -suma- se pasa como argumento a -aplicar_opreacion-
resultado = aplicar_opreacion(suma, 5, 3)

print(f"El resultado de la suma es: {resultado}")
# La salida es: El resultado de la suma es: 8

El resultado de la suma es: 8


En este ejemplo -suma- es una función de primera clase que se pasa a aplicar_operación, una función de orden superior.


# **Funciones puras**

Mismo input → mismo output, sin efectos colaterales.

In [4]:
def cuadrado(x: int) -> int:
    return x * x

print(cuadrado(4))  # 16
print(cuadrado(4))  # siempre 16

16
16


1. No depende de variables externas.
2. No modifica nada.
3. Siempre devuelve el mismo resultado.
4. No tiene efectos secundarios.

**Funciones puras**

* Una función pura siempre devuelve el mismo resultado para las mismas entradas y no tiene efectos secundarios.
* Esto significa que no modifica variables externas al ámbito de la función.

In [5]:
# Variable global que no debe ser modificada:
IVA_GLOBAL = 0.16

# Función pura: solo depende de sus argumentos.
def calcular_precio_final(precio_base):
    return precio_base * (1 + 0.16)

# Función impura: modifica una variable global (efecto secundario).
def agregar_iva_impura(precio_base):
    # Esto es una mala práctica en programación funcional
    global IVA_GLOBAL
    IVA_GLOBAL += 0.16
    return precio_base * (1 + IVA_GLOBAL)

precio = 100
precio_final = calcular_precio_final(precio)

print(f"El precio final es: {precio_final}")
# Salida: El precio final es:

El precio final es: 115.99999999999999


`calcular_precio_final` es pura porque no modifica uanda fuera de su ambito, a diferencia de `agregar_iva_impura` que si lo hace.

# **Inmutabilidad**

* La **inmutabilidad** se refiere a no modificar objetos después de su creación.
* En Python, _las tuplas_ y _las cadenas_ de texto son inmutables, mientras que las listas son mutables.

In [6]:
# Uso de tuplas (inmutables)
datos_inmutables = (1, 2, 3)

# Esto daria un Error (TypeError) porque las tuplas no se pueden modificar
datos_inmutables.append(4)  # Error

AttributeError: 'tuple' object has no attribute 'append'

In [7]:
# Para modificarlas se debe crear una nueva tupla a partir de la original.
nuevos_datos = datos_inmutables + (4,)

print(f"Tupla original: {datos_inmutables}")
print(f"Tupla modificada: {nuevos_datos}")

Tupla original: (1, 2, 3)
Tupla modificada: (1, 2, 3, 4)


In [9]:
# Uso de una lista (mutable)
datos_mutables  = [1, 2, 3]
datos_mutables.append(4) # Esto funciona bien
print(f"Lista mutada: {datos_mutables}")

Lista mutada: [1, 2, 3, 4]


Al trabajar con estructuras inmutabkes, se evita el cambio de estado, lo que facilita el seguimiento de datos y reduce errores.

# **Inmutabilidad razonable:**

Evitar mutaciones innecesarias, crear nuevas estructuras cuando sea necesario.


In [10]:
# lista original
lista_numeros = [1, 2, 3, 4, 5]

# En vez de mutar la lista, devolvemos una nueva lista con transformaciones
def duplicar_lista(nums):
    return [n * 2 for n in nums]

nueva_lista_mutable = duplicar_lista(lista_numeros)

print("Original: ", lista_numeros)
print("Nueva: ", nueva_lista_mutable)

Original:  [1, 2, 3, 4, 5]
Nueva:  [2, 4, 6, 8, 10]


La lista origina se conserva.

# **Funciones Lambda**

* Una **función lambda** es una función anónima pequeña que se define en una sola línea.
* Son muy útiles para operaciones simples que se usan solo una vez, a menudo como argumentos para funciones de orden superior.
* `Anónima` significa que no requiere un nombre (aunque se puede asignar a una variable).
* Se usa cuando necesitas una función rápida y simple.
* Sintaxis: `lambda argumentos: expresión`
* Se diferencia de `def` porque:
* `lambda` **solo puede contener una expresión**, no bloques de código, no `return`, no `for` etc.
* Se evalúa en el momento y devuelve automáticamente el resultado.

In [11]:
# Función normal
def suma(a, b):
    return a + b

# Función lambda equivalente
suma_lambda = lambda a,b: a +b

print(suma(3, 5))
print(suma_lambda(3, 5))

8
8


Observa que la `lambda` no necesita `return`, siempre devuelve el valor de la expresión.

**Uso práctico de lambda**

Se usan sobre todo cuando necesitas pasar funciones como argumentos a otras funciones:
* map
* filter
* sorted

**En `map` (transformación de listas)**

In [12]:
datos = ["1", " 2 ", "003"]

# convertir strings a enteros quitando espacios.
datos_limpios = list(map(lambda s: int(s.strip()), datos))
print(datos_limpios)

[1, 2, 3]


**En `filter` (filtrado)**

In [13]:
# Números pares
pares = list(filter(lambda n: n % 2 == 0, range(10)))
print(pares)

[0, 2, 4, 6, 8]


**En `sorted `(ordenar con clave personalizada)**

In [14]:
names = ["Ana", "carlos", "beatriz"]

ordenados = sorted(names, key=lambda s: s.lower())

print(ordenados)

['Ana', 'beatriz', 'carlos']


**En `reduce` (acumulación)**

In [15]:
from functools import reduce

suma = reduce(lambda acc, n: acc + n, [1, 2, 3,], 0)
print(suma)

6


In [16]:
precios = [10, 20, 35, 50]

# Usamos map() con una función lambda para duplicar precio.
precio_duplicados = list(map(lambda price: price * 2, precios))

print(f"Precio duplicados: {precio_duplicados}")
# Salida: precios duplicados: [20, 40, 70, 100]

Precio duplicados: [20, 40, 70, 100]


Usando filter() con una función lambda para filtrar precios mayores a 30.

In [17]:
precios_altos = list(filter(lambda price: price >30, precios))

print(f"Precios mayores a 30: {precios_altos}")

Precios mayores a 30: [35, 50]


En estos ejemplos, la funcion lambda: `lambda price: price * 2` - Se usa para definir una transformación sin necesidad de crear una función `def` por separado.


# Comparación con funciones normales

In [18]:
def cuadrado(x):
    return x * x

# Usando equivalente lambda
cuadrado_lambda = lambda x: x * x

print(cuadrado(5))

25


Pero:
* Si la lógica crece (más de 1 línea, condiciones complejas), conviene usar def.

# Ventajas y desventajas

**Ventajas**:

1. Sintaxis corta, ideal para callbacks.
2.
2. Buena combinación con map, filter, reduce, sorted.

**Desventajas:**

1. Menos legible si la expresión es larga.

2. No puedes escribir varias instrucciones.
3.
3. No permite docstrings → menos descriptivas que def.

# Alternativas más "pitónicas"

En Python, muchas veces es más legible usar `comprensiones` en lugar de `map/filter` con` lambda`:

In [19]:
# Con map + lambda
lista_num = ["1", " 2 ", "003"]
num_limpios = list(map(lambda s: int(s.strip()), lista_num))

# Usando Compresión
num_limpios = [int(s.strip()) for s in lista_num]
print(num_limpios)

[1, 2, 3]


# Casos de uso más comunes en nivel senior

Ordenar objetos complejos:


In [20]:
persons = [
    {
    "nombre": "Ana",
    "edad": 30,
},
    {
    "nombre": "Carlos",
    "edad": 25,
}
]

ordenadas = sorted(persons, key=lambda p: p["edad"])

print(ordenadas)

[{'nombre': 'Carlos', 'edad': 25}, {'nombre': 'Ana', 'edad': 30}]


# Funciones inline en GUI/eventos:

In [21]:
# tkinter es una libreria para interfaces graficas
import tkinter as tk

ventana = tk.Tk()
boton = tk.Button(ventana, text="Saludar", command=lambda: print("Hola mundo"))

boton.pack()
ventana.mainloop()

# Expresiones cortas en estructuras de datos:

In [22]:
# Diccionario de operaciones
operaciones ={
    "suma": lambda a, b: a + b,
    "resta": lambda a, b: a - b,
}

print(operaciones["suma"](10, 5))

15


1. `lambda args: `expr → función corta y anónima.
2. Úsala en map, `filter`, reduce, sorted, callbacks.
3. Evita abusar → si el código crece, usa def.
4. Python favorece comprensiones `([x for x in iterable])` sobre `map/filter` + `lambda `por legibilidad.

# Composición de funciones

1. Son una técnica fundamental en programación funcional que consiste en combinar funciones simples para construir una función más compleja.

2. Se trata de pasar la salida de una función como la entrada de la siguiente, creando una cadena de transformaciones.

Encadenar transformaciones pequeñas.

In [23]:
def filtrar_pares(nums):
    return [n for n in nums if n % 2 == 0]

def elevar_al_cuadrado(nums):
    return [n**2 for n in nums]

num_list = [1, 2, 3, 4, 5, 6]

# Composición: filtrar pares y luego elevar al cuadrado
resultado = elevar_al_cuadrado(filtrar_pares(num_list))
print(resultado)

[4, 16, 36]


Cada función hace una sola cosa, y juntas crean un **Pipeline.**

**Nivel Junior**

 la composición se entiende como el encadenamiento de llamadas a funciones. Es la base de un código más modular y legible.

**Concepto**

Imagina una línea de ensamblaje: un producto (el dato) pasa por diferentes estaciones (funciones), donde cada una realiza una tarea específica.

La salida de la primera estación es la entrada de la segunda, y así sucesivamente.

**Métodos y Funciones**

El método más básico para un principiante es anidar llamadas a funciones.

In [24]:
def sumar_dos(x):
    return x + 2

def multiplicar_por_tres(x):
    return x * 3

# Composición manual: el resultado de 'sumar_dos(5)' es la entrada de 'multiplicar_por_tres'.
resultado = multiplicar_por_tres(sumar_dos(5))
print(resultado)  # Salida: 21

21


* Este enfoque, aunque funcional, puede volverse difícil de leer a medida que se anidan más funciones.

* La legibilidad de derecha a izquierda o de adentro hacia afuera puede ser confusa.

**Nivel Senior**

A nivel senior, la composición se ve como un concepto de abstracción. En lugar de anidar llamadas, se crea una nueva función compuesta que encapsula toda la lógica. Esto hace que el código sea más reutilizable, declarativo y fácil de probar.

**Concepto**

Se utilizan herramientas para crear un `"Pipeline`" o **"tubería"** de datos. En lugar de decir "hacer A, luego hacer B, luego C", se define una nueva función llamada "hacer_ABC" que ya sabe cómo combinar las tres.

**Herramientas y Métodos**

Los programadores experimentados no anidan llamadas; en su lugar, escriben una función que compone otras. En Python, esto se puede hacer de forma manual o con librerías.

1. Composición manual (usando una función que devuelve una función):

In [8]:
def componer(f, g):
    """
    Toma dos funciones f y g, y devuelve una nueva función
    que aplica g, y luego f sobre el resultado.
    """
    def funcion_compuesta(x):
        return f(g(x))
    return funcion_compuesta

# Creamos una funcion compuesta
sumar_dos = lambda x: x + 2
multiplicar_por_tres = lambda x: x * 3

# La funcion 'duplicar_y_sumar' primero suma 2 y luego multiplica por 3
duplicar_y_sumar = componer(sumar_dos, multiplicar_por_tres)

resultado =duplicar_y_sumar(5)
print(resultado)

# Salida: 17 (multiplica 5*3=15, luego suma 15+2=17)

17


Nota: El orden es importante. En este caso, f(g(x)) aplica g primero.

# Uso de librerías:

Las librerías funcionales de terceros como toolz o pipe ofrecen herramientas para una composición más limpia y potente, similar a las características de otros lenguajes puramente funcionales.

In [10]:
# pip install pipe
from pipe import select, where, Pipe

# Se crea un pipeline de operaciones
sumar_dos = lambda x: x + 2
multiplicar_por_tres = lambda x: x * 3

# Usando el operador de tubería '|'
# El dato inicial 5 "pasa" a través de cada función.
resultado = 5 | Pipe(multiplicar_por_tres) | Pipe(sumar_dos)
print(resultado) # Salida: 17

17


**Ventajas del enfoque senior:**

**Legibilidad de izquierda a derecha:** Las herramientas de tubería permiten leer el flujo de datos de manera intuitiva.

**Reutilización:** Se pueden crear funciones compuestas para ser usadas en múltiples lugares del código.

**Declarativo:** El código describe "qué" se hace (el flujo de datos) en lugar de "cómo" se hace (anidando llamadas).

**Facilidad de prueba:** Cada función de la composición puede ser probada de forma independiente, lo que simplifica la depuración.

 # Pipeline (Tubería de Datos):

* Es un concepto de diseño en programación.

* Consiste en una secuencia de pasos de procesamiento de datos, donde la salida de un paso se convierte en la entrada del siguiente.

* Imagina una línea de ensamblaje: cada función es una estación que realiza una tarea específica.

# Características principales:

1. **Modularidad:** Cada paso del pipeline es una función pequeña y especializada que realiza una única tarea (ej: filtrar, transformar, validar).
2. **Composición:** Se combinan estas funciones simples para construir un proceso complejo.
3. **Flujo de datos:** El dato "viaja" a través de la tubería, transformándose en cada paso.

**¿Por qué usar Pipelines?**

1. **Legibilidad:** El código se vuelve más fácil de leer porque describe una secuencia de transformaciones de izquierda a derecha, en lugar de `anidar funciones (g(f(h(x))))`.
2. **Reutilización:** Las funciones pequeñas y puras se pueden reutilizar en diferentes pipelines.
3. **Mantenimiento y Pruebas:** Es más fácil probar y depurar cada paso de forma aislada.

* En tu código, ya creaste un pipeline simple anidando funciones:
`resultado = elevar_al_cuadrado(filtrar_pares(num_list)).`

# ¿Cómo se implementan los Pipelines en Python?

Existen varias formas de crear pipelines en Python, desde las más básicas hasta las más elegantes con librerías.

**1. Anidando Funciones (El método básico)**
Es la forma más directa. Se pasa el resultado de una función como argumento a la siguiente.

In [20]:
def filtrar_pares(numeros):
    return [n for n in numeros if n % 2 == 0]

def elevar_al_cuadrado(numeros):
    return [n**2 for n in numeros]

numeros = [1, 2, 3, 4, 5, 6]

# La salida de filtrar_pares es la entrada de elevar_al_cuadrado
resultado = elevar_al_cuadrado(filtrar_pares(numeros))

print(resultado)
# Salida: [4, 16, 36]

[4, 16, 36]


Desventaja: Con muchas funciones, el código se vuelve difícil de leer `(funcion_c(funcion_b(funcion_a(datos)))).`

**2. Usando Variables Intermedias**
Para mejorar la legibilidad del anidamiento, se pueden usar variables temporales.

In [19]:
numeros = [1, 2, 3, 4, 5, 6]

numeros_pares = filtrar_pares(numeros)
numeros_cuadrados = elevar_al_cuadrado(numeros_pares)

print(numeros_cuadrados)
# Salida: [4, 16, 36]

[4, 16, 36]


Ventaja: Es más claro, pero más verboso.

**3. Usando la Librería pipe (El método funcional y elegante)**

Librerías como `pipe` introducen un operador de tubería `(|)` que permite encadenar funciones de una manera muy legible y declarativa.

Primero, instala la librería:

`pip install pipe`

# La librería pipe:

* Es una herramienta (una librería de Python) que te permite escribir pipelines de una manera más clara y legible, `usando el operador | (pipe).`

* En lugar de `anidar funciones (g(f(x)))`, escribes el flujo de datos de izquierda a derecha: `datos | f | g.`

*Esto hace que el código sea más declarativo, ya que describe el flujo de transformaciones en lugar de la mecánica de las llamadas a funciones.

In [21]:
from pipe import where, select

numeros = [1, 2, 3, 4, 5, 6]

# El flujo de datos se lee de izquierda a derecha
resultado_pipe = numeros | where(lambda n: n % 2 == 0) | select(lambda n: n**2)

print(list(resultado_pipe))
# Salida: [4, 16, 36]

[4, 16, 36]


Explicación:
* numeros | ...: El iterable numeros inicia el pipeline.
* where(lambda n: n % 2 == 0): Filtra los elementos del iterable, similar a filter(). Solo deja pasar los que cumplen la condición.
* select(lambda n: n**2): Transforma (mapea) cada elemento que pasó el filtro, similar a map().Resumen: Pipeline vs. Librería pipe

**Ejemplo práctico:**

En lugar de escribir esto (estilo anidado):

In [22]:
n = [1, 2, 3, 4, 5, 6]
# Filtra pares y luego eleva al cuadrado

resultado_anidado = elevar_al_cuadrado(filtrar_pares(n))
print(list(resultado_anidado))
# Salida: [4, 16, 36]

[4, 16, 36]


Con la librería pipe, escribes esto:

In [23]:
from pipe import where, select

n = [1, 2, 3, 4, 5, 6]

# El mismo flujo, pero escrito con el operador pipe '|'
resultado_pipe = n | where(lambda x: x % 2 == 0) | select(lambda x: x ** 2)

print(list(resultado_pipe))
# Salida: [4, 16, 36]

[4, 16, 36]


Un pipeline es la idea de encadenar operaciones, y la librería pipe te da una sintaxis elegante (|) para construir esos pipelines en tu código.

# Funciones principales de pipe

La librería viene con una serie de funciones predefinidas (llamadas "pipes") que cubren las operaciones más comunes de procesamiento de colecciones. Las más importantes son:

1. **where(predicado)**: Filtra los elementos de una colección. Es el equivalente a filter(). Solo deja pasar los elementos para los que el predicado (una función lambda o normal) devuelve True.
2. **select(transformacion):** Transforma cada elemento de la colección. Es el equivalente a map(). Aplica la función de transformacion a cada elemento.
3. **dedup():** Elimina elementos duplicados consecutivos de la colección. Para eliminar todos los duplicados (no solo los consecutivos), puedes usar | dedup(key=lambda x: x).
4. **sort():** Ordena los elementos. Puedes pasarle un key como en la función sorted() de Python.
6. **reverse():** Invierte el orden de los elementos de la colección.
7. **groupby(clave):** Agrupa los elementos según una clave (una función). Devuelve tuplas (clave, grupo).
8. **take(n):** Toma los primeros n elementos de la colección.

**Ejemplo práctico**

Retomemos el ejemplo de filtrar números pares y elevarlos al cuadrado.

1. Sin pipe (anidando funciones):

In [24]:
def filtrar_pares(nums):
    return [n for n in nums if n % 2 == 0]

def elevar_al_cuadrado(nums):
    return [n**2 for n in nums]

numeros = [1, 2, 3, 4, 5, 6]
resultado_anidado = elevar_al_cuadrado(filtrar_pares(numeros))
print(resultado_anidado)  # Salida: [4, 16, 36]

[4, 16, 36]


2. Con la librería pipe:

In [26]:
from pipe import where, select

numero = [1, 2, 3, 4, 5, 6]

# El flujo de datos se lee de izquierda a derecha
resultado_pipe = numero | where(lambda n: n % 2 == 0) | select(lambda n: n**2)

# El resultado de un pipeline es un generador, por eso lo convertimos a lista
print(list(resultado_pipe))
# Salida: [4, 16, 36]

[4, 16, 36]


* Como puedes ver, la versión con` pipe` describe el proceso paso a paso de forma lineal y es más fácil de seguir.

* Creando tus propias funciones para pipe

* Puedes integrar tus propias funciones en un pipeline usando el decorador `@Pipe`.


In [28]:
from pipe import Pipe

@Pipe
def sumar_uno(iterable):
    for elemento in iterable:
        yield elemento + 1


numeros = [1, 2, 3]

resultado = numeros | sumar_uno | select(lambda x: x * 10)

print(list(resultado))

[20, 30, 40]


La función decorada con `@Pipe `debe aceptar un iterable como primer argumento y usar yield para devolver los elementos procesados, manteniendo así la evaluación perezosa (los datos se procesan uno a uno, sin cargar todo en memoria).

* Ventajas de usar `pipe`

1. **Legibilidad:** El código es mucho más fácil de leer y entender, ya que el flujo de datos es lineal.
2. **Composición:** Facilita la construcción de procesos complejos a partir de funciones simples y reutilizables.
3. **Evaluación Perezosa (Lazy Evaluation):** Las operaciones no se ejecutan hasta que se consume el resultado (por ejemplo, con `list()).` Esto es muy eficiente en memoria, especialmente con grandes volúmenes de datos.
4. **Código Declarativo:** Te centras en "qué" quieres hacer con los datos, no en "cómo" (bucles, variables temporales, etc.).

#  otros paquetes que implementan pipelines en Python:

**1. itertools (nativo de Python)**

* El módulo `itertools` permite crear pipelines con iteradores de manera eficiente.

In [29]:
import itertools

# Pipeline: Filtrar pares y luego elevar al cuadrado
numeros = [1, 2, 3, 4, 5, 6]
pares = filter(lambda x: x % 2 == 0, numeros)
cuadrados = map(lambda x: x**2, pares)

print(list(cuadrados))
# Salida: [4, 16, 36]

[4, 16, 36]


**2. pandas (para datos tabulares)**

* pandas permite construir pipelines con métodos encadenados.

In [32]:
import pandas as pd

# Pipeline: filter pares y elevar al cuadrado
df = pd.DataFrame({'numeros': [1, 2, 3, 4, 5, 6]})
resultado = df[df['numeros'] % 2 == 0].assign(cuadrado=lambda x: x['numeros']**2)

print(resultado)
# Salida:
#    numeros  cuadrado
# 1        2         4
# 3        4        16
# 5        6        36


   numeros  cuadrado
1        2         4
3        4        16
5        6        36


 # Mejores prácticas para implementar pipelines en Python:

 1. Modularidad
Divide cada paso del pipeline en funciones pequeñas y específicas que realicen una única tarea.

In [33]:
def filtrar_pares(nums):
    return [n for n in nums if n % 2 == 0]

def elevar_al_cuadrado(nums):
    return [n**2 for n in nums]

**2. Funciones puras**

Asegúrate de que las funciones no tengan efectos secundarios y siempre devuelvan el mismo resultado para las mismas entradas.

**3. Uso de herramientas declarativas**

Usa librerías como pipe, toolz o pandas para mejorar la legibilidad y evitar anidamientos complejos.

In [34]:
from pipe import where, select

numeros = [1, 2, 3, 4, 5, 6]
resultado = numeros | where(lambda n: n % 2 == 0) | select(lambda n: n**2)
print(list(resultado))

[4, 16, 36]



**4. Evita mutaciones**

Trabaja con estructuras inmutables o crea nuevas estructuras en lugar de modificar las existentes.

**5. Legibilidad**

Usa nombres descriptivos para las funciones y variables.
Prefiere pipelines declarativos sobre anidamientos complejos.

**6. Pruebas unitarias**

Prueba cada función individualmente para garantizar que cada paso del pipeline funcione correctamente.

**7. Evaluación perezosa**

Si trabajas con grandes volúmenes de datos, usa generadores o iteradores para evitar cargar todo en memoria.

**8. Documentación**

Documenta cada paso del pipeline para que otros desarrolladores puedan entenderlo fácilmente.

**9. Uso de excepciones**

Maneja errores en cada paso del pipeline para evitar que un fallo interrumpa todo el proceso.

**10. Reutilización**

Diseña funciones genéricas que puedan ser reutilizadas en diferentes pipelines.

# Aplicaciones en proyectos reales

**1. Procesamiento de datos (ETL)**

En proyectos de análisis de datos, los pipelines se usan para extraer, transformar y cargar datos (ETL).



In [1]:
import pandas as pd

# Pipeline de ETL
def extraer_datos():
    return pd.DataFrame({'nombre': ['Ana', 'Carlos'], 'edad': [30, 25]})

def transformar_datos(df):
    return df[df['edad'] > 26].assign(edad_cuadrada=lambda x: x['edad']**2)

def cargar_datos(df):
    print("Datos cargados:")
    print(df)

# Ejecución del pipeline
datos = extraer_datos()
datos_transformados = transformar_datos(datos)
cargar_datos(datos_transformados)

Datos cargados:
  nombre  edad  edad_cuadrada
0    Ana    30            900


# 1. Preprocesamiento para Machine Learning con `scikit-learn`

* En machine Learning, es crucial aplicar la misma secuencia de transformaciones a los datos de entrenamiento y de prueba.

*  `scikit-learn` tiene una clase Pipeline diseñada especidificamente para esto.

**Objetivo:** Preparar un conjunto de datos con valores numéricos y categóricos para entrenar un modelo de clasificación.

**Pasos del Pipeline:**
1. **Imputar valores faltantes:** Rellenar los NaN en columnas numéricas con la media.
2. **Escalar características numéricas:** Normalizar los valores numéricos (ej. StandardScaler).
3. **Codificar características categóricas:** Convertir texto a números (ej. OneHotEncoder).
4. **Entrenar un modelo:** Usar los datos preprocesados para entrenar un clasificador.


In [7]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier

# Datos de ejemplo con valores faltantes y diferentes tipos de datos. Donde data es una lista de diccionarios.
data = {
    'age': [25, 30, None, 45, 22],
    'salary': [50000, 60000,75000, None, 55000],
    'city': ['Barcelona', 'Madrid', 'Lisboa', 'Paris', 'Madrid'],
    'purchases': [0, 1, 1, 1, 0]
}

# Crear un dataframe de pandas con los datos de ejemplo.
df = pd.DataFrame(data)

# aqui puedes ver el dataframe
# df

# Separa características y objetivo y usamos los metodos drop y assign para eliminar la columna objetivo y crear una nueva columna con el nombre de la columna objetivo.
# purchase es la columna objetivo y drop es para eliminar la columna objetivo.
#  la variable y es la columna objetivo.
X = df.drop('purchases', axis=1)
y = df['purchases']

# Identificar columnas numéricas y categóricas
numeric_features = ['age', 'salary']
categorical_features = ['city']

# Crear un pipeline para preprocesamiento los datos numéricos.
# el metodo imputer es para rellenar los NaN con la media,
# el metodo means es para calcular la media de los datos.
# stratery es para calcular la media de los datos. SimpleImputer es para rellenar los NaN con la media. StandardScaler es para normalizar los datos.
# el metodo scaler es para normalizar los datos,
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='mean')),  # Imputar valores faltantes con la media
    ('scaler', StandardScaler())  # Escalar características, StandardScaler es para normalizar los datos.
])

# Crear el pipeline de procesamiento para datos categóricos
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore')) # paso 3: Codificar
])

# Combinar los Pipeline para procesamiento usando ColumnTransformer
preprocessor = ColumnTransformer(
    transformers = [
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)

# Crear el Pipeline final que incluye el preprocesamiento y el modelo
ml_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier()) # Paso 4: Modelo
])

# Entrenar todo el Pipeline con una sola llamada
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
ml_pipeline.fit(X_train, y_train)

# El Pipiline se encarga de transpormar los datos y predecir
print(f"Precisión del modelo: {ml_pipeline.score(X_test, y_test)}")

Precisión del modelo: 1.0


# 2. Procesamiento de Lenguaje Natural (NLP) con spaCy

* La librería `spaCy` está construida alrededor del concepto de un pipeline de procesamiento de texto. Cuando procesas un texto, este pasa a través de varias etapas como tokenización, etiquetado gramatical, etc.

**Objetivo:** Limpiar y extraer información clave de una serie de comentarios de usuarios.

**Pasos del Pipeline:**

1. **Tokenización:** Dividir el texto en palabras y signos de puntuación.
2. **Lematización:** Reducir cada palabra a su forma base (ej. "corriendo" -> "correr").
3. **Eliminación de Stop Words:** Quitar palabras comunes que no aportan significado (como "el", "y", "un").
4. **Extracción de Entidades:** Identificar nombres de personas, organizaciones, lugares, etc.

In [8]:
import spacy

# Cargar un modelo de spaCy (la primera vez puede requerir: python -m spacy download es_core_news_sm)
nlp = spacy.load("es_core_news_sm")

# El pipeline por defecto de este modelo es:
# ['tok2vec', 'morphologizer', 'parser', 'attribute_ruler', 'lemmatizer', 'ner']
print("Componentes del pipeline de spaCy:", nlp.pipe_names)

comments = [
    "Ana de Tejas Verdes compró acciones de Apple Inc. ayer en Madrid.",
    "El CEO de Google, Sundar Pichai, dará una conferencia la próxima semana."
]

# Procesar los textos a través del pipeline de spaCy
for doc in nlp.pipe(comments):
    print(f"\n--- Comentario: '{doc.text}' ---")

    # Extraer lemas limpios (sin stop words ni puntuación)
    lemmas_limpios = [
        token.lemma_ for token in doc if not token.is_stop and not token.is_punct
    ]
    print("Lemas limpios:", lemmas_limpios)

    # Extraer entidades nombradas (NER)
    print("Entidades encontradas:")
    for ent in doc.ents:
        print(f"  - {ent.text} ({ent.label_})")

Componentes del pipeline de spaCy: ['tok2vec', 'morphologizer', 'parser', 'attribute_ruler', 'lemmatizer', 'ner']

--- Comentario: 'Ana de Tejas Verdes compró acciones de Apple Inc. ayer en Madrid.' ---
Lemas limpios: ['Ana', 'Tejas', 'Verdes', 'comprar', 'acción', 'Apple', 'Inc', 'ayer', 'Madrid']
Entidades encontradas:
  - Ana de Tejas Verdes (PER)
  - Apple Inc (MISC)
  - Madrid (LOC)

--- Comentario: 'El CEO de Google, Sundar Pichai, dará una conferencia la próxima semana.' ---
Lemas limpios: ['CEO', 'Google', 'Sundar', 'Pichai', 'dar', 'conferencia', 'próximo', 'semana']
Entidades encontradas:
  - Google (ORG)
  - Sundar Pichai (PER)


# 3. Procesamiento de Imágenes con Pillow

* Un pipeline también puede ser una secuencia de funciones para procesar archivos, como imágenes.

* **Objetivo**: Crear miniaturas en escala de grises para un lote de imágenes.

**Pasos del Pipeline:**

1. **Leer archivo de imagen:** Abrir la imagen desde el disco.
2. **Convertir a escala de grises:** Eliminar la información de color.
3. **Redimensionar:** Cambiar el tamaño a un máximo de 128x128 píxeles.
4. **Guardar resultado:** Escribir la nueva imagen en una carpeta de destino.



In [9]:
from PIL import Image
import glob
import os

# Función para el pipeline de una sola imagen
def process_image_pipeline(image_path, output_dir, size=(128, 128)):
    try:
        # Paso 1: Leer
        img = Image.open(image_path)

        # Paso 2: Convertir a escala de grises
        img_gray = img.convert('L')

        # Paso 3: Redimensionar (crear miniatura)
        img_gray.thumbnail(size)

        # Paso 4: Guardar
        base_name = os.path.basename(image_path)
        output_path = os.path.join(output_dir, base_name)
        img_gray.save(output_path)
        print(f"Procesada: {base_name}")

    except Exception as e:
        print(f"Error procesando {image_path}: {e}")

# --- Ejecución del pipeline para un lote de imágenes ---
# (Asegúrate de tener una carpeta 'source_images' con algunas imágenes .jpg o .png
# y una carpeta vacía 'thumbnails')

SOURCE_DIR = 'source_images'
OUTPUT_DIR = 'thumbnails'

if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

# Obtener todas las imágenes de la carpeta de origen
image_files = glob.glob(f'{SOURCE_DIR}/*[.jpg|.png]')

# Aplicar el pipeline a cada imagen
for file_path in image_files:
    process_image_pipeline(file_path, OUTPUT_DIR)

Procesada: tech.jpg


* Se crea un pipeline (una secuencia de pasos) para procesar automáticamente un lote de imágenes.

* El objetivo es tomar todas las imágenes de una carpeta, convertirlas a escala de grises, crear una versión en miniatura `(thumbnail)` y guardarlas en una nueva carpeta.

**Paso 1: Importar las librerías necesarias**

`from PIL import Image`

`import glob`

`import os`

* `from PIL import Image:` Se importa el módulo Image de la librería `Pillow (PIL Fork).` Esta es la herramienta principal para abrir, manipular y guardar imágenes.

*`import glob`: Se importa el módulo glob, que sirve para encontrar archivos que coinciden con un patrón específico (por ejemplo, todos los archivos .jpg en una carpeta).

* `import os:` Se importa el módulo os `(Operating System),` que proporciona funciones para interactuar con el sistema operativo, como crear carpetas `(os.makedirs) `o manipular rutas de archivos` (os.path.basename, os.path.join).`

**Paso 2: Definir la función del pipeline (process_image_pipeline)**
* Se define una función llamada process_image_pipeline que contiene toda la lógica para procesar una sola imagen.

**Argumentos:**

**image_path:** La ruta al archivo de la imagen que se va a procesar.

**output_dir:** La carpeta donde se guardará la imagen procesada.

**size=(128, 128):** El tamaño máximo para la miniatura. Es un argumento opcional con un valor por defecto de 128x128 píxeles.

**try...except**: Se usa un bloque try...except para manejar posibles errores. Si algo falla al procesar una imagen (por ejemplo, el archivo está corrupto), el programa imprimirá un error y continuará con la siguiente imagen en lugar de detenerse por completo.

**Paso 3: Lógica de procesamiento dentro de la función**

* Leer: Image.open(image_path) abre el archivo de imagen desde la ruta especificada y lo carga en un objeto Image.

* Convertir a escala de grises: img.convert('L') convierte la imagen a modo 'L' (Luminancia), que es la representación en escala de grises.

* Redimensionar: img_gray.thumbnail(size) redimensiona la imagen. Es importante notar que .thumbnail() modifica la imagen in-place y mantiene la proporción original, asegurando que no se estire. La imagen resultante cabe dentro del cuadro delimitador definido por size (ej. 128x128).

**Guardar:**

* os.path.basename(image_path) extrae solo el nombre del archivo de la ruta completa (ej. de 'source_images/foto1.jpg' obtiene 'foto1.jpg').

* os.path.join(output_dir, base_name) construye la ruta de destino completa para guardar la nueva imagen (ej. 'thumbnails/foto1.jpg').

* img_gray.save(output_path) guarda la imagen procesada (en escala de grises y redimensionada) en la ruta de destino.

* print(...) muestra un mensaje en la consola para confirmar que la imagen fue procesada.

**Paso 4: Ejecución del pipeline para un lote de imágenes**

`Esta es la parte del script que orquesta todo el proceso.`

**Configuración:** Se definen las variables SOURCE_DIR y OUTPUT_DIR para especificar las carpetas de origen y destino. Esto hace que el código sea más fácil de modificar.

**Crear Directorio de Salida:** if not os.path.exists(OUTPUT_DIR):

* `os.makedirs(OUTPUT_DIR) `comprueba si la carpeta de destino (thumbnails) existe. Si no existe, la crea. Esto evita que el script falle si la carpeta no ha sido creada manualmente.

* `Encontrar las Imágenes:` glob.glob(f'{SOURCE_DIR}/*[.jpg|.png]') busca en la carpeta source_images todos los archivos que terminen en .jpg o .png y devuelve una lista con sus rutas.

* `Aplicar el Pipeline: `El bucle for itera sobre la lista de rutas de archivos de imagen.
* En cada iteración, llama a la función process_image_pipeline pasándole la ruta del archivo actual y la carpeta de destino.
* Esto aplica la secuencia de procesamiento a cada imagen encontrada.


# Colecciones + funciones de orden superior

Pasar funciones como argumentos (map, filter, reduce).

In [10]:
from functools import reduce

numeros = [1, 2, 3, 4, 5]

# map → aplicar función a cada elemento
cuadrados = list(map(lambda x: x**2, numeros))
print("Cuadrados:", cuadrados)  # [1, 4, 9, 16, 25]

# filter → filtrar según condición
pares = list(filter(lambda x: x % 2 == 0, numeros))
print("Pares:", pares)  # [2, 4]

# reduce → acumular un valor
suma_total = reduce(lambda acc, x: acc + x, numeros, 0)
print("Suma:", suma_total)  # 15


Cuadrados: [1, 4, 9, 16, 25]
Pares: [2, 4]
Suma: 15


# Evaluación perezosa

Generadores que procesan datos sin cargar todo en memoria.

In [11]:
def numeros_grandes(n):
    for i in range(n):
        yield i * 2  # se calcula solo cuando se pide

gen = numeros_grandes(5)

print(next(gen))  # 0
print(next(gen))  # 2
print(list(gen))  # [4, 6, 8]

0
2
[4, 6, 8]


`Útil para datasets grandes: no guardamos todo en memoria.
`

# TÉCNICAS SENIOR

1. Functional Core, Imperative Shell (Núcleo Funcional, Caparazón Imperativo)
Esta es una arquitectura de software que separa la lógica de negocio pura de las interacciones con el mundo exterior (efectos secundarios).


`Núcleo Funcional (Functional Core):`
* Contiene toda la lógica de negocio compleja.
* Se compone de funciones puras que no realizan I/O (lectura/escritura de archivos, llamadas a APIs, logging).
* Solo reciben datos y devuelven un resultado.
* Esto hace que el núcleo sea predecible, fácil de probar unitariamente y reutilizable.

`Caparazón Imperativo (Imperative Shell):` Es la capa exterior que interactúa con el mundo. Se encarga de las tareas "sucias":
* leer archivos, obtener datos de una base de datos, imprimir en la consola, registrar logs, etc.
* El caparazón llama a las funciones del núcleo, les proporciona los datos que necesitan y luego toma los resultados para realizar más acciones con efectos secundarios.

**Ejemplo**

1. Imagina que necesitamos procesar un archivo de ventas, calcular un resumen y guardar un informe.

In [13]:
# --- Núcleo Funcional (Lógica Pura y Comprobable) ---
# Esta función no sabe de archivos, solo procesa datos.
def generar_resumen_ventas(ventas: list[dict]) -> dict:
    """
    Toma una lista de ventas y calcula el total y el número de transacciones.
    Es una función pura.
    """
    if not ventas:
        return {"total_ventas": 0, "num_transacciones": 0}

    total = sum(item['monto'] for item in ventas)
    transacciones = len(ventas)

    return {"total_ventas": total, "num_transacciones": transacciones}


# --- Caparazón Imperativo (Maneja I/O y Efectos Secundarios) ---
def procesar_informe_ventas(archivo_entrada: str, archivo_salida: str):
    """
    Orquesta el proceso: lee datos, llama al núcleo y guarda el resultado.
    """
    try:
        # 1. Leer datos (efecto secundario: I/O)
        print(f"Leyendo datos de '{archivo_entrada}'...")
        # En un caso real, aquí leerías un JSON o CSV
        datos_crudos = [
            {"id": 1, "monto": 100},
            {"id": 2, "monto": 150},
            {"id": 3, "monto": 75}
        ]

        # 2. Llamar al núcleo funcional con los datos limpios
        resumen = generar_resumen_ventas(datos_crudos)

        # 3. Escribir el resultado (efecto secundario: I/O)
        print(f"Guardando resumen en '{archivo_salida}'...")
        with open(archivo_salida, 'w') as f:
            f.write(f"Total de Ventas: {resumen['total_ventas']}\n")
            f.write(f"Número de Transacciones: {resumen['num_transacciones']}\n")

        print("Proceso completado.")

    except Exception as e:
        # 4. Manejar errores (efecto secundario: logging)
        print(f"Error durante el proceso: {e}")

# --- Ejecución ---
procesar_informe_ventas("ventas.json", "resumen.txt")

Leyendo datos de 'ventas.json'...
Guardando resumen en 'resumen.txt'...
Proceso completado.


# 2. Pipelines con Generadores para Datasets Grandes

* Esta técnica combina los pipelines con generadores (yield) para procesar grandes volúmenes de datos con un uso de memoria mínimo.
* Los yield son una forma de devolver datos de forma incrementa.
* En lugar de cargar todo el archivo en memoria, los datos fluyen a través del pipeline elemento por elemento. Esto se conoce como evaluación perezosa (lazy evaluation).

**Ejemplo**

1. Vamos a procesar un archivo de logs de gran tamaño para encontrar y extraer información de las líneas de error, sin cargar nunca el archivo completo en la memoria.

In [14]:
def leer_lineas_log(nombre_archivo: str):
    """Generador que lee un archivo línea por línea (perezosamente)."""
    print("-> Abriendo archivo...")
    with open(nombre_archivo, 'r') as f:
        for linea in f:
            yield linea.strip()

def filtrar_lineas_error(lineas):
    """Generador que solo deja pasar las líneas que contienen 'ERROR'."""
    for linea in lineas:
        if 'ERROR' in linea:
            yield linea

def extraer_info_error(lineas_error):
    """Generador que parsea una línea de error y devuelve un diccionario."""
    for linea in lineas_error:
        partes = linea.split(':', 2)
        if len(partes) == 3:
            yield {
                "timestamp": partes[0],
                "tipo": partes[1].strip(),
                "mensaje": partes[2].strip()
            }

# --- Creación y uso del Pipeline ---

# 1. Crear un archivo de log de ejemplo
log_data = """
2023-10-27T10:00:00: INFO: Servicio iniciado
2023-10-27T10:01:15: ERROR: Fallo en la conexión a la base de datos
2023-10-27T10:02:00: INFO: Usuario 'admin' ha iniciado sesión
2023-10-27T10:03:30: WARNING: Uso de CPU al 85%
2023-10-27T10:04:00: ERROR: Timeout en la petición a la API externa
"""
with open("app.log", "w") as f:
    f.write(log_data.strip())

# 2. Construir el pipeline de generadores
# Ninguna operación se ejecuta todavía, solo se conectan los generadores.
pipeline = extraer_info_error(
    filtrar_lineas_error(
        leer_lineas_log("app.log")
    )
)

# 3. Consumir el pipeline
# Ahora es cuando el archivo se lee y los datos fluyen a través del pipeline.
print("Consumiendo el pipeline de errores:")
for error_info in pipeline:
    print(error_info)

# Salida:
# Consumiendo el pipeline de errores:
# -> Abriendo archivo...
# {'timestamp': '2023-10-27T10:01:15', 'tipo': 'ERROR', 'mensaje': 'Fallo en la conexión a la base de datos'}
# {'timestamp': '2023-10-27T10:04:00', 'tipo': 'ERROR', 'mensaje': 'Timeout en la petición a la API externa'}

Consumiendo el pipeline de errores:
-> Abriendo archivo...
{'timestamp': '2023-10-27T10', 'tipo': '01', 'mensaje': '15: ERROR: Fallo en la conexión a la base de datos'}
{'timestamp': '2023-10-27T10', 'tipo': '04', 'mensaje': '00: ERROR: Timeout en la petición a la API externa'}


# 3. Idempotencia de Transformaciones de Limpieza

* Una operación es idempotente si aplicarla varias veces produce el mismo resultado que aplicarla una sola vez.
* En la limpieza de datos, esto es crucial para asegurar que si un pipeline se ejecuta accidentalmente más de una vez sobre los mismos datos, el resultado final no se corrompa.


**Ejemplo**

1. Imagina que tenemos una función para limpiar nombres de productos.

In [15]:
# --- Transformación NO Idempotente (Incorrecta) ---
def agregar_prefijo_producto_malo(nombre: str) -> str:
    """Agrega un prefijo. Si se ejecuta dos veces, el prefijo se duplica."""
    return f"PROD-{nombre}"

# --- Transformación Idempotente (Correcta) ---
def limpiar_nombre_producto(nombre: str) -> str:
    """
    Limpia un nombre de producto de forma idempotente:
    1. Convierte a mayúsculas.
    2. Elimina espacios en blanco al inicio y al final.
    3. Elimina el prefijo 'PROD-' si ya existe antes de añadirlo.
    """
    # 1. Normalizar (mayúsculas y espacios)
    nombre_limpio = nombre.upper().strip()

    # 2. Asegurar que el prefijo no se duplique
    if nombre_limpio.startswith("PROD-"):
        # Si ya lo tiene, no hacemos nada para evitar 'PROD-PROD-...'
        return nombre_limpio
    else:
        # Si no lo tiene, lo agregamos
        return f"PROD-{nombre_limpio}"


# --- Demostración ---

# 1. Con la función mala (NO idempotente)
nombre1 = "  Laptop  "
print(f"Original: '{nombre1}'")
nombre1 = agregar_prefijo_producto_malo(nombre1)
print(f"Después de 1ª llamada: '{nombre1}'") # 'PROD-  Laptop  '
nombre1 = agregar_prefijo_producto_malo(nombre1)
print(f"Después de 2ª llamada: '{nombre1}'") # 'PROD-PROD-  Laptop  ' (¡Incorrecto!)
print("-" * 20)

# 2. Con la función buena (Idempotente)
nombre2 = "  Laptop  "
print(f"Original: '{nombre2}'")
nombre2 = limpiar_nombre_producto(nombre2)
print(f"Después de 1ª llamada: '{nombre2}'") # 'PROD-LAPTOP'
nombre2 = limpiar_nombre_producto(nombre2)
print(f"Después de 2ª llamada: '{nombre2}'") # 'PROD-LAPTOP' (Correcto, el resultado no cambia)

Original: '  Laptop  '
Después de 1ª llamada: 'PROD-  Laptop  '
Después de 2ª llamada: 'PROD-PROD-  Laptop  '
--------------------
Original: '  Laptop  '
Después de 1ª llamada: 'PROD-LAPTOP'
Después de 2ª llamada: 'PROD-LAPTOP'


# 4 Memoización y Caché
La memoización es una técnica para almacenar los resultados de funciones costosas y reutilizarlos cuando se llaman con los mismos argumentos.

In [16]:
from functools import lru_cache

@lru_cache(maxsize=128)  # Almacena hasta 128 resultados
def fibonacci(n):
    if n <= 1:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

print(fibonacci(50))  # Calcula rápidamente gracias al caché

12586269025


5 # Decoradores Personalizados
Los decoradores permiten modificar el comportamiento de funciones o métodos de manera reutilizable.

In [18]:
def logger(func):
    def wrapper(*args, **kwargs):
        print(f"Llamando a {func.__name__} con {args} y {kwargs}")
        result = func(*args, **kwargs)
        print(f"{func.__name__} devolvió {result}")
        return result
    return wrapper

@logger
def suma(a, b):
    return a + b

suma(3, 5)

Llamando a suma con (3, 5) y {}
suma devolvió 8


8

# 6 Programación Orientada a Aspectos (AOP)
Separa las preocupaciones transversales (como logging o validación) del código principal.

In [19]:
class Aspecto:
    def __init__(self, func):
        self.func = func

    def __call__(self, *args, **kwargs):
        print(f"Antes de ejecutar {self.func.__name__}")
        resultado = self.func(*args, **kwargs)
        print(f"Después de ejecutar {self.func.__name__}")
        return resultado

@Aspecto
def procesar_datos(data):
    print(f"Procesando {data}")

procesar_datos("datos importantes")

Antes de ejecutar procesar_datos
Procesando datos importantes
Después de ejecutar procesar_datos


# 7 Inyección de Dependencias
Permite desacoplar componentes al pasar dependencias como argumentos.

In [21]:
class Servicio:
    def ejecutar(self):
        print("Servicio ejecutado")

class Cliente:
    def __init__(self, servicio):
        self.servicio = servicio

    def realizar_tarea(self):
        self.servicio.ejecutar()

servicio = Servicio()
cliente = Cliente(servicio)
cliente.realizar_tarea()

Servicio ejecutado


# 8 Metaprogramación con type
Crea clases dinámicamente en tiempo de ejecución.

In [22]:
def crear_clase(nombre, atributos):
    return type(nombre, (object,), atributos)

Persona = crear_clase("Persona", {"nombre": "Carlos", "edad": 30})
p = Persona()
print(p.nombre, p.edad)

Carlos 30


# 9 Procesamiento Paralelo

Usa múltiples núcleos para acelerar tareas.

In [None]:
from multiprocessing import Pool

def cuadrado(n):
    return n * n

with Pool(4) as p:
    resultados = p.map(cuadrado, [1, 2, 3, 4, 5])
    print(resultados)

# 10 Programación Reactiva
Maneja flujos de datos asíncronos.

In [None]:
from rx import from_iterable

numeros = from_iterable([1, 2, 3, 4, 5])
numeros.subscribe(lambda x: print(f"Recibido: {x}"))

# Limpieza y validación de datos

**Tareas típicas**

1. Normalizar texto: strip, `casefold, lower, Unicode NFKC (unicodedata.normalize).`

2. P`arsear números/fechas: int, float, datetime.strptime.`

3. Manejar valores `vacíos/None`, eliminar duplicados (set, dict.fromkeys).

4. Validar esquemas: tipos, rangos, obligatoriedad.

**Técnicas senior**

* Pipelines inmutables (no mutar entrada):

* Validación declarativa (p. ej., dataclasses + validadores o pydantic si está permitido).

* Estrategia EAFP (Easier to Ask Forgiveness than Permission): intenta parsear, captura ValueError.

In [None]:
from typing import Iterable

def pipeline(datos: Iterable[str]):
    step1 = map(lambda s: s.strip(), datos)
    step2 = filter(None, step1) # quita vacíos
    step3 = map(lambda s: s.casefold(), step2)
    return step3 # generador; consumir más tarde

# Separación de capas (arquitectura mínima limpia)

1. Dominio (negocio): funciones puras (reglas de stock, duplicados, etc.).

2. Infraestructura: lectura/escritura (JSON, archivos), logging.

3. Presentación: CLI/UI que imprime o muestra datos.

**Técnicas senior**

Interfaces simples (protocolos): funciones reciben/retornan estructuras planas (dict, list) o modelos tipados.

Inyección de dependencias: pasar lectores/escritores como argumentos para testear sin IO real.

Logs y errores: logging con niveles; excepciones específicas (p. ej., StockInsuficiente).

# Preparación para JSON

1. Serializar: json.dump/loads, ensure_ascii=False para acentos.

2. Mantener contratos de datos (versionar esquema, claves estables).

3. Convertir set ↔ list al serializar.

In [None]:
import json

payload = {"productos": ["manzana", "pera"], "tags": list({"fruta", "fresco"})}

with open("inv.json", "w", encoding="utf-8") as f:
    json.dump(payload, f, ensure_ascii=False, indent=2)

# Ejemplo básico (paso a paso)

Problema: Consolidar productos de dos listas, limpiar duplicados, validar stock mínimo y preparar reporte.

**1. Datos**

In [None]:
s1 = [" manzana", "Pera", "manzana ", "Uva"]
s2 = ["Kiwi", "Pera", "Mango "]
stock = {"manzana": 5, "pera": 2, "uva": 7, "kiwi": 0, "mango": 1}
minimos = {"manzana": 3, "pera": 3, "uva": 2, "kiwi": 2, "mango": 2}

**1.2 Limpieza con map/filter**

In [None]:
norm = lambda s: " ".join(s.strip().split()).lower()
cat_1 = list(map(norm, s1))
cat_2 = list(map(norm, s2))

**1.3 Consolidación y duplicados**

In [None]:
consolidado = sorted(set(cat_1) | set(cat_2))

**1.4 Validación de stock con map y filter**

In [None]:
faltantes = list(filter(lambda p: stock.get(p, 0) == 0, consolidado))
stock_bajo = list(filter(lambda p: stock.get(p, 0) < minimos.get(p, 0), consolidado))

**1.5 Reducción: total de unidades disponibles**

In [None]:
from functools import reduce


total_unidades = reduce(lambda acc, p: acc + stock.get(p, 0), consolidado, 0)
reporte = {"catalogo": consolidado, "faltantes": faltantes, "stock_bajo": stock_bajo, "total": total_unidades}

**Conceptos aplicados:**

* normalización, set para unicidad, map/filter/reduce, composición simple.

# Ejemplo senior (arquitectura limpia + pipeline funcional)

**Objetivo:** Mini sistema de inventario con reglas de negocio puras, validación y reporte; separación de capas y técnicas pro.

**1. Modelos de dominio (tipados) y validadores**

In [4]:
from dataclasses import dataclass
from typing import Iterable, Callable, Dict, List, Tuple
from functools import reduce
from collections import defaultdict
import unicodedata

# --- Utilidades de normalización ---
def nfkc(s: str) -> str:
    return unicodedata.normalize("NFKC", s)

norm_text: Callable[[str], str] = lambda s: " ".join(nfkc(s).strip().split()).casefold()

# --- Modelos ---
@dataclass(frozen=True)
class Producto:
    sku: str
    nombre: str

@dataclass
class Stock:
    unidades: int
    minimo: int

# --- Tipos ---
Catalogo = Dict[str, Producto] # sku -> Producto
Existencias = Dict[str, Stock] # sku -> Stock
NombreIndex = Dict[str, List[str]] # nombre_norm -> [skus]

**2. Capa de negocio: reglas puras**

In [5]:
# Construir catálogo consolidado desde múltiples fuentes de (sku, nombre)

def construir_catalogo(*fuentes: Iterable[Tuple[str, str]]) -> Catalogo:
    def step(fuente: Iterable[Tuple[str, str]]) -> Catalogo:
        return {sku: Producto(sku=sku, nombre=norm_text(nombre)) for sku, nombre in fuente}
# Reducimos catálogos parciales en uno solo (prioridad última fuente)
    return reduce(lambda acc, cat: (acc | cat), map(step, fuentes), {})


# Índice por nombre normalizado → skus (muchos a uno)
def indexar_por_nombre(cat: Catalogo) -> NombreIndex:
    idx: NombreIndex = defaultdict(list)
    for sku, prod in cat.items():
        idx[prod.nombre].append(sku)
        return idx

# Detectar duplicados por nombre (múltiples SKUs con el mismo nombre)
def duplicados_por_nombre(idx: NombreIndex) -> Dict[str, List[str]]:
    return {nombre: skus for nombre, skus in idx.items() if len(skus) > 1}

# Productos con stock bajo o faltante
def evaluar_stock(cat: Catalogo, ex: Existencias) -> Tuple[List[str], List[str]]:
    skus = cat.keys()
    faltantes = [sku for sku in skus if ex.get(sku, Stock(0, 0)).unidades == 0]
    bajos = [sku for sku in skus if ex.get(sku, Stock(0, 0)).unidades < ex.get(sku, Stock(0, 0)).minimo]
    return faltantes, bajos

# Plan de reposición mínimo (llevar a mínimo + buffer)
def plan_reposicion(ex: Existencias, buffer: int = 2) -> Dict[str, int]:
    def falta(sku_stock: Tuple[str, Stock]) -> Tuple[str, int]:
        sku, st = sku_stock
        objetivo = max(st.minimo + buffer, st.minimo)
        return (sku, max(0, objetivo - st.unidades))

        pares = map(falta, ex.items())
        return {sku: qty for sku, qty in pares if qty > 0}

**3. Capa de infraestructura (IO) — simulada**

In [6]:
# Estas funciones podrían leer/escribir JSON en producción.
def cargar_fuentes() -> Tuple[list[tuple[str, str]], list[tuple[str, str]]]:
    fuente_a = [("A1", "Manzana "), ("B2", "Pera"), ("C3", "Uva ")]
    fuente_b = [("B2", " Pera"), ("D4", "Kiwi"), ("E5", "Mango")]
    return fuente_a, fuente_b


def cargar_existencias() -> Existencias:
    return {
        "A1": Stock(5, 3),
        "B2": Stock(2, 3),
        "C3": Stock(7, 2),
        "D4": Stock(0, 2),
        "E5": Stock(1, 2),
    }

**4. Orquestación (presentación)**

In [7]:
if __name__ == "__main__":
    fuente_a, fuente_b = cargar_fuentes()
    cat = construir_catalogo(fuente_a, fuente_b)
    idx = indexar_por_nombre(cat)
    ex = cargar_existencias()

dups = duplicados_por_nombre(idx)
faltantes, bajos = evaluar_stock(cat, ex)
plan = plan_reposicion(ex, buffer=2)

print("=== RESUMEN INVENTARIO ===")
print("SKUs:", sorted(cat.keys()))
print("Duplicados por nombre:", dups)
print("Faltantes:", faltantes)
print("Stock bajo:", bajos)
print("Plan reposición:", plan)

=== RESUMEN INVENTARIO ===
SKUs: ['A1', 'B2', 'C3', 'D4', 'E5']
Duplicados por nombre: {}
Faltantes: ['D4']
Stock bajo: ['B2', 'D4', 'E5']
Plan reposición: None


Qué aprendemos aquí

Pipelines con map/reduce sin mutación de entrada.

Índices para consultas O(1) y detección de duplicados por nombre.

Reglas de negocio puras (fáciles de testear).

Separación clara entre datos (dominio), infraestructura e interfaz.

# Técnicas senior (checklist)

Type hints + dataclasses → APIs claras y predecibles.

Funciones pequeñas y pure-first: aislar IO.

operator.itemgetter/attrgetter para ordenar/extraer campos.

itertools: groupby, chain, islice para flujos grandes.

functools.partial para fijar parámetros en pipelines.

lru_cache para memoizar cálculos costosos (si son puros).

Errores explícitos: crea excepciones de dominio (p. ej., class StockInsuficiente(Exception): ...).

Logs con logging y niveles (info, warning, error).

Complejidad: prefiere O(n) con una sola pasada y generadores.

Testing: tests del core sin tocar disco/red.