In [18]:
import os
import re
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
import ipywidgets as widgets
from IPython.display import display
from scipy.interpolate import griddata



In [19]:
anonymity_dict = {}
qids = 12
anonymity_results_path = f'../data/adults/results/anonymization/{qids}_qids/'

# Tomamos los archivos de la forma k_{k}-l_{l}-t_{t}-results.csv
pattern = re.compile(r'k_\d+-l_\d+-t_(\d+\.\d+|\d+)-results\.csv')
for f in [f for f in os.listdir(anonymity_results_path) if pattern.fullmatch(f)]:
    # Extraemos los valores de k, l y t (teniendo en cuenta que t es un float)
    k, l, t = map(float, re.findall(r'\d+\.\d+|\d+', f))
    # Creamos una clave para el diccionario
    key = (int(k), int(l), float(t))
    # Leemos el archivo y lo guardamos en el diccionario como dataframe (la primera fila es el nombre de las columnas y no hay índice)
    anonymity_dict[key] = pd.read_csv(os.path.join(anonymity_results_path, f))    
    

In [20]:
k_all_values = [1, 2, 4, 8, 16, 32, 64]
l_all_values = [1, 2]
t_all_values = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]
for k in k_all_values:
        for l in l_all_values:
            for t in t_all_values:
                key = (k, l, t)
                if key not in anonymity_dict:
                    raise Exception(f'No se ha encontrado el archivo para k={k}, l={l}, t={t}')

In [21]:
hier_path = '../data/adults/hierarchies/'
def load_hierarchies(hier_path) -> dict:
    """
    Carga las jerarquías desde archivos CSV en el directorio especificado.
    Cada archivo debe tener un nombre que comience con el atributo seguido de '_jerarquia.csv'.
    Devuelve un diccionario donde la clave es el atributo y el valor es un DataFrame de pandas.
    """
    hierarchies = {}
    for f in os.listdir(hier_path):
        if f.endswith('.csv'):
            attr = f.split('_')[0]  # Asumimos que el nombre del archivo es 'atributo_jerarquia.csv'
            hier_df = pd.read_csv(os.path.join(hier_path, f), index_col=0)
            # Convertimos el DataFrame a un diccionario donde el índice del DataFrame es la clave y la fila es el valor
            hierarchies[attr] = hier_df.to_dict(orient='dict')
    return hierarchies

def get_hierarchy_dict(levels: str) -> dict:
    """
    Obtiene un diccionario con la jerarquía de niveles proveniente de una cadena de la forma:
    \n**Ejemplo de entrada **: *"{'level1': '1/3', 'level2': '2/3', 'level3': '0/3'}"*.
    \nDevuelve un diccionario con la fracción del valor de cada nivel, excluyendo aquellos con valor 0.
    \n**Ejemplo de salida**: *{'level1': 1/3 , 'level2': 2/3}*.
    Args:
        levels (str): Cadena que representa los niveles y sus valores en formato 'nivel: valor/total'.
    Returns:
        dict: Diccionario con los niveles y sus valores, excluyendo aquellos con valor 0.
    """
    levels = re.findall(r"'[\w-]+': '\d/\d'", levels)  
    hier_dict =  {k.strip("'"): (l.strip("'").split("/")[0],l.strip("'").split("/")[1]) for k, l in (x.split(': ') for x in levels)}
    return {k: v for k, v in hier_dict.items()}

In [22]:
# Vamos a calcular la pérdida de información para cada uno de los dataframes
hierarchies = load_hierarchies(hier_path)

# Para una cadena de generalización de QIDs "{'age': '1/3', 'education': '2/3', 'occupation': '1/3'}"
# 1/num_total_qids * sum(generalization_level) -> 1/14 * (1/3 + 2/3 + 1/3) = 1/14 * 4/3 = 4/42 = 0.095238


