# BTC Fake - Training Completion Simulator

This notebook gives the user a set of files that look just like files sent by BTC vendor to convey
the Sephora Course Catalog, and the transactions of managers assigning training, and employees 
completing training. Its simulates Managers who assign Daily Dose trainings to employees, and
simulates employees completing those manager-assigned trainings, PLUS training that was made by
AI recommendations.

## How it works:
1. **Preprocessing**: Downloads two files that BTC sent to the SFTP server. These two files are 
the files that represent the training Course Catalog. These are meta-data - non transactional.
2. **Manager Assigns Training**: 
   - Queries content_assignments AND content_completion tables from Databricks
   - Calculates open assignments (assignments - completions) for each employee
   - Selects and assigns up to 3 Daily Dose contents to all employees
3. **Employee Completes Training**: 
   - Loads manager assignments and AI recommendations for each employee
   - Completes training based on employee type (A, B, or F)
4. **Output Generation**:
   - NonCompletedAssignments CSV file (open assignments from Databricks + new manager assignments)
   - ContentUserCompletion CSV file (completed training with source tracking)
   - UserCompletion CSV file that is a dummy file required but not important
5. **Update NonCompletedAssignments**: Removes completed assignments from the file
6. **Summary**: Prints completion details showing which training came from manager vs AI


In [None]:
import pandas as pd
import requests
from datetime import datetime, timedelta
import random
import string
from typing import List, Dict
import urllib3
import pytz
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv()

# Disable SSL warnings when ignoring certificate verification
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Define Pacific timezone globally for all timestamp calculations
# Timestamps are calculated in PT for proper local time logic,
# then converted to UTC (offset +00:00) for CSV output files
PT = pytz.timezone('America/Los_Angeles')
UTC = pytz.UTC

# ML Training Recommender API Configuration
API_BASE_URL = os.getenv("API_BASE_URL", "https://dataiku-api-devqa.lower.internal.sephora.com")
API_ENDPOINT = os.getenv("API_ENDPOINT", "/public/api/v1/mltr/v3/run")
API_TIMEOUT = int(os.getenv("API_TIMEOUT", "30"))

# File Path Configuration
EMPLOYEES_FILE = os.getenv("EMPLOYEES_FILE", "input/employees.csv")
OUTPUT_DIR = os.getenv("OUTPUT_DIR", "generated_files")
SFTP_LOCAL_DIR = os.getenv("SFTP_LOCAL_DIR", "downloaded_files")
USER_COMPLETION_TEMPLATE_FILE = os.getenv("USER_COMPLETION_TEMPLATE_FILE", "docs/sample_files/UserCompletion_v2_YYYY_m_d_1_000001.csv")

# Databricks Configuration
DATABRICKS_HOST = os.getenv("DATABRICKS_HOST", "")
DATABRICKS_HTTP_PATH = os.getenv("DATABRICKS_HTTP_PATH", "")
DATABRICKS_TOKEN = os.getenv("DATABRICKS_TOKEN", "")
DATABRICKS_CATALOG = os.getenv("DATABRICKS_CATALOG", "retail_systems_dev")
DATABRICKS_SCHEMA = os.getenv("DATABRICKS_SCHEMA", "store_enablement")

# SFTP Outbound Server Configuration (Publishing)
SFTP_OUTBOUND_HOST = os.getenv("SFTP_OUTBOUND_HOST", "internal-sftp.sephoraus.com")
SFTP_OUTBOUND_USER = os.getenv("SFTP_OUTBOUND_USER", "SephoraRDIInternal")
SFTP_OUTBOUND_PASSWORD = os.getenv("SFTP_OUTBOUND_PASSWORD", "")
SFTP_OUTBOUND_REMOTE_PATH = os.getenv("SFTP_OUTBOUND_REMOTE_PATH", "/inbound/BTC/retailData/prod/vendor/mySephoraLearningV2")
SFTP_PUBLISH_ENABLED = os.getenv("SFTP_PUBLISH_ENABLED", "true").lower() in ['true', '1', 'yes']

# Preprocessing - Download Files from SFTP

This section prepares for a fresh run:

## Cleanup
1. Removes all files from `downloaded_files/` directory
2. Removes all files from `generated_files/` directory
3. Ensures each run starts with a clean slate

## Generate UserCompletion File
1. Copies the UserCompletion template from `docs/sample_files/`
2. Renames it with the current date (YYYY_m_d format)
3. Places it in `generated_files/` directory

## Download Files from SFTP Server
1. **CourseCatalog** - Training curriculum elements like Courses and components
2. **StandAloneContent** - All training content (videos, PDFs, documents)

These are the files sent recently by BTC to Prod and they are helpful for this
process to know about training, like which Contents are Daily Dose.

## Requirements:
1. Copy `.env.example` to `.env` and add your SFTP password
2. Files will be downloaded to `downloaded_files/` directory
3. The system finds the most recent file based on the date in the filename

## File Formats:
- CourseCatalog: `CourseCatalog_V2_YYYY_M_DD_1_random.csv`
- StandAloneContent: `StandAloneContent_v2_YYYY_M_DD_1_random.csv`

In [None]:
# Cleanup: Remove old files from previous runs
import os
import glob

def cleanup_directory(directory: str) -> int:
    """
    Remove all files in a directory (keeps the directory itself and .gitkeep files).
    
    Args:
        directory: Path to directory to clean
    
    Returns:
        Number of files removed
    """
    if not os.path.exists(directory):
        print(f"  Directory does not exist: {directory}")
        return 0
    
    files_removed = 0
    pattern = os.path.join(directory, "*")
    
    for file_path in glob.glob(pattern):
        # Skip .gitkeep files
        if os.path.basename(file_path) == ".gitkeep":
            continue
        
        # Only remove files, not subdirectories
        if os.path.isfile(file_path):
            try:
                os.remove(file_path)
                files_removed += 1
            except Exception as e:
                print(f"  Error removing {file_path}: {e}")
    
    return files_removed

print("=" * 80)
print("PREPROCESSING - Cleanup")
print("=" * 80)
print()

print("Cleaning up directories from previous runs...")
print()

# Clean generated_files directory
print(f"Cleaning {OUTPUT_DIR}/...")
removed = cleanup_directory(OUTPUT_DIR)
print(f"  Removed {removed} file(s)")
print()

# Clean downloaded_files directory
print(f"Cleaning {SFTP_LOCAL_DIR}/...")
removed = cleanup_directory(SFTP_LOCAL_DIR)
print(f"  Removed {removed} file(s)")
print()

print("=" * 80)
print()

In [None]:
# Generate UserCompletion file from template
import shutil

def generate_user_completion_file() -> str:
    """
    Copy the UserCompletion template file to generated_files with current date and time in PT.
    
    Returns:
        Path to the generated file, or None if generation fails
    """
    # Source template file (now configurable via environment variable)
    source_file = USER_COMPLETION_TEMPLATE_FILE
    
    if not os.path.exists(source_file):
        print(f"  Template file not found: {source_file}")
        return None
    
    # Generate new filename with current date and time in PT
    now = datetime.now(PT)
    year = now.strftime("%Y")
    month = now.strftime("%-m")  # No leading zero
    day = now.strftime("%-d")    # No leading zero
    
    # Generate 6-digit time suffix: HHMMSS
    time_suffix = now.strftime("%H%M%S")
    
    new_filename = f"UserCompletion_v2_{year}_{month}_{day}_1_{time_suffix}.csv"
    destination_file = os.path.join(OUTPUT_DIR, new_filename)
    
    # Copy the file
    try:
        shutil.copy2(source_file, destination_file)
        return destination_file
    except Exception as e:
        print(f"  Error copying file: {e}")
        return None

print("=" * 80)
print("PREPROCESSING - Generate UserCompletion File")
print("=" * 80)
print()

print("Generating a dummy UserCompletion file from sample file...")
user_completion_path = generate_user_completion_file()

if user_completion_path:
    print(f"✓ UserCompletion file generated successfully")
    print(f"  File: {user_completion_path}")
else:
    print("✗ Failed to generate UserCompletion file")

print()
print("=" * 80)
print()

In [None]:
# Import SFTP libraries and load environment
import os
import re
from dotenv import load_dotenv
import paramiko
from datetime import datetime

# Load environment variables
load_dotenv()

# SFTP Inbound Server Configuration
SFTP_INBOUND_HOST = os.getenv("SFTP_INBOUND_HOST", "sftp.sephora.com")
SFTP_INBOUND_USER = os.getenv("SFTP_INBOUND_USER", "SephoraMSL")
SFTP_INBOUND_PASSWORD = os.getenv("SFTP_INBOUND_PASSWORD", "your_SFTP_INBOUND_PASSWORD_placeholder")
SFTP_INBOUND_REMOTE_PATH = os.getenv("SFTP_INBOUND_REMOTE_PATH", "/inbound/BTC/retailData/prod/vendor/mySephoraLearning-archive")

