# Assignment of labs to enumerators

In [1]:
# Set up
import pandas as pd
import numpy as np
import sys
from pathlib import Path
CODE_ROOT = Path.cwd().parents[1]
sys.path.append(str(CODE_ROOT))
import config
from openpyxl import load_workbook
from openpyxl.styles import Font, PatternFill
import os

In [2]:
# Load datasets
labs = pd.read_csv(config.LABS_LIST / "LabsList_Randomized_Locations.csv")
enumerators = pd.read_excel(config.ENUMERATORS / "enumerators_list.xlsx")

In [3]:
# Create enumerator ids (random and secure)
np.random.seed(config.SEED)
n_enums = len(enumerators)
enumerators["id"] = np.random.choice(np.arange(10, 99), size=n_enums, replace=False)

In [4]:
# Assign enumerators to 6 lab groups (randomly, without replacement)
def assign_enumerators(labs_df, enum_df, n_treat = 3, n_control = 3, seed = 110):
    """ Assign enumerators to lab groups.
    
    - Each enumerator is assigned to 3 treatment and 3 control lab groups.
    - Takes into account 2 location restrictions.
    - No lab is assigned twice.
    
    Returns:
    - assignments: labs with assigned enumerators
    - leftover_treatment: treatment labs without enumerators
    - leftover_control: control labs without enumerators
    """
    rng = np.random.RandomState(seed)

    leftover_labs = labs_df.copy()
    assignments = pd.DataFrame()

    for _, enum in enum_df.iterrows():
        possible_labs = leftover_labs.copy()

        # Apply location restrictions
        if enum.get("restriction_sch", 0) == 1:
            possible_labs = possible_labs[possible_labs["Location SCH"] == 1]
        if enum.get("restriction_bot", 0) == 1:
            possible_labs = possible_labs[possible_labs["Location BOT"] == 1]

        # Separate treatment and control labs
        treatment_labs = possible_labs[possible_labs["Treatment Status"] == "treatment"]
        control_labs = possible_labs[possible_labs["Treatment Status"] == "control"]

        # Assign labs, checking if enough labs are available
        n_leftover_treat = len(treatment_labs)
        n_leftover_control = len(control_labs)
        if n_leftover_treat < n_treat:
            if n_leftover_control < n_control: # Not enough T and C labs
                print(f"Warning: Only {n_leftover_treat} treatment and {n_leftover_control} control labs available for enumerator.")
                assigned_treat = treatment_labs
                assigned_control = control_labs
            else:  # Not enough T labs
                print(f"Warning: Only {n_leftover_treat} treatment labs available for enumerator.")
                assigned_treat = treatment_labs
                assigned_control = control_labs.sample(n=6-n_leftover_treat, random_state=rng, replace=False)
        elif n_leftover_control < n_control: # Not enough C labs
            print(f"Warning: Only {n_leftover_control} control labs available for enumerator.")
            assigned_control = control_labs
            assigned_treat = treatment_labs.sample(n=6-n_leftover_control, random_state=rng, replace=False)
        else: # Enough T and C labs
            assigned_treat = treatment_labs.sample(n=n_treat, random_state=rng, replace=False)
            assigned_control = control_labs.sample(n=n_control, random_state=rng, replace=False)

        assigned = pd.concat([assigned_treat, assigned_control])

        # Assign enumerator info
        assigned = assigned.copy()
        assigned["enum_lastname"] = enum["lastname"]
        assigned["enum_firstname"] = enum["firstname"]
        assigned["enum_id"] = enum["id"]
        assigned["enum_email"] = enum["email_cleaned"]
        assigned["enum_foldername"] = enum["foldername"]
        assigned["enum_restriction"] = enum["restriction"]

        # Append to assignments and remove from leftover labs
        assignments = pd.concat([assignments, assigned])

        # Remove assigned labs from leftover_labs by labgroupid
        assigned_lab_ids = assigned["labgroupid"].tolist()
        leftover_labs = leftover_labs[~leftover_labs["labgroupid"].isin(assigned_lab_ids)]

    # Leftover labs
    leftover_treatment = leftover_labs[leftover_labs["Treatment Status"] == "treatment"]
    leftover_control = leftover_labs[leftover_labs["Treatment Status"] == "control"]

    return assignments, leftover_treatment, leftover_control