def calculate_information_loss(generalization_str: str) -> float:
    generalization_dict = get_hierarchy_dict(generalization_str)
    v = []
    for k, (l, r) in generalization_dict.items():
        if l == '0':
            v.append(0)
            continue
        elif l == r:
            v.append(1)
            continue
        # Obtenemos un ejemplo entre los valores del nivel l de la jerarquía
        elem = list(hierarchies[k][l].values())[0]
        pattern = re.compile(r'\d+-\d+')
        # Si es nivel de generalización numérico
        if pattern.fullmatch(elem):
            # Extraemos el valor numérico del nivel de generalización
            num_values = [int(x) for x in elem.split('-')]
            # Calculamos la fracción de generalización
            generalization_fraction = (num_values[1]-num_values[0] + 1) / len(hierarchies[k]['1'])
        # Si es nivel de generalización categórico
        else:
            generalization_fraction = 1 - len(set(hierarchies[k][l].values())) / len(set(hierarchies[k]['1'].keys()))
        v.append(generalization_fraction)
    # Calculamos la pérdida de información
    num_total_qids = len(hierarchies)
    return (1 / num_total_qids) * sum(v)

In [23]:
calculate_information_loss("{'age': '4/5', 'sex': '1/1', 'education': '2/3'}")

0.16249999999999998

In [7]:
for key, df in anonymity_dict.items():
    # Añadimos una nueva columna 'information_loss' al dataframe con la pérdida de información
    df['information_loss'] = df['Generalization Levels'].apply(calculate_information_loss)
    anonymity_dict[key] = df

### Resultados de aplicar únicamente k-anonimización

La siguiente gráfica muestra la distribución de la métrica `risk_distance` para diferentes valores de k, manteniendo l=1 y t=0, es decir, aplicando solo k-anonimización sobre los datos.

In [8]:
# Filtrar los resultados para l=1 y t=0
filtered = {k_val: df 
            for (k_val, l_val, t_val), df in anonymity_dict.items() 
            if (k_val in k_all_values and
                l_val == 1 and t_val == 1.0)}

# Preparar los datos para el diagrama de cajas
data = []
for k_val, df in filtered.items():
    # Suponiendo que 'risk_distance' es una columna o índice en el DataFrame
    risk_values = df['risk_distance'].values.astype(float)
    for v in risk_values:
        data.append({'k': k_val, 'risk_distance': v})

box_df = pd.DataFrame(data)

# Calcular la mediana de risk_distance para cada valor de k
medians = box_df.groupby('k')['risk_distance'].median().reset_index()

# Graficar la línea de medianas
fig_line = px.line(medians, x='k', y='risk_distance', markers=True)
fig_line.update_xaxes(type='log', tickvals=box_df['k'].unique(), ticktext=[f"{int(val)}" for val in box_df['k'].unique()], title_text='K')
fig_line.update_yaxes(title_text='Distancia de riesgo')
fig_line.update_traces(line=dict(color='black', width=3), marker=dict(size=10, symbol='circle'))
fig_line.update_layout(
    xaxis=dict(
        title_font=dict(size=22),
        tickfont=dict(size=16),
        type='log',
        tickvals=sorted(box_df['k'].unique()),
        ticktext=[str(int(val)) for val in sorted(box_df['k'].unique())]
    ),
    yaxis=dict(
        title_font=dict(size=22),
        tickfont=dict(size=16),
        type='log',
        tickvals=[10**(-i) for i in range(0, 7)],
        ticktext=[str(10**(-i)) for i in range(0, 7)]
    ),
    margin=dict(l=0, r=0, t=0, b=0),
    height=400,
    width=800
)
# Exportamos la figura a .png
fig_line.write_image("../Memoria/images/graphs/risk_distance-vs-k_anonymity.png")
fig_line.show()


In [9]:
# Vamos a graficar para cada k, la pérdida de información

# Preparar los datos para el diagrama de cajas
data = []
for k_val, df in filtered.items():
    # Suponiendo que 'information_loss' es una columna o índice en el DataFrame
    info_loss_values = df['information_loss'].values.astype(float)
    for v in info_loss_values:
        data.append({'k': k_val, 'information_loss': v})

box_df = pd.DataFrame(data)

# Calcular la mediana de information_loss para cada valor de k
medians = box_df.groupby('k')['information_loss'].median().reset_index()

