In [1]:
# Imports and Setup
import os
import random
import time
from datetime import datetime
from collections import defaultdict
import shutil
import subprocess
import sys
import logging
from google.colab import widgets, output
from IPython.display import display
import ipywidgets as ipw
from IPython.display import display, HTML
from importlib.metadata import version, PackageNotFoundError
from google.colab import drive  # Google Colab specific import

In [2]:
# -------------------------
# Global Variables
# -------------------------
# Global variable to manage if the program should exit
exit_program = False

json_file_name = 'grounded-jetty-365400-0c0e03ecb0d2.json'
spreadsheet_id = '1QDGvsJNOAo455G7nGx_Dd3SVvDG8d49odSGd4m7gcEQ'
excluded_workers = ["Winkie Zhang", "Hilbert Chu", "Peggy Chu", "Wellington Hui"]  # Excluded workers list
worker_constraints = {
    'On-Premise Sound Control': {
        'Issac Siu': [1],
        'Dany Chan': [2,5],
        #'Elton Yu': [3],
        'Kenneth Kong': [4]
    },
    'YouTube/On-Line Sound Control': {
        'Cyrus Tang': [1],
        'Elton Yu': [2]
    },
    'PowerPoint Control': {
        'Sam Chu': [1,3,5]
    },
    'Camera Person': {
        'Moon Tang': [1,5],
        'Le Chu': [2],
        'Cyrus Tang': [3],
        'Lucy Chu': [4]
    },
    'Mandarin A/V helper': {
        'Bill Chu': [3],
        'Dany Chan': [4]
    },
}

# if Ana Hui was assigned as Pianist, the Paul Hui is force to be the "Power Point Control" on that week
# if he is available
colab_constraints = {
    "Pianist": {
        "Ana Hui": {
            "PowerPoint Control": "Paul Hui"
        }
    },
    #"Worship Chairperson": {
    #    "Ron Yip": {
    #        "Power Point Preparation": "Ron Yip"
    #    }
    #},
    "Worship Chairperson": {
        "Sarah Hui": {
            "Power Point Preparation": "Wellington Hui"
        }
    }
}

# Global dictionary mapping tasks to dependent worker columns
# This is used to generate the dependent_weekly_assignments
# so that before worker was assigned as "Power Point Preparation", the program will check the "Worship Chairperson" column
# and put the worker that was already assigned as "Worship Chairperson", in the dependent_weekly_assignments array
# and that person can't be assigned as "Power Point Preparation"
dependend_task_column_mapping = {
  "On-Premise Sound Control": ["Song Leader", "Camera Person", "PowerPoint Control", "YouTube/On-Line Sound Control", "Power Point Preparation"],
  "YouTube/On-Line Sound Control": ["Camera Person", "PowerPoint Control", "On-Premise Sound Control", "Power Point Preparation"],
  "Camera Person": ["PowerPoint Control", "On-Premise Sound Control", "YouTube/On-Line Sound Control", "Power Point Preparation"],
  "PowerPoint Control": ["Camera Person", "On-Premise Sound Control", "YouTube/On-Line Sound Control", "Worship Chairperson", "Pianist", "Power Point Preparation"],
  "Power Point Preparation": ["Worship Chairperson", "Song Leader"],
  "Mandarin A/V helper": [],
  "Worship Chairperson": ["Song Leader"]
};

# Initialize all non av weekly assignments for each week
# this is used for for looking at the colab constraint to force worker for certain a/v task
# even though he might be working on some non av task for that week already,
all_non_av_task_names = ['Worship Chairperson', 'On-Stage Translator', 'Song Leader', 'Pianist'] # all tasks to work on

month_week_mapping = defaultdict(list)
cumulative_assignments = {}

# for clear up spreadsheet data only
all_task_names = ['On-Premise Sound Control', 'YouTube/On-Line Sound Control', 'PowerPoint Control', 'Camera Person', 'Power Point Preparation', 'Mandarin A/V helper'] # all tasks existed

# for drop down and grow to include "clear up" and "exit program" option
all_task_names_dropdown = ['On-Premise Sound Control', 'YouTube/On-Line Sound Control', 'PowerPoint Control', 'Camera Person', 'Power Point Preparation', 'Mandarin A/V helper'] # all tasks existed

In [3]:
# Helper Functions: Package Installation
def ensure_package_installed(package_name):
    """
    Ensures that the required package is installed.
    """
    try:
        pkg_version = version(package_name)
        logging.debug(f"Package '{package_name}' is already installed (version {pkg_version}).")
    except PackageNotFoundError:
        logging.debug(f"Package '{package_name}' not found. Installing...")
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', package_name])


