# **Target Trial Emulation**

### **Submitted by:**
- **Ladrera**, Raiken
- **Tibon**, Hestia

## **Instructions**

Assignment 1 for Clustering: New and novel methods in Machine Learning are made either by borrowing formulas and concepts from other scientific fields and redefining it based on new sets of assumptions, or by adding an extra step to an already existing framework of methodology.

In this exercise (Assignment 1 of the Clustering Topic), we will try to develop a novel method of Target Trial Emulation by integrating concepts of Clustering into the already existing framework. Target Trial Emulation is a new methodological framework in epidemiology which tries to account for the biases in old and traditional designs.

These are the instructions:
- Look at this website: https://rpubs.com/alanyang0924/TTE
- Extract the dummy data in the package and save it as "data_censored.csv"
- Convert the R codes into Python Codes (use Jupyter Notebook), replicate the results using your python code.
- Create another copy of your Python Codes, name it TTE-v2 (use Jupyter Notebook).
- Using TTE-v2, think of a creative way on where you would integrate a clustering mechanism, understand each step carefully and decide at which step a clustering method can be implemented. Generate insights from your results.
- Do this by pair, preferably your thesis partner.
- Push to your github repository.


## **1. Setup**

In [39]:
import os
import pandas as pd

estimand_pp = "PP"  # Per-protocol
estimand_itt = "ITT"  # Intention-to-treat

# Directories for saving outputs
trial_pp_dir = os.path.join(os.getcwd(), "trial_pp")
trial_itt_dir = os.path.join(os.getcwd(), "trial_itt")

os.makedirs(trial_pp_dir, exist_ok=True)
os.makedirs(trial_itt_dir, exist_ok=True)

print(f"Directories created:\n{trial_pp_dir}\n{trial_itt_dir}")

Directories created:
C:\Users\meizi\Documents\GitHub\TTE-v2\trial_pp
C:\Users\meizi\Documents\GitHub\TTE-v2\trial_itt


## **2. Data Preparation**

In [40]:
data_path = "data_censored.csv" 
data_censored = pd.read_csv(data_path)
print(data_censored.head())

columns_needed = ["id", "period", "treatment", "outcome", "eligible", "age", "x1", "x2", "x3"]
trial_pp = data_censored[columns_needed].copy()
trial_itt = data_censored[columns_needed].copy()

print(trial_pp.head())
print(trial_itt.head())


   id  period  treatment  x1        x2  x3        x4  age     age_s  outcome  \
0   1       0          1   1  1.146148   0  0.734203   36  0.083333        0   
1   1       1          1   1  0.002200   0  0.734203   37  0.166667        0   
2   1       2          1   0 -0.481762   0  0.734203   38  0.250000        0   
3   1       3          1   0  0.007872   0  0.734203   39  0.333333        0   
4   1       4          1   1  0.216054   0  0.734203   40  0.416667        0   

   censored  eligible  
0         0         1  
1         0         0  
2         0         0  
3         0         0  
4         0         0  
   id  period  treatment  outcome  eligible  age  x1        x2  x3
0   1       0          1        0         1   36   1  1.146148   0
1   1       1          1        0         0   37   1  0.002200   0
2   1       2          1        0         0   38   0 -0.481762   0
3   1       3          1        0         0   39   0  0.007872   0
4   1       4          1        0       

## **3. Weight Models and Censoring**

#### **3.1. Treatment and Switching Weight Models**

In [41]:
import os
import pandas as pd
import statsmodels.api as sm

