# Sistema de Almacenamiento Distribuido

Este cuaderno explica el código de un sistema de almacenamiento distribuido, que incluye cifrado y replicación de datos. A continuación, se detalla cada bloque de código y su funcionalidad.

## Importar Librerías Necesarias

Primero, importamos las librerías necesarias para la aplicación.

In [None]:

from flask import Flask, request, send_file, jsonify
from Crypto.Cipher import AES
import os
import logging
import asyncio
import aiohttp
import threading
from queue import Queue



- **Flask:** Microframework para construir aplicaciones web, permitiendo manejar solicitudes HTTP.
- **Crypto.Cipher.AES:** Librería para el cifrado de datos utilizando el estándar AES (Advanced Encryption Standard), que es un algoritmo de cifrado simétrico ampliamente utilizado.
- **os:** Módulo para interactuar con el sistema operativo, permitiendo manipular archivos y directorios.
- **logging:** Módulo para generar registros de eventos, útil para el diagnóstico y solución de problemas.
- **asyncio y aiohttp:** Librerías para manejar operaciones asíncronas, permitiendo realizar múltiples tareas de forma concurrente sin bloquear el flujo del programa.
- **threading y Queue:** Módulos para manejar operaciones concurrentes utilizando hilos y colas, lo que permite ejecutar varias operaciones al mismo tiempo.

**Configuración Inicial**

Configuramos las constantes y creamos las carpetas necesarias.

In [None]:
UPLOAD_FOLDER = '/app/cargas'
key = b'This_is_a16b_key'
NODOS = ["http://storage-node-1:5000", "http://storage-node-2:5000"]

os.makedirs(UPLOAD_FOLDER, exist_ok=True)
logging.basicConfig(level=logging.DEBUG)


- **UPLOAD_FOLDER:** Especifica el directorio donde se almacenarán los archivos cargados.
- **key:** Clave de cifrado AES utilizada para cifrar y descifrar los datos.
- **NODOS:** Lista de URLs de los nodos donde se replicarán los archivos, asegurando redundancia y disponibilidad.
- **os.makedirs:** Crea el directorio de carga si no existe, asegurando que siempre haya un lugar para guardar los archivos.
- **logging.basicConfig:** Configura el nivel de registro a DEBUG para capturar información detallada durante la ejecución del programa.


**Función de Cifrado**

Función para cifrar datos utilizando AES.

In [None]:
def cifrar_archivo(data):
    cipher = AES.new(key, AES.MODE_EAX)
    ciphertext, tag = cipher.encrypt_and_digest(data)
    return cipher.nonce, tag, ciphertext


- **cifrar_archivo:** Cifra los datos utilizando la clave AES. El método `encrypt_and_digest` genera el texto cifrado (ciphertext) y un tag que asegura la integridad de los datos. El nonce es un valor único utilizado para este cifrado específico, asegurando que cada cifrado sea único incluso si los datos son los mismos.

**Función de Descifrado**

Función para descifrar datos utilizando AES.

In [None]:
def descifrar_archivo(nonce, tag, ciphertext):
    cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
    data = cipher.decrypt_and_verify(ciphertext, tag)
    return data


- **descifrar_archivo:** Descifra los datos utilizando la clave AES y el nonce. Verifica la integridad de los datos usando el tag. Si los datos han sido alterados, la verificación fallará y se lanzará una excepción.

**Función Asíncrona para Replicar Archivos**

Función para replicar archivos a otros nodos de manera asíncrona.

In [None]:
async def replicar_archivo_async(nodo, ruta_archivo, nonce, tag):
    try:
        form = aiohttp.FormData()
        with open(ruta_archivo, 'rb') as file_enc:
            data = file_enc.read()
        form.add_field('archivo', data, filename=os.path.basename(ruta_archivo))
        form.add_field('nonce', nonce.hex())
        form.add_field('tag', tag.hex())
        async with aiohttp.ClientSession() as session:
            async with session.post(f"{nodo}/cargar", data=form) as response:
                if response.status != 200:
                    logging.error(f"Error replicando en {nodo}: {response.status}")
                else:
                    logging.debug(f"Replicación en {nodo} completada con estado {response.status}")
    except Exception as e:
        logging.error(f"Error replicando en {nodo}: {e}")


- **replicar_archivo_async:** Envía los datos cifrados a otro nodo de manera asíncrona utilizando `aiohttp`. Esta función se ejecuta de forma no bloqueante, permitiendo que el programa continúe ejecutándose mientras espera la respuesta del servidor remoto.

**Función para Replicar Archivos**

Función para iniciar la replicación de archivos.

In [None]:
def replicar_archivo(nodo, ruta_archivo, nonce, tag):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(replicar_archivo_async(nodo, ruta_archivo, nonce, tag))
    loop.close()


- **replicar_archivo:** Crea un nuevo bucle de eventos `asyncio` para ejecutar la replicación de archivos. Esta función se utiliza para iniciar la función asíncrona `replicar_archivo_async`.


