## Ejercicios

Presenta tus respuestas en tu repositorio personal. Estos ejercicios proporcionan un reto significativo que pone a prueba tus habilidades en programación funcional, así como tu capacidad para integrar y optimizar la computación paralela y concurrente en aplicaciones reales y complejas.

#### Ejercicio 1: Sistema de procesamiento de imágenes en tiempo real
Desarrollar un sistema que procese imágenes en tiempo real para detectar y clasificar objetos, utilizando programación funcional para el manejo de las transformaciones de imágenes y concurrent.futures para procesar múltiples flujos de imágenes simultáneamente.

**Descripción:**

- Implementa funciones puras para cada paso del procesamiento de imágenes, como filtrado, detección de bordes, y clasificación de objetos.
- Utiliza concurrent.futures.ThreadPoolExecutor para paralelizar el procesamiento de imágenes provenientes de múltiples fuentes en tiempo real.
- Emplea asyncio para manejar asincrónicamente la entrada/salida de imágenes y la integración con sistemas de almacenamiento o bases de datos.
Asegurar que todas las operaciones sobre los datos de las imágenes sean inmutables para evitar efectos secundarios no deseados.

Adicional:

- Integra el sistema con una interfaz web en tiempo real donde los usuarios puedan cargar imágenes y recibir resultados inmediatamente.




**OPCION 1**

In [None]:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from PIL import Image, ImageFilter
import os
from io import BytesIO
from functools import wraps
import time

# Funciones puras para el procesamiento de imágenes
def convert_to_grayscale(image):
    """Convierte la imagen a escala de grises."""
    return image.convert('L')

def apply_edge_detection(image):
    """Aplica detección de bordes a la imagen."""
    return image.filter(ImageFilter.FIND_EDGES)

def classify_image(image):
    """Clasifica la imagen, simulación de clasificación."""
    return "antelope"  # Simulación de resultado de clasificación

# Decorador para medir el tiempo de ejecución
def time_it(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} took {end - start:.2f} seconds to run.")
        return result
    return wrapper

# Función para procesar una imagen individualmente, sincronizada para usar en ThreadPoolExecutor
def process_single_image(image_path):
    """Procesa una imagen leyéndola, convirtiéndola a escala de grises, aplicando detección de bordes y clasificándola."""
    with open(image_path, 'rb') as file:
        image_data = file.read()
    img = Image.open(BytesIO(image_data))
    gray_img = convert_to_grayscale(img)
    processed_img = apply_edge_detection(gray_img)
    class_label = classify_image(processed_img)
    return processed_img, class_label

@time_it
async def process_images(image_paths):
    """Procesa imágenes en paralelo utilizando ThreadPoolExecutor."""
    with ThreadPoolExecutor(max_workers=4) as executor:
        loop = asyncio.get_running_loop()
        tasks = [loop.run_in_executor(executor, process_single_image, path) for path in image_paths]
        results = await asyncio.gather(*tasks)
    return results

async def main():
    """Función principal para ejecutar el procesamiento asincrónico de imágenes."""
    images_dir = 'D:/Tareas de cursos/Evaluacion5_C8286/Antelope'
    image_paths = [os.path.join(images_dir, f) for f in os.listdir(images_dir) if f.endswith('.jpg')]
    
    processed_images = await process_images(image_paths)
    
    output_dir = 'D:/Tareas de cursos/Evaluacion5_C8286/processed_antelope_images'
    os.makedirs(output_dir, exist_ok=True)
    
    for i, (img, label) in enumerate(processed_images):
        img.save(os.path.join(output_dir, f'processed_{i}_{label}.jpg'))
        print(f"Saved {os.path.join(output_dir, f'processed_{i}_{label}.jpg')}")

if __name__ == "__main__":
    asyncio.run(main())


**OPCION 2**

In [None]:
from PIL import Image, ImageFilter
import time
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
import asyncio
import os