In [4]:
# Google Services Setup
def setup_google_services(json_file_path):
    """
    Sets up Google Sheets API and pygsheets client using the provided JSON credentials.
    """
    ensure_package_installed('pygsheets')
    ensure_package_installed('google-auth')
    ensure_package_installed('google-api-python-client')

    import pygsheets
    from google.oauth2.service_account import Credentials
    from googleapiclient.discovery import build

    SCOPES = ["https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"]
    credentials = Credentials.from_service_account_file(json_file_path, scopes=SCOPES)

    client = pygsheets.authorize(custom_credentials=credentials)
    sheets_service = build('sheets', 'v4', credentials=credentials)
    return client, sheets_service


In [5]:
# Helper Functions: JSON File Management
def ensure_json_file(json_file_name):
    """
    Ensures that the JSON credentials file is available in the Colab environment.
    """
    json_file_path = f'/content/{json_file_name}'
    google_drive_path = f'/content/drive/MyDrive/cec/cantonese_worship/{json_file_name}'

    if not os.path.exists(json_file_path):
        logging.debug(f"JSON file '{json_file_name}' not found in /content/. Checking Google Drive...")
        drive.mount('/content/drive')

        if os.path.exists(google_drive_path):
            logging.debug(f"JSON file found in Google Drive. Copying to /content/...")
            shutil.copy(google_drive_path, json_file_path)
            logging.debug("File copied successfully.")
        else:
            logging.debug(f"JSON file '{json_file_name}' not found in Google Drive. Please upload the file.")
            return None
    else:
        logging.debug(f"JSON file '{json_file_name}' already exists in /content/.")
    return json_file_path


In [6]:
class IgnoreUnwantedFunctionNamesFilter(logging.Filter):
    def filter(self, record):
        try:
            # Test if the message can be formatted without errors
            _ = record.getMessage()
        except Exception as e:
            # Capture and display the offending log statement
            print(f"Skipping log record due to formatting error: {e}")
            print(f"Offending log record: level={record.levelname}, funcName={record.funcName}, msg={record.msg}, args={record.args}")
            return False  # Skip this log record

        # Skip logging for unwanted function names like 'method' and '__call__'
        if record.funcName in ['method', '__call__']:
            return False  # Skip this log record
        return True  # Log the record

class MainBlockFilter(logging.Filter):
    def filter(self, record):
        # Identify records from the main block (no function name or <cell line>)
        if record.funcName.startswith('<cell line') or record.funcName == '<module>':
            record.funcName = '__main__'  # Replace with '__main__'
        return True

def configure_logging():
    # Clear existing handlers to avoid duplicates
    for handler in logging.root.handlers[:]:
        logging.root.removeHandler(handler)

    # Set up the logging handler
    handler = logging.StreamHandler()
    handler.setLevel(logging.DEBUG)

    # Set the formatter to include the function name
    formatter = logging.Formatter('[%(levelname)s] %(funcName)s: %(message)s')
    handler.setFormatter(formatter)

    # Add the filters to the handler
    handler.addFilter(IgnoreUnwantedFunctionNamesFilter())
    handler.addFilter(MainBlockFilter())

    # Add the handler to the root logger
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.DEBUG)
    root_logger.addHandler(handler)

    # Initial debug log to confirm logging is configured
    logging.debug("Logging configuration set up successfully.")