In [6]:
# Sort enumerators by restrictions (those with restrictions first)
enumerators = enumerators.sort_values(by=["restriction"], ascending=False).reset_index(drop=True)

# Run assignment
assignments, leftover_treatment, leftover_control = assign_enumerators(
    labs_df = labs,
    enum_df = enumerators,
    n_treat = 3,
    n_control = 3,
    seed = 110
)

# Check for duplicate assignments
duplicates = assignments[assignments.duplicated(subset="labgroupid", keep=False)]
if not duplicates.empty:
    print("Duplicate labgroupids found:")
    print(duplicates[["labgroupid", "Lab Group", "enum_firstname", "enum_lastname"]])
else:
    print("No duplicates of labgroupid found.")

# Order assignments by enumerator id and labgroupid
assignments = assignments.sort_values(by=["enum_id", "labgroupid"]).reset_index(drop=True)

# Reorder columns for saving assignments file
assignments_order = [
    "labgroupid", "Lab Group", "Faculty", "Institute", 
    "Professor", "Email", "Source", "Treatment Status", 
    "enum_id", "enum_firstname", "enum_lastname", 
    "enum_email"
]

# Save the assignments file
cols_to_save = [col for col in assignments_order if col in assignments.columns]
assignments.to_csv(config.ENUMERATORS / "assignedlabs.csv", index=False, columns=cols_to_save)

# Combine leftover labs
unassigned_labs = pd.concat([leftover_treatment, leftover_control]).reset_index(drop=True)

# Check that no labgroupid is in both assignments and leftover labs
assigned_ids = set(assignments["labgroupid"])
unassigned_ids = set(unassigned_labs["labgroupid"])
overlap = assigned_ids & unassigned_ids
if overlap:
    print("Error: Some labgroupids are in both assignments and unassigned_labs:")
    print(overlap)
else:
    print("All labgroupids are correctly assigned or unassigned.")

# Save unassigned labs
unassigned_labs.to_csv(config.LABS_LIST / "LabsList_Unassigned.csv", index=False)

No duplicates of labgroupid found.
All labgroupids are correctly assigned or unassigned.


In [6]:
# Create assignments file for each enumerator

# Rename columns for clarity
assignments = assignments.rename(columns={"Professor": "Lab Responsible Person"})
assignments = assignments.rename(columns={"Source": "Website"})

# Columns to include in the enumerator's file
cols_to_include = [
    "labgroupid", "Lab Group", "Faculty", "Institute", 
    "Lab Responsible Person", "Email", "Website", "Treatment Status"
]

# Color treatment yellow and control no color
fill_colors = {
    "treatment": "FFFF00",  # Yellow
    "control": "FFFFFF"     # No color (white)
}

for enum_id, enum_data in assignments.groupby("enum_id"):

    # Get enumerator info
    id = enum_data["enum_id"].iloc[0]
    name = enum_data["enum_foldername"].iloc[0]
    folder_name = f"{name}_data"

    # Columns to include
    labs_for_enum = enum_data[cols_to_include].copy()

    # Create excel path
    filename = os.path.join(config.SWITCHDRIVE_ROOT, folder_name, f"lab_assignment.xlsx")

    # Save first without formatting
    labs_for_enum.to_excel(filename, index=False)

    # Load workbook and select active sheet
    wb = load_workbook(filename)
    ws = wb.active
    ws.title = "Lab Assignments"

    # Bold header row
    for cell in ws[1]:
        cell.font = Font(bold=True)

    # Adjust column widths
    for col in ws.columns:
        max_length = 0
        column = col[0].column_letter  # Get the column name
        for cell in col:
            try:
                if cell.value:
                    max_length = max(max_length, len(str(cell.value)))
            except:
                pass
        adjusted_width = (max_length + 2)
        ws.column_dimensions[column].width = adjusted_width

    # Fill colors based on treatment status
    for row in ws.iter_rows(min_row=2, max_row=ws.max_row, min_col=ws.min_column, max_col=ws.max_column):
        status_cell = row[cols_to_include.index("Treatment Status")]
        status = str(status_cell.value).lower()
        if status in fill_colors:
            status_cell.fill = PatternFill(start_color=fill_colors[status], end_color=fill_colors[status], fill_type="solid")
    
    # Save workbook
    wb.save(filename)
