# GSS testing notebook

## How to preproceess the data

### Loading the GSS dataset

In [323]:
import pyreadstat
import pandas as pd

from typing import Any

def from_gss(file_path: str, n_sample: int | None = None, random_state: int = 42) -> tuple[pd.DataFrame, Any]:
    df, meta = pyreadstat.read_sav(file_path)
    if n_sample is not None:
        df = df.sample(n=n_sample, random_state=random_state).reset_index(drop=True)
    return df, meta

data, meta = from_gss("./datasets/2024_spss/2024/GSS2024.sav")
print(f"data.shape = {data.shape}")
data.head()

data.shape = (3309, 813)


Unnamed: 0,fileversion,year,id,abany,abanyg,abdefect,abdefectg,abhlth,abhlthg,abnomore,...,wrkstat,wrkwayup,wrldgovt,wtssnrps,wtssps,xmarsex,xmovie,xmoviey,xnorcsiz,yousup
0,7224.2,2024.0,1.0,,,,,,,,...,1.0,4.0,,2.36949,1.889989,,1.0,,4.0,
1,7224.2,2024.0,2.0,,,,,,,,...,5.0,2.0,3.0,1.39116,1.152265,,,4.0,4.0,
2,7224.2,2024.0,3.0,1.0,1.0,,1.0,,1.0,,...,5.0,,2.0,1.171199,0.915609,1.0,2.0,,4.0,
3,7224.2,2024.0,4.0,2.0,,2.0,,2.0,,2.0,...,2.0,2.0,,2.73583,2.288064,1.0,,,4.0,
4,7224.2,2024.0,5.0,2.0,2.0,,2.0,,1.0,,...,5.0,,3.0,1.272915,1.005427,1.0,2.0,,4.0,


### Integer, floating, or nominal/ordinal?

In [324]:
from typing import Literal

def infer_resp_type(col: str, df: pd.DataFrame, meta: Any) -> Literal["int", "float", "nominal/ordinal", "unknown"]:
    unique_vals = df[col].dropna().unique()
    val2txt = {k: v for k, v in meta.variable_value_labels.get(col, {}).items() if 0 <= k}
    if len(val2txt) < len(unique_vals):
        if all(float(v).is_integer() for v in unique_vals):
            return "int"
        return "float"
    return "nominal/ordinal"

### Exploring the GSS dataset

In [325]:
from ipywidgets import interact, IntRangeSlider

def inspect_column(variable: str, from_to: tuple[int, int]) -> None:
    column = data[variable]
    print(f"question: {meta.column_names_to_labels.get(variable)} ({(ctype := infer_resp_type(variable, data, meta))})")
    if ctype == "nominal/ordinal":
        val2txt = {k: v for k, v in meta.variable_value_labels.get(variable, {}).items() if 0 <= k}
        print(f"response labels:\n  {'\n  '.join(f'{k}: {v}' for k, v in val2txt.items())}")
    start, end = from_to
    rows = column.iloc[start:end]
    for idx, value in enumerate(rows.tolist()):
        label = meta.variable_value_labels.get(variable, {}).get(value)
        print(f"{start + idx}: {value}{f', i.e., {label}' if label is not None else ''}")

interact(
    inspect_column, 
    variable=data.columns, 
    from_to=IntRangeSlider(
        min=0, 
        max=len(data), 
        step=1, 
        value=[0, 10], 
        description='rows'
    )
);

