In [1]:
import kagglehub
import pandas as pd
import numpy as np
 # Download latest version
path = kagglehub.dataset_download("wenruliu/adult-income-dataset")

print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/wenruliu/adult-income-dataset?dataset_version_number=2...


100%|██████████| 652k/652k [00:00<00:00, 1.05MB/s]

Extracting files...
Path to dataset files: /root/.cache/kagglehub/datasets/wenruliu/adult-income-dataset/versions/2





In [2]:
df = pd.read_csv(fr'{path}\adult.csv')

def prepocessing(df):

    df.drop(columns=['fnlwgt', 'education', 'capital-gain', 'capital-loss', 'hours-per-week'], inplace = True)

    fill_rows = ~(df == '?').any(axis = 1)
    df = df[fill_rows]

    for col in df.columns:
        if df[col].dtype == 'object' or pd.api.types.is_categorical_dtype(df[col]):
          df[col], _ = pd.factorize(df[col])

    return df

def split_train_test(df, test_size=0.3):
    # 1. Embaralha o DataFrame inteiro aleatoriamente
    # Usa sample(frac=1) para selecionar 100% dos dados de forma aleatória
    # random_state é usado para reprodutibilidade (sempre a mesma ordem se o número for o mesmo)
    shuffled_df = df.sample(frac=1, random_state=42).reset_index(drop=True)

    # 2. Calcula o ponto de corte
    len_df = len(shuffled_df)
    cut_point = len_df - round(len_df * test_size)

    # 3. Divide o DataFrame embaralhado
    train = shuffled_df[:cut_point]
    test = shuffled_df[cut_point:]

    print(f'Tamanho total: {len(df)}')
    print(f'Tamanho do treino: {len(train)}')
    print(f'Tamanho do teste: {len(test)}')

    return train, test

df = prepocessing(df)
X_train, X_test = split_train_test(df)

FileNotFoundError: [Errno 2] No such file or directory: '/root/.cache/kagglehub/datasets/wenruliu/adult-income-dataset/versions/2\\adult.csv'

In [None]:
import numpy as np
import pandas as pd

class KNN_DP():
    def __init__(self, k):
        self.k = k
        self.X_train = None
        self.y_train = None

    def fit(self, X, y):
        # Armazena os dados de treino
        self.X_train = np.array(X)
        self.y_train = np.array(y)

    def euclidian_dist(self, row1, row2):
        # Cálculo simples e robusto da distância euclidiana
        # Raiz da soma das diferenças ao quadrado
        diff = row1 - row2
        return np.sqrt(np.sum(diff**2))

    def get_vizinhos(self, test_row):
        distancias = []
        # Itera sobre todo o treino (pode ser lento, mas obedece "sem bibliotecas de ML")
        for i in range(len(self.X_train)):
            dist = self.euclidian_dist(test_row, self.X_train[i])
            distancias.append((self.y_train[i], dist))

        # Ordena pela distância (menor para maior)
        distancias.sort(key=lambda x: x[1])

        # Pega os k primeiros
        vizinhos = distancias[:self.k]
        return vizinhos

    def predict_instance(self, test_row):
        # KNN Tradicional
        vizinhos = self.get_vizinhos(test_row)
        classes_vizinhas = [v[0] for v in vizinhos]

        # Votação majoritária simples
        valores, contagens = np.unique(classes_vizinhas, return_counts=True)
        indice_vencedor = np.argmax(contagens)
        return valores[indice_vencedor]

    def predict_instance_private(self, test_row, epsilon):
        # KNN com Privacidade Diferencial (Mecanismo de Laplace na Votação)
        vizinhos = self.get_vizinhos(test_row)
        classes_vizinhas = [v[0] for v in vizinhos]

        # Obtém as classes possíveis e suas contagens reais
        valores_unicos = np.unique(self.y_train) # Todas as classes possíveis do problema

        # Dicionário de contagem inicializado com 0
        contagens = {val: 0 for val in valores_unicos}

        # Contagem real dos vizinhos
        for cls in classes_vizinhas:
            contagens[cls] += 1

        # Aplicação do Ruído de Laplace
        # Sensibilidade (Delta f) para votação é 1 (mudar 1 vizinho muda o voto em 1)
        # Scale (b) = Delta f / epsilon = 1 / epsilon
        scale = 1.0 / epsilon

        melhor_classe = None
        maior_voto_ruidoso = -float('inf')

        for cls in contagens:
            # Gera ruído de Laplace
            ruido = np.random.laplace(loc=0.0, scale=scale)
            voto_ruidoso = contagens[cls] + ruido

            if voto_ruidoso > maior_voto_ruidoso:
                maior_voto_ruidoso = voto_ruidoso
                melhor_classe = cls

        return melhor_classe

    def predict(self, X_test):
        # Predição padrão
        X_test = np.array(X_test)
        predicoes = []
        for row in X_test:
            pred = self.predict_instance(row)
            predicoes.append(pred)
        return np.array(predicoes)

    def predict_private(self, X_test, epsilon):
        # Predição com DP
        X_test = np.array(X_test)
        predicoes = []
        for row in X_test:
            pred = self.predict_instance_private(row, epsilon)
            predicoes.append(pred)
        return np.array(predicoes)