# Función para convertir imágenes a escala de grises
def convert_to_grayscale(image):
    return image.convert('L')

# Función para aplicar la detección de bordes
def apply_edge_detection(image):
    return image.filter(ImageFilter.FIND_EDGES)

# Decorador para medir el tiempo de ejecución
def time_it(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} took {end - start:.2f} seconds to run.")
        return result
    return wrapper

# Decorador para paralelizar el procesamiento de imágenes
def parallelize_image_processing(function):
    @wraps(function)
    def wrapper(image_paths):
        with ThreadPoolExecutor(max_workers=5) as executor:
            results = list(executor.map(function, image_paths))
        return results
    return wrapper

@time_it
@parallelize_image_processing
def process_image(image_path):
    # Abre la imagen y la procesa aplicando las funciones de procesamiento
    img = Image.open(image_path)
    gray_img = convert_to_grayscale(img)
    processed_img = apply_edge_detection(gray_img)
    return processed_img

async def main():
    # Directorio donde están las imágenes
    images_dir = r'D:\Tareas de cursos\Evaluacion5_C8286\Antelope'
    image_paths = [os.path.join(images_dir, f) for f in os.listdir(images_dir) if f.endswith('.jpg')]

    # Procesar imágenes
    processed_images = process_image(image_paths)

    # Guardar las imágenes procesadas en una nueva carpeta
    output_dir = r'D:\Tareas de cursos\Evaluacion5_C8286\processed_antelope_images'
    os.makedirs(output_dir, exist_ok=True)
    for i, img in enumerate(processed_images):
        img.save(os.path.join(output_dir, f'processed_{i}.jpg'))

if __name__ == "__main__":
    asyncio.run(main())


**texto en negrita**#### Ejercicio 2: Simulación de sistema de reservas con alta concurrencia

Crea una simulación de un sistema de reservas (como para hoteles o vuelos) que pueda manejar un alto volumen de peticiones concurrentes sin conflictos de datos.

**Descripción:**

- Diseña funciones puras para manejar la lógica de reservas, cancelaciones, y modificaciones de reservas.
- Utiliza asyncio para manejar múltiples solicitudes de clientes de manera asincrónica.
- Emplea multiprocessing para distribuir la carga de trabajo a través de múltiples procesos y aprovechar múltiples núcleos de CPU, especialmente para tareas que requieren intensos cálculos o procesamiento de datos.
- Implementar mecanismos de sincronización y control de concurrencia para asegurar la consistencia y la integridad de los datos en un entorno de múltiples usuarios.

Adicional:

- Simula un escenario de "Black Friday" donde se espera un pico de demanda y evaluar el rendimiento y la escalabilidad del sistema.

In [None]:
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import random
import time

# Funciones puras para manipular el estado de las reservas
def add_reservation(reservations, reservation):
    """Agrega una nueva reserva a la lista de reservas de manera inmutable."""
    return reservations + [reservation]

def cancel_reservation(reservations, reservation_id):
    """Cancela una reserva por ID, inmutablemente."""
    return [res for res in reservations if res['id'] != reservation_id]

def update_reservation(reservations, reservation_id, new_details):
    """Actualiza una reserva por ID, inmutablemente."""
    return [res if res['id'] != reservation_id else {**res, **new_details} for res in reservations]

# Simulación del procesamiento de solicitudes usando ThreadPoolExecutor
def handle_request(request):
    """Maneja una solicitud individual simulando cierta lógica y tiempo de procesamiento."""
    time.sleep(random.uniform(0.1, 0.5))  # Simular tiempo de procesamiento
    return f"Processed request {request['id']} with status: {request['status']}"

# Integración de multiprocessing para manejar la carga en múltiples procesos
def process_booking_requests(requests):
    """Procesa una lista de solicitudes de reserva concurrentemente usando multiprocessing."""
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(handle_request, requests))
    return results