def parse_course_catalog_filename(filename: str) -> tuple:
    """
    Parse course catalog filename to extract date components.
    Format: CourseCatalog_V2_YYYY_M_DD_1_random.csv
    
    Args:
        filename: The course catalog filename
    
    Returns:
        Tuple of (year, month, day, datetime_obj) or None if parsing fails
    """
    pattern = r'CourseCatalog_V2_(\d{4})_(\d{1,2})_(\d{1,2})_\d+_[a-z0-9]+\.csv'
    match = re.match(pattern, filename, re.IGNORECASE)
    
    if match:
        year = int(match.group(1))
        month = int(match.group(2))
        day = int(match.group(3))
        
        try:
            date_obj = datetime(year, month, day)
            return (year, month, day, date_obj)
        except ValueError:
            return None
    return None

def parse_standalone_content_filename(filename: str) -> tuple:
    """
    Parse standalone content filename to extract date components.
    Format: StandAloneContent_v2_YYYY_M_DD_1_random.csv
    
    Args:
        filename: The standalone content filename
    
    Returns:
        Tuple of (year, month, day, datetime_obj) or None if parsing fails
    """
    pattern = r'StandAloneContent_v2_(\d{4})_(\d{1,2})_(\d{1,2})_\d+_[a-z0-9]+\.csv'
    match = re.match(pattern, filename, re.IGNORECASE)
    
    if match:
        year = int(match.group(1))
        month = int(match.group(2))
        day = int(match.group(3))
        
        try:
            date_obj = datetime(year, month, day)
            return (year, month, day, date_obj)
        except ValueError:
            return None
    return None

def download_most_recent_course_catalog() -> str:
    """
    Connect to SFTP inbound server and download the most recent CourseCatalog file.
    
    Returns:
        Path to the downloaded file, or None if download fails
    """
    try:
        # Create SFTP connection
        transport = paramiko.Transport((SFTP_INBOUND_HOST, 22))
        transport.connect(username=SFTP_INBOUND_USER, password=SFTP_INBOUND_PASSWORD)
        sftp = paramiko.SFTPClient.from_transport(transport)
        
        print(f"Connected to SFTP inbound server: {SFTP_INBOUND_HOST}")
        
        # Change to remote directory
        sftp.chdir(SFTP_INBOUND_REMOTE_PATH)
        print(f"Changed to directory: {SFTP_INBOUND_REMOTE_PATH}")
        
        # List all files in the directory
        files = sftp.listdir()
        print(f"Found {len(files)} files in directory")
        
        # Filter for course catalog files and parse dates
        catalog_files = []
        for filename in files:
            parsed = parse_course_catalog_filename(filename)
            if parsed:
                catalog_files.append((filename, parsed[3]))  # (filename, datetime_obj)
        
        if not catalog_files:
            print("No valid CourseCatalog files found")
            sftp.close()
            transport.close()
            return None
        
        # Sort by date (most recent first)
        catalog_files.sort(key=lambda x: x[1], reverse=True)
        most_recent_file = catalog_files[0][0]
        most_recent_date = catalog_files[0][1]
        
        print(f"Most recent file: {most_recent_file} (date: {most_recent_date.strftime('%Y-%m-%d')})")
        
        # Download the file
        local_path = os.path.join(SFTP_LOCAL_DIR, most_recent_file)
        sftp.get(most_recent_file, local_path)
        print(f"Downloaded to: {local_path}")
        
        # Close connections
        sftp.close()
        transport.close()
        
        return local_path
        
    except Exception as e:
        print(f"Error downloading course catalog: {e}")
        print(f"  SFTP Inbound Host: {SFTP_INBOUND_HOST}")
        print(f"  SFTP Inbound Path: {SFTP_INBOUND_REMOTE_PATH}")
        print(f"  SFTP Inbound User: {SFTP_INBOUND_USER}")
        return None

def download_most_recent_standalone_content() -> str:
    """
    Connect to SFTP inbound server and download the most recent StandAloneContent file.
    
    Returns:
        Path to the downloaded file, or None if download fails
    """
    try:
        # Create SFTP connection
        transport = paramiko.Transport((SFTP_INBOUND_HOST, 22))
        transport.connect(username=SFTP_INBOUND_USER, password=SFTP_INBOUND_PASSWORD)
        sftp = paramiko.SFTPClient.from_transport(transport)
        
        print(f"Connected to SFTP inbound server: {SFTP_INBOUND_HOST}")
        
        # Change to remote directory
        sftp.chdir(SFTP_INBOUND_REMOTE_PATH)
        print(f"Changed to directory: {SFTP_INBOUND_REMOTE_PATH}")
        
        # List all files in the directory
        files = sftp.listdir()
        print(f"Found {len(files)} files in directory")
        
        # Filter for standalone content files and parse dates
        content_files = []
        for filename in files:
            parsed = parse_standalone_content_filename(filename)
            if parsed:
                content_files.append((filename, parsed[3]))  # (filename, datetime_obj)
        
        if not content_files:
            print("No valid StandAloneContent files found")
            sftp.close()
            transport.close()
            return None
        
        # Sort by date (most recent first)
        content_files.sort(key=lambda x: x[1], reverse=True)
        most_recent_file = content_files[0][0]
        most_recent_date = content_files[0][1]
        
        print(f"Most recent file: {most_recent_file} (date: {most_recent_date.strftime('%Y-%m-%d')})")
        
        # Download the file
        local_path = os.path.join(SFTP_LOCAL_DIR, most_recent_file)
        sftp.get(most_recent_file, local_path)
        print(f"Downloaded to: {local_path}")
        
        # Close connections
        sftp.close()
        transport.close()
        
        return local_path
        
    except Exception as e:
        print(f"Error downloading standalone content: {e}")
        print(f"  SFTP Inbound Host: {SFTP_INBOUND_HOST}")
        print(f"  SFTP Inbound Path: {SFTP_INBOUND_REMOTE_PATH}")
        print(f"  SFTP Inbound User: {SFTP_INBOUND_USER}")
        return None

In [None]:
def publish_files_to_sftp_outbound(files_to_publish: List[str]) -> bool:
    """
    Publish generated files to SFTP outbound server.
    
    Args:
        files_to_publish: List of local file paths to upload
    
    Returns:
        True if all files published successfully, False otherwise
    """
    # Check if publishing is enabled
    if not SFTP_PUBLISH_ENABLED:
        print("SFTP publishing is disabled (SFTP_PUBLISH_ENABLED=false)")
        return False
    
    # Check if password is configured
    if not SFTP_OUTBOUND_PASSWORD:
        print("ERROR: SFTP outbound password not configured")
        print("  Set SFTP_OUTBOUND_PASSWORD in .env file")
        return False
    
    # Check if we have files to publish
    if not files_to_publish:
        print("No files to publish")
        return False
    
    try:
        # Create SFTP connection
        print(f"Connecting to SFTP outbound server: {SFTP_OUTBOUND_HOST}")
        transport = paramiko.Transport((SFTP_OUTBOUND_HOST, 22))
        transport.connect(username=SFTP_OUTBOUND_USER, password=SFTP_OUTBOUND_PASSWORD)
        sftp = paramiko.SFTPClient.from_transport(transport)
        
        print(f"✓ Connected successfully")
        
        # Change to remote directory
        try:
            sftp.chdir(SFTP_OUTBOUND_REMOTE_PATH)
            print(f"✓ Changed to directory: {SFTP_OUTBOUND_REMOTE_PATH}")
        except IOError:
            print(f"ERROR: Remote directory does not exist: {SFTP_OUTBOUND_REMOTE_PATH}")
            sftp.close()
            transport.close()
            return False
        
        # Upload each file
        published_count = 0
        failed_count = 0
        
        for local_file_path in files_to_publish:
            if not os.path.exists(local_file_path):
                print(f"  ⚠ File not found (skipping): {local_file_path}")
                failed_count += 1
                continue
            
            # Get just the filename (not the full path)
            filename = os.path.basename(local_file_path)
            
            try:
                # Upload the file
                sftp.put(local_file_path, filename)
                print(f"  ✓ Uploaded: {filename}")
                published_count += 1
            except Exception as e:
                print(f"  ✗ Failed to upload {filename}: {e}")
                failed_count += 1
        
        # Close connections
        sftp.close()
        transport.close()
        
        # Summary
        print()
        print(f"Publishing summary:")
        print(f"  Successfully published: {published_count} file(s)")
        if failed_count > 0:
            print(f"  Failed: {failed_count} file(s)")
        
        return failed_count == 0
        
    except Exception as e:
        print(f"ERROR: Failed to publish files to SFTP outbound server")
        print(f"  Error: {e}")
        print(f"  Host: {SFTP_OUTBOUND_HOST}")
        print(f"  User: {SFTP_OUTBOUND_USER}")
        print(f"  Remote Path: {SFTP_OUTBOUND_REMOTE_PATH}")
        return False

In [None]:
# Execute: Download Course Catalog and Standalone Content from SFTP
print("=" * 80)
print("PREPROCESSING - Download Files from SFTP Inbound Server")
print("=" * 80)
print()

# Download Course Catalog
print("Downloading Course Catalog...")
print("-" * 80)
course_catalog_path = download_most_recent_course_catalog()