# ---------------------------------------------------------
# EXECUTANDO O ALGORITMO (Simulação do fluxo pedido)
# ---------------------------------------------------------

# Supondo que você já tenha X_train, X_test, y_train, y_test separados
# Ajuste as variáveis abaixo com seus dados reais
# K = 10 conforme a instrução

y_train= X_train['age'].values
x_treino = X_train.drop(columns = ['age']).values
y_test= X_train['age'].values
x_teste = X_train.drop(columns = ['age']).values
k = 10
model = KNN_DP(k=k)

# Treinamento
print("Treinando modelo...")
model.fit(X_train, y_train)

# 1. Execução KNN Tradicional
print("Executando KNN Tradicional...")
y_pred_padrao = model.predict(X_test)

# Salvando resultado tradicional
df_resultado = pd.DataFrame({
    'y_true': y_test,
    'y_pred': y_pred_padrao
})
df_resultado.to_csv('resultado_knn_tradicional.csv', index=False)
print("Resultado Tradicional salvo.")

# 2. Execução KNN Privado para diferentes epsilons
epsilons = [0.5, 1, 5, 10]

for eps in epsilons:
    print(f"Executando KNN Privado (epsilon={eps})...")
    y_pred_privado = model.predict_private(X_test, epsilon=eps)

    # Salvando resultado privado
    nome_arquivo = f'resultado_knn_laplace_eps_{eps}.csv'
    df_resultado_priv = pd.DataFrame({
        'y_true': y_test,
        'y_pred': y_pred_privado
    })
    df_resultado_priv.to_csv(nome_arquivo, index=False)
    print(f"Arquivo {nome_arquivo} salvo.")

print("Processo finalizado.")

In [None]:
def acuracia(pred, y_target):
    return  sum(pred == y_target)/len(y_target)


In [4]:


class MecanismoLaplace:

    def __init__(self, epsilon):
        # Guarda o orçamento total de privacidade definido pelo usuário.
        self.epsilon = epsilon
    "Função que pega um valor real (contagem de vizinhos) e adiciona um valor aleatório (ruído)."
    def adicionar_ruido(self, valor_real, sensibilidade, num_classes):

        # Divisão do orçamento
        epsilon_fracionado = self.epsilon / num_classes
        #Calculo da escala
        escala = sensibilidade / epsilon_fracionado

        #Gera um ruido aleatorio, com média do ruído zero e definindo a dispersão do ruído como a escala
        ruido = np.random.laplace(loc=0.0, scale=escala)

        # Retorna a soma do valor original com o ruído gerado.
        return valor_real + ruido

