<a href="https://colab.research.google.com/github/hector3910/Laboratorio-3-Estructura-de-Datos-II/blob/main/Laboratorio_3_Estructura_de_Datos_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import socket
import threading
import time
import pickle
import logging
import os

# Configuración de logging
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger()

CHUNK_SIZE = 4096

# No usaremos shared_array ni array_lock en el entorno distribuido
# shared_array = None
# array_lock = threading.Lock()

# Direcciones IP y puertos de cada máquina
# Reemplaza 'IP_CLIENTE', 'IP_WORKER_0', 'IP_WORKER_1' por las direcciones IP reales
IP_CLIENTE = '192.168.1.3'          # Dirección IP del cliente
IP_WORKER_0 = '192.168.1.3'        # Dirección IP del Worker 0
IP_WORKER_1 = '192.168.1.3'        # Dirección IP del Worker 1

PUERTO_CLIENTE = 6000
PUERTO_WORKER_0 = 5000
PUERTO_WORKER_1 = 5001

# Mapear el siguiente worker para cada worker actual
NEXT_WORKER = {
    0: (IP_WORKER_1, PUERTO_WORKER_1),
    1: (IP_WORKER_0, PUERTO_WORKER_0),  # Los workers se turnan enviando al otro worker
}

# Clases para ordenamiento con estado
class MergesortState:
    def _init_(self, arr):
        self.arr = arr
        self.stack = []
        self.current = None
        self.left = None
        self.right = None
        self.i = self.j = self.k = 0
        self.init_sort()

    def init_sort(self):
        self.stack.append((0, len(self.arr) - 1, 'split'))

    # Ahora incluimos 'arr' en la serialización
    def _getstate_(self):
        return self._dict_

    def _setstate_(self, state):
        self._dict_.update(state)

    def is_finished(self):
        return not self.stack and self.current is None

    def step(self):
        if self.current is None and self.stack:
            self.current = self.stack.pop()

        if self.current:
            start, end, phase = self.current
            if phase == 'split':
                if start >= end:
                    self.current = None
                else:
                    mid = (start + end) // 2
                    self.stack.append((start, end, 'merge'))
                    self.stack.append((mid + 1, end, 'split'))
                    self.stack.append((start, mid, 'split'))
                    self.current = None
            elif phase == 'merge':
                mid = (start + end) // 2
                self.left = self.arr[start:mid + 1]
                self.right = self.arr[mid + 1:end + 1]
                self.i = self.j = 0
                self.k = start
                self.current = (start, end, 'merge_in_progress')
            elif phase == 'merge_in_progress':
                if self.i < len(self.left) and self.j < len(self.right):
                    if self.left[self.i] <= self.right[self.j]:
                        self.arr[self.k] = self.left[self.i]
                        self.i += 1
                    else:
                        self.arr[self.k] = self.right[self.j]
                        self.j += 1
                    self.k += 1
                elif self.i < len(self.left):
                    self.arr[self.k] = self.left[self.i]
                    self.i += 1
                    self.k += 1
                elif self.j < len(self.right):
                    self.arr[self.k] = self.right[self.j]
                    self.j += 1
                    self.k += 1
                else:
                    self.current = None
                    self.left = self.right = None
                    self.i = self.j = self.k = 0

class QuicksortState:
    def _init_(self, arr):
        self.arr = arr
        self.stack = [(0, len(arr) - 1)]
        self.phase = 'partition'
        self.i = self.j = self.pivot = self.start = self.end = None

    def _getstate_(self):
        return self._dict_

    def _setstate_(self, state):
        self._dict_.update(state)

    def is_finished(self):
        return not self.stack and self.phase == 'done'

    def step(self):
        if self.phase == 'partition':
            if not self.stack:
                self.phase = 'done'
                return
            self.start, self.end = self.stack.pop()
            if self.start < self.end:
                # Elegir pivote y moverlo al final
                mid = (self.start + self.end) // 2
                pivot_candidates = [
                    (self.arr[self.start], self.start),
                    (self.arr[mid], mid),
                    (self.arr[self.end], self.end)
                ]
                pivot_candidates.sort(key=lambda x: x[0])
                pivot_value, pivot_index = pivot_candidates[1]
                self.arr[pivot_index], self.arr[self.end] = self.arr[self.end], self.arr[pivot_index]
                self.pivot = self.arr[self.end]
                self.i = self.start - 1
                self.j = self.start
                self.phase = 'partitioning'
            else:
                self.phase = 'partition'
        elif self.phase == 'partitioning':
            if self.j < self.end:
                if self.arr[self.j] <= self.pivot:
                    self.i += 1
                    self.arr[self.i], self.arr[self.j] = self.arr[self.j], self.arr[self.i]
                self.j += 1
            else:
                self.i += 1
                self.arr[self.i], self.arr[self.end] = self.arr[self.end], self.arr[self.i]
                pivot_index = self.i
                if pivot_index - self.start > 1:
                    self.stack.append((self.start, pivot_index - 1))
                if self.end - pivot_index > 1:
                    self.stack.append((pivot_index + 1, self.end))
                self.phase = 'partition'
        else:
            pass