if course_catalog_path:
    print()
    print(f"✓ Course catalog downloaded successfully")
    print(f"  File: {course_catalog_path}")
    
    # Optionally load and preview the file
    try:
        catalog_df = pd.read_csv(course_catalog_path)
        print(f"  Rows: {len(catalog_df)}")
        print(f"  Columns: {list(catalog_df.columns)}")
    except Exception as e:
        print(f"  Note: Could not preview file: {e}")
else:
    print()
    print("✗ Failed to download course catalog")
    print("  Please check:")
    print("    1. .env file contains valid SFTP_INBOUND_PASSWORD")
    print("    2. SFTP inbound server is accessible")
    print("    3. Remote path exists and contains CourseCatalog files")

print()
print("-" * 80)

# Download Standalone Content
print("Downloading Standalone Content...")
print("-" * 80)
standalone_content_path = download_most_recent_standalone_content()

if standalone_content_path:
    print()
    print(f"✓ Standalone content downloaded successfully")
    print(f"  File: {standalone_content_path}")
    
    # Optionally load and preview the file
    try:
        content_df = pd.read_csv(standalone_content_path)
        print(f"  Rows: {len(content_df)}")
        print(f"  Columns: {list(content_df.columns)}")
    except Exception as e:
        print(f"  Note: Could not preview file: {e}")
else:
    print()
    print("✗ Failed to download standalone content")
    print("  Please check:")
    print("    1. .env file contains valid SFTP_INBOUND_PASSWORD")
    print("    2. SFTP inbound server is accessible")
    print("    3. Remote path exists and contains StandAloneContent files")

print()
print("=" * 80)

In [None]:
# Load employees (used by both Manager and Employee Training sections)
print(f"Loading employees from {EMPLOYEES_FILE}...")
employees_df = pd.read_csv(EMPLOYEES_FILE)

# Filter out comment rows (rows where employee_id starts with '#')
initial_count = len(employees_df)
employees_df['employee_id'] = employees_df['employee_id'].astype(str)
employees_df = employees_df[~employees_df['employee_id'].str.startswith('#')].copy()

# Convert employee_id back to int after filtering comments
employees_df['employee_id'] = employees_df['employee_id'].astype(int)

filtered_count = initial_count - len(employees_df)

if filtered_count > 0:
    print(f"Filtered out {filtered_count} comment row(s)")

print(f"Loaded {len(employees_df)} employees")
print()

# Helper function for formatting content IDs (used by both sections)
def format_content_id(content_id: int) -> str:
    """
    Format content ID with commas for human readability.
    Example: 1915085 -> "1,915,085"
    
    Args:
        content_id: The numeric content ID
    
    Returns:
        Formatted string with commas
    """
    return f"{content_id:,}"

# Manager - Assign Training to Employees

This section implements the manager functionality:
1. Queries Databricks for employee state using BOTH content_assignments and content_completion tables
2. Calculates open assignments (assignments - completions) for each employee
3. Loads the standalone content file from preprocessing
4. Filters for content where Daily_Dose_BA is TRUE
5. Sorts by CreateDate (most recent first)
6. Selects up to 3 Daily Dose contents to assign
7. Checks for Daily Dose conflicts (employees to skip):
   - **Conflict Check 1**: Queries content_completion table for employees who completed ANY Daily Dose this week
   - **Conflict Check 2**: Checks open assignments for Daily Dose with due dates this week
8. Skips employees who have Daily Dose conflicts
9. Assigns the 3 Daily Dose contents to all eligible employees
10. Assigns 1 random non-Daily Dose content to ALL employees (no skipping)
11. Generates a NonCompletedAssignments CSV file with:
    - Open assignments from Databricks (written FIRST)
    - New manager assignments (written SECOND)

**Note**: The manager will NOT assign Daily Dose to an employee if they already completed ANY Daily Dose in the current week (Sunday to Sunday).


In [None]:
import pytz

# Date/time helper functions - all use PT timezone defined in Cell 2

def get_sunday_of_current_week() -> datetime:
    """
    Get Sunday of the current week at 23:59:59 PT.
    
    If today is Sunday, returns today at 23:59:59.
    If today is Monday-Saturday, returns the most recent past Sunday at 23:59:59.
    
    Returns:
        datetime object for Sunday of current week at 23:59:59 PT
    """
    now = datetime.now(PT)
    # Sunday is 6 in Python's weekday() (Monday=0, Sunday=6)
    current_weekday = now.weekday()
    
    if current_weekday == 6:
        # Today is Sunday
        sunday = now
    else:
        # Go back to the most recent Sunday
        # Monday=0, so we need to go back (current_weekday + 1) days to reach Sunday
        days_since_sunday = current_weekday + 1
        sunday = now - timedelta(days=days_since_sunday)
    
    # Set time to 23:59:59 PT
    return sunday.replace(hour=23, minute=59, second=59, microsecond=0)

def get_next_future_sunday() -> datetime:
    """
    Get the next future Sunday at 22:49:49 PT.
    
    If today is Sunday, returns next Sunday (7 days from now).
    If today is Monday-Saturday, returns the upcoming Sunday.
    
    Returns:
        datetime object for the next future Sunday at 22:49:49 PT
    """
    now = datetime.now(PT)
    # Sunday is 6 in Python's weekday() (Monday=0, Sunday=6)
    current_weekday = now.weekday()
    
    if current_weekday == 6:
        # Today is Sunday, next Sunday is 7 days away
        days_until_sunday = 7
    else:
        # Calculate days until next Sunday
        # Monday=0, Tuesday=1, ..., Saturday=5
        # Days to Sunday: 6 - current_weekday
        days_until_sunday = 6 - current_weekday
    
    next_sunday = now + timedelta(days=days_until_sunday)
    
    # Set time to 22:49:49 PT
    return next_sunday.replace(hour=22, minute=49, second=49, microsecond=0)

def generate_request_id() -> str:
    """
    Generate RequestId in format: fake:DD
    Example: fake:14 (for the 14th day of the month)
    Uses PT timezone for date component.
    
    Returns:
        RequestId string
    """
    now = datetime.now(PT)
    day = now.strftime("%d")  # 2-digit day with leading zero
    
    return f"fake:{day}"

def generate_non_completed_assignments_filename() -> str:
    """
    Generate NonCompletedAssignments filename with timestamp.
    Format: Non_Completed_Assignments_V2_YYYY_M_DD_1_HHMMSS.csv
    Uses PT timezone for date and time components.
    
    Returns:
        Generated filename
    """
    now = datetime.now(PT)
    year = now.strftime("%Y")
    month = now.strftime("%-m")  # No leading zero
    day = now.strftime("%-d")    # No leading zero
    
    # Generate 6-digit time suffix: HHMMSS
    time_suffix = now.strftime("%H%M%S")
    
    return f"Non_Completed_Assignments_V2_{year}_{month}_{day}_1_{time_suffix}.csv"