In [7]:
def initialize_all_non_av_weekly_assignments(sheet, task_name, column_headers, all_non_av_weekly_assignments=None):
    """
    Accumulates weekly non-available assignments for multiple tasks.
    - Finds the task_name column dynamically in the header row.
    - Extracts worker assignments for each week and appends to 'all_non_av_weekly_assignments'.
    - Allows accumulation of multiple task-worker pairs across multiple calls.

    Args:
        sheet: The Google Sheets API object containing rows and columns.
        task_name: The name of the task to extract assignments for.
        column_headers: The list of column headers to locate the task.
        all_non_av_weekly_assignments: A dictionary to store non av task-worker dictionary assignments.

    Returns:
        Updated 'all_non_av_weekly_assignments' with weeks as keys and task-worker pairs as values.

    Example:
        Input 1: task_name="Power Point Preparation"
        Input 2: task_name="Worship Chairperson"
        Output: {1: {"Power Point Preparation": "Ana Hui", "Worship Chairperson": "Paul Hui"},
                 2: {"Power Point Preparation": "Paul Hui"}}
    """

    # Initialize the dictionary if not provided (for the first call)
    if all_non_av_weekly_assignments is None:
        all_non_av_weekly_assignments = {}

    # Ensure the task_name exists in the headers
    if task_name not in column_headers:
        logging.debug(f"[ERROR] Task '{task_name}' not found in column headers. Returning existing assignments.")
        return all_non_av_weekly_assignments

    # Locate the column index of the task_name
    task_col_index = column_headers.index(task_name) + 1  # Convert to 1-based index
    logging.debug(f" Task '{task_name}' found at column index {task_col_index}.")

    all_rows = sheet.get_all_values()  # Fetch all rows from the sheet

    # Iterate over the rows, skipping the header row
    for week, row_values in enumerate(all_rows[1:], start=1):
        # Validate the date in the first column
        date_str = row_values[0].strip() if row_values else None
        try:
            datetime.strptime(date_str, "%m/%d/%Y")
        except (ValueError, TypeError):
            logging.debug(f" Non-date value in row {week + 1}: '{date_str}'. Exiting.")
            break  # Stop processing on invalid date

        # Safely initialize the dictionary for the current week
        week_assignments = all_non_av_weekly_assignments.setdefault(week, {})

        # Extract the worker name for the current task column
        if task_col_index - 1 < len(row_values):
            worker = row_values[task_col_index - 1].strip()
            if worker:  # Only add if the cell is not empty
                week_assignments[task_name] = worker

    return all_non_av_weekly_assignments


In [8]:
def initialize_dependent_weekly_assignments(sheet, task_name, column_headers):
    """
    Initializes weekly dependend assignments for a task based on its dependent worker columns.
    Updates the dependent_weekly_assignments dictionary to include both workers and their tasks.
    """
    # Initialize weekly assignments for the task
    # before worker was assigned as "Power Point Preparation",
    # the program will check the "Worship Chairperson" column in the dependend_task_column_mapping
    # and put the worker that was already assigned as "Worship Chairperson", in the dependent_weekly_assignment array
    # and that person can't be assigned as "Power Point Preparation"

    dependent_columns = dependend_task_column_mapping.get(task_name, [])
    if not dependent_columns:
        logging.debug(f" No dependent columns defined for task '{task_name}'. Return empty dependent_weekly_assignments.")
        return {}

    # Map dependent column headers to their respective indices
    column_indices = [column_headers.index(col) + 1 for col in dependent_columns if col in column_headers]
    if not column_indices:
        logging.debug(f" Dependent columns for task '{task_name}' not found in headers. Return empty dependent_weekly_assignments.")
        return {}

    logging.debug(f" Dependent columns for task '{task_name}': {dependent_columns} (indices: {column_indices})")

    dependent_weekly_assignments = {}
    all_rows = sheet.get_all_values()  # Get all rows as a 2D list
    for week, row_values in enumerate(all_rows[1:], start=1):  # Skip header row
        # Check if the first column contains a valid date
        date_str = row_values[0].strip() if row_values else None
        try:
            datetime.strptime(date_str, "%m/%d/%Y")  # Adjust format as needed
        except (ValueError, TypeError):
            logging.debug(f" Found non-date value in column 1 at row {week + 1}: {date_str}. Exiting.")
            break  # Exit loop on the first non-date value

        # Initialize the dictionary for the current week if it doesn't exist
        if week not in dependent_weekly_assignments:
            dependent_weekly_assignments[week] = {}

        # Add worker-task assignments for the current week
        for col_index, dependent_column in zip(column_indices, dependent_columns):
            if col_index - 1 < len(row_values):
                worker = row_values[col_index - 1].strip()
                if worker:  # Ensure the cell is not empty
                    dependent_weekly_assignments[week][dependent_column] = worker

        logging.debug(f" Dependent Weekly assignments for week {week}: {dependent_weekly_assignments[week]}")

    return dependent_weekly_assignments


In [9]:
def get_priority_worker_list(
    available_workers,
    task_worker_constraints,
    cumulative_assignments,
    last_assignment_week
):
    """
    Compute and return the priority worker list based on constraints and priorities.

    Args:
        available_workers (list): List of workers available for the task.
        task_worker_constraints (dict): Constraints specific to the task.
        cumulative_assignments (dict): Mapping of workers to their cumulative assignments.
        last_assignment_week (dict): Mapping of workers to their last assignment week.

    Returns:
        list: Sorted list of workers by priority.
    """
    return sorted(
        available_workers,
        key=lambda w: (
            0 if len(task_worker_constraints.get(w, [])) == 1 else  # Workers with 1 constraint get top priority
            1 if len(task_worker_constraints.get(w, [])) == 2 else  # Workers with 2 constraints next
            2 if len(task_worker_constraints.get(w, [])) == 3 else  # Workers with 3 constraints next else
            6,  # Workers with no constraints (not in the task_worker_constraints) come last
            cumulative_assignments[w],  # Secondary: Fewer cumulative assignments
            len(task_worker_constraints.get(w, [])),  # Tertiary: Number of constraints (2, 3, etc.)
            last_assignment_week[w],  # Quaternary: Assigned longer ago
            w  # Quinary: Alphabetical tie-breaker
        )
    )