interactive(children=(Dropdown(description='variable', options=('fileversion', 'year', 'id', 'abany', 'abanyg'…

## How to make predictions

### Which race is this persona?

In [326]:
from collections.abc import Mapping

def race_from_gss(row: pd.Series, df: pd.DataFrame, meta: Any) -> str | None:
    relevant = [
        'raceacs1',
        'raceacs2', 
        'raceacs3',
        'raceacs4',
        'raceacs5',
        'raceacs6',
        'raceacs7',
        'raceacs8',
        'raceacs9',
        'raceacs10',
        'raceacs14',
        'raceacs16'
    ]
    races = df[relevant].columns.tolist()

    stack: list[str] = []
    for code in races:
        label = meta.column_names_to_labels.get(code)
        flag = meta.variable_value_labels.get(code, {}).get(row[code])
        if flag == "yes":
            stack.append(label)
    
    if not stack:
        return None
    elif len(stack) == 1:
        return stack[0]
    else:
        return ", ".join(stack[:-1]) + f", and {stack[-1]}"

### Making persona descriptions

In [327]:

def persona_desc(row: pd.Series, df: pd.DataFrame, meta: Any) -> Mapping[str, str]:
    out =  {
        "age": str(int(row["age"])) if not pd.isna(row["age"]) else None,
        "sex": meta.variable_value_labels.get("sex", {}).get(row["sex"]),
        "race": race_from_gss(row, df, meta),
        "religion": meta.variable_value_labels.get("relig", {}).get(row["relig"]),
        "marital_status": meta.variable_value_labels.get("marital", {}).get(row["marital"]),
        "employment_status": meta.variable_value_labels.get("wrkstat", {}).get(row["wrkstat"]),
        "political_views": meta.variable_value_labels.get("polviews", {}).get(row["polviews"]),
        "born_in_usa": meta.variable_value_labels.get("born", {}).get(row["born"]),
        "education_level": meta.variable_value_labels.get("educ", {}).get(row["educ"])
    }
    return {k: v for k, v in out.items() if v is not None}

### Exploring personas descriptions

In [328]:
def inspect_persona(idx: int) -> None:
    row = data.iloc[idx]
    persona = persona_desc(row, data, meta)
    print(f"persona #{idx}:")
    for k, v in persona.items():
        print(f"  {k}: {v}")

interact(inspect_persona, idx=(0, len(data)-1, 1));

interactive(children=(IntSlider(value=1654, description='idx', max=3308), Output()), _dom_classes=('widget-int…

### Making predictions

In [329]:
from genagents.genagents import GenerativeAgent

def respond(col: str, idx: int, df: pd.DataFrame, meta: Any) -> float | None:
    row = df.iloc[idx]
    agent = GenerativeAgent(); agent.update_scratch(persona_desc(row, df, meta))
    question = meta.column_names_to_labels.get(col)
    resp_type = infer_resp_type(col, df, meta)
    labels = {k: v for k, v in meta.variable_value_labels.get(col, {}).items() if 0 <= k}
    if resp_type == "int" or resp_type == "float":
        min = float(df[col].dropna().min())
        max = float(df[col].dropna().max())
        resp_and_rationale = agent.numerical_resp({question: [min, max]}, float_resp=(resp_type == "float"))
        if resp_and_rationale is not None and "responses" in resp_and_rationale:
            resp = resp_and_rationale["responses"]
            if resp and len(resp) == 1:
                return float(resp[0])
    elif resp_type == "nominal/ordinal" and labels is not None:
        resp_and_rationale = agent.categorical_resp({question: labels.values()})
        if resp_and_rationale is not None and "responses" in resp_and_rationale:
            resp = resp_and_rationale["responses"]
            if resp and len(resp) == 1:
                for k, v in labels.items():
                    if v == resp[0]:
                        return k
    return None

### Exploring predictions

In [330]:
from ipywidgets import interact_manual

def inspect_response(col: str, idx: int) -> None:
    column = data[col]
    print(f"question: {meta.column_names_to_labels.get(col)} ({(ctype := infer_resp_type(col, data, meta))})")
    if ctype == "nominal/ordinal":
        val2txt = {k: v for k, v in meta.variable_value_labels.get(col, {}).items() if 0 <= k}
        print(f"response labels:\n  {'\n  '.join(f'{k}: {v}' for k, v in val2txt.items())}")
    resp = respond(col, idx, data, meta)
    print(f"response: {resp}{f', i.e., {val2txt.get(resp)}' if ctype == 'nominal/ordinal' and resp in val2txt else ''}")
    print(f"target: {column.iloc[idx]}{f', i.e., {val2txt.get(column.iloc[idx])}' if ctype == 'nominal/ordinal' and column.iloc[idx] in val2txt else ''}")

interact_manual(inspect_response, col=list(data.columns), idx=(0, len(data)-1, 1));

interactive(children=(Dropdown(description='col', options=('fileversion', 'year', 'id', 'abany', 'abanyg', 'ab…

### Running simulations

In [331]:
cols_to_predict = [
#    'natspacy',
#    'natenviy',
#    'nathealy',
#    'natcityy',
#    'natdrugy',
#    'nateducy',
#    'natracey',
#    'natarmsy',
#    'nataidy',
#    'natfarey',
#    'natroad',
#    'natsoc',
#    'natspac',
#    'natenvir',
#    'natheal',
#    'natcity',
#   'natdrug',
#    'nateduc',
#    'natrace',
#    'natarms',
#    'nataid',
#    'natfare',
#    'natchld',
#    'natsci',
#    'natenrgy',
    'prayer',
#    'courts',
    'discaffw',
    'discaffm',
    'fehire',
    'fechld',
    'fepresch',
    'fefam',
    'fepol',
#    'reg16',
    'mobile16',
    'famdif16',
    'incom16',
    'dwelown16',
    'paeduc',
    'padeg',
    'maeduc',
    'madeg',
    'mawrkgrw',
    'marital',
    'widowed',
    'divorce',
    'martype',
    'posslqy',
    'wrkstat',
    'evwork',
    'wrkgovt1',
    'wrkgovt2',
    'partfull',
    'wksub1',
    'wksup1',
    'conarmy',
    'conbus',
    'conclerg',
    'coneduc',
    'confed',
    'confinan',
    'conjudge',
    'conlabor',
    'conlegis',
    'conmedic',
    'conpress',
    'consci',
    'contv',
    'vetyears',
    'joblose',
#    'jobfind',
    'happy',
    'hapmar',
    'satjob',
    'speduc',
    'spdeg',
    'spwrksta',
    'spfund',
    'unemp',
    'union1',
    'spkathy',
    'libathy',
    'colath',
    'spkracy',
    'libracy',
    'spkcomy',
    'libcomy',
    'colcomy',
    'colrac',
    'spkmslmy',
    'libmslmy',
    'cappun',
    'polhitoky',
    'polabusey',
    'polattaky',
    'grass',
    'gunlaw',
    'owngun',
    'hunt1',
    'class',
    'satfin',
    'finalter',
    'finrela',
    'race',
#    'racdif1',
#    'racdif2',
#    'racdif3',
#    'racdif4',
#    'wlthwhts',
#    'wlthblks',
#    'wlthhsps',
    'racwork',
    'letin1a',
    'getahead',
    'parsol',
    'kidssol',
    'spanking',
    'divlaw',
    'sexeduc',
    'pillok',
    'xmarsex',
    'homosex',
    'discaff',
#    'abdefect',
#    'abnomore',
#    'abhlth',
#    'abpoor',
#    'abrape',
#    'absingle',
#    'abany',
    'letdie1',
#    'suicide1',
#    'suicide2',
#    'suicide4',
    'pornlaw',
    'fair',
    'helpful',
    'trust',
    'tax',
    'vote16',
    'pres16',
    'if16who',
    'polviews',
    'partyid',
    'news',
    'relig',
    'relig16',
    'attend',
    'pray',
    'postlife',
    'bible',
    'reborn',
    'relpersn',
    'sprtprsn',
    'born',
    'granborn',
#    'uscitzn',
    'educ',
    'degree',
    'income',
    'visitors',
    'dwelown',
    'othlang',
    'sex',
    'hispanic',
    'health',
    'compuse',
    'webmob',
    'xmovie',
    'life',
    'richwork'
]

print(f"# of columns to predict: {len(cols_to_predict)}")
print(f"# of columns also in GSS: {len({k for k in cols_to_predict if k in data})}")

# of columns to predict: 126
# of columns also in GSS: 126


In [332]:
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections.abc import Iterable

def respond_all(cols: Iterable[str], df: pd.DataFrame, meta: Any, num_workers: int = 4, skip_nan: bool = False) -> pd.DataFrame:
    pred = df.copy()
    tasks = [(col, idx) for idx in range(len(df)) for col in cols if col in df.columns and (not skip_nan or not pd.isna(df.at[idx, col]))]
    with ThreadPoolExecutor(max_workers=num_workers) as exe:
        futures = {exe.submit(respond, col, idx, df, meta): (col, idx) for col, idx in tasks}
        for future in tqdm(as_completed(futures), total=len(futures), desc=f"{len(df)} respondents x {len(cols)} questions"):
            col, idx = futures[future]
            try:
                result = future.result()
                if result is not None:
                    pred.at[idx, col] = result
            except Exception as err:
                print(f"[ERROR] in col {col}, idx {idx}: {err}")
    return pred

#### Random baseline

In [333]:
import numpy as np

def respond_random(col: str, idx: int, df: pd.DataFrame, meta: Any) -> float | None:
    resp_type = infer_resp_type(col, df, meta)
    labels = {k: v for k, v in meta.variable_value_labels.get(col, {}).items() if 0 <= k}
    rng = np.random.default_rng()
    vals = df[col].dropna()
    if resp_type == "int":
        if len(vals) == 0:
            return None
        return int(rng.integers(int(vals.min()), int(vals.max()) + 1))
    elif resp_type == "float":
        if len(vals) == 0:
            return None
        return float(rng.uniform(vals.min(), vals.max()))
    elif resp_type == "nominal/ordinal" and labels:
        if labels:
            return rng.choice(list(labels.keys()))
    return None

def respond_all_random(cols: Iterable[str], df: pd.DataFrame, meta: Any, num_workers: int = 4, skip_nan: bool = False) -> pd.DataFrame:
    pred = df.copy()
    tasks = [(col, idx) for idx in range(len(df)) for col in cols if col in df.columns and (not skip_nan or not pd.isna(df.at[idx, col]))]
    with ThreadPoolExecutor(max_workers=num_workers) as exe:
        futures = {exe.submit(respond_random, col, idx, df, meta): (col, idx) for col, idx in tasks}
        for future in tqdm(as_completed(futures), total=len(futures), desc=f"{len(df)} respondents x {len(cols)} questions (baseline)"):
            col, idx = futures[future]
            try:
                result = future.result()
                if result is not None:
                    pred.at[idx, col] = result
            except Exception as err:
                print(f"[ERROR] in col {col}, idx {idx}: {err}")
    return pred

In [334]:
import os

from ipywidgets import IntSlider

true: pd.DataFrame | None = None
pred: pd.DataFrame | None = None
base: pd.DataFrame | None = None

def run_response_simulation(num_people: int, num_workers: int, skip_nan: bool) -> None:
    global pred, true, base
    true, _ = from_gss("./datasets/2024_spss/2024/GSS2024.sav", n_sample=num_people)
    pred = respond_all(cols_to_predict, true, meta, num_workers=num_workers, skip_nan=skip_nan)
    base = respond_all_random(cols_to_predict, true, meta, num_workers=num_workers, skip_nan=skip_nan)
    true.to_pickle(f"./saved_tests/gss{num_people}_true.pkl")
    pred.to_pickle(f"./saved_tests/gss{num_people}_pred.pkl")
    base.to_pickle(f"./saved_tests/gss{num_people}_base.pkl")

interact_manual(run_response_simulation, num_people=IntSlider(min=1, max=len(data), step=1, value=5), num_workers=IntSlider(min=1, max=7 * max(32, os.cpu_count() or 1), step=1, value=101), skip_nan=True);

interactive(children=(IntSlider(value=5, description='num_people', max=3309, min=1), IntSlider(value=101, desc…

## Performance analysis

### Score and loss functions

In [335]:
from sklearn.metrics import f1_score as f1_fn

def acc_score(pred: pd.DataFrame, true: pd.DataFrame) -> pd.Series:
    mask = ~true.isna()
    correct = (pred == true) & mask
    return correct.sum() / mask.sum()

def mae_score(pred: pd.DataFrame, true: pd.DataFrame) -> pd.Series:
    mask = ~true.isna()
    return (pred - true).abs().where(mask).apply(lambda col: col.mean())

def f1_score(pred: pd.DataFrame, true: pd.DataFrame) -> pd.Series:
    mask = ~true.isna()
    def f1_col(col):
        y_true = true[col.name][mask[col.name]]
        y_pred = pred[col.name][mask[col.name]]
        if y_true.nunique() < 2 or len(y_true) == 0:
            return float('nan')
        try:
            return f1_fn(y_true, y_pred, average='macro')
        except ValueError:
            return float('nan')
    return pred.apply(f1_col)

def corr_score(pred: pd.DataFrame, true: pd.DataFrame) -> pd.Series:
    mask = ~true.isna()
    def corr_col(col):
        x = pred[col.name][mask[col.name]]
        y = true[col.name][mask[col.name]]
        if len(x) < 2:
            return float('nan')
        if x.std() == 0 or y.std() == 0:
            return float('nan')
        try:
            return x.corr(y)
        except Exception:
            return float('nan')
    return pred.apply(corr_col)

### Load data to analyze

In [344]:
import warnings

from collections import OrderedDict

warnings.filterwarnings("ignore", message="The number of unique classes is greater than 50% of the number of samples. `y` could represent a regression problem, not a classification problem.")

score_fns = OrderedDict([
    ("acc", acc_score),
    ("f1", f1_score),
    ("mae", mae_score),
    ("corr", corr_score)
])
scores: Mapping[str, pd.Series] | None = None
baselines: Mapping[str, pd.Series] | None = None

def load_results(num_people: int) -> None:
    if not os.path.exists(f"./saved_tests/gss{num_people}_true.pkl") or not os.path.exists(f"./saved_tests/gss{num_people}_pred.pkl"):
        print(f"No saved results with {num_people}.")
        return
    global true, pred, base, scores, baselines
    true = pd.read_pickle(f"./saved_tests/gss{num_people}_true.pkl")
    pred = pd.read_pickle(f"./saved_tests/gss{num_people}_pred.pkl")
    base = pd.read_pickle(f"./saved_tests/gss{num_people}_base.pkl")
    scores = {k: fn(pred, true) for k, fn in score_fns.items()}
    baselines = {k: fn(base, true) for k, fn in score_fns.items()}

interact_manual(load_results, num_people=IntSlider(min=1, max=len(data), step=1, value=5));

interactive(children=(IntSlider(value=5, description='num_people', max=3309, min=1), Button(description='Run I…

### Evaluating results

In [337]:
def inspect_column_scores(col: str) -> None:
    if scores is not None and baselines is not None:
        print(f"for {col}:")
        if infer_resp_type(col, data, meta) in ["int", "float"]:
            print(f"  mae: {scores['mae'][col]} (baseline: {baselines['mae'][col]})")
            print(f"  corr: {scores['corr'][col]} (baseline: {baselines['corr'][col]})")
        elif infer_resp_type(col, data, meta) == "nominal/ordinal":
            print(f"  acc: {scores['acc'][col]} (baseline: {baselines['acc'][col]})")
            print(f"  f1: {scores['f1'][col]} (baseline: {baselines['f1'][col]})")
        print(f"overall:")
        for metric, score in scores.items():
            overall_score = score.mean()
            print(f"  {metric}: {overall_score} (baseline: {baselines[metric].mean()})")
    else:
        print("No test data loaded.")

interact(inspect_column_scores, col=cols_to_predict);

interactive(children=(Dropdown(description='col', options=('prayer', 'discaffw', 'discaffm', 'fehire', 'fechld…

In [348]:
from colorama import Fore, Style

color_maps = (
    ("f1", (0.8, 1.0), Fore.GREEN),
    ("f1", (0.6, 0.8), Fore.YELLOW),
    ("f1", (0.0, 0.6), Fore.RED),
    ("acc", (0.8, 1.0), Fore.GREEN),
    ("acc", (0.6, 0.8), Fore.YELLOW),
    ("acc", (0.0, 0.6), Fore.RED),
    ("mae", (0.0, 1.0), Fore.GREEN),
    ("mae", (1.0, 2.0), Fore.YELLOW),
    ("mae", (2.0, float('inf')), Fore.RED),
    ("corr", (0.7, 1.0), Fore.GREEN),
    ("corr", (0.3, 0.7), Fore.YELLOW),
    ("corr", (-1.0, 0.3), Fore.RED)
)

def pick_color(metric: str, value: float) -> str:
    for name, (low, high), color in color_maps:
        if name == metric and low <= value <= high:
            return color
    return ""

def print_ranked(metric: str, from_to: tuple[int, int], minimize: bool, of_type: str) -> None:
    if scores is None or baselines is None:
        print("No test data loaded.")
        return
    filtered_cols = [col for col in cols_to_predict if infer_resp_type(col, data, meta) == of_type]
    if not filtered_cols:
        print(f"No columns of type {of_type}.")
        return
    pred_scores = scores.get(metric)[filtered_cols]
    base_scores = baselines.get(metric)[filtered_cols]
    ranked = (pred_scores - base_scores).sort_values(ascending=minimize)
    start, end = from_to
    print(" ".join(f"{color}■{Style.RESET_ALL} {len([col for col in cols_to_predict if pick_color(metric, scores[metric][col]) == color]) / len(cols_to_predict):.1%}" for name, _, color in color_maps if name == metric), end="\n\n")
    for idx, (col, _) in enumerate(ranked.iloc[start:end].items(), start=start):
        print(f"{idx + 1}. {col}: {pick_color(metric, pred_scores[col])}{pred_scores[col]}{Style.RESET_ALL} (baseline: {pick_color(metric, base_scores[col])}{base_scores[col]}{Style.RESET_ALL})")

interact(
    print_ranked, 
    metric=score_fns.keys(), 
    from_to=IntRangeSlider(
        min=0,
        max=len(cols_to_predict),
        step=1,
        value=(0, 5)
    ),
    of_type=["nominal/ordinal", "int", "float"],
    minimize=False
);

interactive(children=(Dropdown(description='metric', options=('acc', 'f1', 'mae', 'corr'), value='acc'), IntRa…

## Notes

1. So few respondents selected "yes" under "widowed" that sampling only 50 respondents had all answering "no". Hence, accuracy and F1 scores fail. They need at least to active classes.