In [1]:
import numpy as np
import numpy.polynomial.polynomial
import pandas as pd
import warnings
import scipy.stats as sc

In [None]:
def return_time_horizon_profiles_dict(random_state=None):
    
    time_horizon_profiles_dict = {
        'flat' : 
            {'y_values' : np.array(np.full((10),1)),
             'order' : 4
            },
        'beginning_heavy' : 
            {'y_values' : np.array([1.  , 1.15, 1.45, 1.15, 1.  , 0.95, 0.9 , 0.85, 0.8 , 0.75]),
             'order' : 4
            },
        'end_heavy' : 
            {'y_values' : np.array([0.75, 0.8 , 0.85, 0.9 , 0.95, 1.  , 1.15, 1.45, 1.15, 1.  ]),
             'order' : 4
            },
        'beginning_and_end_heavy' : 
            {'y_values' : np.array([1.05, 1.35, 1.05, 0.9 , 0.65, 0.65, 0.9 , 1.05, 1.35, 1.05]),
             'order' : 4
            },
        'middle_heavy' : 
            {'y_values' : np.array([0.7 , 0.85, 1.  , 1.15, 1.3 , 1.3 , 1.15, 1.  , 0.85, 0.7 ]),
             'order' : 4
            },
        'random' : 
            {'y_values' : sc.norm.rvs(1,0.3,10,random_state=random_state).clip(min=0),
             'order' : 6
            }    
    }
    
    return time_horizon_profiles_dict

def get_time_horizon_profiles_by_time_interval(time_horizon_profile, number_time_intervals,random_state=None):
    
    """
    This function is used to return a set of weights used to vary the taskload over the time horizon, 
    using the time_horizon_profiles_dict. The base weighting in the dictionary is fitted to a 
    polynomial and used to generate a weight for each point on the time horizon. 
    """
    
    time_horizon_profiles_dict = return_time_horizon_profiles_dict()
    
    order = time_horizon_profiles_dict[time_horizon_profile]['order']
    x = np.linspace(0,number_time_intervals,10) 
    y = time_horizon_profiles_dict[time_horizon_profile]['y_values']
    ffit = np.polynomial.polynomial.Polynomial.fit(x, y, deg=order)
    time_intervals = np.arange(number_time_intervals)
    out_array = ffit(time_intervals).clip(min=0.1)
    
    return out_array

def generate_taskload_array(taskload_parameters, random_state=None):
    
    """
    Function to generate an taskload array using specified input parameters.
    """
    
    number_time_intervals = taskload_parameters['number_time_intervals']
    number_jobs = taskload_parameters['number_jobs']
    job_normal_dist_params_dict = taskload_parameters['job_normal_dist_params_dict']
    job_time_horizon_profiles_dict = taskload_parameters['job_time_horizon_profiles_dict']
    job_zero_taskload_probability_dict = taskload_parameters['job_zero_taskload_probability_dict']
    
    taskload_array = np.zeros((number_jobs, number_time_intervals))
    
    # create an array of random states - necessary to replicate returned data
    random_states = sc.randint.rvs(0,number_jobs*number_time_intervals,
                                   size=(number_jobs, number_time_intervals),
                                   random_state=random_state)
    
    for row in range(number_jobs):
        time_horizon_profile = get_time_horizon_profiles_by_time_interval(job_time_horizon_profiles_dict[row], 
                                                                          number_time_intervals)
        zero_taskload_chance = job_zero_taskload_probability_dict[row]
        for column in range(number_time_intervals):
            mean = job_normal_dist_params_dict[row]['mean'] * time_horizon_profile[column]
            stdev = job_normal_dist_params_dict[row]['stdev'] * time_horizon_profile[column]
            random_state_n = random_states[row, column]
            if sc.uniform.rvs(0,1,random_state=random_state_n) >= zero_taskload_chance:
                taskload_array[row, column] = np.round(sc.norm.rvs(mean, stdev, size=1, 
                                                                   random_state=random_state_n).clip(min=0),0)[0]
    taskload_array = taskload_array.astype(int)
    
    return taskload_array

def generate_random_taskload_parameters(number_time_intervals, number_jobs, 
                                       job_taskload_parameter_generator_parameters_dict, 
                                       zero_taskload_density=0.25, time_horizon_profile=None, 
                                       random_state=None):

    """
    Function to generate a parameter set, randomised to a certain level given the specified inputs.
    """
    
    # generate taskload parameters for each job
    job_taskload_parameter_generator_mean = job_taskload_parameter_generator_parameters_dict['taskload_dist_means_mean']
    job_taskload_parameter_generator_stdev = job_taskload_parameter_generator_parameters_dict['taskload_dist_means_stdev']
    job_taskload_parameter_generator_stdev_ratio = job_taskload_parameter_generator_parameters_dict['taskload_dist_stdev_ratio']
    job_normal_dist_means = np.round(sc.norm.rvs(job_taskload_parameter_generator_mean, 
                                                  job_taskload_parameter_generator_stdev, 
                                                  size=number_jobs, random_state=random_state).clip(min=0),0)
    job_normal_dist_stdevs = np.round(np.multiply(job_normal_dist_means, 
                                                  job_taskload_parameter_generator_stdev_ratio).clip(min=0),0)
    job_normal_dist_params_dict = {i: {'mean': job_normal_dist_means[i], 'stdev': job_normal_dist_stdevs[i]}
                                   for i in range(number_jobs)}
    
    # if a time_horizon_profile is not provided, just use random ones for each job
    # if it is provided, use that for all jobs
    time_horizon_profiles_dict = return_time_horizon_profiles_dict(random_state=random_state)
    if time_horizon_profile is None:
        time_horizon_profiles_keys = sorted(list(time_horizon_profiles_dict.keys()))
        random_indexes = sc.randint.rvs(0,len(time_horizon_profiles_dict),size=number_jobs,random_state=random_state)
        job_time_horizon_profiles_dict = {i: time_horizon_profiles_keys[random_indexes[i]] for i in range(number_jobs)}
    else:
        job_time_horizon_profiles_dict = {i: time_horizon_profile  for i in range(number_jobs)}
        
    # calculate zero taskload probabilty using the zero_taskload_density
    # parameter as a knob to control density
    job_zero_taskload_probability_dict = {k: (lambda v: 0 if zero_taskload_density == 0 
                                              else 1/pow(v['mean'], 1-zero_taskload_density))(v)
                                          for k, v in job_normal_dist_params_dict.items()}
    
    out_parameters = {
        'number_time_intervals' : number_time_intervals,
        'number_jobs' : number_jobs,
        'job_normal_dist_params_dict' : job_normal_dist_params_dict,
        'job_time_horizon_profiles_dict' : job_time_horizon_profiles_dict,
        'job_zero_taskload_probability_dict' : job_zero_taskload_probability_dict,
        'time_horizon_profiles_dict' : time_horizon_profiles_dict,
    }
    
    return out_parameters
        