In [None]:
def get_open_assignments_from_databricks(employee_ids: list) -> pd.DataFrame:
    """
    Query Databricks to get open (non-completed) assignments for specific employees.
    
    Open assignments = content_assignments - content_completion
    
    Args:
        employee_ids: List of employee IDs (ba_id) to query assignments for
    
    Returns:
        DataFrame with columns: ba_id, content_id, assignment_date, assignment_begin_date, 
                               assignment_due_date, content_type
        Returns empty DataFrame if Databricks is not configured (allows process to continue).
        Raises exception if Databricks IS configured but query fails (stops process).
    """
    # Check if Databricks is configured
    if not all([DATABRICKS_HOST, DATABRICKS_HTTP_PATH, DATABRICKS_TOKEN]):
        print("Databricks configuration not found. Skipping assignments query.")
        print("  Set DATABRICKS_HOST, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN in .env file")
        print("  Process will continue with only new manager assignments.")
        return pd.DataFrame()
    
    # Check if we have employee IDs to query
    if not employee_ids:
        print("No employee IDs provided. Skipping assignments query.")
        return pd.DataFrame()
    
    # Databricks IS configured - any error should stop the process
    try:
        from databricks import sql
    except ImportError as e:
        print("=" * 80)
        print("ERROR: databricks-sql-connector not installed")
        print("=" * 80)
        print()
        print("The databricks-sql-connector library is required but not installed.")
        print()
        print("To fix this issue:")
        print("  1. Activate your virtual environment:")
        print("     source .venv/bin/activate  # macOS/Linux")
        print()
        print("  2. Install the required library:")
        print("     pip install databricks-sql-connector")
        print()
        print("  OR install all requirements:")
        print("     pip install -r requirements.txt")
        print()
        print("=" * 80)
        raise ImportError("databricks-sql-connector not installed") from e
    
    try:
        # Connect to Databricks
        print(f"Connecting to Databricks...")
        print(f"  Host: {DATABRICKS_HOST}")
        print(f"  HTTP Path: {DATABRICKS_HTTP_PATH}")
        
        connection = sql.connect(
            server_hostname=DATABRICKS_HOST,
            http_path=DATABRICKS_HTTP_PATH,
            access_token=DATABRICKS_TOKEN
        )
        
        print(f"✓ Connected successfully")
        
        cursor = connection.cursor()
        
        # Table names
        assignments_table = f"{DATABRICKS_CATALOG}.{DATABRICKS_SCHEMA}.content_assignments"
        completion_table = f"{DATABRICKS_CATALOG}.{DATABRICKS_SCHEMA}.content_completion"
        
        print(f"Querying tables:")
        print(f"  Assignments: {assignments_table}")
        print(f"  Completions: {completion_table}")
        print(f"  For {len(employee_ids)} employee(s): {sorted(employee_ids)}")
        
        # Build IN clause for employee IDs
        employee_ids_str = ", ".join([str(emp_id) for emp_id in employee_ids])
        
        # Query: Get all assignments, then LEFT JOIN with completions to find open ones
        # Open assignments are those where completion.ba_id IS NULL (no matching completion)
        query = f"""
        SELECT 
            a.ba_id,
            a.content_id,
            a.assignment_date,
            a.assignment_begin_date,
            a.assignment_due_date,
            a.content_type
        FROM {assignments_table} a
        LEFT JOIN {completion_table} c
            ON a.ba_id = c.ba_id 
            AND a.content_id = c.content_id
        WHERE a.ba_id IN ({employee_ids_str})
            AND c.ba_id IS NULL
        ORDER BY a.ba_id, a.assignment_due_date
        """
        
        cursor.execute(query)
        
        # Fetch results
        columns = [desc[0] for desc in cursor.description]
        rows = cursor.fetchall()
        
        # Close connection
        cursor.close()
        connection.close()
        
        # Create DataFrame
        df = pd.DataFrame(rows, columns=columns)
        
        print(f"✓ Retrieved {len(df)} open assignment(s) from Databricks")
        print(f"  (Open = assignments NOT in completions)")
        if len(df) > 0:
            unique_employees = df['ba_id'].nunique()
            print(f"  Open assignments for {unique_employees} employee(s)")
        
        return df
        
    except Exception as e:
        print()
        print("=" * 80)
        print("ERROR: Failed to query Databricks")
        print("=" * 80)
        print()
        print(f"Error type: {type(e).__name__}")
        print(f"Error message: {str(e)}")
        print()
        print("Connection details:")
        print(f"  Host: {DATABRICKS_HOST}")
        print(f"  HTTP Path: {DATABRICKS_HTTP_PATH}")
        print(f"  Catalog: {DATABRICKS_CATALOG}")
        print(f"  Schema: {DATABRICKS_SCHEMA}")
        print(f"  Assignments table: {DATABRICKS_CATALOG}.{DATABRICKS_SCHEMA}.content_assignments")
        print(f"  Completion table: {DATABRICKS_CATALOG}.{DATABRICKS_SCHEMA}.content_completion")
        print()
        print("Common issues:")
        print("  1. Invalid or expired access token")
        print("     → Generate new token in Databricks: User Settings → Access Tokens")
        print()
        print("  2. Incorrect hostname or HTTP path")
        print("     → Verify DATABRICKS_HOST and DATABRICKS_HTTP_PATH in .env")
        print()
        print("  3. Table does not exist")
        print("     → Verify both tables exist:")
        print(f"       {DATABRICKS_CATALOG}.{DATABRICKS_SCHEMA}.content_assignments")
        print(f"       {DATABRICKS_CATALOG}.{DATABRICKS_SCHEMA}.content_completion")
        print()
        print("  4. Network/firewall issues")
        print("     → Ensure you can reach the Databricks workspace")
        print()
        print("  5. Insufficient permissions")
        print("     → Verify your account has SELECT permission on both tables")
        print()
        print("=" * 80)
        raise RuntimeError(f"Databricks query failed: {str(e)}") from e


In [None]:
# Manager - Query Databricks and select training content to assign to employees
print("=" * 80)
print("MANAGER - Assigning Training to Employees")
print("=" * 80)
print()

# Step 1: Get list of employee IDs from input file and query Databricks
employee_ids_list = employees_df['employee_id'].tolist()
print(f"Querying Databricks for assignments for {len(employee_ids_list)} employees from input file...")
print("-" * 80)
open_assignments_df = get_open_assignments_from_databricks(employee_ids_list)
print()

# Convert Databricks assignments to the NonCompletedAssignments format
databricks_assignments = []
if not open_assignments_df.empty:
    for _, row in open_assignments_df.iterrows():
        databricks_assignments.append({
            "UserID": int(row['ba_id']),
            "CreateDate_text": row['assignment_date'].isoformat() if hasattr(row['assignment_date'], 'isoformat') else str(row['assignment_date']),
            "RequestId": generate_request_id(),
            "TrainingElementId": format_content_id(int(row['content_id'])),
            "Start_Date_text": row['assignment_begin_date'].isoformat() if hasattr(row['assignment_begin_date'], 'isoformat') else str(row['assignment_begin_date']),
            "DueDate_text": row['assignment_due_date'].isoformat() if hasattr(row['assignment_due_date'], 'isoformat') else str(row['assignment_due_date']),
            "ContentType": row['content_type'] if 'content_type' in row else "Media"
        })
    print(f"Converted {len(databricks_assignments)} Databricks assignment(s) to output format")
    print()

# Step 2: Load the standalone content file
new_manager_assignments = []
employee_assigned_daily_dose = {}  # Track Daily Dose assignments
employee_assigned_random = {}  # Track random non-Daily Dose assignments