class Trial:
    def __init__(self, data):
        """Initialize the trial object with dataset."""
        self.data = data.copy()
        self.switch_model = None
        self.switch_weights = None
        self.censor_model = None
        self.censor_weights = None
        self.numerator = None
        self.denominator = None
        self.censor_event = None
        self.pool_models = None
        self.model_fitted = False  

    def set_switch_weight_model(self, numerator, denominator):
        """Set up the numerator and denominator formulas for switch weight modeling."""
        self.numerator = numerator
        self.denominator = denominator
        self.model_fitted = False 
        
        print(f"Switch Model specifications set:\n"
              f"- Numerator: {self.numerator}\n"
              f"- Denominator: {self.denominator}\n"
              f"- Model not fitted yet. Use `calculate_weights()`.")

    def set_censor_weight_model(self, censor_event, numerator, denominator, pool_models="none"):
        """Set up the numerator and denominator formulas for censor weight modeling."""
        self.censor_event = censor_event
        self.numerator = numerator
        self.denominator = denominator
        self.pool_models = pool_models
        self.model_fitted = False  # Reset model status

        print(f"Censor Model specifications set:\n"
              f"- Numerator formula: 1 - {self.censor_event} ~ {self.numerator}\n"
              f"- Denominator formula: 1 - {self.censor_event} ~ {self.denominator}\n"
              f"- Pooling: {self.pool_models}\n"
              f"- Model not fitted yet. Use `calculate_weights()`.")

    def calculate_weights(self, save_path, model_type="logit", weight_type="switch"):
        """Fit logistic regression model and compute weights."""
        if weight_type == "switch":
            # Ensure formulas exist
            if self.numerator is None or self.denominator is None:
                raise ValueError("Switch weight model formulas not set. Call `set_switch_weight_model()` first.")

            # Ensure output directory exists
            os.makedirs(save_path, exist_ok=True)

            # Compute denominator variable
            self.data["denom"] = self.data.eval(self.denominator)

            # Convert age to binary if needed
            median_age = self.data["age"].median()
            self.data["age_binary"] = (self.data["age"] > median_age).astype(int)

            # Prepare independent (X) and dependent (y) variables
            X = sm.add_constant(self.data["denom"])
            y = self.data["age_binary"]

            # Choose model type
            if model_type == "logit":
                model = sm.Logit(y, X).fit(disp=0)
            else:
                raise ValueError("Unsupported model type. Only 'logit' is available for now.")

            # Save model
            model.save(os.path.join(save_path, "switch_model.pickle"))

            # Store results
            self.switch_model = model
            self.switch_weights = model.predict(X)
            self.model_fitted = True

            print(f"Switch weights calculated and saved in {save_path}.")

        elif weight_type == "censor":
            # Ensure formulas exist
            if self.censor_event is None or self.numerator is None or self.denominator is None:
                raise ValueError("Censor weight model formulas not set. Call `set_censor_weight_model()` first.")

            os.makedirs(save_path, exist_ok=True)

            # Compute denominator variable
            self.data["denom"] = self.data.eval(self.denominator)

            # Define censor outcome: 1 - censor_event
            self.data["censor_binary"] = 1 - self.data[self.censor_event]

            # Prepare independent (X) and dependent (y) variables
            X = sm.add_constant(self.data["denom"])
            y = self.data["censor_binary"]

            # Choose model type
            if model_type == "logit":
                model = sm.Logit(y, X).fit(disp=0)
            else:
                raise ValueError("Unsupported model type. Only 'logit' is available for now.")

            # Save model
            model.save(os.path.join(save_path, "censor_model.pickle"))

            # Store results
            self.censor_model = model
            self.censor_weights = model.predict(X)
            self.model_fitted = True

            print(f"Censor weights calculated and saved in {save_path}.")

    @property
    def get_censor_weights(self):
        """Automatically calculate censor weights if not already done."""
        if not self.model_fitted:
            print("Censor model not fitted yet. Automatically running `calculate_weights()`...")
            self.calculate_weights("trial_default", weight_type="censor")  # Default save path
        return self.censor_weights

# Example Usage
trial_pp = Trial(data_censored)  # Initialize with dataset

# Define censor weight model
trial_pp.set_censor_weight_model(
    censor_event="censored",
    numerator="x2",
    denominator="x2 + x1",
    pool_models="none"
)

# Check message output (mimics R)
print(trial_pp.get_censor_weights.head())  # Automatically runs `calculate_weights()` if needed

# For ITT model (with pooling in numerator)
trial_itt = Trial(data_censored)
trial_itt.set_censor_weight_model(
    censor_event="censored",
    numerator="x2",
    denominator="x2 + x1",
    pool_models="numerator"
)

print(trial_itt.get_censor_weights.head())


Censor Model specifications set:
- Numerator formula: 1 - censored ~ x2
- Denominator formula: 1 - censored ~ x2 + x1
- Pooling: none
- Model not fitted yet. Use `calculate_weights()`.
Censor model not fitted yet. Automatically running `calculate_weights()`...
Censor weights calculated and saved in trial_default.
0    0.882943
1    0.908124
2    0.933494
3    0.925940
4    0.903820
dtype: float64
Censor Model specifications set:
- Numerator formula: 1 - censored ~ x2
- Denominator formula: 1 - censored ~ x2 + x1
- Pooling: numerator
- Model not fitted yet. Use `calculate_weights()`.
Censor model not fitted yet. Automatically running `calculate_weights()`...
Censor weights calculated and saved in trial_default.
0    0.882943
1    0.908124
2    0.933494
3    0.925940
4    0.903820
dtype: float64


### **3.2 Other Transformative Sensoring**