In [1]:
# setup
import pandas as pd
import numpy as np
import random
import statsmodels.api as sm
import sys
from ppi_py import ppi_ols_ci, classical_ols_ci, ppi_ols_pointestimate

df = pd.read_csv("../Data/5_SurveySampleLLM.csv.gz")

Covs = ['PedPed', 'Barrier', 'CrossingSignal', 'NumberOfCharacters',
        'DiffNumberOFCharacters', 'LeftHand', 'Man', 'Woman', 'Pregnant',
        'Stroller', 'OldMan', 'OldWoman', 'Boy', 'Girl', 'Homeless',
        'LargeWoman', 'LargeMan', 'Criminal', 'MaleExecutive',
        'FemaleExecutive', 'FemaleAthlete', 'MaleAthlete', 'FemaleDoctor',
        'MaleDoctor', 'Dog', 'Cat', 
        'Intervention'
        ]

sys.version

'3.11.2 (tags/v3.11.2:878ead1, Feb  7 2023, 16:38:35) [MSC v.1934 64 bit (AMD64)]'

Function from PPI to calculate stats

In [18]:
def _ols_get_stats(
    pointest,
    X,
    Y,
    Yhat,
    X_unlabeled,
    Yhat_unlabeled,
    w=None,
    w_unlabeled=None,
    use_unlabeled=True,
):
    """Computes the statistics needed for the OLS-based prediction-powered inference.

    Args:
        pointest (ndarray): A point estimate of the coefficients.
        X (ndarray): Covariates for the labeled data set.
        Y (ndarray): Labels for the labeled data set.
        Yhat (ndarray): Predictions for the labeled data set.
        X_unlabeled (ndarray): Covariates for the unlabeled data set.
        Yhat_unlabeled (ndarray): Predictions for the unlabeled data set.
        w (ndarray, optional): Sample weights for the labeled data set.
        w_unlabeled (ndarray, optional): Sample weights for the unlabeled data set.
        use_unlabeled (bool, optional): Whether to use the unlabeled data set.

    Returns:
        grads (ndarray): Gradient of the loss function with respect to the coefficients.
        grads_hat (ndarray): Gradient of the loss function with respect to the coefficients, evaluated using the labeled predictions.
        grads_hat_unlabeled (ndarray): Gradient of the loss function with respect to the coefficients, evaluated using the unlabeled predictions.
        inv_hessian (ndarray): Inverse Hessian of the loss function with respect to the coefficients.
    """
    n = Y.shape[0]
    N = Yhat_unlabeled.shape[0]
    d = X.shape[1]
    w = np.ones(n) if w is None else w / np.sum(w) * n
    w_unlabeled = (
        np.ones(N)
        if w_unlabeled is None
        else w_unlabeled / np.sum(w_unlabeled) * N
    )

    hessian = np.zeros((d, d))
    grads_hat_unlabeled = np.zeros(X_unlabeled.shape)
    if use_unlabeled:
        for i in range(N):
            hessian += (
                w_unlabeled[i]
                / (N + n)
                * np.outer(X_unlabeled[i], X_unlabeled[i])
            )
            grads_hat_unlabeled[i, :] = (
                w_unlabeled[i]
                * X_unlabeled[i, :]
                * (np.dot(X_unlabeled[i, :], pointest) - Yhat_unlabeled[i])
            )

    grads = np.zeros(X.shape)
    grads_hat = np.zeros(X.shape)
    for i in range(n):
        hessian += (
            w[i] / (N + n) * np.outer(X[i], X[i])
            if use_unlabeled
            else w[i] / n * np.outer(X[i], X[i])
        )
        grads[i, :] = w[i] * X[i, :] * (np.dot(X[i, :], pointest) - Y[i])
        grads_hat[i, :] = (
            w[i] * X[i, :] * (np.dot(X[i, :], pointest) - Yhat[i])
        )

    inv_hessian = np.linalg.inv(hessian).reshape(d, d)
    return grads, grads_hat, grads_hat_unlabeled, inv_hessian

Function to compute $\tilde{\rho}$ and $\sigma$ for PPI SEs.

In [23]:
def _power_analysis_stats(grads, grads_hat, inv_hessian):
    grads_ = grads - grads.mean(axis=0)
    grads_hat_ = grads_hat - grads_hat.mean(axis=0)
    cov = inv_hessian @ (grads_[:,None,:] * grads_hat_[:,:,None]).mean(axis=0) @ inv_hessian
    var = inv_hessian @ (grads_[:,None,:]*grads_[:,:,None]).mean(axis=0) @ inv_hessian
    var_hat = inv_hessian @ (grads_hat_[:,None,:]*grads_hat_[:,:,None]).mean(axis=0) @ inv_hessian
    rhos_sq = np.diag(cov)**2/(np.diag(var)*np.diag(var_hat))
    sigmas_sq = np.diag(var)
    return rhos_sq, sigmas_sq

def _estimate_ppi_SE(n, N, rho_sq, var_Y):
    if N == np.inf:
        return np.sqrt(var_Y*(1-rho_sq)/n)
    if N == 0:
        return np.sqrt(var_Y/n)
    var_ppi = var_Y*(1-rho_sq*N/(n+N))/n
    return np.sqrt(var_ppi)

def _estimate_classical_SE(n, var_Y):
    return np.sqrt(var_Y/n)


In [20]:
# functions to calculate weights for conjoint experiment
def CalcTheoreticalInt(r):
    # this function is applied to each row (r)
    if r["Intervention"]==0:
        if r["Barrier"]==0:
            if r["PedPed"]==1: p = 0.48
            else: p = 0.32
            
            if r["CrossingSignal"]==0:   p = p * 0.48
            elif r["CrossingSignal"]==1: p = p * 0.2
            else: p = p * 0.32
        else: p = 0.2

    else: 
        if r["Barrier"]==0:
            if r["PedPed"]==1: 
                p = 0.48
                if r["CrossingSignal"]==0: p = p * 0.48
                elif r["CrossingSignal"]==1: p = p * 0.32
                else: p = p * 0.2
            else: 
                p = 0.2
                if r["CrossingSignal"]==0: p = p * 0.48
                elif r["CrossingSignal"]==1: p = p * 0.2
                else: p = p * 0.32
        else: p = 0.32  
    
    return(p)  
        
def calcWeightsTheoretical(profiles):
    
    p = profiles.apply(CalcTheoreticalInt, axis=1)

    weight = 1/p 

    return(weight) 

            

In [21]:
df.loc[:,"weights"] = calcWeightsTheoretical(df)

Calculate $\tilde{\rho}^2$ for variable `x` using model `y`.

In [44]:
x = "CrossingSignal"
y = "gpt4turbo_wp_Saved"

X = df[x]
X = np.column_stack((np.ones(X.shape), X))
Y_silicone = df[y].to_numpy().reshape(-1, 1)
Y_human = df["Saved"].to_numpy().reshape(-1, 1)
W = df["weights"].to_numpy().reshape(-1, 1)
beta = sm.WLS(Y_human, X, weights=W).fit().params

grads, grads_hat, grads_hat_unlabeled, inv_hessian = _ols_get_stats(beta, 
                                                                    X, 
                                                                    Y_human, 
                                                                    Y_silicone,
                                                                    X,
                                                                    Y_silicone,
                                                                    w = W, 
                                                                    use_unlabeled=False)

In [46]:
rho_sq, var_Y = _power_analysis_stats(grads, grads_hat, inv_hessian)
print("Rho squared", rho_sq[1])
print("Rho", np.sqrt(rho_sq[1]))

Rho squared 0.09016284460630423
Rho 0.3002712850179055
