# FTE requirements for NEXT 6 to 9 Months Forecasted
# Only creates FTE Projections for the days and Time provided in HOOPS


In [None]:
# 🚀 Key Features
# 📞 Call Volume and AHT Forecasting:

# Predicts both call volumes and AHT using historical trends, time-of-day effects, weekly cycles, and seasonality.

# Handles missing or incomplete data by referencing previous week’s data for the same time intervals.

# 📅 Time Series Feature Engineering:

# Extracts features like day-of-week, month, year, and holidays.

# Includes lag features: values from prior weeks and the same time last year to model seasonal patterns.

# 🗓️ Holiday/Hoops Awareness:

# Detects U.S. Federal Holidays and Easter to adjust forecasts for special dates.
# Only creates FTE Projections for the days and Time provided in HOOPS

# 🧠 ML-Based Forecasting:

# Trains regression models on historical features to predict future AHT and call volumes.

# Flexible design allows for incorporating any sklearn-compatible ML model.

# 📐 FTE Estimation with Erlang-C:

# Calculates the number of required agent positions using Erlang-C queuing theory.

# Considers SLA goals, shrinkage, occupancy caps, and ASA thresholds per client.

# 📈 Call Pattern Reconstruction:

# Dynamically builds future timeframes (test sets) based on client hoop schedules.

# Applies statistical smoothing (rolling average) to stabilize noisy staffing values.

# 🗄️ SQL Database Integration:

# Pulls historical and client configuration data from SQL.

# Inserts/upserts predictions back into the reporting DB.



In [None]:
# # Contact Center Forecasting Tool

# This Jupyter Notebook is a powerful workforce management tool for **predicts call volumes, average handle times (AHT), call arrival patterns (CAP), and staffing requirements (FTE)** for contact centers. Using **machine learning (RandomForestRegressor)**, **Erlang-C queuing theory**, and **SQL database integration**, it provides **9-month forecasts** for staffing, shift planning, and performance analysis. The code features:

# - **Call Arrival Pattern (CAP)**: Predicts call distribution across 30-minute intervals, handling holidays and missing data.
# - **Call Volume Forecasting**: Projects daily call volumes with cyclical features and outlier handling.
# - **AHT Forecasting**: Estimates handle times in 2-hour intervals.
# - **FTE Estimation**: Calculates required agents using Erlang-C, considering SLAs, shrinkage, and occupancy.
# - **Visualization**: Plots year-over-year call volume trends.
# - **Database Integration**: Pulls/stores data via SQL for scalability.

# Built with **pandas**, **sklearn**, **matplotlib**, and **pyodbc**, this tool supports **workforce management**, **analysts**, and **operations leadership** in optimizing contact center operations. Requires Avaya and reporting databases.

# **Note**: Some SQL queries need customization, and projections are best for up to 6 months.

# ## Setup
# 1. Install dependencies: `pip install pandas numpy sklearn matplotlib pyodbc holidays python-dateutil boto3 sendgrid jinja2`.
# 2. Configure database connections (`get_db_avaya_stats`, `get_db_dictionary`, `get_db_Client_data_import`).
# 3. Run the notebook with appropriate client codes, team names, and date ranges.

# ## Usage
# Modify `ds`, `de`, `Clientcode`, and `teams` to run forecasts. Use `plot_yoy_call_volume_with_2025` for visualizations. Outputs are stored in the database for reporting.

In [None]:
# Purpose
# The primary purpose of this notebook is to provide contact center analysts, workforce management teams, and operations leadership with data-driven insights for:

# Staffing Projections: Forecast required agent positions based on predicted call volumes, AHT, and service level agreements (SLAs).
# Shift Planning: Optimize schedules using call arrival patterns and hours of operation (HoOPS).
# Performance Analysis: Evaluate historical call data and simulate SLA scenarios.
# Strategic Decisions: Support hiring, outsourcing, and resource allocation decisions.


# This block of code Below gets the Call Arrival Pattern for future projections

In [None]:
import pandas as pd
import sys
import xlsxwriter
import os
import pyodbc 
import numpy as np
from datetime import date, timedelta, datetime
from dateutil.relativedelta import relativedelta
import holidays
import math
import matplotlib.pyplot as plt
from matplotlib.dates import DateFormatter

import jinja2
from jinja2 import Environment, FileSystemLoader
import urllib.parse

from IPython.core.display import display, HTML
import sendgrid
from sendgrid.helpers.mail import *
import json
import urllib
import boto3
from boto.s3.key import Key
import base64

from sklearn.ensemble import RandomForestRegressor
from sklearn.decomposition import PCA
from sklearn.metrics import mean_absolute_error

In [None]:
def get_call_data(client, team=None):
    # Connect to the Avaya statistics database
    avaya_db = get_db_avaya_stats()
    
    # If a team is provided, fetch call data for the specific client and team
    if team:
        sql = """Query to pull data"""
        sql = sql % (client, team)
    # If no team is provided, fetch call data for the entire client
    else:
        sql = """Query to pull data"""
        sql = sql % (client)
    
    # Execute the SQL query and load the result into a pandas DataFrame
    data = pd.read_sql(sql, con=avaya_db)
    
    # Sort the data chronologically by the 'Date' column
    data = data.sort_values(by='Date', ascending=True)
    
    # Exclude data for today's date
    today_date = datetime.today().date()
    data = data[data['Date'] != today_date]

    # Return an empty list if no data was retrieved
    if len(data) == 0:
        return []

    # Convert the 'Time' column from timedelta to an integer format
    data = time_to_int(data)
    
    # Generate a complete time series from the first to last date,
    # covering every 30-minute interval in a day
    rng = get_date_range(data.Date.iloc[0], data.Date.iloc[-1])

    # Print data types for debugging purposes
    print(data.dtypes)
    
    # Merge the actual data with the complete date-time range to fill in missing time intervals
    df = pd.merge(rng, data, how="outer", on=["Date", "Time"])
    
    # Fill any missing values (NaNs) resulting from the merge with 0s
    df = df.fillna(0)
    
    # Ensure the 'Calls' column is of integer type
    df['Calls'] = df['Calls'].astype(int)
    
    # Perform feature engineering on the dataset to add model-specific features
    df = feature_engineering(df, team)
    
    # Return the final dataframe ready for use in a machine learning model
    return df


In [None]:
import pandas as pd

def fill_missing_calls(df, team):
    # Ensure the 'Date' column is in datetime format
    df['Date'] = pd.to_datetime(df['Date'])

    # Sort the DataFrame by 'Date' and reset index
    df.sort_values('Date', inplace=True)
    df.reset_index(drop=True, inplace=True)
    
    # Special handling for the "Technical Support" team
    if team == "Technical Support":
        # Consider only weekdays (Monday to Friday)
        weekday_calls = (df['day_of_week'] < 5)
        
        # Ensure 'Time' column is in integer format
        df['Time'] = df['Time'].astype(int)

        # Identify 30-min intervals between 6:00 PM and 8:00 PM with 0 calls
        evening_calls_zero = (df['Time'] >= 1800) & (df['Time'] <= 2000) & (df['Calls'] == 0)

        # Set 'Calls' to 1 for identified time slots during weekdays
        df.loc[weekday_calls & evening_calls_zero, 'Calls'] = 1

    # Loop through each row to fill 0-call slots based on same time interval from the previous week
    for index, row in df.iterrows():
        # Focus only on weekdays and where Calls == 0
        if row['day_of_week'] < 5 and row['Calls'] == 0:
            current_date = row['Date']
            current_time = row['Time']

            # Calculate the same day one week before
            prev_week_same_day = current_date - pd.DateOffset(weeks=1)

            # Fetch the corresponding row from the previous week for the same time
            prev_week_data = df[
                (df['Date'] == prev_week_same_day) &
                (df['Time'] == current_time)
            ]

            # If such a row exists, replace the current 0-call value with the previous week's call value
            if not prev_week_data.empty:
                df.at[index, 'Calls'] = prev_week_data.iloc[0]['Calls']

    # Convert 'Date' column back to just date format (dropping time component)
    df['Date'] = df['Date'].dt.date

    # Return the cleaned and updated DataFrame
    return df


In [None]:
# Creates a dataframe containing every time interval for everyday in the date range first to last.
# What is returned is a complete time series containing zero gaps.
# Params 'first' and 'last' are both datetime.date objects