if standalone_content_path and os.path.exists(standalone_content_path):
    print(f"Loading standalone content from: {standalone_content_path}")
    standalone_df = pd.read_csv(standalone_content_path)
    print(f"Loaded {len(standalone_df)} content items")
    print()
    
    # Calculate dates for NEW assignments in PT, then convert to UTC for CSV output
    created_date = datetime.now(PT).astimezone(UTC).isoformat()
    start_date = get_sunday_of_current_week().astimezone(UTC).isoformat()
    due_date = get_next_future_sunday().astimezone(UTC).isoformat()
    
    # PART A: DAILY DOSE ASSIGNMENTS
    print("=" * 80)
    print("PART A: DAILY DOSE ASSIGNMENTS")
    print("=" * 80)
    print()
    
    # Filter for content where Daily_Dose_BA is TRUE
    print("Filtering for Daily Dose training (Daily_Dose_BA = TRUE)...")
    daily_dose_content = standalone_df[
        (standalone_df['Daily_Dose_BA'] == 'TRUE') | 
        (standalone_df['Daily_Dose_BA'] == True)
    ].copy()
    
    print(f"Found {len(daily_dose_content)} Daily Dose content items")
    print()
    
    if len(daily_dose_content) > 0:
        # Convert CreateDate to datetime for sorting
        daily_dose_content['CreateDate_dt'] = pd.to_datetime(daily_dose_content['CreateDate'])
        
        # Sort by CreateDate (most recent first)
        daily_dose_content = daily_dose_content.sort_values('CreateDate_dt', ascending=False)
        
        # Select up to 3 most recent contents
        contents_to_assign = daily_dose_content.head(3)
        
        print(f"Selected {len(contents_to_assign)} Daily Dose content(s) to assign:")
        for idx, content in contents_to_assign.iterrows():
            content_id = content['ContentId']
            content_name = content['ContentName']
            create_date = content['CreateDate']
            print(f"  {format_content_id(int(content_id.replace(',', '')))} - {content_name} (Created: {create_date})")
        print()
        
        # Check which employees to skip for Daily Dose
        print("-" * 80)
        print("Checking for Daily Dose conflicts...")
        print()
        
        # Calculate current week boundaries (Sunday to Sunday)
        now_pt = datetime.now(PT)
        sunday_of_current_week = get_sunday_of_current_week()
        next_sunday = get_next_future_sunday()
        
        # For completion date comparison, use date objects
        week_start_date = sunday_of_current_week.date()
        week_end_date = next_sunday.date()
        
        print(f"Current week (for completion check): {week_start_date} to {week_end_date}")
        print()
        
        # Build set of Daily Dose content IDs for quick lookup
        daily_dose_content_ids = set()
        for _, dd_content in standalone_df[
            (standalone_df['Daily_Dose_BA'] == 'TRUE') | 
            (standalone_df['Daily_Dose_BA'] == True)
        ].iterrows():
            content_id_str = dd_content['ContentId']
            if isinstance(content_id_str, str):
                content_id_numeric = int(content_id_str.replace(',', ''))
            else:
                content_id_numeric = int(content_id_str)
            daily_dose_content_ids.add(content_id_numeric)
        
        # Check employees for Daily Dose conflicts
        employees_to_skip_dd = {}  # Map employee_id -> reason for skipping Daily Dose
        
        # CONFLICT CHECK 1: Query content_completion for Daily Dose completions this week
        if all([DATABRICKS_HOST, DATABRICKS_HTTP_PATH, DATABRICKS_TOKEN]):
            try:
                from databricks import sql
                
                # Connect to Databricks
                connection = sql.connect(
                    server_hostname=DATABRICKS_HOST,
                    http_path=DATABRICKS_HTTP_PATH,
                    access_token=DATABRICKS_TOKEN
                )
                
                cursor = connection.cursor()
                
                # Table name
                completion_table = f"{DATABRICKS_CATALOG}.{DATABRICKS_SCHEMA}.content_completion"
                
                print(f"Querying {completion_table} for Daily Dose completions this week...")
                
                # Build IN clause for employee IDs and Daily Dose content IDs
                employee_ids_str = ", ".join([str(emp_id) for emp_id in employee_ids_list])
                dd_content_ids_str = ", ".join([str(cid) for cid in daily_dose_content_ids])
                
                # Query: Find employees who completed ANY Daily Dose content this week
                query = f"""
                SELECT 
                    ba_id,
                    content_id,
                    completion_date
                FROM {completion_table}
                WHERE ba_id IN ({employee_ids_str})
                    AND content_id IN ({dd_content_ids_str})
                    AND completion_date >= '{week_start_date}'
                    AND completion_date <= '{week_end_date}'
                ORDER BY ba_id, completion_date DESC
                """
                
                cursor.execute(query)
                
                # Fetch results
                completion_rows = cursor.fetchall()
                
                # Close connection
                cursor.close()
                connection.close()
                
                if completion_rows:
                    print(f"Found {len(completion_rows)} Daily Dose completion(s) this week:")
                    for row in completion_rows:
                        emp_id = row[0]
                        content_id = row[1]
                        comp_date = row[2]
                        
                        # Mark this employee to skip
                        if emp_id not in employees_to_skip_dd:
                            employees_to_skip_dd[emp_id] = {
                                'reason': 'completed_this_week',
                                'content_id': content_id,
                                'completion_date': comp_date
                            }
                        
                        print(f"  Employee {emp_id}: Completed Daily Dose {content_id} on {comp_date}")
                    print()
                else:
                    print("✓ No Daily Dose completions found this week")
                    print()
                    
            except Exception as e:
                print(f"⚠ Could not query content_completion: {e}")
                print("  Continuing without completion check...")
                print()
        
        # CONFLICT CHECK 2: Check open assignments for Daily Dose with due dates this week
        today_date = now_pt.date()
        next_sunday_date = next_sunday.date()
        
        if not open_assignments_df.empty:
            for _, assignment in open_assignments_df.iterrows():
                employee_id = int(assignment['ba_id'])
                content_id = int(assignment['content_id'])
                assignment_due_date = assignment['assignment_due_date']
                
                # Check if this is a Daily Dose assignment
                if content_id in daily_dose_content_ids:
                    # Convert due date to date for comparison
                    if hasattr(assignment_due_date, 'date'):
                        due_date_check = assignment_due_date.date()
                    else:
                        from dateutil import parser
                        due_date_check = parser.parse(str(assignment_due_date)).date()
                    
                    # Check if due date is today or next future Sunday
                    if due_date_check == today_date or due_date_check == next_sunday_date:
                        if employee_id not in employees_to_skip_dd:
                            employees_to_skip_dd[employee_id] = {
                                'reason': 'open_assignment_this_week',
                                'content_id': content_id,
                                'due_date': due_date_check
                            }
        
        # Log employees who will be skipped for Daily Dose
        if employees_to_skip_dd:
            print(f"❌ SKIPPING {len(employees_to_skip_dd)} employee(s) - Daily Dose conflict:")
            print()
            for emp_id in sorted(employees_to_skip_dd.keys()):
                skip_info = employees_to_skip_dd[emp_id]
                print(f"  Employee {emp_id}:")
                
                if skip_info['reason'] == 'completed_this_week':
                    print(f"    Reason: Already completed Daily Dose this week")
                    print(f"    Completed Content: {skip_info['content_id']}")
                    print(f"    Completion Date: {skip_info['completion_date']}")
                else:
                    print(f"    Reason: Has open Daily Dose assignment for this week")
                    print(f"    Assigned Content: {skip_info['content_id']}")
                    print(f"    Due Date: {skip_info['due_date']}")
                print()
        else:
            print(f"✓ No Daily Dose conflicts found")
            print()
        
        print("-" * 80)
        print()
        
        # Create Daily Dose assignments for eligible employees
        eligible_employees_dd = [emp for _, emp in employees_df.iterrows() 
                                 if emp['employee_id'] not in employees_to_skip_dd]
        
        if len(eligible_employees_dd) > 0:
            print(f"✓ ASSIGNING DAILY DOSE TO {len(eligible_employees_dd)} ELIGIBLE EMPLOYEE(S)")
            print()
            
            for employee in eligible_employees_dd:
                employee_id = employee['employee_id']
                
                print(f"Employee {employee_id}:")
                
                # Assign each selected Daily Dose content to this employee
                for _, content in contents_to_assign.iterrows():
                    content_id = content['ContentId']
                    content_name = content['ContentName']
                    
                    print(f"  ✓ Daily Dose: {content_id} - {content_name}")
                    
                    # Store what was assigned
                    if employee_id not in employee_assigned_daily_dose:
                        employee_assigned_daily_dose[employee_id] = []
                    employee_assigned_daily_dose[employee_id].append({
                        'content_id': content_id,
                        'content_name': content_name
                    })
                    
                    assignment = {
                        "UserID": employee_id,
                        "CreateDate_text": created_date,
                        "RequestId": generate_request_id(),
                        "TrainingElementId": content_id,
                        "Start_Date_text": start_date,
                        "DueDate_text": due_date,
                        "ContentType": "Media"
                    }
                    
                    new_manager_assignments.append(assignment)
                
                print()
            
            dd_count = len([a for a in new_manager_assignments])
            print(f"Created {dd_count} Daily Dose assignments")
            print()
        else:
            print("⚠ NO ELIGIBLE EMPLOYEES for Daily Dose")
            print("All employees have Daily Dose conflicts (completed or assigned this week).")
            print()
    else:
        print("No Daily Dose content found.")
        print()
    
    # PART B: RANDOM NON-DAILY DOSE ASSIGNMENTS
    print("=" * 80)
    print("PART B: RANDOM NON-DAILY DOSE ASSIGNMENTS")
    print("=" * 80)
    print()
    
    # Filter for content where Daily_Dose_BA is NOT TRUE
    print("Filtering for NON-Daily Dose training (Daily_Dose_BA != TRUE)...")
    non_daily_dose_content = standalone_df[
        ~((standalone_df['Daily_Dose_BA'] == 'TRUE') | 
          (standalone_df['Daily_Dose_BA'] == True))
    ].copy()
    
    print(f"Found {len(non_daily_dose_content)} non-Daily Dose content items")
    print()
    
    if len(non_daily_dose_content) > 0:
        # Collect assignment data for table display
        random_assignment_rows = []
        
        for _, employee in employees_df.iterrows():
            employee_id = employee['employee_id']
            
            # Randomly select one content from non-Daily Dose content
            selected_content = non_daily_dose_content.sample(n=1).iloc[0]
            content_id = selected_content['ContentId']
            content_name = selected_content['ContentName']
            
            # Store for table display
            random_assignment_rows.append((employee_id, content_id, content_name))
            
            # Store what was assigned
            employee_assigned_random[employee_id] = {
                'content_id': content_id,
                'content_name': content_name
            }
            
            assignment = {
                "UserID": employee_id,
                "CreateDate_text": created_date,
                "RequestId": generate_request_id(),
                "TrainingElementId": content_id,
                "Start_Date_text": start_date,
                "DueDate_text": due_date,
                "ContentType": "Media"
            }
            
            new_manager_assignments.append(assignment)
        
        # Print header
        print(f"Random non-Daily Dose training assigned to {len(employees_df)} employee(s):")
        print()
        print(f"{'Employee ID':<15} | {'Content ID':<15} | {'Content Name'}")
        print(f"{'-' * 15} | {'-' * 15} | {'-' * 50}")
        
        # Print each assignment
        for emp_id, content_id, content_name in sorted(random_assignment_rows):
            print(f"{emp_id:<15} | {content_id:<15} | {content_name}")
        
        print()
        random_count = len(employee_assigned_random)
        print(f"Created {random_count} random non-Daily Dose assignments")
        print()
    else:
        print("No non-Daily Dose content found.")
        print()
else:
    print("Standalone content file not found.")
    print("Please run the preprocessing section first.")

# Step 3: Combine Databricks assignments with new manager assignments
print("=" * 80)
print("COMBINING ASSIGNMENTS")
print("=" * 80)
print()
print(f"  Assignments from Databricks Table: {len(databricks_assignments)}")
print(f"  New Daily Dose assignments: {len(employee_assigned_daily_dose) * len(contents_to_assign) if employee_assigned_daily_dose else 0}")
print(f"  New random Non-Daily Dose assignments: {len(employee_assigned_random)}")
print(f"  Total new assignments: {len(new_manager_assignments)}")

# Databricks assignments go FIRST (as per manager.md)
all_assignments = databricks_assignments + new_manager_assignments
print(f"  Total assignments for output: {len(all_assignments)}")
print()