**Función de Carga de Archivos**

Función para cargar archivos en el sistema, cifrarlos y replicarlos a otros nodos.

In [None]:
@app.route('/cargar', methods=['POST'])
def cargar_archivo():
    archivo = request.files['archivo']
    ruta_archivo = os.path.join(UPLOAD_FOLDER, archivo.filename)

    try:
        # Cifrar el archivo
        nonce, tag, ciphertext = cifrar_archivo(archivo.read())

        # Guardar el archivo cifrado
        with open(ruta_archivo, 'wb') as file_enc:
            file_enc.write(nonce)
            file_enc.write(tag)
            file_enc.write(ciphertext)

        # Replicar el archivo en otros nodos
        for nodo in NODOS:
            queue.put((nodo, ruta_archivo, nonce, tag))

        logging.debug(f"Archivo {archivo.filename} cargado y cifrado exitosamente")
        return 'Archivo cargado y cifrado exitosamente', 200
    except Exception as e:
        logging.error(f"Error al cargar y cifrar el archivo: {e}")
        return jsonify({'error': f'Error al cargar y cifrar el archivo: {e}'}), 500


- **cargar_archivo:** Cifra el archivo recibido utilizando AES y lo guarda en el sistema. Luego, pone la tarea de replicar el archivo en una cola para que los hilos worker la procesen. La respuesta de éxito o error se devuelve al cliente.

**Función de Descarga de Archivos**

Función para descargar y descifrar archivos del sistema.

In [None]:
@app.route('/descargar/<nombre_archivo>', methods=['GET'])
def descargar_archivo(nombre_archivo):
    try:
        ruta_archivo = os.path.join(UPLOAD_FOLDER, nombre_archivo)
        with open(ruta_archivo, 'rb') as file_enc:
            nonce = file_enc.read(16)
            tag = file_enc.read(16)
            ciphertext = file_enc.read()

        data = descifrar_archivo(nonce, tag, ciphertext)
        temp_file_path = os.path.join(UPLOAD_FOLDER, f"temp_{nombre_archivo}")
        with open(temp_file_path, 'wb') as temp_file:
            temp_file.write(data)

        return send_file(temp_file_path, as_attachment=True, download_name=nombre_archivo)
    except Exception as e:
        logging.error(f"Error al descargar y descifrar el archivo: {e}")
        return jsonify({'error': f'Error al descargar y descifrar el archivo: {e}'}), 500


- **descargar_archivo:** Lee y descifra el archivo solicitado, luego lo guarda temporalmente en el servidor y lo envía al cliente. Si ocurre un error durante el proceso, se registra y se envía una respuesta de error al cliente.


**Función de Eliminación de Archivos**

Función para eliminar archivos del sistema.

In [None]:
@app.route('/eliminar/<nombre_archivo>', methods=['DELETE'])
def eliminar_archivo(nombre_archivo):
    ruta_archivo = os.path.join(UPLOAD_FOLDER, nombre_archivo)
    try:
        os.remove(ruta_archivo)
        logging.info(f"Archivo {nombre_archivo} eliminado exitosamente")
        return 'Archivo eliminado exitosamente', 200
    except Exception as e:
        logging.error(f"Error al eliminar el archivo: {e}")
        return jsonify({'error': 'Error al eliminar el archivo'}), 500


- **eliminar_archivo:** Elimina el archivo especificado del sistema. Si el archivo se elimina correctamente, se devuelve un mensaje de éxito; de lo contrario, se registra y se devuelve un error.

**Función Worker**

Función worker para manejar la replicación de archivos en segundo plano.

In [None]:
def worker():
    while True:
        nodo, ruta_archivo, nonce, tag = queue.get()
        if nodo is None:
            break
        replicar_archivo(nodo, ruta_archivo, nonce, tag)
        queue.task_done()


- **worker:** Toma las tareas de replicación de la cola y las procesa. Si recibe una tarea nula, termina su ejecución. Esta función se ejecuta en hilos separados para permitir la replicación concurrente.


**Configuración de la Cola y los Hilos**

Configuración para iniciar los hilos worker.

In [None]:
num_worker_threads = 5
queue = Queue()

threads = []
for _ in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)


- **num_worker_threads:** Especifica el número de hilos worker.
- **queue:** Cola para manejar las tareas de replicación.
- **threads:** Lista de hilos worker que se inician y ejecutan la función `worker`.

**[nicio de la Aplicación](https://)**

Código para iniciar la aplicación Flask.

In [None]:
if __name__ == "__main__":
    try:
        app.run(host="0.0.0.0", port=5000, debug=True)
    finally:
        for _ in range(num_worker_threads):
            queue.put((None, None, None, None))
        for t in threads:
            t.join()



- **app.run:** Inicia la aplicación Flask en el puerto 5000.
- **queue.put:** Agrega una tarea nula para cada hilo worker para que puedan terminar su ejecución de manera ordenada.
- **threads.join:** Espera a que todos los hilos terminen antes de cerrar la aplicación.