def get_date_range(first, last):
    # Initialize an empty DataFrame with the expected columns
    df = pd.DataFrame(columns=["Date", "Time"])
    
    # Generate a list of dates between 'first' and 'last' (inclusive)
    rng = pd.date_range(start=first, end=last).date
    rng = pd.DataFrame(rng, columns=['Date'])

    # Iterate over each date in the range
    for i in range(len(rng)):
        d = rng.Date[i]
        arr = []

        # Generate 30-minute intervals: 00:00, 00:30, ..., 23:30 represented as integers (e.g., 0, 30, ..., 2330)
        for i in range(24):
            arr.append(i*100)       # e.g., 0000, 0100, ..., 2300
            arr.append(i*100 + 30)  # e.g., 0030, 0130, ..., 2330

        # Create a DataFrame with current date and its corresponding time intervals
        temp = pd.DataFrame(arr, columns=["Time"])
        temp["Date"] = d

        # Append the new rows to the main DataFrame
        df = pd.concat([df, temp])
    
    # Return the complete date-time range DataFrame
    return df


In [None]:
# This function converts the Time column to an integer. 
def time_to_int(df):
    df["Time"] = df["Time"].astype(str)
    df["Time"] = df.apply(lambda row : row["Time"][-8:-3], axis=1)
    df["Time"] = df.apply(lambda row : row["Time"].replace(":", ""), axis=1)
    df["Time"] = df["Time"].astype(int)
    return df

In [None]:
# Helper function for get_call_data(). Sets the holiday flag to true for all Easter Sundays and Good Fridays in df.
def set_easter_holiday(df):
    easters = []
    year = df["Date"].iloc[0].year
    while year <= datetime.now().year:
        d = date(year,1,1)
        e = d + pd.offsets.Easter()
        e = e.date()
        gf = e - timedelta(2)
        df.loc[df["Date"] == e, "is_holiday"] = 1
        df.loc[df["Date"] == gf, "is_holiday"] = 1
        year = year + 1
    return df

In [None]:
# Adds the column "Percent" to the dataframe, where in each row, "Percent" is the percentage of total call volume 
# for that date. 
def addPercent(df):
    sums = df.groupby(["Date"])[["Calls"]].sum()
    def addPercentHelper(row, df):
        p = row.Calls
        q = sums[sums.index == row.Date].Calls.item()
        if q == 0:
            return 0
        else:
            return p/q
    
    df["Percent"] = df.apply(lambda row : addPercentHelper(row, sums), axis=1)
    return df

In [None]:
def normalize(df):
    sums = df.groupby(["Date"])[["Predictions"]].sum()
    def normalizeHelper(row, df):
        p = row.Predictions
        q = sums[sums.index == row.Date].Predictions.item()
        if q == 0:
            return 0
        else:
            return p/q
    
    df["CAP"] = df.apply(lambda row : normalizeHelper(row, sums), axis=1)
    return df

In [None]:
# Retrieves the number of calls from the (depth) most recent week.
# For future dates, which haven't yet occurred, this function grabs call data from the most current 
# weeks of data.
def get_last_week(depth, day, time, old_data):
#     print (day,"Day")
    old_data['Date'] = pd.to_datetime(old_data['Date'])

    old_data['Date'] = old_data['Date'].dt.date

#     print(old_data.Date.iloc[-1],"last date")
#     print(time)
    while day > old_data.Date.iloc[-1]:
        day = day - timedelta(7)
    
    d = day-timedelta( 7 * (depth-1) )
    return old_data[(old_data.Date == d) & (old_data.Time == time)].Percent.item()

In [None]:
def get_hoops_for_test_frame(team,start_date, end_date):
    from datetime import date, timedelta, datetime
    import datetime as dt

    todays_date = datetime.today().date()
    sql = """My query to get Hours of Operations from the database Table"""
    sql = sql % (team,start_date,end_date,team,start_date,end_date)

    Hoops = pd.read_sql(sql, con=reporting_db)
    Hoops['Hoop_Start']= pd.to_datetime(Hoops['Hoop_Start'])
    Hoops['Hoop_Start'] = pd.to_datetime(Hoops['Hoop_Start']).dt.time

    Hoops['Hoop_End']= pd.to_datetime(Hoops['Hoop_End'])
    Hoops['Hoop_End'] = pd.to_datetime(Hoops['Hoop_End']).dt.time

    total_rows = Hoops.shape[0]

    # total_rows = 15

# Initialize a list to accumulate the rows
    rows = []

    for x in range(total_rows):
        PIT_Date = Hoops['PIT_Date'][x]
        Hoop_Start = str(Hoops['Hoop_Start'][x])
        Hoop_End = str(Hoops['Hoop_End'][x])

        # Convert Hoop_Start and Hoop_End from strings to datetime objects
        start = dt.datetime.strptime(Hoop_Start, '%H:%M:%S')
        end = dt.datetime.strptime(Hoop_End, '%H:%M:%S')
        delta = dt.timedelta(minutes=30)

        # Loop through the time range
        t = start
        while t < end:
            Daterec = t.strftime('%H:%M:%S')
            t += delta
            # Append each row to the list as a dictionary
            rows.append({'Date_PIT': PIT_Date, 'Timestamp': Daterec})

    # Create the Final DataFrame from the list of dictionaries
    Finaldataframe = pd.DataFrame(rows)
    
    df1 = Finaldataframe
    df1["Timestamp"] = df1["Timestamp"].astype(str)
    df1["Timestamp"] = df1.apply(lambda row : row["Timestamp"][:-3], axis=1)
    df1["Timestamp"] = df1.apply(lambda row : row["Timestamp"].replace(":", ""), axis=1)
    df1["Timestamp"] = df1["Timestamp"].astype(int)
    df1 = df1.rename(columns={'Timestamp': 'Time'})
    df1 = df1.rename(columns={'Date_PIT': 'Date'})

    return df1

In [None]:
# Creates the dataframe for the future projection, containing all feature engineering columns needed 
# for the machine learning model.
def create_test_frame(team,start_date, end_date, old_data):
    print("\tCreating the test frame...")
    # Start out with a dataframe containing all days and time intervals from start_date to end_date
    rng = get_hoops_for_test_frame(team,start_date, end_date)

    
    # The day of the week for each row is important information
    rng["day_of_week"] = rng.apply(lambda row : row["Date"].weekday(), axis=1)
    
    # Extract information about the date, because a date can not be fed to machine learning model. Only numbers.
    rng["month"] = rng.apply(lambda row: row["Date"].month, axis = 1)
    rng["day_of_month"] = rng.apply(lambda row: row["Date"].day, axis = 1)
    rng["year"] = rng.apply(lambda row: row["Date"].year, axis = 1)
    
    # Is date a US Holiday?
    us_holidays = holidays.US()
    rng["is_holiday"] = rng.apply(lambda row : row["Date"] in us_holidays, axis=1 )
    rng["is_holiday"] = rng["is_holiday"].astype(int)
    rng = set_easter_holiday(rng)
 
    # Get the 3 most recent weeks of data that are not in the future
    rng["last_week"] = rng.apply(lambda row : get_last_week(1, row["Date"], row["Time"], old_data), axis=1)
    rng["last_week_2"] = rng.apply(lambda row : get_last_week(2, row["Date"], row["Time"], old_data), axis=1)
    rng["last_week_3"] = rng.apply(lambda row : get_last_week(3, row["Date"], row["Time"], old_data), axis=1)
    
    # "Train" dataframes containing small amounts of historic data should not be given last_year columns, because 
    # the data doesn't extend that far into the past. Instead, they should be fitted to the ML Model without
    # the last_year information columns, and only with previous week data, in order to maximize the amount of rows
    # fed to the ML model. Train and Test frames must contain all the same columns, thus, we check if the train frame
    # (old_data) has old_year columns. If it does not, then it is a "short" frame, containing only past weeks data.
    if "last_year" in old_data.columns:
        # Get the 4th-9th most recent weeks of data
        rng["last_week_4"] = rng.apply(lambda row : get_last_week(4, row["Date"], row["Time"], old_data), axis=1)
        rng["last_week_5"] = rng.apply(lambda row : get_last_week(5, row["Date"], row["Time"], old_data), axis=1)
        rng["last_week_6"] = rng.apply(lambda row : get_last_week(6, row["Date"], row["Time"], old_data), axis=1)
        rng["last_week_7"] = rng.apply(lambda row : get_last_week(7, row["Date"], row["Time"], old_data), axis=1)
        rng["last_week_8"] = rng.apply(lambda row : get_last_week(8, row["Date"], row["Time"], old_data), axis=1)
        rng["last_week_9"] = rng.apply(lambda row : get_last_week(9, row["Date"], row["Time"], old_data), axis=1)
        
        # Get data from this day and time last year, plus and minus 1 week
        rng["last_year"] = rng.apply(lambda row : old_data[(old_data.Date == row.Date - timedelta(364)) & (old_data.Time == row.Time)].Percent.item(), axis=1)
        rng["last_year-1w"] = rng.apply(lambda row : old_data[(old_data.Date == row["Date"] - timedelta(371)) & (old_data.Time == row.Time)].Percent.item(), axis=1)
        rng["last_year+1w"] = rng.apply(lambda row : old_data[(old_data.Date == row["Date"] - timedelta(357)) & (old_data.Time == row.Time)].Percent.item(), axis=1)
        
        rng = rng[["Time", "day_of_week", "month", "day_of_month", "year", "is_holiday", "last_week", "last_week_2", "last_week_3", "last_week_4", "last_week_5", "last_week_6", "last_week_7", "last_week_8", "last_week_9", "last_year", "last_year-1w", "last_year+1w"]]
    # This is for "short" dataframes
    else:
        # The full test frame has the following columns:
        rng = rng[["Time", "day_of_week", "month", "day_of_month", "year", "is_holiday", "last_week", "last_week_2", "last_week_3"]]
    return rng

