In [1]:
import socket
import os
import json
import time
import numpy as np
from sklearn.linear_model import SGDClassifier

CLIENT_HOST = 'localhost'
CLIENT_PORT_SEND = 5100  # Porta para enviar modelo global para clientes
CLIENT_PORT_RECV = 5101  # Porta para receber modelo local dos clientes

MASTER_HOST = 'localhost'
MASTER_PORT_SEND = 6000  # Envia modelo intermediário para a nuvem
MASTER_PORT_RECV = 6001  # Recebe modelo global da nuvem

SERVER_ID = 1   # Identificador do servidor (para simulação com múltiplos servidores)
NUM_CLIENTS = 3 # Quantidade de clientes esperados

# Função para receber os parâmetros do modelo local de um cliente via Json
def receive_client_parameters(conn):
    data = b""
    while True:
        packet = conn.recv(CLIENT_PORT_RECV)
        if not packet:
            break
        data += packet

    decoded = json.loads(data.decode('utf-8'))

    coef = np.array(decoded['coef'])
    intercept = np.array(decoded['intercept'])
    data_size = int(decoded['data_size'])

    return coef, intercept, data_size

# Receber modelo global inicial da nuvem
global_coef = None
global_intercept = None

print(f"\n[Servidor Hospital {SERVER_ID}] Inicializado...")

print(f"\n[Servidor Hospital {SERVER_ID}] Aguardando modelo global inicial da nuvem...")

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    while True:
        try:
            s.connect((MASTER_HOST, MASTER_PORT_RECV))
            break
        except (ConnectionRefusedError, socket.timeout):
            time.sleep(1)

    try:
        received_data = b""
        while True:
            chunk = s.recv(MASTER_PORT_RECV)
            if not chunk:
                break
            received_data += chunk

        decoded = json.loads(received_data.decode('utf-8'))
        global_coef = np.array(decoded['coef'])
        global_intercept = np.array(decoded['intercept'])
        global_params = decoded.get("params", {})
        NUM_ROUNDS = np.array(decoded['num_rounds'])   

        # Cria o modelo com os mesmos hiperparâmetros do master
        global_model = SGDClassifier(**global_params)

        # Atribui os pesos recebidos
        global_model.coef_ = global_coef
        global_model.intercept_ = global_intercept
        
        print(f"[Servidor Hospital {SERVER_ID}] Modelo global inicial recebido com sucesso!")
    except Exception as e:
        print(f"[Servidor Hospital {SERVER_ID}] Erro ao receber modelo inicial: {e}")
    finally:
        s.close()

# Enviar modelo global inicial aos clientes
print(f"\n[Servidor Hospital {SERVER_ID}] Conectando aos clientes...")    
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind((CLIENT_HOST, CLIENT_PORT_SEND))
    s.listen(NUM_CLIENTS)

    for i in range(NUM_CLIENTS):
        conn, addr = s.accept()
        with conn:
            print(f"[Servidor Hospital {SERVER_ID}] Conectado ao cliente {i+1}!")
            print(f"[Servidor Hospital {SERVER_ID}] Enviando modelo global inicial para o cliente {i+1}...")
            model_data = {
                "coef": global_coef.tolist(),
                "intercept": global_intercept.tolist(),
                "num_rounds" : int(NUM_ROUNDS),
                "params": {
                    "loss": global_model.loss,
                    "penalty": global_model.penalty,
                    "l1_ratio": global_model.l1_ratio, 
                    "max_iter": global_model.max_iter,
                    "learning_rate": global_model.learning_rate,
                    "eta0": global_model.eta0,
                    "alpha": global_model.alpha,
                    "average": global_model.average,
                    "random_state": global_model.random_state
                }
            }
            data = json.dumps(model_data).encode('utf-8') 
            conn.sendall(data)
            print(f"[Servidor Hospital {SERVER_ID}] Modelo global inicial enviado para o cliente {i+1} com sucesso!")

