# Streams avanzados y Parallel Streams

## Introducción a Streams Avanzados en Java

### Streams y su importancia en el manejo de colecciones

Un **Stream** en Java es una abstracción que permite procesar colecciones de datos de forma funcional, declarativa y eficiente. En lugar de trabajar con bucles tradicionales, los Streams proporcionan una manera más concisa y expresiva de manejar datos.

#### Características clave
- **No almacenan datos**: Un Stream no es una estructura de datos en sí misma, sino que opera sobre la fuente (como una lista o un conjunto).
- **Inmutabilidad**: No modifican la colección original; generan nuevas instancias o resultados.
- **Encadenamiento**: Permiten encadenar operaciones, creando un flujo de procesamiento claro y lógico.


### Diferencia entre operaciones intermedias y terminales

- **Intermedias**: Transforman el Stream y devuelven otro Stream. Son "lazy" (evaluación perezosa), lo que significa que no se ejecutan hasta que se invoca una operación terminal.

Ejemplo: `filter()`, `map()`, `sorted()`.

- **Terminales**: Estas operaciones devuelven un resultado concreto, como un valor, una colección o ejecutan una acción final.

Ejemplo: `collect()`, `forEach()`, `reduce()`.


In [None]:
var nombres = Arrays.asList("Cesar", "Luis", "Maria");
var resultado = nombres.stream()
        .filter(nombre -> nombre.startsWith("C"))
        .map(String::toUpperCase)
        .sorted()
        .collect(Collectors.toList());

System.out.println(resultado);

### Lazy Evaluation

Una de las características más poderosas de los Streams es la **evaluación perezosa**. Esto significa que las operaciones intermedias no procesan los datos hasta que una operación terminal lo desencadena. Esto mejora el rendimiento, ya que solo se procesan los elementos necesarios.

In [None]:
var numeros = Arrays.asList(1, 2, 3, 4, 5, 6);
numeros.stream()
       .filter(n -> n > 2) // No se ejecuta aún.
       .map(n -> n * 2)   // Tampoco se ejecuta.
       .forEach(System.out::println); // Ahora todas las operaciones se ejecutan.

### Beneficios de usar Streams

- Código más legible y conciso.
- Eliminación de bucles anidados.
- Mejor aprovechamiento de la arquitectura multicore con **Parallel Streams**.

## Operaciones Avanzadas con Streams

### Uso de `flatMap` para transformar y aplanar estructuras de datos

El método `flatMap` es ideal para trabajar con estructuras de datos complejas, como listas de listas. Este operador "aplana" múltiples niveles de datos, transformándolos en un único Stream.

In [None]:
var listaDeListas = Arrays.asList(
    Arrays.asList("Java", "Streams"),
    Arrays.asList("FlatMap", "Avanzado")
);

var resultado = listaDeListas.stream()
        .flatMap(List::stream) // Aplana las listas.
        .collect(Collectors.toList());
System.out.println(resultado);
// Salida: [Java, Streams, FlatMap, Avanzado]

### Ejecución de operaciones personalizadas con `peek`


El método `peek` se utiliza para realizar acciones adicionales durante el procesamiento, como depuración o registro, sin afectar la transformación.

In [None]:
var nombres = Arrays.asList("Cesar", "Luis", "Ana");

nombres.stream()
       .peek(nombre -> System.out.println("Procesando: " + nombre)) // Depuración.
       .map(String::toUpperCase)
       .forEach(System.out::println);

**Nota**: Aunque `peek` puede ser útil, úsalo con cuidado, ya que no está diseñado para reemplazar operaciones terminales como `forEach`.

### Manejo de excepciones en Streams


Dado que muchas operaciones de Streams pueden lanzar excepciones, es esencial manejarlas adecuadamente. Para ello, puedes usar funciones personalizadas o encapsular operaciones en bloques `try-catch`.


In [None]:
List<String> numeros = Arrays.asList("10", "20", "abc", "30");

numeros.stream()
       .map(n -> {
           try {
               return Integer.parseInt(n);
           } catch (NumberFormatException e) {
               System.err.println("Error al convertir: " + n);
               return 0;
           }
       })
       .forEach(System.out::println);
// Salida: 10, 20, 0, 30

### Uso avanzado de `reduce` para agregaciones

El método `reduce` permite realizar operaciones de agregación, como sumar, encontrar el valor máximo/mínimo, concatenar cadenas, entre otros.

In [None]:
var numeros = Arrays.asList(1, 2, 3, 4);

var suma = numeros.stream()
        .reduce(0, Integer::sum); // Inicio en 0 y suma cada elemento.
System.out.println(suma); // Salida: 10

In [None]:
var maximo = numeros.stream()
        .reduce(Integer::max);
maximo.ifPresent(System.out::println); // Salida: 4

## Parallel Streams

Los **Parallel Streams** son una característica de Java que permite dividir las tareas de procesamiento en sub-tareas para ejecutarlas simultáneamente en múltiples núcleos del procesador. Esto puede mejorar considerablemente el rendimiento en ciertos casos.