In [None]:
# Given start_date and end_date, generates a dataframe of predictions for each day
# in the date range.
# Limitation: Projections work best for up to 6 months in the future, and should only be used for future dates.
def future_projection(start_date, end_date, client, team, model):
    print (start_date)
    
    if isinstance(start_date, str):
        # Convert string to datetime
        start_date = datetime.strptime(start_date, "%Y-%m-%d").date()
        
    elif isinstance(start_date, date):
        # No change needed
        pass
    else:
        raise ValueError("start_date must be a string or a date object")


    # Collect all historic data, with feature engineered columns
    df = get_call_data(client, team)

    if len(df) == 0: 
        return []
    if df.iloc[-1].Date < date.today() - timedelta(21):
        return []
    print("\tFitting model with historic data...")
    
    # Check if the historic data uses last_year metrics as a predictor. This means there was a sufficient
    # amount of historic data to use the prior year's data as a predictor for the the future. Otherwise it 
    # is a "short" dataframe and will only use prior weeks of data as a predictor.
    if "last_year" in df.columns and df.iloc[0].Date <= start_date - timedelta(371):
        # Fit the model to the training frame by selecting all the columns except for "Percent", which is the 
        # metric the model is attempting to predict
        model.fit(df[["Time", "day_of_week", "month", "day_of_month", "year", "is_holiday", "last_week", 
                         "last_week_2", "last_week_3", "last_week_4", "last_week_5", "last_week_6", 
                         "last_week_7", "last_week_8", "last_week_9", "last_year", "last_year-1w", 
                         "last_year+1w"]], df["Percent"])

    # Enter the else block if df is a "short" frame which does not use prior year's data
    else:
        # Fit the model to the training frame by taking all columns except the one we are trying to predict. 
        # Notice the "short" training frame does not have last_year metrics
        model.fit(df[["Time", "day_of_week", "month", "day_of_month", "year", "is_holiday", "last_week", 
                         "last_week_2", "last_week_3"]], df["Percent"])
   
    # Create the test frame, including all feature engineered columns used to fit the model
    test = create_test_frame(team,start_date, end_date, df)
    test.fillna(0, inplace=True)
#     print(test)
    # Create the prediction series
    print("\tGenerating predictions...")
    pred = model.predict(test)
    pred = pd.DataFrame(pred, columns=["Predictions"])
    
    # We want to return a dataframe with the date, time, and the predicted call volume for that date, 
    # but right now we only have a series of numbers and no dates.
    # Create a dataframe containing all dates and time intervals from start_date to end_date
    rng = get_hoops_for_test_frame(team,start_date, end_date)
    rng["day_of_week"] = rng.apply(lambda row : row.Date.weekday(), axis=1)
    # Then concatenate the predictions with the date range dataframe, and voila, this is what we return
    pred = pd.concat([pred.reset_index(drop=True), rng.reset_index(drop=True)], axis=1)
    # Ensure that weekends are filled with 0 percents, since there is no call volume on weekends
    # Secondly, normalize the CAPs so that the sum(Predictions) for each day is equal to 1.0
    pred["Predictions"].loc[pred["day_of_week"] > 4] = 0
    pred = normalize(pred)
    # Return the predictions for the team for the given date range
    return pred

In [None]:
# SQL Update statement exists to set the Latest_Version to 0 for the team that is being predicted
# The Insert statement inserts the latest new latest prediction for the given team 
insert_sql = """insert statement """
D = "ON DUPLICATE KEY UPDATE CAP_Percent1 = Values(CAP_Percent1),CAP_Percent2=Values(CAP_Percent2),Version_timestmp=Values(Version_timestmp)\n"

# For each team in the teams array, get the prediction dataframe, and perform the update, and insert queries
for team in teams:
    print("Generating CAP Predictions for",team, "...")
    # Get the prediction dataframe for team, then execute update_sql for team
    pred = future_projection(ds, de, client, team, model)
    if len(pred) == 0:
        print("\tDue to lack of data, " + team + " is being skipped")
        continue
#     sql = update_sql % (team)
#     run_query(sql)
    print("\tInserting predictions into the database")
    
    # Add rows of the prediction dataframe to the insert query. Every 500 rows, execute the query to prevent
    # it from becoming too long.
    sql = insert_sql
    i = 0  # keeps track of multiples of 500
    ct = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    
    for index, row in pred.iterrows():
        if pred.iloc[index, 0] == 0:
            continue
        pkey = str(client)+ "-" + str(team)+ "-" + str(pred.iloc[index, 1])+ "-" + str(pred.iloc[index, 2]*100) + "-" + str(VersionType)
        s = "('%s','%s', '%s', 'phone','%s','%s', '%s', %s, %s, 0, 0),\n"
        s = s % (pkey,client, team,VersionType, ct, str(pred.iloc[index, 1]), pred.iloc[index, 2]*100, pred.iloc[index, 4])
        sql += s
        i += 1
        # If we reach a multiple of 500, it is time to insert the 500 rows we have added to the query, then start anew
        if i == 500:
            sql = sql[:-2]
            sql += "\n" + D
            run_query(sql)
            i = 0
            sql = insert_sql
    
    # Unless the length of the dataframe is a multiple of 500, then there will be rows of the dataframe that
    # have not been inserted into the DB. Here we insert the remaining rows, if they exist.
    if sql != insert_sql:
        sql = sql[:-2]
        sql += "\n" + D
        run_query(sql)
    print("\tFinished.")

## This block of code below gets the Call volume future projections

In [None]:
# Given client code and optional team name, returns a dataframe
# containing call volume data per day, fully prepped for the ML model
def get_call_data(clientCode, team):
    avaya_db = get_db_avaya_stats()
    # Query for getting a team's call data
    if team:
        sql = """select DATE(row_date) as Date, 
        sum(i_arrived) as 'Calls'from avaya_stats.hsplit h
        join reporting.avaya_skills_dict s on s.split = h.split
        where client = '%s' and Forecasting_Skillname = "%s" and row_date >= "2022-01-01" 
        group by row_date"""
        sql = sql % (clientCode, team)
    # Query for getting a client's call data
    else:
        sql = """Select DATE(row_date) as 'Date',
        sum(A.i_arrived) as 'Calls' from avaya_stats.hvdn A
        join reporting.avaya_vdn_desc D on D.vdn = A.vdn
        where D.clientCode = '%s' and D.VDN_Type = 'Incoming ANI' and row_date >= "2022-01-01"
        group by row_date"""
        sql = sql % (clientCode)
        
    df = pd.read_sql(sql, con=avaya_db)
    df = df.sort_values(by='Date', ascending=True)
    today_date = datetime.today().date()
    today_date = today_date-timedelta(1)
    df = df[df['Date'] < today_date]
    df = df[df["Calls"]>10]    
    
    if len(df) == 0:
        return []
    
    # Team and skill data needs to be changed to fit model
    if team:
        df = pad_call_data(df)
    
    #adds all feature engineering columns that are useful to the ML model
    df = add_feature_columns(df)
    
    return df