In [10]:
def is_worker_allowed(worker, week, task_worker_constraints):

    month, week_in_month = get_month_week(week)
    if worker not in task_worker_constraints:
      return True
    return week_in_month in task_worker_constraints[worker]


In [11]:
def get_month_week(week):

    for month, weeks in month_week_mapping.items():
        for global_week, week_in_month in weeks:
            if global_week == week:
                return month, week_in_month
    return None

In [12]:
# Worker Extraction and Dropdown Fetch
def fetch_workers_from_contact_sheet(client, task_name):
    """
    Fetches the list of workers from the 'Contact' sheet within the same spreadsheet
    based on the task_name, handling multi-value cells in the Worship Subject Matter Expert column.

    Args:
        client (pygsheets.Client): The authorized pygsheets client.
        spreadsheet_id (str): The ID of the Google spreadsheet.
        task_name (str): The task name to match in the 'Worship Subject Matter Expert' column.

    Returns:
        list: A list of worker names.
    """
    # global excluded_workers, spreadsheet_id  # Reference the global excluded_names variable

    # Open the spreadsheet and get the 'Contact' sheet
    spreadsheet = client.open_by_key(spreadsheet_id)
    contact_sheet = spreadsheet.worksheet_by_title("contact")  # Access the 'Contact' sheet by name

    # Get all values from the Contact sheet
    all_values = contact_sheet.get_all_values()

    if not all_values or len(all_values) < 2:
        logging.debug("[ERROR] Contact sheet is empty or does not have sufficient rows.")
        return []

    # Identify relevant column indices
    header_row = all_values[0]
    try:
        name_col_index = header_row.index("Name")  # Assuming column A is labeled "Name"
        task_col_index = header_row.index("Worship Subject Matter Expert")
    except ValueError as e:
        logging.debug(f"[ERROR] Required column not found in the Contact sheet: {e}")
        return []

    # Extract workers matching the task_name
    workers = []
    for row in all_values[1:]:  # Skip the header
        try:
            worker_name = row[name_col_index].strip()
            if worker_name in excluded_workers:
                continue  # Skip excluded names

            task_field = row[task_col_index].strip()  # Content of the "Worship Subject Matter Expert" column
            if task_field:  # Check if the cell is not empty
                tasks = [task.strip() for task in task_field.split(',')]  # Split and strip
                if task_name in tasks:  # Check if task_name is in the list
                    workers.append(worker_name)  # Add worker's name
        except IndexError:
            continue

    logging.debug(f" Workers fetched for task '{task_name}' (after exclusions): {workers}")
    return workers


In [13]:
def clear_cells_for_rows_with_dates(sheet, header_name):
    """
    Clears the cells in the column corresponding to the specified header for rows where column A contains valid dates,
    ensuring the header row remains intact.

    Args:
        sheet (pygsheets.Worksheet): The Google Sheet worksheet object.
        header_name (str): The name of the header to identify the column to clear.

    Returns:
        None
    """
    logging.debug(f"Clearing cells in the column '{header_name}' for rows with dates in column A...")

    # Fetch all rows of data
    all_values = sheet.get_all_values()

    if not all_values or len(all_values) < 1:
        logging.debug("Sheet is empty or has no header row.")
        return

    # Identify the target column index from the header name
    header_row = all_values[0]
    if header_name not in header_row:
        logging.debug(f"Header '{header_name}' not found in the sheet.")
        return

    target_col_index = header_row.index(header_name)  # 0-based column index

    # Identify rows with valid dates in column A
    rows_to_clear = []
    for i, row in enumerate(all_values[1:], start=2):  # Start from row 2 (1-based index)
        try:
            if row[0].strip():  # Check if column A has data
                datetime.strptime(row[0].strip(), "%m/%d/%Y")  # Parse date to validate
                rows_to_clear.append(i)  # Append row index (1-based)
        except (ValueError, IndexError):
            continue

    if rows_to_clear:
        # Prepare the column values for update
        column_data = []  # Start with empty list for rows to update

        # Fill in updated values (empty for rows to clear, retain others)
        for i, row in enumerate(all_values[1:], start=2):
            if i in rows_to_clear:
                column_data.append([""])  # Clear this cell
            else:
                column_data.append([row[target_col_index]])  # Retain current value

        # Update the column range (starting from row 2)
        col_letter = column_index_to_letter(target_col_index + 1)
        sheet.update_values(
            f"{col_letter}2:{col_letter}{len(all_values)}", column_data
        )

        logging.debug(f"Cleared data in column '{header_name}' for rows: {rows_to_clear}")
    else:
        logging.debug("No rows with valid dates in column A found to clear.")