![Streams secuencia y paralelo](https://miro.medium.com/v2/resize:fit:720/format:webp/1*pRTe-Sg_BS_esta41QHBRw.png)



### Introducción al paralelismo

- Un **Stream secuencial** procesa los datos en un solo núcleo de forma ordenada.
- Un **Parallel Stream** divide los datos en fragmentos y los distribuye entre varios núcleos para su procesamiento paralelo.

In [None]:
var numeros = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

int sumaSecuencial = numeros.stream()
        .mapToInt(Integer::intValue)
        .sum();
System.out.println("Suma secuencial: " + sumaSecuencial);

In [None]:
var numeros = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

int sumaParalela = numeros.parallelStream()
        .mapToInt(Integer::intValue)
        .sum();
System.out.println("Suma paralela: " + sumaParalela);

### Cómo crear Parallel Streams

1. El método `parallelStream()` en colecciones

In [None]:
var datos = Arrays.asList("a", "b", "c");
datos.parallelStream()
        .forEach(System.out::println);

2. El método `Stream.parallel()`

In [None]:
Stream.of("x", "y", "z")
        .parallel()
        .forEach(System.out::println);


### Comparación entre Streams secuenciales y paralelos

| Aspecto | Stream Secuencial | Parallel Stream |
| --- | --- | --- |
| Procesamiento | Un solo núcleo | Varios núcleos |
| Orden | Respeta el orden original | Puede no garantizar el orden |
| Overhead| Mínimo | Mayor debido al coste de paralelismo |
| Escenarios ideales | Tareas simples o pequeñas | Tareas intensivas en cálculo (CPU-intensive) |

**Nota importante**: Usar **Parallel Streams** no siempre es más rápido. Hay un coste adicional asociado con dividir y combinar los datos.

### Beneficios y limitaciones de Parallel Streams

**Beneficios**:
- Aprovechan la arquitectura multicore de los procesadores modernos.
- Permiten dividir y conquistar tareas complejas de forma eficiente.

**Limitaciones**:
- No son ideales para operaciones dependientes del orden.
- Pueden generar **condiciones de carrera** si las operaciones no son thread-safe.
- Consumen más recursos para tareas pequeñas o simples.

In [None]:
var lista = Collections.synchronizedList(new ArrayList<>());

IntStream.range(1, 1000)
    .parallel()
    .forEach(lista::add);

System.out.println("Tamaño de la lista: " + lista.size());

El tamaño final de la lista puede ser incorrecto debido a condiciones de carrera.

### Escenarios recomendados para Parallel Streams

- Procesamiento de grandes volúmenes de datos (Big Data).
- Operaciones intensivas en cálculos matemáticos (por ejemplo, simulaciones o análisis científicos).
- Filtrados o transformaciones de datos donde el orden no es crítico.

## Casos de Uso y Buenas Prácticas

### Casos de Uso Comunes

1. **Procesamiento de grandes volúmenes de datos**: Los Streams son ideales para manejar colecciones o archivos extensos, ya que permiten operaciones rápidas y concisas como filtrados, transformaciones y reducciones.
    -  Ejemplo: Procesar archivos CSV o JSON masivos para extraer información.
1. **Agregaciones y cálculos**: Para operaciones como sumas, promedios, máximos y mínimos, el método `reduce` o las clases de utilidades como `Collectors` son muy útiles.
    - Ejemplo: Calcular estadísticas de ventas en una base de datos.
1. **Transformación de datos**: Convertir formatos o tipos de datos mediante el uso de `map` o `flatMap`.
    - Ejemplo: Transformar listas de objetos en mapas clave-valor.
1. **Filtrado y búsqueda**: Streams facilitan encontrar elementos específicos en colecciones de gran tamaño.
    - Ejemplo: Buscar el primer empleado con un salario mayor a un umbral.

### Buenas Prácticas con Streams

1. **Mantén el código legible**: Los Streams pueden volverse complejos rápidamente si no se estructuran adecuadamente. Usa líneas separadas para cada operación y comenta las secciones críticas si es necesario.

In [None]:
empleados.stream()
        .filter(emp -> emp.getEdad() > 30) // Filtrar empleados mayores de 30
        .map(Empresa::getDepartamento)    // Obtener departamentos
        .distinct()                       // Eliminar duplicados
        .forEach(System.out::println);    // Imprimir resultado


2. **Evita usar Parallel Streams sin un análisis previo**:
    - Úsalos solo para tareas intensivas en CPU o cuando el orden no sea crítico.
    - Evítalos si trabajas con recursos compartidos o I/O, ya que podrían provocar cuellos de botella.

3. **Cuidado con las operaciones mutables**: No modifiques colecciones externas dentro de un Stream. Usa métodos como collect para generar nuevas estructuras de datos.

In [None]:
// Ejemplo incorrecto

var resultado = new ArrayList<>();
lista.stream()
    .forEach(resultado::add); // Puede generar problemas en Streams paralelos

In [None]:
// Ejemplo correcto
var resultado = lista.stream()
        .filter(e -> e.length() > 3)
        .collect(Collectors.toList());

4. **Haz operaciones costosas al final**: Mueve operaciones como sorted al final del pipeline para evitar overhead innecesario.

5. **Usa Lazy Evaluation a tu favor**: Aprovecha la evaluación perezosa para optimizar el procesamiento de datos. Asegúrate de que las operaciones intermedias sean eficientes.


### Ejemplo Real: Procesamiento de Logs
Supongamos que tienes un archivo de logs y necesitas identificar entradas de error.

In [None]:
try (Stream<String> lineas = Files.lines(Paths.get("logs.txt"))) {
    var errores = lineas.filter(linea -> linea.contains("ERROR"))
            .map(String::trim)
            .collect(Collectors.toList());
    errores.forEach(System.out::println);
} catch (IOException e) {
    e.printStackTrace();
}

### Limitaciones a Tener en Cuenta

- **Sobrehead de Parallel Streams**: No uses Parallel Streams si la tarea es pequeña o si el tiempo de división/combina es mayor que el tiempo de ejecución de la tarea.
- **Orden de ejecución**: Si el orden es importante (como en operaciones financieras), usa Streams secuenciales.
- **Condiciones de carrera**: Asegúrate de que tus operaciones sean thread-safe.

## Ejercicios Prácticos

### Ejercicio 1: Procesamiento de archivos CSV con Streams

**Descripción**: Dado un archivo CSV con información de ventas, procesa los datos para calcular las ventas totales por región.

**Pasos**:
1. Lee el archivo línea por línea usando `Files.lines`.
1. Usa `filter` para eliminar líneas vacías o incorrectas.
1. Divide los datos con `split` para acceder a las columnas necesarias.
1. Usa `map` y `collect` para agrupar las ventas por región y calcular totales.

**Ejemplo de entrada (archivo CSV)**:

In [None]:
Región,Producto,Ventas
Norte,Café,100
Sur,Té,200
Norte,Té,150

**Tarea**: Calcula las ventas totales por región:

In [None]:
Norte: 250
Sur: 200

### Ejercicio 2: Contar palabras únicas en un texto

**Descripción**: Dado un texto, utiliza Streams para contar las palabras únicas y ordenar los resultados alfabéticamente.

**Pasos**:
1. Convierte el texto en una lista de palabras usando `split`.
1. Usa un Stream para eliminar caracteres especiales y normalizar las palabras (pasar a minúsculas).
1. Usa `distinct` para encontrar palabras únicas.
1. Colecciona las palabras en una lista ordenada.

**Ejemplo**: 
- **Texto**: "Java Streams son poderosos. Streams en Java son eficientes." 
- **Resultado**: [eficientes, en, java, poderosos, son, streams]

### Ejercicio 3: Procesamiento de grandes listas con Parallel Streams

**Descripción**: Dada una lista de números enteros (de 1 a 10,000,000), usa Parallel Streams para encontrar todos los números primos.

**Pasos**:
1. Genera la lista con `IntStream.range`.
1. Filtra los números primos usando un método auxiliar.
1. Usa `parallelStream` para paralelizar la operación.
1. Guarda los números primos en una lista.

**Desafío adicional**: Compara el tiempo de ejecución entre un Stream secuencial y uno paralelo.

### Ejercicio 4: Análisis de logs con Streams

**Descripción**: Dado un archivo de logs de un servidor, encuentra y agrupa los errores por tipo.

**Pasos**:
1. Lee el archivo de logs con `Files.lines`.
1. Filtra líneas que contengan la palabra "ERROR".
1. Usa `map` y `collect` para agrupar los errores por tipo.
1. Calcula cuántos errores hay de cada tipo.

**Ejemplo de entrada**:

In [None]:
INFO - Inicio del sistema
ERROR - Conexión fallida
ERROR - Usuario no autorizado
INFO - Proceso completado
ERROR - Conexión fallida

**Resultado esperado**:

In [None]:
Conexión fallida: 2
Usuario no autorizado: 1

### Ejercicio 5: Transformación y agrupación de datos

**Descripción**: Dado un catálogo de productos con sus categorías, utiliza Streams para agrupar los productos por categoría y calcular el precio promedio de cada grupo.

**Pasos**:
1. Crea una lista de objetos Producto con atributos como nombre, categoría y precio.
1. Usa `groupingBy` para agrupar los productos por categoría.
1. Calcula el precio promedio de cada categoría usando `averagingDouble`.

**Ejemplo**: Lista de productos:

In [None]:
List<Producto> productos = Arrays.asList(
    new Producto("Café", "Bebidas", 5.0),
    new Producto("Té", "Bebidas", 3.0),
    new Producto("Pan", "Alimentos", 2.0)
);

**Resultado esperado**:

In [None]:
Bebidas: 4.0
Alimentos: 2.0

### Ejercicio 6: Suma paralela de elementos en una lista grande

**Descripción**: Usa Parallel Streams para calcular la suma de una lista con millones de números.

**Pasos**:
1. Genera una lista con `IntStream.range`.
1. Usa `parallel()` para paralelizar la suma.
1. Mide el tiempo de ejecución y compáralo con un Stream secuencial.