#Implementa o algoritmo de vizinhos no raio
class ClassificadorRaioPrivado:

    def __init__(self, raio=6.0):
        # Define a distância máxima para considerar um ponto como vizinho.
        self.raio = raio

        # Variáveis para guardar os dados de treino.
        self.X_treino = None
        self.y_treino = None
        self.classes_unicas = None

    def treinar(self, X, y):
        # Converte a lista de características de x e y para um array NumPy.
        self.X_treino = np.array(X)
        self.y_treino = np.array(y)

        #Pega apenas a lista de classes distintas
        self.classes_unicas = np.unique(self.y_treino)

    def distancia_euclidiana(self, linha1, linha2):

        #Calcular a distancia entre duas coordenadas
        diferenca = linha1 - linha2

        #Calcula da distancia usando Pitágoras
        distancia = np.sqrt(np.sum(diferenca**2))

        # Retorna a distância.
        return distancia

    #Busca os vizinhos proximos
    def obter_vizinhos_no_raio(self, linha_teste):

        # Lista para guardar as classes dos vizinhos encontrados.
        vizinhos_encontrados = []

        # Loop que passa por cada linha da base de treinamento.
        for i in range(len(self.X_treino)):

            # Calcula a distância entre o ponto de teste atual e o ponto de treino i.
            dist = self.distancia_euclidiana(linha_teste, self.X_treino[i])

            # Verifica se a distância é menor ou igual ao raio 6.
            if dist <= self.raio:
                # Se estiver dentro do raio, adicionamos a classe desse vizinho na lista.
                vizinhos_encontrados.append(self.y_treino[i])

        # Retorna a lista com as classes de todos os vizinhos próximos.
        return vizinhos_encontrados

    #Faz a classificação
    def prever_instancia_privada(self, linha_teste, epsilon):

        # Chama a função interna para pegar as classes de quem está perto.
        lista_classes_vizinhas = self.obter_vizinhos_no_raio(linha_teste)

        # Cria um dicionário com todas as classes possíveis iniciando com 0 votos.
        contagens_reais = {classe: 0 for classe in self.classes_unicas}

        # Conta quantos vizinhos existem para cada classe.
        for classe_vizinha in lista_classes_vizinhas:
            # Incrementa o contador daquela classe.
            contagens_reais[classe_vizinha] += 1

        # Instancia a nossa classe de ruído com o epsilon fornecido.
        mecanismo = MecanismoLaplace(epsilon)

        # Verifica quantas classes existem no total para dividir o epsilon.
        total_classes = len(self.classes_unicas)

        # Dicionário para guardar os votos com ruído.
        contagens_ruidosas = {}

        # Loop para aplicar ruído em TODAS as classes possíveis, mesmo se a classe teve voto 0
        for classe in self.classes_unicas:
            # Pega a contagem verdadeira.
            valor_verdadeiro = contagens_reais[classe]

            # Chama a função que adiciona o ruído de Laplace, definindo a sensibilidade como 1.
            valor_com_ruido = mecanismo.adicionar_ruido(
                valor_real=valor_verdadeiro,
                sensibilidade=1.0,
                num_classes=total_classes
            )

            # Guarda o valor ruidoso no dicionário.
            contagens_ruidosas[classe] = valor_com_ruido

        #Definir a melhor classe nos valores com ruído
        melhor_classe = None
        maior_valor_encontrado = -float('inf') # Inicializar com o menor valor possivel.

        # Varre o dicionário de contagens ruidosas para achar o maior valor.
        for classe, valor in contagens_ruidosas.items():
            # Se o valor atual for maior que o recorde anterior.
            if valor > maior_valor_encontrado:
                # Atualiza o recorde.
                maior_valor_encontrado = valor
                # Define essa classe como a vencedora.
                melhor_classe = classe

        # Retorna a classe que teve a maior contagem ruidosa.
        return melhor_classe
    #Recebe  os dados de teste e aplica a predição com base nos valores ruidosos
    def prever_privado(self, X_teste, epsilon):

        # Garante que X_teste seja um array numpy.
        X_teste = np.array(X_teste)

        # Lista para guardar todas as previsões.
        lista_predicoes = []

        # Loop que passa linha por linha da base de teste.
        for linha in X_teste:
            # Chama a função, que retorna a predição com base nos valores com ruído.
            predicao = self.prever_instancia_privada(linha, epsilon)

            # Adiciona o resultado na lista de predições.
            lista_predicoes.append(predicao)

        # Transforma a lista final em um array numpy e retorna.
        return np.array(lista_predicoes)