# Step 4: Generate output file
if all_assignments:
    assignments_filename = generate_non_completed_assignments_filename()
    assignments_path = f"{OUTPUT_DIR}/{assignments_filename}"
    
    # Create DataFrame
    assignments_df = pd.DataFrame(all_assignments)
    
    # Write to CSV with proper quoting
    assignments_df.to_csv(assignments_path, index=False, quoting=1)  # quoting=1 means QUOTE_ALL
    
    print(f"Generated NonCompletedAssignments file: {assignments_filename}")
    print(f"  Databricks Table assignments: {len(databricks_assignments)}")
    print(f"  New manager assignments: {len(new_manager_assignments)}")
    print(f"  Total assignments in file: {len(all_assignments)}")
    print()
    
    # Print summary for NEW assignments
    if new_manager_assignments:
        print("New Assignment Summary:")
        if employee_assigned_daily_dose:
            print(f"  Daily Dose assignments: {len(employee_assigned_daily_dose)} employee(s) × {len(contents_to_assign)} contents = {len(employee_assigned_daily_dose) * len(contents_to_assign)}")
            if employees_to_skip_dd:
                print(f"  Skipped (Daily Dose conflicts): {len(employees_to_skip_dd)} employee(s)")
        if employee_assigned_random:
            print(f"  Random assignments: {len(employee_assigned_random)} employee(s) × 1 content = {len(employee_assigned_random)}")
        print(f"  Total new assignments: {len(new_manager_assignments)}")
        print(f"  CreateDate: {created_date}")
        print(f"  Start Date: {start_date}")
        print(f"  Due Date: {due_date}")
else:
    print("No assignments created (neither from Databricks nor new manager assignments).")

print()
print("=" * 80)

# Employee Training Simulation

This section simulates employees completing training based on manager assignments and AI recommendations:

## Workflow:
1. **Get Recommendations** (next cell): Calls ML Training Recommender API for each employee
2. **Get Manager Assignments**: Loads assignments from NonCompletedAssignments file created by manager
3. **Combine Training**: Merges manager assignments with AI recommendations
4. **Check Recent Completions**: Queries content_completion table for training completed in last 13 days
   - **ONLY applies to AI recommendations** - manager assignments are ALWAYS included
   - Employees skip AI-recommended training they completed recently (today + prior 12 days)
   - Filtered AI training is removed from available training list
   - Logs which AI training was skipped and why
5. **Helper Functions** (following cell): Generates training timestamps
6. **Process Employee**: Determines completions based on employee type:
   - Type A: Completes all training (manager + filtered AI)
   - Type B: Completes one training (from combined list of manager + filtered AI)
   - Type F: Completes no training
7. **Filename Generator**: Creates unique output filename with timestamp
8. **Main Loop**: Processes all employees and collects completion records
9. **Generate Output**: Writes ContentUserCompletion CSV file
10. **Update NonCompletedAssignments**: Removes completed training from NonCompletedAssignments file
11. **Print Summary**: Displays completion summary with source (manager or AI) for each employee

**Important Notes**: 
- Employees will skip AI-recommended training they already completed in the last 13 days (current day + prior 12 days)
- Manager-assigned training is NEVER skipped - employees always see and complete manager assignments regardless of recent completion history
- This prevents duplicate AI recommendations while ensuring manager assignments are always honored

In [None]:
def get_training_recommendations(employee_id: int) -> List[Dict]:
    """
    Call the training recommender API for a given employee.
    
    Args:
        employee_id: The employee's ID (ba_id)
    
    Returns:
        List of recommended training courses
    """
    url = f"{API_BASE_URL}{API_ENDPOINT}"
    payload = {"data": {"ba_id": employee_id}}
    
    try:
        # Disable SSL certificate verification for internal APIs
        response = requests.post(url, json=payload, timeout=API_TIMEOUT, verify=False)
        response.raise_for_status()
        data = response.json()
        
        # Response structure: {"response": {"ml_recommendations": [...], "coaching_note": {...}}, "timing": {...}, "apiContext": {...}}
        if isinstance(data, dict):
            response_data = data.get("response", {})
            if isinstance(response_data, dict):
                # Get ml_recommendations from nested response
                recommendations = response_data.get("ml_recommendations", [])
            else:
                # Response is directly a list
                recommendations = response_data if isinstance(response_data, list) else []
        else:
            print(f"  Unexpected response type: {type(data)}")
            return []
        
        # Print selected fields from API response
        if isinstance(recommendations, list) and recommendations:
            print(f"  ML Recommendation API Response for employee {employee_id}:")
            for rec in recommendations:
                content_id = rec.get("recommended_content_id", "N/A")
                recommended_content = rec.get("recommended_content", "N/A")
                print(f"  {content_id} | {recommended_content}")
            print()
        
        # Ensure we have a list
        if isinstance(recommendations, list):
            return recommendations
        else:
            print(f"  Recommendations is not a list: {type(recommendations)}")
            return []
            
    except Exception as e:
        print(f"  Error fetching recommendations for employee {employee_id}: {e}")
        return []

In [None]:
def generate_training_times(num_courses: int) -> List[tuple]:
    """
    Generate start and completion times for training courses.
    Calculates times in PT timezone (00:05 and 00:09), then converts to UTC for output.
    
    All timestamps are returned in ISO-8601 format with UTC timezone offset (+00:00).
    
    Args:
        num_courses: Number of courses to generate times for
    
    Returns:
        List of (start_time, end_time) tuples in ISO-8601 format with UTC timezone
    """
    times = []
    now = datetime.now(PT)  # Use PT timezone for calculation
    
    # Set to current day at 13:15 PT for start time
    start_time_pt = now.replace(hour=13, minute=15, second=0, microsecond=0)
    
    # Set to current day at 13:19 PT for completion time
    end_time_pt = now.replace(hour=13, minute=19, second=0, microsecond=0)
    
    # Convert to UTC before formatting
    start_time_utc = start_time_pt.astimezone(UTC)
    end_time_utc = end_time_pt.astimezone(UTC)
    
    for _ in range(num_courses):
        # Return ISO-8601 format with UTC offset
        times.append((
            start_time_utc.isoformat(),
            end_time_utc.isoformat()
        ))
    
    return times

In [None]:
def get_employee_recent_completions(employee_id: int, lookback_days: int = 13) -> set:
    """
    Query content_completion table to get training completed by employee in the last N days.
    
    Args:
        employee_id: The employee's ID (ba_id)
        lookback_days: Number of days to look back (default: 13 = today + prior 12 days)
    
    Returns:
        Set of content IDs completed in the lookback period
    """
    # Check if Databricks is configured
    if not all([DATABRICKS_HOST, DATABRICKS_HTTP_PATH, DATABRICKS_TOKEN]):
        # Databricks not configured - return empty set (no skip logic)
        return set()
    
    try:
        from databricks import sql
        
        # Connect to Databricks
        connection = sql.connect(
            server_hostname=DATABRICKS_HOST,
            http_path=DATABRICKS_HTTP_PATH,
            access_token=DATABRICKS_TOKEN
        )
        
        cursor = connection.cursor()
        
        # Table name
        completion_table = f"{DATABRICKS_CATALOG}.{DATABRICKS_SCHEMA}.content_completion"
        
        # Calculate date range in PT timezone
        now_pt = datetime.now(PT)
        start_date = (now_pt - timedelta(days=lookback_days - 1)).date()  # -1 because today is included
        end_date = now_pt.date()
        
        # Query: Find all content completed by this employee in the last N days
        query = f"""
        SELECT DISTINCT content_id
        FROM {completion_table}
        WHERE ba_id = {employee_id}
            AND completion_date >= '{start_date}'
            AND completion_date <= '{end_date}'
        """
        
        cursor.execute(query)
        
        # Fetch results
        rows = cursor.fetchall()
        
        # Close connection
        cursor.close()
        connection.close()
        
        # Build set of content IDs
        recent_content_ids = set()
        for row in rows:
            content_id = int(row[0])
            recent_content_ids.add(content_id)
        
        return recent_content_ids
        
    except Exception as e:
        print(f"  ⚠ Could not query recent completions for employee {employee_id}: {e}")
        print(f"  Continuing without skip logic...")
        return set()