import pandas as pd
# Importing the pandas library
def fill_missing_calls(df):

    df['Date'] = pd.to_datetime(df['Date'])
    # Sort the DataFrame by 'Date'
    df.sort_values('Date', inplace=True)
    df.reset_index(drop=True, inplace=True)

    for index, row in df.iterrows():
        if row['day_of_week'] < 5 and row['Calls'] == 0:
            current_date = row['Date']
            week_start = current_date - pd.DateOffset(days=current_date.weekday())

            # weeks end on Friday (weekday 4)
            week_end = week_start + pd.DateOffset(days=4)  
            # Filter data for the current week and calculate average 'Calls'
            current_week_data = df[(df['Date'] >= week_start) & (df['Date'] <= week_end)]['Calls']
            recent_week_data = df[(df['Date'] >= week_start - pd.DateOffset(weeks=1)) & (df['Date'] <= week_end - pd.DateOffset(weeks=1))]['Calls']

            # If data for the current week is available, calculate its average, else use the average of the most recent week
            if not current_week_data.empty:
                avg_calls = current_week_data[current_week_data != 0].mean()
            else:
                avg_calls = recent_week_data[recent_week_data != 0].mean()

            # Fill missing 'Calls' value with the calculated average
            df.at[index, 'Calls'] = int(avg_calls) if not pd.isnull(avg_calls) else 0

    return df
def add_feature_columns(df):
    short = False
    if df.iloc[0].Date > df.iloc[-1].Date - timedelta(371*2): short = True
    df["Date"] = pd.to_datetime(df["Date"])
    df["day_of_week"] = df["Date"].dt.weekday
    df.loc[df["day_of_week"] > 4, "Calls"] = 0
    df["month"] = df["Date"].dt.month
    df["day_of_month"] = df["Date"].dt.day
    df["year"] = df["Date"].dt.year
    us_holidays = holidays.US()
    df["is_holiday"] = df["Date"].apply(lambda x: int(x in us_holidays))
    df["is_holiday"] = df["is_holiday"].astype(int)
    df = set_easter_holiday(df)
    df = fill_missing_calls(df)
    
    df['day_of_year'] = df['Date'].map(lambda x: x.timetuple().tm_yday)
    df['year_sin'] = np.sin(2 * np.pi * df['day_of_year'] / 365.25)
    df['year_cos'] = np.cos(2 * np.pi * df['day_of_year'] / 365.25)
    df["last_week"] = df["Calls"].shift(7)
    df["last_week_2"] = df["Calls"].shift(14)
    df["last_week_3"] = df["Calls"].shift(21)
    
    df['rolling_avg_28d'] = df['Calls'].rolling(window=28, min_periods=1).mean().shift(1)
    
    if not short:
        df["last_week_4"] = df["Calls"].shift(28)
        df.loc[:, "current_avg"] = (df["last_week"] + df["last_week_2"] + df["last_week_3"] + df["last_week_4"])/4
        df.loc[:, "last_year"] = df["Calls"].shift(364)
        df.loc[:, "last_year-1w"] = df["Calls"].shift(371)
        df.loc[:, "last_year+1w"] = df["Calls"].shift(357)
        df.loc[:, "historic_avg"] = (df["last_year"] + df["last_year-1w"] + df["last_year+1w"])/3
        df.loc[:, "corr"] = (df["current_avg"] - df["historic_avg"])/df["historic_avg"]
        df.replace([np.inf, -np.inf], 0, inplace=True)
        df_new = df[371:].copy()
        df_new = df_new.fillna(0)
        df = df_new
    else:
        df_new = df.dropna().copy()
        df = df_new
    
    return df

# Helper function for get_call_data(). Sets the holiday flag to true for all Easter Sundays and Good Fridays in df.
def set_easter_holiday(df):
    easters = []
    year = df["Date"].iloc[0].year
    while year <= datetime.now().year:
        d = date(year,1,1)
        e = d + pd.offsets.Easter()
        e = e.date()
        gf = e - timedelta(2)
        df.loc[df["Date"] == e, "is_holiday"] = 1
        df.loc[df["Date"] == gf, "is_holiday"] = 1
        year = year + 1
    return df
def create_test_frame(start_date, end_date, old_data, team):
    rng = get_hoops_for_test_frame(start_date, end_date, team)
    rng["Date"] = pd.to_datetime(rng["Date"])
    rng["day_of_week"] = rng["Date"].dt.weekday
    rng["month"] = rng["Date"].dt.month
    rng["day_of_month"] = rng["Date"].dt.day
    rng["year"] = rng["Date"].dt.year
    us_holidays = holidays.US()
    rng["is_holiday"] = rng["Date"].apply(lambda x: int(x in us_holidays))
    rng["is_holiday"] = rng["is_holiday"].astype(int)
    rng = set_easter_holiday(rng)
    
    rng['day_of_year'] = rng['Date'].map(lambda x: x.timetuple().tm_yday)
    rng['year_sin'] = np.sin(2 * np.pi * rng['day_of_year'] / 365.25)
    rng['year_cos'] = np.cos(2 * np.pi * rng['day_of_year'] / 365.25)
    rng['rolling_avg_28d'] = old_data['rolling_avg_28d'].iloc[-28:].mean()
    
    rng["last_week"] = rng.apply(lambda row: get_last_week(1, row["Date"], old_data), axis=1)
    rng["last_week_2"] = rng.apply(lambda row: get_last_week(2, row["Date"], old_data), axis=1)
    rng["last_week_3"] = rng.apply(lambda row: get_last_week(3, row["Date"], old_data), axis=1)
    
    if "last_year" in old_data.columns:
        rng["last_week_4"] = rng.apply(lambda row: get_last_week(4, row["Date"], old_data), axis=1)
        rng["last_year"] = rng.apply(lambda row: old_data[old_data.Date == row["Date"] - timedelta(364)].Calls.item(), axis=1)
        rng["last_year-1w"] = rng.apply(lambda row: old_data[old_data.Date == row["Date"] - timedelta(371)].Calls.item(), axis=1)
        rng["last_year+1w"] = rng.apply(lambda row: old_data[old_data.Date == row["Date"] - timedelta(357)].Calls.item(), axis=1)
        rng["current_avg"] = (rng["last_week"] + rng["last_week_2"] + rng["last_week_3"] + rng["last_week_4"])/4
        rng["historic_avg"] = (rng["last_year"] + rng["last_year-1w"] + rng["last_year+1w"])/3
        rng["corr"] = (rng["current_avg"] - rng["historic_avg"])/rng["historic_avg"]
        rng.replace([np.inf, -np.inf], 0, inplace=True)
        rng = rng.fillna(0)
        
        train_3 = old_data[["last_year", "last_year-1w", "last_year+1w"]]
        test_3 = rng[["last_year", "last_year-1w", "last_year+1w"]]
        pca = PCA(n_components=2)
        pca.fit(train_3)
        test_pca = pca.transform(test_3)
        
        test_full = np.concatenate([test_pca, rng[["day_of_week", "month", "day_of_month", "year", "is_holiday", 
                                                    "last_week", "last_week_2", "last_week_3", "last_week_4", 
                                            "corr", "year_sin", "year_cos", "rolling_avg_28d"]]], axis=1)
        print(test_full) 

        return test_full, ["PCA_0", "PCA_1"] + ["day_of_week", "month", "day_of_month", "year", "is_holiday", 
                                                "last_week", "last_week_2", "last_week_3", "last_week_4", 
                                                "corr", "year_sin", "year_cos", "rolling_avg_28d"]
    else:
        test_full = rng[["day_of_week", "month", "day_of_month", "year", "is_holiday", 
                         "last_week", "last_week_2", "last_week_3", "rolling_avg_7d", "year_sin", "year_cos", "rolling_avg_28d"]]
        print(test_full) 
        return test_full, ["day_of_week", "month", "day_of_month", "year", "is_holiday", 
                           "last_week", "last_week_2", "last_week_3", "year_sin", "year_cos", "rolling_avg_28d"]
    
    