# Ciclo principal de treinamento federado
for round_num in range(1, NUM_ROUNDS + 1):
    print(f"\n[Servidor Hospital {SERVER_ID}] Iniciando rodada {round_num} de aprendizado federado")

    client_weights = []
    client_sizes = []

    # Receber modelo local dos clientes
    print(f"[Servidor Hospital {SERVER_ID}] Aguardando conexão dos clientes...")
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind((CLIENT_HOST, CLIENT_PORT_RECV))
        s.listen(NUM_CLIENTS)
        for i in range(NUM_CLIENTS):
            conn, addr = s.accept()
            with conn:
                print(f"[Servidor Hospital {SERVER_ID}] Cliente {i+1} conectado: {addr}!")
                client_coef, client_intercept, data_size = receive_client_parameters(conn)

                if not (isinstance(client_coef, np.ndarray) and isinstance(client_intercept, np.ndarray) and isinstance(data_size, int)):
                    raise ValueError("Dados recebidos do cliente estão em formato incorreto.")

                client_weights.append((client_coef, client_intercept))
                client_sizes.append(data_size)

                print(f"[Servidor Hospital {SERVER_ID}] Parâmetros recebidos do cliente {i+1} com sucesso!")

    # Inicializa os acumuladores se for a primeira rodada
    if global_coef is None:
        global_coef = np.zeros_like(client_weights[0][0])
        global_intercept = np.zeros_like(client_weights[0][1])
    else:
        global_coef.fill(0)
        global_intercept.fill(0)

    total_data_size = sum(client_sizes)
    if total_data_size == 0:
        raise ValueError("Tamanho total dos dados dos clientes é zero.")

    # Realização do FedAvg(Agregação por média)
    # Coef e Intercept são os parâmetros utilizados pela regressão logística
    # - Coef(Coeficiente): Representa os pesos atribuídos a cada atributo (feature) do modelo. É um vetor ou matriz (dependendo da tarefa) que determina quanto cada variável influencia a predição.
    # - Intercept(Intercepto): É o termo de bias, o valor que o modelo usa quando todas as entradas são zero. Serve para ajustar o ponto de corte (threshold) da função de decisão.
    print(f"[Servidor Hospital {SERVER_ID}] Realizando FedAvg do modelo intermediário...")
    for i in range(NUM_CLIENTS):
        weight = client_sizes[i] / total_data_size         # Calcular o peso (importância) do cliente na agregação
        global_coef += client_weights[i][0] * weight       # Coeficientes ponderados
        global_intercept += client_weights[i][1] * weight  # Intercepto ponderado

    # Atualizar modelo global
    global_model.coef_ = global_coef
    global_model.intercept_ = global_intercept
    print(f"[Servidor Hospital {SERVER_ID}] Modelo intermediário atualizado na rodada {round_num}!")

    # Enviar modelo intermediário para a nuvem
    hospital_model = {
        "coef": global_coef.tolist(),
        "intercept": global_intercept.tolist(),
        "data_size": total_data_size
    }

    print(f"[Servidor Hospital {SERVER_ID}] Enviando modelo intermediário para a nuvem...")
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        while True:
            try:
                s.connect((MASTER_HOST, MASTER_PORT_SEND))
                break
            except (ConnectionRefusedError, socket.timeout):
                time.sleep(1)
        s.sendall(json.dumps(hospital_model).encode('utf-8'))
    print(f"[Servidor Hospital {SERVER_ID}] Modelo intermediário enviado para a nuvem com sucesso!")

    # Receber modelo global da nuvem
    print(f"[Servidor Hospital {SERVER_ID}] Aguardando modelo global da nuvem...")
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        while True:
            try:
                s.connect((MASTER_HOST, MASTER_PORT_RECV))
                break
            except (ConnectionRefusedError, socket.timeout):
                time.sleep(1)

        received_data = b""
        while True:
            chunk = s.recv(MASTER_PORT_RECV)
            if not chunk:
                break
            received_data += chunk

        decoded = json.loads(received_data.decode('utf-8'))
        global_coef = np.array(decoded['coef'])
        global_intercept = np.array(decoded['intercept'])
    print(f"[Servidor Hospital {SERVER_ID}] Modelo global recebido da nuvem com sucesso!")

    print(f"[Servidor Hospital {SERVER_ID}] Enviando modelo global aos clientes...")    
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind((CLIENT_HOST, CLIENT_PORT_SEND))
        s.listen(NUM_CLIENTS)

        for i in range(NUM_CLIENTS):
            conn, addr = s.accept()
            with conn:
                print(f"[Servidor Hospital {SERVER_ID}] Cliente {i+1} conectado: {addr}!")
                model_data = {
                    "coef": global_coef.tolist(),
                    "intercept": global_intercept.tolist()
                }
                data = json.dumps(model_data).encode('utf-8') 
                print(f"[Servidor Hospital {SERVER_ID}] Enviando modelo global para o cliente {i+1}...")
                conn.sendall(data)
                print(f"[Servidor Hospital {SERVER_ID}] Modelo global enviado para o cliente {i+1} com sucesso!")

print(f"\n[Servidor Hospital {SERVER_ID}] Todas as rodadas de treinamento federado foram concluídas!")