########Simulação###############
# ... (Mantenha as classes MecanismoLaplace e ClassificadorRaioPrivado aqui em cima) ...

######## Simulação com Mais Dados ###############
if __name__ == "__main__":

    # Define uma "semente" para os números aleatórios.
    # Isso garante que sempre que você rodar, os dados gerados serão os mesmos.
    np.random.seed(42)

    print("--- Gerando Dados Sintéticos ---")

    # 1. Gerando a Classe 0 (50 pontos ao redor de 2,2)
    # np.random.randn cria números numa curva normal. Somamos +2 para mover o centro.
    X_classe0 = np.random.randn(50, 2) + 2
    y_classe0 = [0] * 50 # Lista com 50 zeros

    # 2. Gerando a Classe 1 (50 pontos ao redor de 8,8)
    # Somamos +8 para mover o centro para longe da classe 0.
    X_classe1 = np.random.randn(50, 2) + 8
    y_classe1 = [1] * 50 # Lista com 50 uns

    # 3. Juntando tudo no conjunto de Treino
    # np.concatenate cola os arrays um embaixo do outro.
    X_treino = np.concatenate([X_classe0, X_classe1])
    y_treino = np.concatenate([y_classe0, y_classe1])

    print(f"Total de dados de treino: {len(X_treino)} linhas")

    # 4. Criando pontos de Teste Estratégicos
    dados_teste = [
        [2, 2],   # Ponto bem no meio da Classe 0 (Deveria ser 0)
        [8, 8],   # Ponto bem no meio da Classe 1 (Deveria ser 1)
        [5, 5],   # Ponto no "limbo" entre os dois (Dúvida cruel)
        [10, 10], # Ponto extremo da Classe 1 (Deveria ser 1)
        [0, 0]    # Ponto extremo da Classe 0 (Deveria ser 0)
    ]

    print(f"Dados de teste: {dados_teste}")
    print("-" * 30)

    # 5. Instancia e Treina o Modelo
    # Raio 6.0 é grande o suficiente para pegar bastante gente nesse cenário
    modelo = ClassificadorRaioPrivado(raio=6.0)
    modelo.treinar(X_treino, y_treino)

    # 6. Testando com diferentes Epsilons (Orçamentos de Privacidade)
    lista_epsilons = [0.1, 1.0, 10.0]

    for eps in lista_epsilons:
        print(f"\n>>> Testando com Epsilon = {eps}")

        if eps == 0.1:
            print("(Muita Privacidade / Muito Ruído -> Esperamos erros)")
        elif eps == 10.0:
            print("(Pouca Privacidade / Pouco Ruído -> Esperamos precisão)")

        # Faz a predição
        predicoes = modelo.prever_privado(dados_teste, epsilon=eps)

        # Mostra o resultado lado a lado
        for i, ponto in enumerate(dados_teste):
            print(f"Ponto {ponto} -> Classificado como: {predicoes[i]}")

--- Gerando Dados Sintéticos ---
Total de dados de treino: 100 linhas
Dados de teste: [[2, 2], [8, 8], [5, 5], [10, 10], [0, 0]]
------------------------------

>>> Testando com Epsilon = 0.1
(Muita Privacidade / Muito Ruído -> Esperamos erros)
Ponto [2, 2] -> Classificado como: 0
Ponto [8, 8] -> Classificado como: 1
Ponto [5, 5] -> Classificado como: 0
Ponto [10, 10] -> Classificado como: 1
Ponto [0, 0] -> Classificado como: 0

>>> Testando com Epsilon = 1.0
Ponto [2, 2] -> Classificado como: 0
Ponto [8, 8] -> Classificado como: 1
Ponto [5, 5] -> Classificado como: 0
Ponto [10, 10] -> Classificado como: 1
Ponto [0, 0] -> Classificado como: 0

>>> Testando com Epsilon = 10.0
(Pouca Privacidade / Pouco Ruído -> Esperamos precisão)
Ponto [2, 2] -> Classificado como: 0
Ponto [8, 8] -> Classificado como: 1
Ponto [5, 5] -> Classificado como: 0
Ponto [10, 10] -> Classificado como: 1
Ponto [0, 0] -> Classificado como: 0