def prep_for_ML(df):
    train = df
    train_3 = train[["last_year", "last_year-1w", "last_year+1w"]]
    pca = PCA(n_components=2)
    pca.fit(train_3)
    train_pca = pca.transform(train_3)
    feature_cols = ["day_of_week", "month", "day_of_month", "year", "is_holiday", 
                    "last_week", "last_week_2", "last_week_3", "last_week_4", 
                    "corr", "year_sin", "year_cos", "rolling_avg_28d"]
    train_full = np.concatenate([train_pca, train[feature_cols]], axis=1)
    return train_full


def pad_call_data(data):
    start = data.Date.iloc[0]
    end = data.Date.iloc[-1]
    rng = pd.date_range(start=start, end=end).date
    df = pd.DataFrame(rng, columns=['Date'])
    df = pd.merge(df, data, how="outer", on="Date")
    df.fillna(0, inplace=True)

    return df
def run_query(sql):
    reporting_client_db = get_db_Client_data_import()
    cursor_insert = reporting_client_db.cursor()
    cursor_insert.execute(sql)
    reporting_client_db.commit()
    cursor_insert.close()
    
def get_hoops_for_test_frame(start_date, end_date,team):
    from datetime import date, timedelta, datetime
    reporting_db = get_db_dictionary()

    todays_date = datetime.today().date()
    sql = """Query to pull hoops"""
    sql = sql % (team,start_date,end_date,team,start_date,end_date)
    Hoops = pd.read_sql(sql, con=reporting_db)

    rng = pd.DataFrame(Hoops, columns=['Date'])
    return rng
# # Retrieves the number of calls from the (depth) most recent week.
# # For future dates, which haven't yet occurred, this function grabs call data from the most current 
# # weeks of data.
def get_last_week(depth, day, old_data):
    old_data['Date'] = pd.to_datetime(old_data['Date'])

#     old_data['Date'] = old_data['Date'].dt.date
    i = 0
    while day > old_data.Date.iloc[-1]:
        day = day - timedelta(7)
    
    d = day-timedelta( 7 * (depth-1) )
    
    return old_data[old_data.Date == d].Calls.item()

def future_projection(start_date, end_date, client, team, model=None, percentage_increase=0.0):
    if model is None:
        model = RandomForestRegressor(random_state=63)
    
    if isinstance(start_date, str):
        start_date = datetime.strptime(start_date, "%Y-%m-%d").date()
    elif isinstance(start_date, date):
        pass
    else:
        raise ValueError("start_date must be a string or a date object")
    
    df = get_call_data(client, team)
    print(df)

    df['Date'] = pd.to_datetime(df['Date']).dt.date
    
    q_low, q_high = df['Calls'].quantile([0.01, 0.99])
    df['Calls'] = df['Calls'].clip(lower=q_low, upper=q_high)
    
    if "last_year" in df.columns and df.iloc[0].Date <= start_date - timedelta(371):
        X = prep_for_ML(df)
        y = df["Calls"]
        feature_cols = ["day_of_week", "month", "day_of_month", "year", "is_holiday", 
                        "last_week", "last_week_2", "last_week_3", "last_week_4", "corr", 
                        "year_sin", "year_cos", "rolling_avg_28d"]
    else:
        feature_cols = ["day_of_week", "month", "day_of_month", "year", "is_holiday", 
                        "last_week", "last_week_2", "last_week_3",
                        "year_sin", "year_cos", "rolling_avg_28d"]
        X = df[feature_cols]
        y = df["Calls"]

    param_grid = {
        'n_estimators': [50,75,100,125],
        'max_depth': [3, 9, 12, 25],
        'min_samples_split': [4, 8, 12],
        'min_samples_leaf': [2, 6, 12]
    }
    tscv = TimeSeriesSplit(n_splits=3)
    grid_search = GridSearchCV(model, param_grid, cv=tscv, scoring='neg_mean_absolute_error', n_jobs=-1)
    grid_search.fit(X, y)
    print(f"Best Parameters: {grid_search.best_params_}")
    best_model = grid_search.best_estimator_
    
    mae_scores = cross_val_score(best_model, X, y, cv=tscv, scoring='neg_mean_absolute_error')
    rmse_scores = cross_val_score(best_model, X, y, cv=tscv, scoring='neg_root_mean_squared_error')
    avg_mae = -mae_scores.mean()
    avg_rmse = -rmse_scores.mean()
    print(f"Cross-Validation MAE: {avg_mae:.2f}, RMSE: {avg_rmse:.2f}")
    
    best_model.fit(X, y)
    
    test, test_feature_cols = create_test_frame(start_date, end_date, df, team)
    rng = get_hoops_for_test_frame(start_date, end_date, team)
    print(rng)

    test_df = pd.DataFrame(test, columns=test_feature_cols)
    test_df['Date'] = rng['Date'].values
    pred = best_model.predict(test_df.drop(columns=['Date'], errors='ignore'))
    print(pred)

    # Apply percentage increase and clip negative values
    adjusted_pred = pred * (1 + percentage_increase / 100)
    adjusted_pred = np.maximum(adjusted_pred, 0)  # Ensure no negative predictions
    
    pred_df = pd.DataFrame(adjusted_pred, columns=["Predictions"])
    pred_df = pd.concat([pred_df.reset_index(drop=True), rng.reset_index(drop=True)], axis=1)
    
    return pred_df



In [None]:
def plot_yoy_call_volume_with_2025(preds, client_code, team=None, start_date="2025-01-01", end_date="2025-12-31"):
    df_historical = get_call_data(client_code, team)
#     df_historical = df_historical[df_historical['Date']<"2025-02-01"]
    df_historical_max = df_historical['Date'].max() if not df_historical.empty else None
    print(f"Historical data: {len(df_historical)} rows, max date: {df_historical_max}")
    
    if len(df_historical) == 0:
        print("No historical data available to plot.")
        return
    
    df_pred = preds
    print(f"Predictions input: {len(df_pred)} rows")
    
    if len(df_pred) > 0:
        # Convert dates for consistency
        df_historical['Date'] = pd.to_datetime(df_historical['Date']).dt.date
        df_pred['Date'] = pd.to_datetime(df_pred['Date']).dt.date
        
        # Filter historical data to exclude dates in predictions
        pred_dates = set(df_pred['Date'])
        df_historical = df_historical[~df_historical['Date'].isin(pred_dates)]
        print(f"Filtered historical data: {len(df_historical)} rows after excluding prediction dates")
        
        # Filter predictions to post-historical dates (optional, for safety)
        df_historical_max = df_historical['Date'].max() if not df_historical.empty else None
        if df_historical_max:
            df_historical_max = pd.to_datetime(df_historical_max).date()
            df_pred = df_pred[df_pred['Date'] > df_historical_max]
        print(f"Predictions after filter: {len(df_pred)} rows, min date: {df_pred['Date'].min()}")
        
        df_pred = df_pred.rename(columns={'Predictions': 'Calls'})
        df_combined = pd.concat([df_historical[['Date', 'Calls']], df_pred[['Date', 'Calls']]], ignore_index=True)
    else:
        print("No prediction data available. Plotting historical data only.")
        df_combined = df_historical[['Date', 'Calls']]
    
    print(f"Combined data: {len(df_combined)} rows")
    df_combined['Date'] = pd.to_datetime(df_combined['Date'])
    df_combined['Year'] = df_combined['Date'].dt.year
    df_combined['Month'] = df_combined['Date'].dt.month
    print(f"Years in combined data: {df_combined['Year'].unique()}")
    
    monthly_data = df_combined.groupby(['Year', 'Month'])['Calls'].sum().reset_index()
    pivot_data = monthly_data.pivot(index='Month', columns='Year', values='Calls').fillna(0)
    print(f"Pivot columns (years): {pivot_data.columns.tolist()}")
    
    plt.figure(figsize=(14, 8))
    for year in pivot_data.columns:
        plt.plot(pivot_data.index, pivot_data[year], marker='o', linestyle='-', label=str(year))
        for month in pivot_data.index:
            value = pivot_data.loc[month, year]
            if value > 0:
                plt.text(month, value, f'{int(value)}', ha='center', va='bottom', fontsize=8)
    
    plt.title(f'Year-over-Year Call Volume by Month (Client: {client_code}, Team: {team or "All"})')
    plt.xlabel('Month')
    plt.ylabel('Call Volume')
    plt.xticks(range(1, 13), ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 
                              'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'])
    plt.legend(title='Year', bbox_to_anchor=(1.05, 1), loc='upper left')  # Fixed typo 'Yeazr' to 'Year'
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.show()