In [14]:
def column_index_to_letter(col_index):
    """
    Converts a column index to its Excel-style column letter (e.g., 1 -> 'A', 27 -> 'AA').
    """
    col_letter = ""
    while col_index > 0:
        col_index -= 1
        col_letter = chr(col_index % 26 + 65) + col_letter
        col_index //= 26
    return col_letter

In [15]:
def extract_dropdown_values(sheets_service, header_value):
    """
    Extracts dropdown values from a specified column header in the first sheet of the Google Sheet.

    Args:
        sheets_service: Google Sheets API service instance.
        header_value: The column header to identify dropdown values.

    Returns:
        A list of available dropdown values.
    """
    # global spreadsheet_id  # Explicitly use global variables

    # Get spreadsheet details to find the first sheet's name
    response = sheets_service.spreadsheets().get(spreadsheetId=spreadsheet_id, includeGridData=True).execute()
    sheets_data = response.get('sheets', [])

    if not sheets_data:
        logging.debug("No sheets found in the spreadsheet.")
        return []

    # Dynamically fetch the first sheet's title
    first_sheet_name = sheets_data[0]['properties']['title']
    logging.debug(f"Using the first sheet: {first_sheet_name}")

    available_worker_list = []

    # Process the first sheet's grid data
    first_sheet = sheets_data[0]
    if 'data' not in first_sheet or not first_sheet['data']:
        logging.debug(f"No grid data found in the first sheet '{first_sheet_name}'.")
        return []

    rows = first_sheet['data'][0].get('rowData', [])
    if not rows:
        logging.debug(f"No rows found in sheet '{first_sheet_name}'.")
        return []

    # Find column index for the given header
    header_row = rows[0]
    column_index = None
    for idx, cell in enumerate(header_row.get('values', [])):
        if cell.get('userEnteredValue', {}).get('stringValue', '').strip().lower() == header_value.lower():
            column_index = idx
            break

    if column_index is None:
        logging.debug(f"Header '{header_value}' not found in the sheet.")
        return []

    logging.debug(f"Header '{header_value}' found in column index {column_index}.")

    # Extract dropdown values from the column
    for row in rows[1:]:
        if 'values' in row and len(row['values']) > column_index:
            cell = row['values'][column_index]
            if 'dataValidation' in cell:
                workers = [v['userEnteredValue'] for v in cell['dataValidation']['condition']['values']]
                available_worker_list.append(workers)

    logging.debug(f"Extracted available_worker_list for '{header_value}': {available_worker_list}")
    return available_worker_list

In [16]:
def update_values_to_named_column(sheet, data_to_write, header_name):
    """
    Updates values in the Google Sheet column corresponding to the header_name.
    """
    all_values = sheet.get_all_values()
    if not all_values:
        raise ValueError("Sheet is empty or has no header row.")

    header_row = all_values[0]
    if header_name not in header_row:
        raise ValueError(f"Header '{header_name}' not found in the sheet.")

    col_index = header_row.index(header_name) + 1
    col_letter = column_index_to_letter(col_index)
    # start_cell = f"{col_letter}2"

    sanitized_data = [[str(item)] for item in data_to_write]
    range_end = f"{col_letter}2:{col_letter}{len(sanitized_data) + 1}"
    sheet.update_values(range_end, sanitized_data)