# Manejo asincrónico de reservas
async def manage_reservations(requests):
    """Gestiona reservas asincrónicamente usando asyncio y concurrent.futures."""
    loop = asyncio.get_running_loop()
    future = loop.run_in_executor(None, process_booking_requests, requests)
    result = await future
    print(result)

# Simulación de un escenario de alta demanda
async def simulate_high_demand_scenario():
    """Simula la llegada de múltiples solicitudes durante un evento de alta demanda."""
    requests = [{'id': i, 'status': 'new'} for i in range(100)]  # Simular 100 solicitudes para "Black Friday"
    await manage_reservations(requests)

if __name__ == "__main__":
    asyncio.run(simulate_high_demand_scenario())


#### Ejercicio 3: Análisis y visualización de datos de tráfico en tiempo real

Construye una aplicación que recoja, procese y visualice datos de tráfico en tiempo real para una ciudad, utilizando programación funcional para el análisis de datos y asyncio junto con concurrent.futures para el procesamiento concurrente.

**Descripción:**

- Recoge datos de tráfico de múltiples fuentes, como cámaras de tráfico y sensores en carreteras.
- Implementa funciones puras para calcular métricas de tráfico, como velocidad media, densidad de tráfico, y tiempos de viaje estimados.
- Usa asyncio para procesar datos de tráfico de manera asincrónica.
- Aplica concurrent.futures.ProcessPoolExecutor para realizar análisis de datos pesados en paralelo.
- Desarrollar una interfaz de usuario que muestre en tiempo real los datos de tráfico y las métricas en un mapa interactivo.

Adicional:

- Predice patrones de tráfico y congestiones utilizando modelos de machine learning sobre los datos procesados.


In [None]:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import random
import time

# Funciones puras para el cálculo de métricas de tráfico
def calculate_average_speed(data):
    """Calcula la velocidad promedio a partir de datos de velocidad recogidos."""
    total_speed = sum(data['speed'])
    count = len(data['speed'])
    return total_speed / count if count else 0

def calculate_traffic_volume(data):
    """Calcula el volumen de tráfico a partir de datos de conteo de vehículos."""
    return sum(data['vehicles'])

# Función para procesar datos de una ubicación usando multiprocessing
def process_single_location(data):
    """Procesa los datos de tráfico de una única ubicación."""
    average_speed = calculate_average_speed(data)
    traffic_volume = calculate_traffic_volume(data)
    return {'location': data['location'], 'average_speed': average_speed, 'traffic_volume': traffic_volume}

# Función para procesar datos de múltiples ubicaciones en paralelo
def process_traffic_data(locations_data):
    """Procesa datos de múltiples ubicaciones utilizando procesamiento en paralelo."""
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(process_single_location, locations_data))
    return results

# Función asincrónica para actualizar los datos de tráfico
async def update_traffic_data():
    """Actualiza periódicamente los datos de tráfico y la visualización."""
    while True:
        locations_data = fetch_traffic_data()  # Recolecta datos de tráfico
        processed_data = await asyncio.get_event_loop().run_in_executor(None, process_traffic_data, locations_data)
        update_visualization(processed_data)  # Actualiza la visualización
        await asyncio.sleep(10)  # Actualiza cada 10 segundos

# Simulación de recolección de datos de tráfico
def fetch_traffic_data():
    """Simula la recolección de datos de tráfico de múltiples sensores."""
    return [
        {'location': 'Location A', 'speed': [random.randint(30, 70) for _ in range(10)], 'vehicles': [random.randint(80, 120) for _ in range(10)]},
        {'location': 'Location B', 'speed': [random.randint(20, 50) for _ in range(10)], 'vehicles': [random.randint(60, 90) for _ in range(10)]}
    ]

# Simulación de actualización de la visualización
def update_visualization(data):
    """Actualiza una interfaz de usuario o un dashboard con los últimos datos procesados."""
    print("Updated visualization with:", data)

if __name__ == "__main__":
    asyncio.run(update_traffic_data())