In [None]:
preds = future_projection(ds, de, Clientcode, "Technical Support",percentage_increase=0)
preds

In [None]:
ss = plot_yoy_call_volume_with_2025(preds,Clientcode, team="Technical Support", start_date=ds, end_date=de)
ss

### Avg Handle Time Projections

In [None]:
def get_call_data(client, team=None, skill=None):
    avaya_db = get_db_avaya_stats()
    
    # Query for collecting a team's skill group call data
    if  team:
        sql = """Query to pull Handle time for AHT predictions"""
        sql = sql % (client, team)
  
    # Execute query, put results in dataframe
    df = pd.read_sql(sql, con=avaya_db)
    
    df = df.sort_values(by='Date', ascending=True)
    today_date = datetime.today().date()
    df = df[df['Date'] != today_date]
    if len(df) == 0:
        return []
    
    # Turn the timedelta 'Time' column into integer representation of the time
    df = time_to_int(df)
    # This function makes a dataframe containing every time interval from 00:00 to 23:30 of every day, starting from the 
    # first day of collected data, to the last day of collected data. A complete time series with no gaps.
    rng = get_date_range(df.Date.iloc[0], df.Date.iloc[-1])
    # Merge the collected data, which contains gaps, with the gapless rng dataframe, to create one complete time series
    df = pd.merge(rng, df, how="outer", on=["Date","Time"])

    # The places in which the collected data had gaps will leave a NaN after the merge, so we should fill these with
    # a 0. 
    df = df.fillna(0)
    df['Calls'] = df['Calls'].astype(int)

    # This function adds all the columns used by the machine learning model
    df = feature_engineering(df,team)
    
    return df

In [None]:
def get_date_range(first, last):
    df = pd.DataFrame(columns=["Date","Time"])
    rng = pd.date_range(start=first, end=last).date
    rng = pd.DataFrame(rng, columns=['Date'])
    for i in range(len(rng)):
        d = rng.Date[i]
        arr = []
        for i in range(12):
            arr.append(i*2*100)
        temp = pd.DataFrame(arr, columns=["Time"])
        temp["Date"] = d
        df = pd.concat([df, temp])
    df["Time"]=df["Time"].astype(int)
#     print(df.dtypes)
    
    return df

In [None]:
import pandas as pd

def fill_missing_AHT(df,team):
    # Convert 'Date' column to datetime if not already in that format
    Contractual_threshold = """Select AHT_Contractual_threshold from reporting.client_report_conditions where clientCode = "xyz"  """
    Contractual_threshold = pd.read_sql(Contractual_threshold, con=avaya_db)
    
    Contractual_threshold = Contractual_threshold['AHT_Contractual_threshold'].loc[0]
    Contractual_threshold
    
    df['Date'] = pd.to_datetime(df['Date'])

# Sort the DataFrame by 'Date'
    df.sort_values('Date', inplace=True)
    df.reset_index(drop=True, inplace=True)
    
    if team == "Technical Support":
        # Identify rows where 'day_of_week' < 5 and 'Calls' is 0
        weekday_calls = (df['day_of_week'] < 5)
        df['Time'] = df['Time'].astype(int)

        # Identify the time intervals from 6 pm to 8 pm with 'Calls' = 0
        evening_calls_zero = (df['Time']== 2000) & (df['Calls'] == 0)

        # Update identified intervals in the original DataFrame to 'Calls' = 1
        df.loc[weekday_calls & evening_calls_zero, 'Calls'] = 1
        df.loc[weekday_calls & evening_calls_zero, 'AvgHandleTime'] = Contractual_threshold

    for index, row in df.iterrows():
        if row['day_of_week'] < 5 and row['Calls'] == 0:
            current_date = row['Date']
            current_time = row['Time']

            # Find the previous week's date for the same day (Wednesday in this case)
            prev_week_same_day = current_date - pd.DateOffset(weeks=1)

            # Filter data for the same day and time interval in the previous week
            prev_week_data = df[
                (df['Date'] == prev_week_same_day) &
                (df['Time'] == current_time)
            ]

            # Check if there's data for the same time interval in the previous week
            if not prev_week_data.empty:
                # Replace missing 'Calls' values for the specific interval from the previous week's data
                df.at[index, 'Calls'] = prev_week_data.iloc[0]['Calls']  # Using iloc[0] to get the first value if multiple found
                df.at[index, 'AvgHandleTime'] = prev_week_data.iloc[0]['AvgHandleTime']  # Use the 'AHT' value from the previous week

    df['Date'] = df['Date'].dt.date

    return df

In [None]:

def feature_engineering(df,team):

    short = False
    if df.iloc[0].Date > df.iloc[-1].Date - timedelta(371*2): short = True
    # The day of the week for each row is important information
    df["day_of_week"] = df.apply(lambda row : row["Date"].weekday(), axis=1)
    df["AvgHandleTime"].loc[df["day_of_week"] > 4] = 0
    
    # Extract information about the date, because a date can not be fed to machine learning model. Only numbers.
    df["month"] = df.apply(lambda row: row["Date"].month, axis = 1)
    df["day_of_month"] = df.apply(lambda row: row["Date"].day, axis = 1)
    df["year"] = df.apply(lambda row: row["Date"].year, axis = 1)
    
    #is date a US Holiday?
    us_holidays = holidays.US()
    df["is_holiday"] = df.apply(lambda row : int(row["Date"] in us_holidays), axis=1 )
    df = set_easter_holiday(df)
    df = fill_missing_AHT(df,team)

    # call volume a week ago
    df["last_week"] = df["AvgHandleTime"].shift(12*7)
    # 2 weeks ago
    df["last_week_2"] = df["AvgHandleTime"].shift(12*7*2)
    # 3 weeks ago
    df["last_week_3"] = df["AvgHandleTime"].shift(12*7*3)
    
    if not short:
        # 4 weeks ago
        df["last_week_4"] = df["AvgHandleTime"].shift(12*7*4)
        # 5 weeks ago
        df["last_week_5"] = df["AvgHandleTime"].shift(12*7*5)
        # 6 weeks ago
        df["last_week_6"] = df["AvgHandleTime"].shift(12*7*6)
        # 7 weeks ago
        df["last_week_7"] = df["AvgHandleTime"].shift(12*7*7)
        # 8 weeks ago
        df["last_week_8"] = df["AvgHandleTime"].shift(12*7*8)
        # 9 weeks ago
        df["last_week_9"] = df["AvgHandleTime"].shift(12*7*9)
        # 364 days ago, because we want to land on the same day of the week, not same date
        df["last_year"] = df["AvgHandleTime"].shift(12*364)
        # same weekday from last year minus another week
        df["last_year-1w"] = df["AvgHandleTime"].shift(12*371)
        # same weekday from last year plus another week
        df["last_year+1w"] = df["AvgHandleTime"].shift(12*357)
    # Several rows of data will have null values for last_year and/or last_week columns so we drop them
    df = df.dropna()
#     df["AvgHandleTime"].loc[df.Calls < 5] = 0
    return df.reset_index(drop=True)

In [None]:

def time_to_int(df):
    df["Time"] = df["Time"].astype(str)
    df["Time"] = df.apply(lambda row : row["Time"][-8:-3], axis=1)
    df["Time"] = df.apply(lambda row : int(row["Time"].replace(":", "")), axis=1)
    return df

In [None]:

def get_last_week(depth, day, time, old_data):
    i = 0
    while day > old_data.Date.iloc[-1]:
        day = day - timedelta(7)
    
    d = day-timedelta( 7 * (depth-1) )
    print (d,time)
    print(old_data[(old_data.Date == d) & (old_data.Time == time)])
    return old_data[(old_data.Date == d) & (old_data.Time == time)].AvgHandleTime.item()