In [17]:
def on_button_click(b):
    global cumulative_assignments, exit_program  # Access the global variable

    selected_tasks = list(task_select_multiple.value)

    if not selected_tasks:
        logging.debug("No tasks selected. Please select at least one task to proceed.")
        print("No tasks selected. Please select at least one task to proceed.")
        return

    # Check if the user selected "Exit Program"
    if "Exit Program" in selected_tasks:
        logging.debug("User selected 'Exit Program'. Exiting the application.")
        print("Exiting the application...")
        exit_program = True  # Set the flag to True to exit
        return

    # Check if the user selected "Clear Up All Task Columns"
    if "Clear Up All Task Columns" in selected_tasks:
        logging.debug("User selected 'Clear Up All Task Columns'.")
        clear_up_task_columns(all_task_names)
        # Re-display the dropdown and button
        if not exit_program:
            display_dropdown_and_button()
        return  # Exit after handling this action

    # clear up all the selected_tasks columns before start"
    clear_up_task_columns(selected_tasks)

    # clear up all cumulative assignment for all workers before start"
    cumulative_assignments.clear()  # This resets the global dictionary without reinitializing it

    # Process each selected task
    for task_name in selected_tasks:

        logging.debug(f"--- Starting assignment for task: {task_name} ---")

        # Initialize dependent weekly assignments
        dependent_weekly_assignments = initialize_dependent_weekly_assignments(sheet, task_name, column_headers)
        logging.debug(f"The initial dependent weekly assignment: {dependent_weekly_assignments} for task: {task_name}")

        # Assign workers
        assign_workers(workers, task_name, dependent_weekly_assignments, all_non_av_weekly_assignments, sheet, client, sheets_service)
        logging.debug(f"--- Finished assignment for task: {task_name} ---\n")

    logging.debug(f"Tasks processed....")
    #print("Tasks processed. Please select additional tasks or more tasks( <ctl> click ) from the dropdown or 'Exit Program'.")

    # Re-display the dropdown and button for the next selection
    if not exit_program:
        display_dropdown_and_button()


In [18]:
def clear_up_task_columns(task_names):
    # Process each selected task
    for task_name in task_names:
        logging.debug(f"Dynamically clearing cells on column: {task_name} for rows with dates in column A...")

        # Clear cells dynamically for the selected task
        clear_cells_for_rows_with_dates(sheet, header_name=task_name)
        logging.debug(f"Done clearing cells on column: {task_name} for rows with dates in column A...")

In [19]:
def display_dropdown_and_button():
    """Function to display the dropdown and start button."""
    global task_select_multiple

    logging.debug(f"Please select task or additional tasks( Ctrl + click ) from the dropdown or 'Clear Up All Task Columns' or 'Exit Program'.")

    task_select_multiple = ipw.SelectMultiple(
        options=all_task_names_dropdown,
        description='Select Tasks:',
        layout=ipw.Layout(height='140px')  # Adjust height for more options
    )
    display(task_select_multiple)

    start_button = ipw.Button(description="Start Assignment")
    start_button.on_click(on_button_click)
    display(start_button)


In [20]:
def get_assigned_worker(
    week,
    task_name,
    available_worker_list,
    dependent_weekly_assignments,
    all_non_av_weekly_assignments,
    task_worker_constraints,
    cumulative_assignments,
    last_assignment_week,
    last_worker,
    dates,
):
    """
    Determines the assigned worker for a given task and week.
    Applies filters, constraints, and priorities to assign a worker.
    """
    # Filter available workers based on task constraints
    available_workers = [
        worker for worker in available_worker_list[week - 1]
        if is_worker_allowed(worker, week, task_worker_constraints)
    ]

    # Filter out workers already assigned this week in other tasks
    available_workers = [
        worker for worker in available_workers
        if worker not in dependent_weekly_assignments.get(week, {}).values()
    ]

    # Get the date for the current week
    week_date = dates[week - 1]
    month, week_in_month = get_month_week(week)  # Get the actual month-week info
    week_label = f"Week {week_in_month} of month {month} ({week_date.strftime('%Y-%m-%d')})"
    logging.debug(f"--- {week_label} Assignments ---")
    logging.debug(f"Task Worker constraints: {task_worker_constraints}")
    logging.debug(f"Drop Down values for Available workers of this week (before constraints): {available_worker_list[week - 1]}")
    logging.debug(f"Available workers after applying task worker constraints: {available_workers}")

    # If no workers are available, return None
    if not available_workers:
        logging.debug("*** No Assigned Worker ***")
        return None

    # Get the priority list of workers
    priority_worker_list = get_priority_worker_list(
        available_workers,
        task_worker_constraints,
        cumulative_assignments,
        last_assignment_week,
    )

    logging.debug(f"Priority workers list (sorted by priority): {priority_worker_list}")

    if not priority_worker_list:
        logging.debug("*** No eligible workers available after priority reordering ***")
        return None

    # Avoid back-to-back assignments
    if len(priority_worker_list) > 1 and priority_worker_list[0] == last_worker:
        assigned_worker = priority_worker_list[1]  # Pick the second worker
    else:
        assigned_worker = priority_worker_list[0]  # Pick the top-priority worker

    # Apply collaboration constraints
    for source_task, rules in colab_constraints.items():
        for source_worker, dependent_tasks in rules.items():
            if task_name in dependent_tasks:
                dependent_worker = dependent_tasks[task_name]
                # Ensure the week and source_task exist in all_non_av_weekly_assignments
                if week in all_non_av_weekly_assignments and source_task in all_non_av_weekly_assignments[week]:
                    assigned_source_worker = all_non_av_weekly_assignments[week][source_task]
                    logging.debug(f"Colab constraint: Trying to find {source_worker} with the task of {source_task} "
                          f"in this week's non av weekly assignment, but find {assigned_source_worker} of the same task.")
                    if (
                        assigned_source_worker == source_worker and
                        dependent_worker in priority_worker_list
                    ):
                        logging.debug(f"Colab constraint: Assigning {dependent_worker} to {task_name} due to {source_worker} "
                              f"is assigned as {source_task}.")
                        assigned_worker = dependent_worker
                        break

    return assigned_worker