class HeapsortState:
    def _init_(self, arr):
        self.arr = arr
        self.n = len(arr)
        self.i = self.n // 2 - 1
        self.j = self.n - 1
        self.phase = 'heapify'

    def _getstate_(self):
        return self._dict_

    def _setstate_(self, state):
        self._dict_.update(state)

    def is_finished(self):
        return self.phase == 'done'

    def step(self):
        if self.phase == 'heapify':
            if self.i >= 0:
                self.heapify(self.n, self.i)
                self.i -= 1
            else:
                self.phase = 'sort'
        elif self.phase == 'sort':
            if self.j > 0:
                self.arr[0], self.arr[self.j] = self.arr[self.j], self.arr[0]
                self.heapify(self.j, 0)
                self.j -= 1
            else:
                self.phase = 'done'

    def heapify(self, n, i):
        largest = i
        left = 2 * i + 1
        right = 2 * i + 2

        if left < n and self.arr[left] > self.arr[largest]:
            largest = left

        if right < n and self.arr[right] > self.arr[largest]:
            largest = right

        if largest != i:
            self.arr[i], self.arr[largest] = self.arr[largest], self.arr[i]
            self.heapify(n, largest)

def recv_all(conn):
    total_size_data = b""
    while len(total_size_data) < 8:
        packet = conn.recv(8 - len(total_size_data))
        if not packet:
            return None
        total_size_data += packet
    total_size = int.from_bytes(total_size_data, 'big')
    data = b""

    while len(data) < total_size:
        chunk = conn.recv(min(CHUNK_SIZE, total_size - len(data)))
        if not chunk:
            raise EOFError("Conexión cerrada antes de recibir todos los datos.")
        data += chunk

    return pickle.loads(data)

def send_all(conn, data):
    bytes_data = pickle.dumps(data)
    total_size = len(bytes_data)
    conn.sendall(total_size.to_bytes(8, 'big'))
    conn.sendall(bytes_data)

def is_sorted(arr):
    return all(arr[i] <= arr[i + 1] for i in range(len(arr) - 1))

def worker(worker_id, worker_ip, worker_port):
    function_map = {
        "mergesort": MergesortState,
        "quicksort": QuicksortState,
        "heapsort": HeapsortState,
    }

    # Configurar el siguiente worker
    next_worker_ip, next_worker_port = NEXT_WORKER[worker_id]

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as receive_socket:
        receive_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        receive_socket.bind((worker_ip, worker_port))
        receive_socket.listen(5)
        logger.info(f"Worker {worker_id} listening on {worker_ip}:{worker_port}")

        while True:
            conn, addr = receive_socket.accept()
            handle_connection(worker_id, conn, next_worker_ip, next_worker_port, function_map)

def handle_connection(worker_id, conn, next_worker_ip, next_worker_port, function_map):
    with conn:
        logger.info(f"Worker {worker_id} accepted connection from {conn.getpeername()}")
        data = recv_all(conn)
        if not data:
            return

        if data.get('command') == 'start':
            t = data['t']
            algorithm_name = data['algorithm']
            arr = data['array']  # Recibir el arreglo inicial
            AlgorithmClass = function_map.get(algorithm_name)
            if not AlgorithmClass:
                send_all(conn, {"status": "FAILED"})
                return
            sort_state = AlgorithmClass(arr)
            start_time = time.time()
        else:
            sort_state = data['sort_state']
            t = data['t']
            start_time = data['start_time']

        # Ejecutar el algoritmo durante t segundos o hasta que termine
        end_time = time.time() + t
        while time.time() < end_time and not sort_state.is_finished():
            sort_state.step()

        if sort_state.is_finished():
            # Enviar resultado al cliente
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
                while True:
                    try:
                        client_socket.connect((IP_CLIENTE, PUERTO_CLIENTE))
                        break
                    except ConnectionRefusedError:
                        time.sleep(0.1)
                send_all(client_socket, {
                    "status": "COMPLETED",
                    "worker_id": worker_id,
                    "time": time.time() - start_time,
                    "sorted_array": sort_state.arr
                })
                logger.info(f"Worker {worker_id} completed sorting and sent data to client.")
        else:
            # Enviar estado al siguiente worker
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as send_socket:
                while True:
                    try:
                        send_socket.connect((next_worker_ip, next_worker_port))
                        break
                    except ConnectionRefusedError:
                        time.sleep(0.1)
                send_all(send_socket, {
                    "command": "continue",
                    "sort_state": sort_state,
                    "t": t,
                    "start_time": start_time
                })
                logger.info(f"Worker {worker_id} sent data to next worker.")