In [None]:
def get_hoops_for_test_frame(start_date, end_date,team):
    from datetime import date, timedelta, datetime
    import datetime as dt

    reporting_db = get_db_dictionary()
    
    todays_date = datetime.today().date()
    sql = """Get HOOPS"""
    sql = sql % (team,start_date,end_date,team,start_date,end_date)

    Hoops = pd.read_sql(sql, con=reporting_db)
    Hoops['Hoop_Start']= pd.to_datetime(Hoops['Hoop_Start'])
    Hoops['Hoop_Start'] = pd.to_datetime(Hoops['Hoop_Start']).dt.time

    Hoops['Hoop_End']= pd.to_datetime(Hoops['Hoop_End'])
    Hoops['Hoop_End'] = pd.to_datetime(Hoops['Hoop_End']).dt.time

    total_rows = Hoops.shape[0]

    # total_rows = 15

    rows = []

    for x in range(total_rows):
        PIT_Date = Hoops['PIT_Date'][x]
        Hoop_Start = str(Hoops['Hoop_Start'][x])
        Hoop_End = str(Hoops['Hoop_End'][x])

        # Convert Hoop_Start and Hoop_End from strings to datetime objects
        start = dt.datetime.strptime(Hoop_Start, '%H:%M:%S')
        end = dt.datetime.strptime(Hoop_End, '%H:%M:%S')
        delta = dt.timedelta(hours=2)

        # Loop through the time range
        t = start
        while t < end:
            Daterec = t.strftime('%H:%M:%S')
            t += delta
            # Append each row to the list as a dictionary
            rows.append({'Date_PIT': PIT_Date, 'Timestamp': Daterec})

    # Create the Final DataFrame from the list of dictionaries
    Finaldataframe = pd.DataFrame(rows)

    df1 = Finaldataframe
    df1["Timestamp"] = df1["Timestamp"].astype(str)
    df1["Timestamp"] = df1.apply(lambda row : row["Timestamp"][:-3], axis=1)
    df1["Timestamp"] = df1.apply(lambda row : row["Timestamp"].replace(":", ""), axis=1)
    df1["Timestamp"] = df1["Timestamp"].astype(int)
    df1 = df1.rename(columns={'Timestamp': 'Time'})
    df1 = df1.rename(columns={'Date_PIT': 'Date'})

    return df1
    

In [None]:

def create_test_frame(start_date, end_date, old_data,team):
    rng = get_hoops_for_test_frame(start_date, end_date,team)
    
    rng["day_of_week"] = rng.apply(lambda row : row["Date"].weekday(), axis=1)
    rng["month"] = rng.apply(lambda row: row["Date"].month, axis = 1)
    rng["day_of_month"] = rng.apply(lambda row: row["Date"].day, axis = 1)
    rng["year"] = rng.apply(lambda row: row["Date"].year, axis = 1)
    
    us_holidays = holidays.US()
    rng["is_holiday"] = rng.apply(lambda row : row["Date"] in us_holidays, axis=1 )
    rng["is_holiday"] = rng["is_holiday"].astype(int)
    rng = set_easter_holiday(rng)
    
    rng["last_week"] = rng.apply(lambda row : get_last_week(1, row["Date"], row["Time"], old_data), axis=1)
    rng["last_week_2"] = rng.apply(lambda row : get_last_week(2, row["Date"], row["Time"], old_data), axis=1)
    rng["last_week_3"] = rng.apply(lambda row : get_last_week(3, row["Date"], row["Time"], old_data), axis=1)

    if "last_year" in old_data.columns:
        rng["last_week_4"] = rng.apply(lambda row : get_last_week(4, row["Date"], row["Time"], old_data), axis=1)
        rng["last_week_5"] = rng.apply(lambda row : get_last_week(5, row["Date"], row["Time"], old_data), axis=1)
        rng["last_week_6"] = rng.apply(lambda row : get_last_week(6, row["Date"], row["Time"], old_data), axis=1)
        rng["last_week_7"] = rng.apply(lambda row : get_last_week(7, row["Date"], row["Time"], old_data), axis=1)
        rng["last_week_8"] = rng.apply(lambda row : get_last_week(8, row["Date"], row["Time"], old_data), axis=1)
        rng["last_week_9"] = rng.apply(lambda row : get_last_week(9, row["Date"], row["Time"], old_data), axis=1)
        
        rng["last_year"] = rng.apply(lambda row : old_data[(old_data.Date == row.Date - timedelta(364)) & (old_data.Time == row.Time)].AvgHandleTime.item(), axis=1)
        rng["last_year-1w"] = rng.apply(lambda row : old_data[(old_data.Date == row["Date"] - timedelta(371)) & (old_data.Time == row.Time)].AvgHandleTime.item(), axis=1)
        rng["last_year+1w"] = rng.apply(lambda row : old_data[(old_data.Date == row["Date"] - timedelta(357)) & (old_data.Time == row.Time)].AvgHandleTime.item(), axis=1)
        
        rng = rng[["Time", "day_of_week", "month", "day_of_month", "year", "is_holiday", "last_week", "last_week_2", "last_week_3", "last_week_4", "last_week_5", "last_week_6", "last_week_7", "last_week_8", "last_week_9", "last_year", "last_year-1w", "last_year+1w"]]
    else:
        rng = rng[["Time", "day_of_week", "month", "day_of_month", "year", "is_holiday", "last_week", "last_week_2", "last_week_3"]]
    return rng

In [None]:

def future_projection(start_date, end_date, client, team, model):
    if isinstance(start_date, str):
        start_date = datetime.strptime(start_date, "%Y-%m-%d").date()
    elif isinstance(start_date, date):
        pass
    else:
        raise ValueError("start_date must be a string or a date object")
        
    df = get_call_data(client, team)
    if len(df) == 0: 
        return []
    if df.iloc[-1].Date < date.today() - timedelta(21):
        return []
    print("\tFitting model with historic data...")
  
    if "last_year" in df.columns and df.iloc[0].Date <= start_date - timedelta(371):
       
        model.fit(df[["Time", "day_of_week", "month", "day_of_month", "year", "is_holiday", "last_week", 
                         "last_week_2", "last_week_3", "last_week_4", "last_week_5", "last_week_6", 
                         "last_week_7", "last_week_8", "last_week_9", "last_year", "last_year-1w", 
                         "last_year+1w"]], df["AvgHandleTime"])

    else:
     
        model.fit(df[["Time", "day_of_week", "month", "day_of_month", "year", "is_holiday", "last_week", 
                         "last_week_2", "last_week_3"]], df["AvgHandleTime"])

    test = create_test_frame(start_date, end_date, df,team)
    
    print("\tGenerating predictions...")
    pred = model.predict(test)
    pred = pd.DataFrame(pred, columns=["Predictions"])
    
    rng = get_hoops_for_test_frame(start_date, end_date,team)
    rng["day_of_week"] = rng.apply(lambda row : row.Date.weekday(), axis=1)
    pred = pd.concat([pred.reset_index(drop=True), rng.reset_index(drop=True)], axis=1)
    pred["Predictions"].loc[pred["day_of_week"] > 4] = 0
    
    return pred

In [None]:
# SQL Update statement exists to set the AHT column of an existing row in the DB to the projected value for that row 
update_sql = """UPDATE query"""

# For each team in the teams array, get the prediction dataframe, and perform the update, and insert queries
for team in teams:
    print("Generating AHT Predictions for",team, "...")
    # Get the prediction dataframe for team, then execute update_sql for team
    pred = future_projection(ds, de, client, team, model)
    if len(pred) == 0:
        print("\tDue to lack of data, " + team + " is being skipped")
        continue
    print("\tInserting predictions into the database")
    
    for index, row in pred.iterrows():
        if pred.iloc[index, 0] == 0:
            continue
        sql = update_sql
        sql = sql % (pred.iloc[index, 0],client, team,VersionType, str(pred.iloc[index, 1]), pred.iloc[index, 2] * 100, pred.iloc[index, 2]*100 + 13000)
        run_query(sql)
    
    print("\tFinished.")

#### Erlang Script to Use Call arrival Pattern, Call Volume Projections, Avg Handle time Projections to Find FTE Required

In [None]:
def time_to_int(df):
    df["Time"] = df["Time"].astype(str)
    df["Time"] = df.apply(lambda row : row["Time"][-8:-3], axis=1)
    df["Time"] = df.apply(lambda row : int(row["Time"].replace(":", "")), axis=1)
    return df

def run_query(sql):
    reporting_client_db = get_db_Client_data_import()
    cursor_insert = reporting_client_db.cursor()
    cursor_insert.execute(sql)
    reporting_client_db.commit()
    cursor_insert.close()
    