In [21]:
def assign_workers(workers, task_name, dependent_weekly_assignments, all_non_av_weekly_assignments, sheet, client, sheets_service):
    """
    Main function to assign workers to tasks based on constraints and priorities.
    Includes check for global weekly assignments.
    """
    # global month_week_mapping, cumulative_assignments, worker_constraints  # Explicitly use global variables

    available_worker_list = extract_dropdown_values(sheets_service, task_name)
    task_worker_constraints = worker_constraints.get(task_name, {})

    if not workers:
        logging.debug(f"[ERROR] No workers found for task '{task_name}'. Exiting.")
        return

    logging.debug(f" Using workers: {workers}")

    # Counting the date row in column A
    dates = []
    for d in sheet.get_col(1, include_tailing_empty=False)[1:]:  # Skip header
        d = d.strip()  # Remove leading/trailing whitespace
        if d:  # Check if the cell is not empty
            try:
                dates.append(datetime.strptime(d, "%m/%d/%Y"))
            except ValueError:
                logging.debug(f"Stopping count at invalid date: {d}")
                break  # Stop processing when an invalid date is encountered
        else:
            logging.debug("Stopping count at empty cell in column A.")
            break  # Stop processing when an empty cell is encountered

    weeks = len(dates)
    logging.debug(f"Total valid weeks identified: {weeks}")

    # Initialize cumulative_assignments for new workers
    for worker in workers:
        if worker not in cumulative_assignments:
            cumulative_assignments[worker] = 0

    last_assignment_week = {worker: -1 for worker in workers}

    # Map weeks to months
    for i, date in enumerate(dates):
        week_in_month = (date.day - 1) // 7 + 1
        month_week_mapping[date.month].append((i + 1, week_in_month))

    # Track the last worker assigned to this task
    last_worker = None

    # Assign workers
    data_to_write = []
    for week in range(1, weeks + 1):
        assigned_worker = get_assigned_worker(
            week=week,
            task_name=task_name,
            available_worker_list=available_worker_list,
            dependent_weekly_assignments=dependent_weekly_assignments,
            all_non_av_weekly_assignments=all_non_av_weekly_assignments,
            task_worker_constraints=task_worker_constraints,
            cumulative_assignments=cumulative_assignments,
            last_assignment_week=last_assignment_week,
            last_worker=last_worker,
            dates=dates,
          )

        if not assigned_worker:
            data_to_write.append("")
            continue

        # Update the last_worker for this task
        last_worker = assigned_worker

        cumulative_assignments[assigned_worker] += 1
        last_assignment_week[assigned_worker] = week

        # Initialize the dictionary for the current week if it doesn't exist
        # THE FIX: always initialize the week key, even if there's no dependent columns
        dependent_weekly_assignments.setdefault(week, {})

        # Track assignment in the global weekly assignments
        dependent_weekly_assignments[week][task_name] = assigned_worker

        logging.debug(f"{assigned_worker} was just assigned (Total assignments so far: {cumulative_assignments[assigned_worker]})")
        logging.debug(f"The last assignment week for each worker: {last_assignment_week}")
        cumulative_summary = " | ".join([f"{worker}: {cumulative_assignments[worker]}" for worker in workers])
        logging.debug(f"Cumulative assignments for all workers: {cumulative_summary}")
        logging.debug(f"Updated dependent weekly assigment for this week : {dependent_weekly_assignments[week]}")
        #logging.debug("\n" + "-" * 30)
        logging.debug(f"{'-' * 30}")

        data_to_write.append(assigned_worker)

    logging.debug(f"data_to_write: {data_to_write}")
    logging.debug("Writing assignments to the Google Sheet...")
    update_values_to_named_column(sheet, data_to_write, task_name)


In [22]:
# -------------------------
# Main Logic with Multi-Select Dropdown
# Restartable Main
# -------------------------