def generate_agent_availability_array(number_agents, number_time_intervals, agent_availability_time_intervals_dict):
    
    """
    Function to generate an array of booleans where each row is an agent, each column is
    a time period, and each element indicates whether that agent is working in that
    time period.
    """
    
    out_array = np.zeros((number_agents, number_time_intervals))
    
    for k, v in agent_availability_time_intervals_dict.items():
        
        out_array[k, v[0]:v[1]+1] = np.ones(v[1]-v[0]+1)
        
    return out_array

def return_agent_availabilities(taskload_parameters, number_agents, agent_availability_profile, 
                                shift_length=0, random_state=None):
    
    """
    Function to return an agent availabilties dictionary. Currently the two options are:
        1) full - here each agent can work for the whole time horizon
        2) random - here agents starts are randomly distributed between the first
                    time interval and the last one which would allow them to complete
                    a full shift
    More may be added at a later date.
    """
    
    number_time_intervals = taskload_parameters['number_time_intervals']
    number_jobs = taskload_parameters['number_jobs']
    
    if agent_availability_profile == 'full':
        
        agent_availability_time_intervals_dict = {i: (0, number_time_intervals-1) 
                                                  for i in range(number_agents)}
        
    elif agent_availability_profile == 'random':
        
        start_times = sc.randint.rvs(0, number_time_intervals-shift_length, size=number_agents, 
                                     random_state=random_state)
        
        agent_availability_time_intervals_dict = {i: (start_times[i], start_times[i]+shift_length-1) 
                                                  for i in range(number_agents)}
        
    else:
        agent_availability_time_intervals_dict = {}
        warnings.warn('Please use supported agent availability profile.')
    
    return agent_availability_time_intervals_dict

def generate_agent_job_endorsement_array(number_agents, number_jobs, agent_job_endorsements_dict):
    
    """
    Function to generate an array of booleans where each row is an agent, each column is
    a job, and each element indicates whether that agent is able to work that job.
    """
    
    out_array = np.zeros((number_agents, number_jobs))
    
    for k, v in agent_job_endorsements_dict.items():
        
        out_array[k, v] = 1
        
    return out_array

def return_agent_endorsements(number_agents, number_jobs, endorsement_density_distribution_dict, random_state=None):
    
    """
    Function to return an agent endorsement dictionary. The ratio of jobs that agents can do is 
    sampled from a uniform distribution for each agent, with the upper and lower bounds given in 
    endorsement_density_distribution_dict.
    """
    
    lower_bound = endorsement_density_distribution_dict['lower_bound']
    upper_bound = endorsement_density_distribution_dict['upper_bound']
    
    endorsement_densities = sc.uniform.rvs(scale=upper_bound-lower_bound,loc=lower_bound
                                          ,size=number_agents,random_state=random_state)
    
    uniform_array = sc.uniform.rvs(size=(number_agents, number_jobs), random_state=random_state)
    
    agent_job_endorsements_dict = {i: [j for j in range(number_jobs) 
                                       if uniform_array[i,j] <= endorsement_densities[i]
                                      ]
                                   for i in range(number_agents)
                                  }
    
    return agent_job_endorsements_dict

def generate_random_agent_parameters(number_agents, taskload_parameters, endorsement_density_distribution_dict,
                                     agent_availability_profile, shift_length=0, random_state=None):
    
    """
    Function to generate a random set of agent parameters.
    """

    number_time_intervals = taskload_parameters['number_time_intervals']
    number_jobs = taskload_parameters['number_jobs']
    
    agent_job_endorsements_dict = return_agent_endorsements(number_agents, number_jobs, 
                                                            endorsement_density_distribution_dict, 
                                                            random_state=random_state)
    
    agent_availability_time_intervals_dict = return_agent_availabilities(taskload_parameters, number_agents, 
                                                                         agent_availability_profile, 
                                                                         shift_length=shift_length, 
                                                                         random_state=None)
    
    agent_parameters = {
        'number_agents' : number_agents,
        'agent_availability_time_intervals_dict' : agent_availability_time_intervals_dict,
        'agent_job_endorsements_dict' : agent_job_endorsements_dict
    }
    
    return agent_parameters