def process_employee(employee_id: int, employee_type: str, manager_assignments_path: str, standalone_df: pd.DataFrame, ai_recommendations: List[Dict] = None) -> List[Dict]:
    """
    Process a single employee: get AI recommendations and manager assignments, then simulate completions.

    Args:
        employee_id: The employee's ID
        employee_type: The employee's type (a, b, or f)
        manager_assignments_path: Path to the NonCompletedAssignments CSV file
        standalone_df: DataFrame containing standalone content for lookups
        ai_recommendations: Optional pre-fetched AI recommendations (to avoid duplicate API calls)

    Returns:
        List of completed training records with PT timezone timestamps
    """
    employee_type = employee_type.lower().strip()

    # Get AI recommendations (use provided ones or fetch new)
    if ai_recommendations is None:
        ai_recommendations = get_training_recommendations(employee_id)

    # Get manager assignments
    manager_assignments = []
    if os.path.exists(manager_assignments_path):
        assignments_df = pd.read_csv(manager_assignments_path)
        
        # Convert UserID to int to match employee_id type
        # (CSV with QUOTE_ALL reads as string, but employee_id is int)
        assignments_df['UserID'] = assignments_df['UserID'].astype(int)
        
        # Filter for this employee
        employee_assignments = assignments_df[assignments_df['UserID'] == employee_id]

        for _, assignment in employee_assignments.iterrows():
            # Get the TrainingElementId and look up the content name
            content_id = assignment['TrainingElementId']

            # Remove commas from content_id if present (it might be formatted)
            if isinstance(content_id, str):
                content_id_numeric = int(content_id.replace(',', ''))
            else:
                content_id_numeric = int(content_id)

            # Look up content name in standalone_df
            # Handle both numeric and string ContentId in standalone_df
            content_row = standalone_df[
                (standalone_df['ContentId'] == content_id) |
                (standalone_df['ContentId'] == str(content_id_numeric))
            ]
            if not content_row.empty:
                content_name = content_row.iloc[0]['ContentName']
            else:
                content_name = "Unknown Manager Assignment"

            manager_assignments.append({
                "recommended_content_id": content_id_numeric,
                "recommended_content": content_name,
                "source": "manager"
            })

    # Tag AI recommendations with source
    for rec in ai_recommendations:
        rec["source"] = "ai"

    # NEW: Check for recently completed training (last 13 days)
    # This ONLY applies to AI recommendations, NOT to manager assignments
    recent_completions = get_employee_recent_completions(employee_id, lookback_days=13)
    
    filtered_ai_recommendations = []
    
    if ai_recommendations:
        if recent_completions:
            print(f"  Recent completions (last 13 days): {len(recent_completions)} content(s)")
            print(f"  Filtering AI recommendations for recent completions...")
            
            skipped_ai_training = []
            
            for rec in ai_recommendations:
                content_id = rec["recommended_content_id"]
                
                if content_id in recent_completions:
                    # Skip this AI recommendation - completed recently
                    skipped_ai_training.append(rec)
                    print(f"    ⊘ IGNORING (ML): {format_content_id(content_id)} - {rec['recommended_content'][:50]}")
                    print(f"      Reason: Employee Completed in last 13 days")
                else:
                    # Keep this AI recommendation - not completed recently
                    filtered_ai_recommendations.append(rec)
            
            if skipped_ai_training:
                print(f"  Skipped {len(skipped_ai_training)} AI recommendation(s) due to recent completion")
                print(f"  AI recommendations after skip filter: {len(filtered_ai_recommendations)}")
        else:
            # No recent completions - keep all AI recommendations
            filtered_ai_recommendations = ai_recommendations
    
    # Combine manager assignments (unfiltered) with filtered AI recommendations
    # Manager assignments are ALWAYS included - no skip logic for manager assignments
    all_training = manager_assignments + filtered_ai_recommendations

    if not all_training:
        print(f"  No training available for employee {employee_id}")
        return []

    print(f"  Total training available: {len(all_training)} ({len(manager_assignments)} manager + {len(filtered_ai_recommendations)} AI)")

    # Determine how many courses to complete based on employee type
    if employee_type == 'a':
        # Type A: complete all training (manager + AI)
        courses_to_complete = all_training
    elif employee_type == 'b':
        # Type B: complete one training (from combined list)
        courses_to_complete = all_training[:1]
    else:
        # Type F: complete no training
        courses_to_complete = []

    # Generate completion records with PT timezone timestamps
    completions = []
    times = generate_training_times(len(courses_to_complete))

    for i, course in enumerate(courses_to_complete):
        try:
            # Validate course is a dict
            if not isinstance(course, dict):
                print(f"  WARNING: Course is not a dict, it's {type(course)}: {course}")
                continue

            start_time, end_time = times[i]
            source = course.get("source", "unknown")
            completions.append({
                "UserId": employee_id,
                "ContentId": format_content_id(course["recommended_content_id"]),
                "DateStarted": start_time,  # ISO-8601 with PT timezone
                "DateCompleted": end_time,  # ISO-8601 with PT timezone
                "CourseName": course.get("recommended_content", "Unknown"),
                "Source": source
            })
        except KeyError as e:
            print(f"  WARNING: Missing key {e} in course data: {course}")
            continue
        except Exception as e:
            print(f"  WARNING: Error processing course: {e}")
            continue

    return completions

def generate_output_filename() -> str:
    """
    Generate output filename with PT timestamp.
    Format: ContentUserCompletion_V2_YYYY_MM_DD_1_HHMMSS.csv
    Uses PT timezone for date and time components for consistency.
    
    Returns:
        Generated filename
    """
    now = datetime.now(PT)  # Use PT timezone for consistency
    year = now.strftime("%Y")
    month = now.strftime("%m")
    day = now.strftime("%d")
    
    # Generate 6-digit time suffix: HHMMSS
    time_suffix = now.strftime("%H%M%S")
    
    return f"ContentUserCompletion_V2_{year}_{month}_{day}_1_{time_suffix}.csv"

In [None]:
# Main execution - Process employees and simulate training completions
print("=" * 80)
print("EMPLOYEE TRAINING SIMULATION")
print("=" * 80)
print()

# Check if manager assignments were created
if 'assignments_path' not in locals() or not os.path.exists(assignments_path):
    print("WARNING: Manager assignments file not found. Employees will only complete AI recommendations.")
    print()
    assignments_path = ""

# Process each employee
all_completions = []
employee_summaries = []
employee_ml_recommendations = []  # Store ML recommendations for summary

for _, employee in employees_df.iterrows():
    employee_id = employee['employee_id']
    employee_type = employee['employee_edu_type']
    
    print(f"Processing Employee {employee_id} (Type {employee_type.upper()})...")
    
    # Get AI recommendations
    ai_recommendations = get_training_recommendations(employee_id)
    
    # Store ML recommendations for this employee
    if ai_recommendations:
        ml_recs = []
        for rec in ai_recommendations:
            ml_recs.append({
                "content_id": rec.get("recommended_content_id"),
                "content_name": rec.get("recommended_content", "Unknown")
            })
        employee_ml_recommendations.append((employee_id, ml_recs))
    
    # Process employee with pre-fetched AI recommendations
    completions = process_employee(employee_id, employee_type, assignments_path, standalone_df, ai_recommendations)
    
    if completions:
        all_completions.extend(completions)
        # Store ContentId, CourseName, and Source for summary
        course_details = [(c['ContentId'], c['CourseName'], c['Source']) for c in completions]
        employee_summaries.append((employee_id, course_details))
        print(f"  Completed {len(completions)} training(s)")
    else:
        print(f"  No training completed")
    print()

print("=" * 80)

In [None]:
# Generate output file
if all_completions:
    output_filename = generate_output_filename()
    output_path = f"{OUTPUT_DIR}/{output_filename}"
    
    # Create DataFrame with only the required columns for CSV
    output_df = pd.DataFrame(all_completions)
    output_df = output_df[['UserId', 'ContentId', 'DateStarted', 'DateCompleted']]
    
    # Write to CSV with proper quoting
    output_df.to_csv(output_path, index=False, quoting=1)  # quoting=1 means QUOTE_ALL
    
    print(f"Generated output file: {output_filename}")
    print(f"Total completions: {len(all_completions)}")
    print()
else:
    print("No training completions to write.")
    print()

In [None]:
# Update NonCompletedAssignments file to remove completed training
if all_completions and 'assignments_path' in locals() and os.path.exists(assignments_path):
    print("=" * 80)
    print("UPDATING NON-COMPLETED ASSIGNMENTS FILE")
    print("=" * 80)
    print()
    
    # Read current assignments file
    print(f"Reading assignments from: {assignments_path}")
    assignments_df = pd.read_csv(assignments_path)
    initial_count = len(assignments_df)
    print(f"  Initial assignments: {initial_count}")
    print()
    
    # Build set of completed (UserID, ContentID) pairs
    # ContentID in completions has commas, ContentID in assignments might also have commas
    completed_set = set()
    
    for completion in all_completions:
        user_id = completion['UserId']
        content_id = completion['ContentId']
        # Normalize content_id by removing commas for comparison
        if isinstance(content_id, str):
            content_id_numeric = int(content_id.replace(',', ''))
        else:
            content_id_numeric = int(content_id)
        completed_set.add((user_id, content_id_numeric))
    
    print(f"Completed training count: {len(completed_set)}")
    print()
    
    # Filter out completed assignments
    removed_assignments = []  # Track what we remove for summary
    
    def is_not_completed(row):
        """Check if an assignment was NOT completed"""
        user_id = int(row['UserID'])
        training_id = row['TrainingElementId']
        
        # Normalize training_id by removing commas
        if isinstance(training_id, str):
            training_id_numeric = int(training_id.replace(',', ''))
        else:
            training_id_numeric = int(training_id)
        
        # Return True if NOT in completed set (keep assignment)
        is_completed = (user_id, training_id_numeric) in completed_set
        if is_completed:
            removed_assignments.append((user_id, training_id_numeric, training_id))
        return not is_completed
    
    # Apply filter
    remaining_assignments_df = assignments_df[assignments_df.apply(is_not_completed, axis=1)].copy()
    
    removed_count = initial_count - len(remaining_assignments_df)
    
    print(f"Assignments breakdown:")
    print(f"  Initial assignments: {initial_count}")
    print(f"  Completed assignments (removed): {removed_count}")
    print(f"  Remaining assignments: {len(remaining_assignments_df)}")
    print()
    
    # Show sample of removed assignments
    if removed_assignments:
        print(f"Sample of removed assignments (first 5):")
        for i, (uid, cid, original_id) in enumerate(removed_assignments[:5]):
            print(f"  {i+1}. Employee {uid}, Content {cid} (original: '{original_id}')")
        if len(removed_assignments) > 5:
            print(f"  ... and {len(removed_assignments) - 5} more")
        print()
    
    # Overwrite the file with remaining assignments
    if len(remaining_assignments_df) > 0:
        remaining_assignments_df.to_csv(assignments_path, index=False, quoting=1)
        print(f"✓ Updated NonCompletedAssignments file")
        print(f"  File: {assignments_path}")
        print(f"  Removed {removed_count} completed assignment(s)")
        print(f"  {len(remaining_assignments_df)} assignment(s) remain")
    else:
        # All assignments were completed - create empty file with headers
        remaining_assignments_df.to_csv(assignments_path, index=False, quoting=1)
        print(f"✓ All assignments completed!")
        print(f"  File updated with headers only (no remaining assignments)")
    
    print()
    print("=" * 80)
    print()