if __name__ == '__main__':
    configure_logging()
    logging.debug("Starting the co-worker auto task assignment application.")

    # Ensure JSON file is found
    json_file_path = ensure_json_file(json_file_name)
    if not json_file_path:
        logging.debug("Exiting script: JSON file not found.")
        sys.exit(1)

    # Setup Google Sheets API and pygsheets
    client, sheets_service = setup_google_services(json_file_path)

    # Dynamically get the first sheet name
    sheet = client.open_by_key(spreadsheet_id).worksheets()[0]
    column_headers = sheet.get_row(1, include_tailing_empty=False)
    sheet_name = sheet.title
    logging.debug(f"Using the first worksheet: {sheet_name}")
    logging.debug(f"The column header row for this first worksheet: {column_headers}")

    # Initialize all non-av weekly assignments
    all_non_av_weekly_assignments = {}
    for task_name in all_non_av_task_names:
        all_non_av_weekly_assignments = initialize_all_non_av_weekly_assignments(
            sheet,
            task_name,
            column_headers,
            all_non_av_weekly_assignments
        )

    logging.debug(f"The non-av task weekly assignment: {all_non_av_weekly_assignments}")

    # Fetch all unique workers for the tasks
    all_workers = set()
    for task_name in all_task_names:
        task_specific_workers = fetch_workers_from_contact_sheet(client, task_name)
        if task_specific_workers:
            all_workers.update(task_specific_workers)

    workers = list(all_workers)
    logging.debug(f"Unique workers for all tasks: {workers}")

    # Add "Clear Up All Task Columns" and "Exit Program" to the dropdown options
    all_task_names_dropdown.append("Clear Up All Task Columns")
    all_task_names_dropdown.append("Exit Program")

    # Start the interface
    display_dropdown_and_button()

[DEBUG] configure_logging: Logging configuration set up successfully.
[DEBUG] __main__: Starting the co-worker auto task assignment application.
[DEBUG] ensure_json_file: JSON file 'grounded-jetty-365400-0c0e03ecb0d2.json' not found in /content/. Checking Google Drive...


Mounted at /content/drive


[DEBUG] ensure_json_file: JSON file found in Google Drive. Copying to /content/...
[DEBUG] ensure_json_file: File copied successfully.
[DEBUG] ensure_package_installed: Package 'pygsheets' not found. Installing...
[DEBUG] ensure_package_installed: Package 'google-auth' is already installed (version 2.27.0).
[DEBUG] ensure_package_installed: Package 'google-api-python-client' is already installed (version 2.155.0).
[INFO] _init_num_threads: NumExpr defaulting to 2 threads.
[DEBUG] __main__: Using the first worksheet: 2025 Q1 worship schedule
[DEBUG] __main__: The column header row for this first worksheet: ['Date', 'Type of Service', 'Place of Service', 'Service Start Time', 'Worship Chairperson', 'Speaker', 'On-Stage Translator', "Translator's In-Ear Headset and Flat Screen Monitor Setup", "Translator's In-Ear Headset and Flat Screen Monitor Setup (Trainee)", 'Song Leader', 'Pianist', 'Usher/Welcome1', 'Usher/Welcome2', 'Usher/Welcome3', 'Power Point Preparation', 'Camera Person', 'On-

SelectMultiple(description='Select Tasks:', layout=Layout(height='140px'), options=('On-Premise Sound Control'…

Button(description='Start Assignment', style=ButtonStyle())

[DEBUG] clear_up_task_columns: Dynamically clearing cells on column: On-Premise Sound Control for rows with dates in column A...
[DEBUG] clear_cells_for_rows_with_dates: Clearing cells in the column 'On-Premise Sound Control' for rows with dates in column A...
[DEBUG] clear_cells_for_rows_with_dates: Cleared data in column 'On-Premise Sound Control' for rows: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
[DEBUG] clear_up_task_columns: Done clearing cells on column: On-Premise Sound Control for rows with dates in column A...
[DEBUG] clear_up_task_columns: Dynamically clearing cells on column: YouTube/On-Line Sound Control for rows with dates in column A...
[DEBUG] clear_cells_for_rows_with_dates: Clearing cells in the column 'YouTube/On-Line Sound Control' for rows with dates in column A...
[DEBUG] clear_cells_for_rows_with_dates: Cleared data in column 'YouTube/On-Line Sound Control' for rows: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
[DEBUG] clear_up_task_columns: Done clearing cell

SelectMultiple(description='Select Tasks:', layout=Layout(height='140px'), options=('On-Premise Sound Control'…

Button(description='Start Assignment', style=ButtonStyle())