#### Ejercicio 4: Sistema de análisis de sentimiento en tiempo real para redes sociales

Desarrolla un sistema que monitoree y analice en tiempo real los sentimientos de los usuarios en redes sociales, utilizando principios de programación funcional para procesar los textos y asyncio para manejar múltiples flujos de datos.

**Descripción:**

- Crea funciones puras para limpiar y pre-procesar los textos recolectados de las redes sociales.
- Implementa análisis de sentimiento utilizando una librería de procesamiento de lenguaje natural, asegurando que las funciones sean inmutables y sin efectos secundarios.
- Utiliza asyncio para recoger datos simultáneamente de varias plataformas de redes sociales.
- Emplea concurrent.futures para procesar y analizar los datos de sentimiento en paralelo, mejorando la capacidad de respuesta del sistema.


In [None]:
import re
import asyncio
from concurrent.futures import ThreadPoolExecutor
from textblob import TextBlob
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

# Definición de stopwords
stop_words = set(stopwords.words('english'))

# Funciones puras para el pre-procesamiento de textos
def clean_text(text):
    """Limpia el texto eliminando caracteres especiales y convirtiéndolo a minúsculas."""
    text = re.sub(r'\W', ' ', text)
    text = text.lower()
    return text

def remove_stopwords(text):
    """Elimina las stopwords de un texto."""
    words = word_tokenize(text)
    filtered_words = [word for word in words if word not in stop_words]
    return ' '.join(filtered_words)

def preprocess_text(text):
    """Combina todas las operaciones de preprocesamiento de texto."""
    text = clean_text(text)
    text = remove_stopwords(text)
    return text

# Función pura para análisis de sentimiento
def analyze_sentiment(text):
    """Analiza el sentimiento de un texto dado y devuelve el resultado."""
    analysis = TextBlob(text)
    return {'polarity': analysis.sentiment.polarity, 'subjectivity': analysis.sentiment.subjectivity}

# Función para preprocesar y analizar textos de manera concurrente
def preprocess_and_analyze(text):
    """Preprocesa y analiza el sentimiento de un texto."""
    preprocessed_text = preprocess_text(text)
    sentiment = analyze_sentiment(preprocessed_text)
    return sentiment

# Uso de concurrent.futures para paralelizar el análisis
def analyze_texts_concurrently(texts):
    """Analiza una lista de textos concurrentemente."""
    with ThreadPoolExecutor(max_workers=10) as executor:
        results = list(executor.map(preprocess_and_analyze, texts))
    return results

# Integración con asyncio para manejo asincrónico
async def collect_and_process_data(stream_data):
    """Asíncronamente recolecta y procesa datos de un flujo."""
    processed_data = await asyncio.get_event_loop().run_in_executor(None, analyze_texts_concurrently, stream_data)
    print("Sentiment Analysis Results:", processed_data)

async def simulate_streaming_data():
    """Simula la llegada de datos de texto de un flujo en tiempo real."""
    sample_data = [
        "I love this product!",
        "This is the worst service ever.",
        "I'm not sure if I like this.",
        "Absolutely fantastic!"
    ]
    await collect_and_process_data(sample_data)

if __name__ == "__main__":
    asyncio.run(simulate_streaming_data())


#### Ejercicio 5: Plataforma de análisis de datos genómicos distribuidos

Construye una plataforma para analizar grandes volúmenes de datos genómicos de manera eficiente, utilizando programación funcional para las transformaciones y cálculos, y multiprocessing para el procesamiento paralelo.

**Descripción:**

- Implementa funciones puras para diversas operaciones genómicas, como el alineamiento de secuencias, la identificación de variantes, y el cálculo de frecuencias genéticas.
- Utiliza multiprocessing.Pool para distribuir el procesamiento de datos a múltiples núcleos de procesador, permitiendo que grandes conjuntos de datos se analicen rápidamente.
- Integra asyncio para manejar eficientemente las entradas/salidas, especialmente para cargar y guardar grandes conjuntos de datos genómicos.
- Implementa un sistema de CI/CD que automatice las pruebas, valide la integridad del código, y orqueste la implementación de la infraestructura necesaria en un entorno de computación en la nube.