# Graficar la línea de medianas
fig_line = px.line(medians, x='k', y='information_loss', markers=True)
fig_line.update_xaxes(type='log', tickvals=box_df['k'].unique(), ticktext=[f"{int(val)}" for val in box_df['k'].unique()], title_text='K')
fig_line.update_yaxes(title_text='Pérdida de información')
fig_line.update_traces(line=dict(color='green', width=3), marker=dict(size=10, symbol='circle'))
fig_line.update_layout(
    xaxis=dict(
        title_font=dict(size=22),
        tickfont=dict(size=16)
    ),
    yaxis=dict(
        title_font=dict(size=22),
        tickfont=dict(size=16),
        tickvals=[1, 0.8, 0.6, 0.4, 0.2, 0],
        ticktext=[str(val) for val in [1, 0.8, 0.6, 0.4, 0.2, 0]]
        ),
    margin=dict(l=0, r=0, t=0, b=0),
    height=400,
    width=800
    )
fig_line.write_image("../Memoria/images/graphs/info_loss-vs-k_anonymity.png")
fig_line.show()

### Resultados de aplicar únicamente t-closeness

La siguiente gráfica muestra la distribución de la métrica `risk_distance` para diferentes valores de t, manteniendo k=1 y l=1, es decir, aplicando solo t-closeness sobre los datos.

In [10]:
fig = go.Figure()

for k_val in k_all_values[:6]:
    # Filtrar los dataframes para este k y l=1
    t_risk = []
    for t_val in t_all_values:
        df = anonymity_dict.get((k_val, 1, t_val))
        if df is not None:
            # Calcular la mediana de risk_distance para este k y t
            median_risk = df['risk_distance'].median()
            t_risk.append({'t': t_val, 'risk_distance': median_risk})
    if t_risk:
        df_k = pd.DataFrame(t_risk)
        fig.add_trace(go.Scatter(
            x=df_k['t'],
            y=df_k['risk_distance'],
            mode='lines+markers',
            name=f'k={k_val}',
            line=dict(width=3),
            marker=dict(size=10, symbol='circle' if k_val == 1 else 'diamond')
        ))

fig.update_xaxes(
    title_text='T',
    tickvals=t_all_values,
    ticktext=[f"{val:.2f}" for val in t_all_values],
    type='linear'
)
fig.update_yaxes(title_text='Distancia de riesgo')
fig.update_layout(
    legend=dict(
        orientation='h',
        yanchor='bottom',
        y=-0.25,
        xanchor='center',
        x=0.5,
        font=dict(size=16),
        title=dict(font=dict(size=18))

    ),
    xaxis=dict(
        title_font=dict(size=22),
        tickfont=dict(size=16)
    ),
    yaxis=dict(
        title_font=dict(size=22),
        tickfont=dict(size=16)
    ),
    margin=dict(l=0, r=0, t=0, b=0),
    height=400,
    width=800
    )

# Cambiar los colores de las líneas a una escala de grises
gray_colors = ['#111111', '#444444', '#666666', '#888888', '#AAAAAA', '#CCCCCC']

for i, trace in enumerate(fig.data):
    trace.line.color = gray_colors[i % len(gray_colors)]

fig.write_image("../Memoria/images/graphs/risk_distance-vs-t_closeness.png")
fig.show()

In [11]:
# Veamos las pérdida de información para cada combinación en función de t
# Preparar los datos para el diagrama de cajas

# Filtrar los resultados para k=1 y l=1 (solo t-closeness)
filtered_t = {t_val: df 
              for (k_val, l_val, t_val), df in anonymity_dict.items() 
              if (t_val in t_all_values and
                k_val == 1 and l_val == 1
                )}
data_t = []
for t_val, df in filtered_t.items():
    info_loss_values = df['information_loss'].values.astype(float)
    for v in info_loss_values:
        data_t.append({'t': t_val, 'information_loss': v})

box_df_t = pd.DataFrame(data_t)

# Calcular la mediana de information_loss para cada valor de t
medians_t = box_df_t.groupby('t')['information_loss'].median().reset_index()
fig_t = go.Figure()
fig_t.add_scatter(
    x=medians_t['t'],
    y=medians_t['information_loss'],
    mode='lines+markers',
    name='Mediana',
    marker=dict(color='green', size=10, symbol='circle'),
    line=dict(color='green', width=3)
)
fig_t.update_xaxes(title_text='T', 
                   tickvals=box_df_t['t'].unique(),
                   ticktext=[f"{val:.2f}" for val in box_df_t['t'].unique()],
                   type='linear')