elif all_completions:
    print("⚠ NonCompletedAssignments file not found - skipping update")
    print()
else:
    print("⚠ No completions to process - skipping NonCompletedAssignments update")
    print()


In [None]:
# Print summary
print("-" * 80)
print("EXECUTION SUMMARY")
print("-" * 80)
print()

print("=" * 80)
print("MANAGER-ASSIGNMENTS GIVEN NEW")
print("=" * 80)
print()

# PART A: Daily Dose Assignments
if 'employee_assigned_daily_dose' in locals() and len(employee_assigned_daily_dose) > 0:
    print("PART A: DAILY DOSE ASSIGNMENTS")
    print("-" * 80)
    print()
    print(f"Daily Dose training assigned to {len(employee_assigned_daily_dose)} employee(s):")
    print()
    
    # Get the contents from the first employee (they all have the same Daily Dose)
    first_employee_contents = list(employee_assigned_daily_dose.values())[0]
    
    print("Daily Dose Contents:")
    for content_info in first_employee_contents:
        content_id = content_info['content_id']
        content_name = content_info['content_name']
        print(f"  {content_id} - {content_name}")
    
    print()
    
    # Print list of all employees who received these assignments
    employee_ids_str = ", ".join([str(emp_id) for emp_id in sorted(employee_assigned_daily_dose.keys())])
    print(f"Employees: {employee_ids_str}")
    print()
    
    if 'employees_to_skip_dd' in locals() and employees_to_skip_dd:
        skipped_ids_str = ", ".join([str(emp_id) for emp_id in sorted(employees_to_skip_dd.keys())])
        print(f"Skipped (already have Daily Dose): {skipped_ids_str}")
        print()
else:
    print("PART A: DAILY DOSE ASSIGNMENTS")
    print("-" * 80)
    print()
    print("No new Daily Dose assignments were created in this run.")
    if 'employees_to_skip_dd' in locals() and employees_to_skip_dd:
        print(f"All {len(employees_to_skip_dd)} employee(s) already have Daily Dose for current week.")
    print()

# PART B: Random Non-Daily Dose Assignments
if 'employee_assigned_random' in locals() and len(employee_assigned_random) > 0:
    print("PART B: RANDOM NON-DAILY DOSE ASSIGNMENTS")
    print("-" * 80)
    print()
    print(f"Random non-Daily Dose training assigned to {len(employee_assigned_random)} employee(s):")
    print()
    
    # Print header
    print(f"{'Employee ID':<15} | {'Content ID':<15} | {'Course Name'}")
    print(f"{'-' * 15} | {'-' * 15} | {'-' * 50}")
    
    # Print each employee's random assignment
    for emp_id in sorted(employee_assigned_random.keys()):
        content_info = employee_assigned_random[emp_id]
        content_id = content_info['content_id']
        content_name = content_info['content_name']
        print(f"{emp_id:<15} | {content_id:<15} | {content_name}")
    
    print()
else:
    print("PART B: NON-DAILY DOSE ASSIGNMENTS RAMDOMLY CHOSEN")
    print("-" * 80)
    print()
    print("No random non-Daily Dose assignments were created in this run.")
    print()

print("=" * 80)
print("RECOMMENDATIONS GIVEN BY ML API")
print("=" * 80)
print()

# Display all ML recommendations given to employees in pipe-separated format
if employee_ml_recommendations:
    # Collect all recommendation rows
    recommendation_rows = []
    
    for employee_id, ml_recs in employee_ml_recommendations:
        for rec in ml_recs:
            content_id = rec["content_id"]
            content_name = rec["content_name"]
            recommendation_rows.append((employee_id, content_id, content_name))
    
    # Sort by employee ID, then content ID
    recommendation_rows.sort(key=lambda x: (x[0], str(x[1])))
    
    # Print header
    print(f"{'Employee ID':<15} | {'Content ID':<15} | {'Content Name'}")
    print(f"{'-' * 15} | {'-' * 15} | {'-' * 50}")
    
    # Print each recommendation as a separate row
    for employee_id, content_id, content_name in recommendation_rows:
        print(f"{employee_id:<15} | {content_id:<15} | {content_name}")
    
    print()
else:
    print("No ML recommendations were given to any employee.")
    print()

print("=" * 80)
print("EMPLOYEE TRAINING COMPLETIONS OF MANAGER-ASSIGNED")
print("=" * 80)
print()

# Track if any manager assignments were completed
manager_completions_found = False

# Collect all manager completions for table display
manager_completion_rows = []

for employee_id, course_details in employee_summaries:
    # Filter for manager-assigned training only
    manager_courses = [(content_id, course_name) for content_id, course_name, source in course_details if source == "manager"]
    
    if manager_courses:
        manager_completions_found = True
        for content_id, course_name in manager_courses:
            manager_completion_rows.append((employee_id, content_id, course_name))

if manager_completions_found:
    # Sort by employee ID, then content ID
    manager_completion_rows.sort(key=lambda x: (x[0], str(x[1])))
    
    # Print header
    print(f"{'Employee ID':<15} | {'Content ID':<15} | {'Course Name'}")
    print(f"{'-' * 15} | {'-' * 15} | {'-' * 50}")
    
    # Print each completion on a separate row
    for employee_id, content_id, course_name in manager_completion_rows:
        print(f"{employee_id:<15} | {content_id:<15} | {course_name}")
else:
    print("No manager-assigned training was completed by any employee.")

print()
print("=" * 80)
print(" EMPLOYEE TRAINING COMPLETIONS OF ML-RECOMMENDED")
print("=" * 80)
print()

# Track if any ML recommendations were completed
ml_completions_found = False

# Collect all ML completions for table display
ml_completion_rows = []

for employee_id, course_details in employee_summaries:
    # Filter for ML-recommended training only
    ml_courses = [(content_id, course_name) for content_id, course_name, source in course_details if source == "ai"]
    
    if ml_courses:
        ml_completions_found = True
        for content_id, course_name in ml_courses:
            ml_completion_rows.append((employee_id, content_id, course_name))

if ml_completions_found:
    # Sort by employee ID, then content ID
    ml_completion_rows.sort(key=lambda x: (x[0], str(x[1])))
    
    # Print header
    print(f"{'Employee ID':<15} | {'Content ID':<15} | {'Course Name'}")
    print(f"{'-' * 15} | {'-' * 15} | {'-' * 50}")
    
    # Print each completion on a separate row
    for employee_id, content_id, course_name in ml_completion_rows:
        print(f"{employee_id:<15} | {content_id:<15} | {course_name}")
else:
    print("No ML-recommended training was completed by any employee.")

print()
print("=" * 80)
print("execution complete")
print("=" * 80)

In [None]:
# Postprocessing - Publish generated files to SFTP outbound server
print()
print("=" * 80)
print("POSTPROCESSING - Publish Files to SFTP Outbound Server")
print("=" * 80)
print()

# Check if publishing is enabled
if not SFTP_PUBLISH_ENABLED:
    print("⊘ SFTP publishing is DISABLED")
    print(f"  To enable publishing, set SFTP_PUBLISH_ENABLED=true in .env file")
    print()
else:
    print("✓ SFTP publishing is ENABLED")
    print()
    
    # Collect all generated files to publish
    files_to_publish = []
    
    # Add ContentUserCompletion file
    if 'output_path' in locals() and os.path.exists(output_path):
        files_to_publish.append(output_path)
    
    # Add NonCompletedAssignments file
    if 'assignments_path' in locals() and os.path.exists(assignments_path):
        files_to_publish.append(assignments_path)
    
    # Add UserCompletion file
    if 'user_completion_path' in locals() and os.path.exists(user_completion_path):
        files_to_publish.append(user_completion_path)
    
    if files_to_publish:
        print(f"Files to publish ({len(files_to_publish)}):")
        for file_path in files_to_publish:
            filename = os.path.basename(file_path)
            print(f"  - {filename}")
        print()
        
        # Publish files
        print("Publishing files...")
        print("-" * 80)
        success = publish_files_to_sftp_outbound(files_to_publish)
        print("-" * 80)
        print()
        
        if success:
            print("✓ All files published successfully")
        else:
            print("⚠ Some files failed to publish")
    else:
        print("⚠ No files found to publish")
        print("  Generated files may not exist. Please run the notebook cells in order.")

print()
print("=" * 80)