def client():
    # No usamos shared_array en el entorno distribuido

    while True:
        logger.info("\nSeleccione el algoritmo de ordenamiento:")
        logger.info("1. Mergesort")
        logger.info("2. Quicksort")
        logger.info("3. Heapsort")
        logger.info("4. Salir")
        choice = input("Ingrese su elección: ")
        if choice == "1":
            algorithm = "mergesort"
        elif choice == "2":
            algorithm = "quicksort"
        elif choice == "3":
            algorithm = "heapsort"
        elif choice == "4":
            logger.info("Saliendo del programa...")
            break
        else:
            logger.warning("Elección inválida. Por favor, seleccione una opción válida.")
            continue

        # Solicitar el nombre del archivo al usuario
        filename = input("Ingrese el nombre del archivo que contiene los datos: ")

        # Reemplazar las barras invertidas por barras diagonales para evitar problemas
        filename = filename.replace('\\', '/')

        # Expandir variables de entorno y usuario
        filename = os.path.expanduser(filename)
        filename = os.path.expandvars(filename)
        filename = os.path.abspath(filename)  # Obtener la ruta absoluta

        # Mostrar información de depuración
        logger.info(f"Directorio actual de trabajo: {os.getcwd()}")
        logger.info(f"Intentando abrir el archivo: {filename}")

        # Verificar si el archivo existe
        if not os.path.isfile(filename):
            logger.error(f"El archivo '{filename}' no fue encontrado.")
            # Listar archivos en el directorio actual
            logger.info(f"Archivos en el directorio actual: {os.listdir(os.getcwd())}")
            continue

        # Leer los datos del archivo
        try:
            with open(filename, 'r') as file:
                data = []
                for line_number, line in enumerate(file, start=1):
                    number = line.strip()
                    if number:
                        try:
                            data.append(int(number))
                        except ValueError:
                            logger.error(f"Datos no válidos en línea {line_number}: '{number}' no es un entero.")
                            continue
            if not data:
                logger.error("El archivo está vacío o no contiene datos válidos.")
                continue
            # No asignamos data a shared_array, ya que no se usa
        except Exception as e:
            logger.error(f"Ocurrió un error al leer el archivo: {e}")
            continue

        t = float(input("Ingrese el tiempo límite para cada worker (en segundos): "))

        logger.info(f"Datos desordenados: {data[:10]}... (mostrando los primeros 10 valores)")

        # Enviar datos iniciales al primer worker
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            while True:
                try:
                    s.connect((IP_WORKER_0, PUERTO_WORKER_0))
                    break
                except ConnectionRefusedError:
                    time.sleep(1)
            send_all(s, {
                "command": "start",
                "algorithm": algorithm,
                "t": t,
                "array": data  # Enviar el arreglo inicial
            })
            logger.info("Cliente envió datos iniciales al Worker 0.")

        # Esperar resultado final
        result_received = False
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server_socket.bind((IP_CLIENTE, PUERTO_CLIENTE))
        server_socket.listen(5)
        logger.info(f"Cliente esperando resultado en {IP_CLIENTE}:{PUERTO_CLIENTE}")

        while not result_received:
            conn, addr = server_socket.accept()
            with conn:
                data = recv_all(conn)
                if data.get("status") == "COMPLETED":
                    result_received = True
                    worker_id = data["worker_id"]
                    time_taken = data["time"]
                    sorted_array = data.get("sorted_array")
                    logger.info(f"\nEl Worker {worker_id} completó el ordenamiento.")
                    logger.info(f"Datos ordenados: {sorted_array[:10]}... (mostrando los primeros 10 valores)")
                    logger.info(f"Tiempo total transcurrido: {time_taken:.2f} segundos")
                    logger.info(f"Worker {worker_id} devolvió el vector ordenado al cliente.")
                    break

        server_socket.close()

if _name_ == "_main_":
    # Determinar si este script se está ejecutando como cliente o como worker
    import sys
    if len(sys.argv) > 1 and sys.argv[1] == 'worker':
        # Iniciar como worker
        worker_id = int(sys.argv[2])
        worker_ip = sys.argv[3]  # IP de esta máquina
        worker_port = int(sys.argv[4])  # Puerto en esta máquina
        worker(worker_id, worker_ip, worker_port)
    else:
        # Iniciar como cliente
        client()