fig_t.update_yaxes(title_text='Pérdida de información')
fig_t.update_layout(
    xaxis=dict(
        title_font=dict(size=22),
        tickfont=dict(size=16)
    ),
    yaxis=dict(
        title_font=dict(size=22),
        tickfont=dict(size=16)
    ),
    margin=dict(l=0, r=0, t=0, b=0),
    height=400,
    width=800
)
# fig_t.write_image("../Memoria/images/graphs/info_loss-vs-t_closeness.png")
fig_t.show()

### Resultados de aplicar únicamente k-anonimity y l-diversity
La gráfica muestra claramente cómo la métrica `risk_distance` disminuye a medida que aumenta el valor de k, tanto para l=1 (azul) como para l=2 (naranja). Sin embargo, la reducción es más pronunciada para l=2, lo que indica que aplicar l-diversity adicionalmente a k-anonimity logra una mayor protección (menor riesgo) en los datos. Esto resalta la importancia de combinar ambos enfoques para mejorar la privacidad.

In [16]:
# Filtrar los resultados para t=0 (solo k y l)
data_kl = []
for (k_val, l_val, t_val), df in anonymity_dict.items():
    if t_val == 1.0:
        for v in df['risk_distance'].values.astype(float):
            data_kl.append({'k': k_val, 'l': l_val, 'risk_distance': v})

df_kl = pd.DataFrame(data_kl)

fig_kl = go.Figure()

colors = {1: 'black', 2: 'grey'}

# Calcular la mediana de risk_distance para cada combinación de k y l
medians_kl = df_kl.groupby(['k', 'l'])['risk_distance'].median().reset_index()

for l_val in sorted(medians_kl['l'].unique()):
    fig_kl.add_trace(go.Scatter(
        x=medians_kl[medians_kl['l'] == l_val]['k'],
        y=medians_kl[medians_kl['l'] == l_val]['risk_distance'],
        mode='lines+markers',
        name=f"l={l_val}",
        marker=dict(color=colors[l_val], size=10),
        line=dict(color=colors[l_val], width=3),
        showlegend=True,
        legendgrouptitle_text=f"L-Diversity"
    ))
fig_kl.update_xaxes(type='log', tickvals=sorted(df_kl['k'].unique()), title_text='K')
fig_kl.update_yaxes(
    title_text='Distancia de riesgo',
)
fig_kl.update_layout(
    legend=dict(
        orientation='h',
        yanchor='bottom',
        y=-0.25,
        xanchor='center',
        x=0.5,
        font=dict(size=16),
        title=dict(font=dict(size=18))
    ),
    xaxis=dict(
        title_font=dict(size=22),
        tickfont=dict(size=16),
    ),
    yaxis=dict(
        title_font=dict(size=22),
        tickfont=dict(size=16),
        type='log',
        tickvals=[10**(-i) for i in range(0, 2)],
    ),
    margin=dict(l=0, r=0, t=0, b=0),
    height=400,
    width=800
)
fig_kl.write_image("../Memoria/images/graphs/risk_distance-vs-k_anonymity-l_diversity.png")
fig_kl.show()

In [48]:
# Filtrar los resultados para t=0 (solo k y l)
data_kl = []
for (k_val, l_val, t_val), df in anonymity_dict.items():
    if t_val == 1.0:
        for v in df['information_loss'].values.astype(float):
            data_kl.append({'k': k_val, 'l': l_val, 'information_loss': v})

df_kl = pd.DataFrame(data_kl)

fig_kl = go.Figure()

colors = {1: 'green', 2: 'mediumturquoise'}

# Calcular la mediana de information_loss para cada combinación de k y l
medians_kl = df_kl.groupby(['k', 'l'])['information_loss'].median().reset_index()