# Inserts the given fte into the database. Sets the team column to PARAM(team)
def insert(client,fte,team):
    insert_sql = """INSERT INTO a.b(Pkey,Team,ContactType,Version_date,VersionType,Cap_Date,Cap_Time,input_Calls,clientCode,input_asa_seconds,input_aht_seconds,input_shrinkage_pct,input_min_service_level,input_max_occupancy,positions_fte_raw,positions_fte_shrink,serv_level_projection,occupancy_projection,waiting_probability,Smoothed_FTE1)
    VALUES """
    D = """ON DUPLICATE KEY UPDATE Version_date = Values(Version_date),input_aht_seconds=Values(input_aht_seconds),positions_fte_raw=Values(positions_fte_raw),\n
        input_Calls= Values(input_Calls),positions_fte_shrink=Values(positions_fte_shrink) ,serv_level_projection=Values(serv_level_projection),\n
        occupancy_projection=Values(occupancy_projection),waiting_probability=Values(waiting_probability),Smoothed_FTE1=Values(Smoothed_FTE1)"""
#     team="Combined"
    i = 0
    sql = insert_sql
    print(sql)
    version_date = str(datetime.now())
    
    for i, r in fte.iterrows():
        
        pkey = str(client)+ "-" + str(team)+ "-" + str(r.Date)+ "-" + str(r.Time*100) +"-"+ str(VersionType)
        print(pkey)
        s = "('%s','%s', 'phone', '%s','%s', '%s', %s, %s, '%s', %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s),\n"
        s = s % (pkey,team, version_date,VersionType, str(r.Date), r.Time*100, r.Calls,client, 30, r.aht, 0.35, 0.8, 0.85, r.raw_positions, r.positions, r.service_level, r.occupancy, r.waiting_prob, r.smooth)
        sql += s
        i += 1
        if i == 500:
            sql = sql[:-2]
            sql += "\n" + D
            run_query(sql)
            i = 0
            sql = insert_sql
    
    # Unless the length of the dataframe is a multiple of 500, then there will be rows of the dataframe that
    # have not been inserted into the DB. Here we insert the remaining rows, if they exist.
    if sql != insert_sql:
        sql = sql[:-2]
        sql += "\n" + D
        run_query(sql)
        
        
def roll(fte):
    # Sort the dataframe by Date and Time
    fte_sorted = fte.sort_values(by=['Date', 'Time']).reset_index(drop=True)
    
    # Initialize an empty list to store smoothed values
    smooth_values = []
    
    # Loop over each unique day in the Date column
    for d in fte_sorted['Date'].unique():
        day_fte = fte_sorted[fte_sorted['Date'] == d]
        
        # Perform rolling mean on the positions column
        smooth_day = day_fte['positions'].rolling(4, center=True, min_periods=1).mean().astype(int)
        
        # Append smoothed values to the list
        smooth_values.extend(smooth_day.tolist())
    
    # Add the smoothed values to a new column in the original dataframe
    fte_sorted['smooth'] = smooth_values
    
    return fte_sorted

In [None]:
# if client_projection_calls == 'Y' :
Client_CVP = """
    SELECT Projection_Date AS Date, Calls
    FROM w.f
    WHERE Type = 'P' AND Client = '%s' AND Team ='%s'
      AND Version_Date = (
          SELECT MAX(Version_Date) 
          FROM w.f
          WHERE Type = 'P' AND Client = '%s' AND Team = '%s'
      )
      AND Projection_Date >= '%s' AND Projection_Date <=' %s'
      AND Pkey != ''
"""
# else:
    

sql_cvp = """
SELECT Projection_Date AS Date, Calls, callsByClient
FROM w.CVP
WHERE Version_Date = (
    SELECT MAX(Version_Date)
    FROM w.CVP
    WHERE Client = '%s' AND Team = '%s' AND VersionType = '%s' AND Projection_Date >= '%s' AND Projection_Date <= '%s' AND Pkey != ''
) AND Client = '%s' AND Team = '%s' AND VersionType = '%s' AND Projection_Date >= '%s' AND Projection_Date <= '%s' AND Pkey != ''
"""

# This query selects CAP data for a team, for all dates between ds and de, inclusive.
sql_cap = """
SELECT Cap_Date AS Date, Cap_Time AS Time, CAP_Percent1 AS Percent, AvgHandleTime
FROM workforce.CAP
WHERE Version_timestmp = (
    SELECT MAX(Version_timestmp)
    FROM workforce.CAP
    WHERE clientCode = '%s' AND Team = '%s' AND VersionType = '%s' AND Cap_Date >= '%s' AND Cap_Date <= '%s' AND Pkey != ''
) AND clientCode = '%s' AND Team = '%s' AND VersionType = '%s' AND Cap_Date >= '%s' AND Cap_Date <= '%s' AND Pkey != ''
"""

if client == "xyz":
    teams = ['a' , 'b']
    
reporting_db = get_db_dictionary()
Client_condition = """Select clientCode,ASA_Contractual_threshold from a.client_report_conditions
                        where clientCode = '%s'"""
Client_condition = Client_condition % (client)
Client_condition = pd.read_sql(Client_condition,con= reporting_db)

ASA_value = Client_condition['ASA_Contractual_threshold'].iloc[0]
ASA_value = ASA_value / 60
ASA_value

In [None]:
# VersionType = 'P'
reporting_client_db = get_db_Client_data_import()

# This will contain non-ASA FTE projections. It will be used to create a combined FTE projection for SOP, SQW, and Operator
for team in teams:
    print(team)
    
    # Initialize a list to accumulate rows for the fte projections
    fte_data = []
    
    # Fill in the SQL query with specific values
    sql = sql_cvp % (client, team, VersionType, str(ds), str(de), client, team, VersionType, str(ds), str(de))
    cvp = pd.read_sql(sql, con=reporting_client_db)
    
    Client_CVP_PD = Client_CVP % (client, team, client,team,str(ds), str(de))
    Client_CVP_PD = pd.read_sql(Client_CVP_PD, con=reporting_client_db)
    print("Client provided calls:" )
    print(Client_CVP_PD)
    
    # Create a mapping Series from df2
    calls_map = Client_CVP_PD.set_index('Date')['Calls']

    # Update 'Calls' in df1 using map from the 'Date' column
    cvp['Calls'] = cvp['Date'].map(calls_map).fillna(cvp['Calls'])
    print(cvp)
    
    # Fill in the SQL query for cap
    sql = sql_cap % (client, team, VersionType, str(ds), str(de), client, team, VersionType, str(ds), str(de))
    cap = pd.read_sql(sql, con=reporting_client_db)
    
    # Iterate over cap dataframe
    for i, row in cap.iterrows():
        print(f"Processing row {i} for team {team}")
        
        if row.AvgHandleTime == 0:
            continue  # Skip intervals with AvgHandleTime of 0
        
        # Calculate target transactions
        target = row.Percent * cvp[cvp.Date == row.Date].Calls.item()
        print(f"Target transactions: {target}")
        
        # Create ErlangC object
        erlang = ErlangC(transactions=target, asa=ASA_value, aht=row.AvgHandleTime / 60, interval=30, shrinkage=0.35)
        req = erlang.required_positions(service_level=0.8, max_occupancy=0.85)
        
        # Adjust shrinkage if occupancy is too low
        if req['occupancy'] < 0.6:
            erlang = ErlangC(transactions=target, asa=ASA_value, aht=row.AvgHandleTime / 60, interval=30, shrinkage=0.25)
            req = erlang.required_positions(service_level=0.8, max_occupancy=0.85)
        
        # Create a new row of data
        r = {
            'Date': row.Date,
            'Time': row.Time,
            'positions': req['positions'],
            'raw_positions': req['raw_positions'],
            'Calls': target,
            'aht': row.AvgHandleTime,
            'service_level': req['service_level'],
            'occupancy': req['occupancy'],
            'waiting_prob': req['waiting_probability']
        }
        
        fte_data.append(r)
    
    # Convert the list of rows to a DataFrame
    fte = pd.DataFrame(fte_data)
    
    # Apply time_to_int function
    fte = time_to_int(fte)
    
    # Perform rolling mean smoothing
    fte = roll(fte)
    
    # Insert the fte data into the database
    print(f"Final FTE data for {team}:")
    print(fte)
    
    insert(client, fte, team)

In [None]:
# Limitations
# Projections are most accurate for up to 9 months into the future.
# Requires sufficient historical data (at least 371 days for full feature set including last-year metrics).