In [None]:
import asyncio
from multiprocessing import Pool
import json
import aiofiles  # Importar aiofiles para manejo asincrónico de archivos

# Funciones puras para el procesamiento genómico
def filter_variants(variants, min_depth=10, min_quality=20):
    return [variant for variant in variants if variant['depth'] >= min_depth and variant['quality'] >= min_quality]

def calculate_allele_frequencies(variants):
    allele_counts = {}
    for variant in variants:
        alleles = variant['alleles']
        for allele in alleles:
            allele_counts[allele] = allele_counts.get(allele, 0) + 1
    total_alleles = sum(allele_counts.values())
    return {allele: count / total_alleles for allele, count in allele_counts.items()}

def process_sample(sample):
    filtered_variants = filter_variants(sample['variants'])
    allele_frequencies = calculate_allele_frequencies(filtered_variants)
    return {'sample_id': sample['id'], 'allele_frequencies': allele_frequencies}

def process_genomic_data(data):
    with Pool(processes=4) as pool:
        results = pool.map(process_sample, data)
    return results

async def load_genomic_data(file_path):
    """Carga datos genómicos de forma asíncrona desde un archivo JSON usando aiofiles."""
    async with aiofiles.open(file_path, 'r') as file:
        data = await file.read()
    return json.loads(data)

async def analyze_genomic_data(file_path):
    data = await load_genomic_data(file_path)
    results = await asyncio.get_event_loop().run_in_executor(None, process_genomic_data, data)
    for result in results:
        print(result)

if __name__ == "__main__":
    asyncio.run(analyze_genomic_data('path_to_genomic_data.json'))


#### Ejercicio 6: Simulador de mercados financieros en tiempo real

Desarrolla un simulador de mercados financieros que pueda procesar y analizar datos bursátiles en tiempo real, usando programación funcional para las operaciones de cálculo y asyncio para manejar datos de múltiples fuentes.

**Descripción:**

- Diseña funciones puras para calcular indicadores financieros, como medias móviles, RSI, y bandas de Bollinger.
- Emplea asyncio para recibir datos de mercado en tiempo real de múltiples intercambios y fuentes de información.
- Aplica concurrent.futures.ThreadPoolExecutor para realizar análisis técnico en paralelo y generar señales de trading.


In [None]:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import numpy as np

def calculate_moving_average(prices, window_size=20):
    if len(prices) < window_size:
        return None
    return np.mean(prices[-window_size:])

def calculate_rsi(prices, periods=14):
    if len(prices) < periods:
        return None

    gains = [max(0, prices[i] - prices[i - 1]) for i in range(1, len(prices))]
    losses = [max(0, prices[i - 1] - prices[i]) for i in range(1, len(prices))]

    average_gain = np.mean(gains[-periods:])
    average_loss = np.mean(losses[-periods:])

    if average_loss == 0:
        return 100

    rs = average_gain / average_loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

async def stream_stock_data():
    example_data = [
        {'stock': 'AAPL', 'prices': [150 + i for i in range(30)]},  # Genera 30 datos para asegurar cálculos
        {'stock': 'GOOGL', 'prices': [1200 + i*2 for i in range(30)]}  # Genera 30 datos para asegurar cálculos
    ]
    while True:
        await asyncio.sleep(1)
        with ThreadPoolExecutor(max_workers=10) as executor:
            results = list(executor.map(analyze_stock, example_data))
        print("Processed Data:", results)

def analyze_stock(data):
    moving_average = calculate_moving_average(data['prices'])
    rsi = calculate_rsi(data['prices'])
    return {'stock': data['stock'], 'moving_average': moving_average, 'RSI': rsi}

if __name__ == "__main__":
    asyncio.run(stream_stock_data())