# Receber f1 dos clientes
f1_all_clients = {}  # dicionário para armazenar F1 por cliente
print(f"[Servidor Hospital {SERVER_ID}] Aguardando conexão dos clientes...")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind((CLIENT_HOST, CLIENT_PORT_RECV))
    s.listen(NUM_CLIENTS)
    
    for i in range(NUM_CLIENTS):
        conn, addr = s.accept()
        with conn:
            print(f"[Servidor Hospital {SERVER_ID}] Cliente {i+1} conectado: {addr}!")

            # Receber os dados em partes até o fim da conexão
            received_data = b""
            while True:
                chunk = conn.recv(CLIENT_PORT_RECV)
                if not chunk:
                    break
                received_data += chunk

            try:
                received = json.loads(received_data.decode('utf-8'))
                client_id = received["id"]
                f1_all_clients[client_id] = received["f1"]

                print(f"[Servidor Hospital {SERVER_ID}] Parâmetros recebidos do cliente {i+1} com sucesso!")

            except json.JSONDecodeError as e:
                print(f"[Servidor Hospital {SERVER_ID}] Erro ao decodificar JSON do cliente {i+1}: {e}")
                print("Conteúdo bruto recebido:", received_data)

# Enviar f1 dos clientes para o master
print(f"[Servidor Hospital {SERVER_ID}] Enviando f1 dos clientes para a nuvem...")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    while True:
        try:
            s.connect((MASTER_HOST, MASTER_PORT_SEND))
            break
        except (ConnectionRefusedError, socket.timeout):
            time.sleep(1)
    data_to_send = {
        "id": SERVER_ID,
        "f1": f1_all_clients
    }
    s.sendall(json.dumps(data_to_send).encode('utf-8'))
print(f"[Servidor Hospital {SERVER_ID}] Parâmetros enviados para a nuvem com sucesso!")

# Recebendo métricas de desempenho dos modelos locais finais dos clientes
aggregated_metrics = {
    'accuracy': [],
    'precision': [],
    'recall': [],
    'f1_score': []
}

# Receber métricas de desempenho do modelo local dos clientes
print(f"[Servidor Hospital {SERVER_ID}] Aguardando conexão dos clientes...")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind((CLIENT_HOST, CLIENT_PORT_RECV))
    s.listen(NUM_CLIENTS)
    
    for i in range(NUM_CLIENTS):
        conn, addr = s.accept()
        with conn:
            print(f"[Servidor Hospital {SERVER_ID}] Cliente {i+1} conectado: {addr}!")

            # Receber os dados em partes até o fim da conexão
            received_data = b""
            while True:
                chunk = conn.recv(CLIENT_PORT_RECV)
                if not chunk:
                    break
                received_data += chunk

            try:
                metrics = json.loads(received_data.decode('utf-8'))
                aggregated_metrics['accuracy'].append(metrics['accuracy'])
                aggregated_metrics['precision'].append(metrics['precision'])
                aggregated_metrics['recall'].append(metrics['recall'])
                aggregated_metrics['f1_score'].append(metrics['f1_score'])

                print(f"[Servidor Hospital {SERVER_ID}] Parâmetros recebidos do cliente {i+1} com sucesso!")

            except json.JSONDecodeError as e:
                print(f"[Servidor Hospital {SERVER_ID}] Erro ao decodificar JSON do cliente {i+1}: {e}")
                print("Conteúdo bruto recebido:", received_data)

# Média intermediária
avg_metrics = {k: np.mean(v) for k, v in aggregated_metrics.items()}

# Enviar métricas de desempenho ao servidor da nuvem
print(f"[Servidor Hospital {SERVER_ID}] Enviando parâmetros de desempenho do modelo intermediário para a nuvem...")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    while True:
        try:
            s.connect((MASTER_HOST, MASTER_PORT_SEND))
            break
        except (ConnectionRefusedError, socket.timeout):
            time.sleep(1)
    s.sendall(json.dumps(avg_metrics).encode('utf-8'))
print(f"[Servidor Hospital {SERVER_ID}] Parâmetros enviados para a nuvem com sucesso!")


[Servidor Hospital 1] Inicializado...

[Servidor Hospital 1] Aguardando modelo global inicial da nuvem...
[Servidor Hospital 1] Modelo global inicial recebido com sucesso!

[Servidor Hospital 1] Conectando aos clientes...
[Servidor Hospital 1] Conectado ao cliente 1!
[Servidor Hospital 1] Enviando modelo global inicial para o cliente 1...
[Servidor Hospital 1] Modelo global inicial enviado para o cliente 1 com sucesso!
[Servidor Hospital 1] Conectado ao cliente 2!
[Servidor Hospital 1] Enviando modelo global inicial para o cliente 2...
[Servidor Hospital 1] Modelo global inicial enviado para o cliente 2 com sucesso!
[Servidor Hospital 1] Conectado ao cliente 3!
[Servidor Hospital 1] Enviando modelo global inicial para o cliente 3...
[Servidor Hospital 1] Modelo global inicial enviado para o cliente 3 com sucesso!

[Servidor Hospital 1] Iniciando rodada 1 de aprendizado federado
[Servidor Hospital 1] Aguardando conexão dos clientes...
[Servidor Hospital 1] Cliente 1 conectado: ('127.0.