for l_val in sorted(medians_kl['l'].unique()):
    fig_kl.add_trace(go.Scatter(
        x=medians_kl[medians_kl['l'] == l_val]['k'],
        y=medians_kl[medians_kl['l'] == l_val]['information_loss'],
        mode='lines+markers',
        name=f"l={l_val}",
        marker=dict(color=colors[l_val], size=10),
        line=dict(color=colors[l_val], width=3),
        showlegend=True,
        legendgrouptitle_text=f"L-Diversity"
    ))
fig_kl.update_xaxes(type='log', tickvals=sorted(df_kl['k'].unique()), title_text='K')
fig_kl.update_yaxes(
    title_text='Pérdida de información'
)
fig_kl.update_layout(
    legend=dict(
        orientation='h',
        yanchor='bottom',
        y=-0.25,
        xanchor='center',
        x=0.5,
        font=dict(size=16),
        title=dict(font=dict(size=18))
    ),
    xaxis=dict(
        title_font=dict(size=22),
        tickfont=dict(size=16),
    ),
    yaxis=dict(
        title_font=dict(size=22),
        tickfont=dict(size=16),
        tickvals=[1, 0.8, 0.6, 0.4, 0.2, 0],
        ticktext=[str(val) for val in [1, 0.8, 0.6, 0.4, 0.2, 0]]
    ),
    margin=dict(l=0, r=0, t=0, b=0),
    height=400,
    width=800
)
fig_kl.write_image("../Memoria/images/graphs/info_loss-vs-k_anonymity-l_diversity.png")
fig_kl.show()

In [None]:
# Generador de figuras
def plot_surface(selected_l: int) -> go.Figure:
    data = []
    for (k_val, l_val, t_val), df in anonymity_dict.items():
        if l_val == selected_l and t_val >= 0.1:
            data.append({'k': k_val, 't': t_val, 'risk_distance': df['risk_distance'].median()})

    df = pd.DataFrame(data)
    df_grouped = df.groupby(['k', 't'], as_index=False).median()

    z = df_grouped.pivot(index='k', columns='t', values='risk_distance').values
    x = df_grouped['k'].unique()
    y = df_grouped['t'].unique()

    # Interpolación para suavizar la superficie

    # Crear una malla regular para k y t
    k_vals = df_grouped['k'].unique()
    t_vals = df_grouped['t'].unique()
    k_grid, t_grid = np.meshgrid(
        np.logspace(np.log10(k_vals.min()), np.log10(k_vals.max()), 50),
        np.linspace(t_vals.min(), t_vals.max(), 50)
    )

    # Interpolar los valores de la métrica
    points = df_grouped[['k', 't']].values
    values = df_grouped['risk_distance'].values
    z_grid = griddata(points, values, (k_grid, t_grid), method='cubic')

    fig = go.Figure(data=go.Surface(
        z=z_grid,
        x=k_grid,
        y=t_grid,
        colorscale='Viridis',
        showscale=False  # Oculta la leyenda de la escala de colores
    ))
    fig.update_scenes(
        xaxis=dict(
            type='log',
            tickvals=k_vals,
            ticktext=[str(int(val)) for val in k_vals],
        )
    )
    fig.update_layout(
        # title=f'Superficie de risk_distance en función de k y t (l={selected_l})',
        scene=dict(
            xaxis_title='K',
            yaxis_title='T',
            zaxis_title='Distancia de riesgo',
        ),
        margin=dict(l=0, r=0, t=0, b=0),
        height=600,
        width=600,
    )
    return fig

l_values = [1, 2]

precalculated_figs = {
    l: plot_surface(l)
     for l in l_values
}

# Exportar las figuras precalculadas a archivos PNG
# for l, fig in precalculated_figs.items():
#     fig.write_image(f"../Memoria/images/graphs/risk_distance-l_{l}-surface.png")

# Widgets para seleccionar el valor de l

l_selector = widgets.SelectionSlider(
    options=l_values,
    value=1,
    description='l:',
    continuous_update=False
)

output = widgets.Output()

def update_plot(change=None):
    with output:
        output.clear_output(wait=True)
        fig = precalculated_figs[l_selector.value]
        fig.show()

l_selector.observe(update_plot, names='value')

display(l_selector, output)
update_plot()  


SelectionSlider(continuous_update=False, description='l:', options=(1, 2), value=1)

Output()