# AlayaCare


## Ingestion pipeline


In [1]:
from sys import path

path.append("src")
from warnings import filterwarnings
from datetime import datetime

filterwarnings("ignore")
from src.utility.environment import Environment
from src.data.ingestion_pipeline.ingestion_pipeline import IngestionPipeline
from src.data.ingestion_pipeline.stages.cleaning.client_data_cleaning_calculated_fields_stage import (
    build_client_data_cleaning_calculated_fields_stage,
)
from src.data.ingestion_pipeline.stages.cleaning.clock_data_cleaning_calculated_fields_stage import (
    build_clock_data_cleaning_calculated_fields_stage,
)
from src.data.ingestion_pipeline.stages.cleaning.employee_data_cleaning_calculated_fields_stage import (
    build_employee_data_cleaning_calculated_fields_stage,
)
from src.data.ingestion_pipeline.stages.cleaning.visit_data_cleaning_calculated_fields_stage import (
    build_visit_data_cleaning_calculated_fields_stage,
)
from src.data.ingestion_pipeline.stages.augmented_visit_stage import (
    build_augmented_visit_stage,
)
from src.data.ingestion_pipeline.stages.segmentation_stage import (
    build_segmentation_stage,
)
from src.data.ingestion_pipeline.stages.employee_history_aggregation_stage import (
    build_employee_history_aggregation_stage,
)
from src.data.ingestion_pipeline.stages.employee_history_fill_gaps_stage import (
    build_employee_history_fill_gaps_stage,
)
from src.data.ingestion_pipeline.stages.employee_history_calculated_fields_stage import (
    build_employee_history_calculated_fields_stage,
)
from src.data.ingestion_pipeline.stages.employee_rolling_features_stage import (
    build_employee_history_rolling_features_stage,
)
from src.data.ingestion_pipeline.stages.employee_history_fill_na_stage import (
    build_employee_history_fill_na_stage,
)
from src.data.ingestion_pipeline.stages.employee_history_anomaly_detection_stage import (
    build_employee_history_anomaly_detection_stage,
)
from src.data.ingestion_pipeline.stages.training_employee_history_stage import (
    build_training_employee_history_stage,
)
from src.data.ingestion_pipeline.stages.y_labels_generation_stage import (
    build_y_labels_generation_stage,
)
from src.utility.configs.configs import Configs

# 6 Hours
MAX_RUNTIME = 21600

env = Environment()
conf = Configs.EXPLAINABLE_BOOSTING_MACHINE_CONFIG

start = datetime.now()

ingestion_pipeline = (
    IngestionPipeline(
        config=conf,
        environment=env,
        use_caching=True,
        limit_dataframe_size=conf.limit_dataframe_size,
        stages=[
            build_client_data_cleaning_calculated_fields_stage(conf, env),
            build_clock_data_cleaning_calculated_fields_stage(conf, env),
            build_employee_data_cleaning_calculated_fields_stage(conf, env),
            build_visit_data_cleaning_calculated_fields_stage(conf, env),
            build_augmented_visit_stage(conf, env),
            build_segmentation_stage(conf, env),
            build_employee_history_aggregation_stage(conf, env),
            build_employee_history_fill_gaps_stage(conf, env),
            build_employee_history_calculated_fields_stage(conf, env),
            build_employee_history_rolling_features_stage(conf, env),
            build_employee_history_fill_na_stage(conf, env),
            build_employee_history_anomaly_detection_stage(conf, env),
            build_y_labels_generation_stage(conf, env),
            build_training_employee_history_stage(conf, env),
        ],
    )
    .build_pipeline()
    .run_pipeline()
)

elapsed_time = (datetime.now() - start)

if conf.is_test_run:
    assert elapsed_time.total_seconds() < MAX_RUNTIME

ingestion_pipeline.data_integrity_test(threshold=0.3)

Running pipeline:   0%|          | 0/13 [00:00<?, ?it/s]

  0%|          | 0/12 [00:00<?, ?it/s]

Computing commute distance:   0%|          | 0/6902 [00:00<?, ?it/s]

Computing employee tenure:   0%|          | 0/6902 [00:00<?, ?it/s]

[38;5;2mSUCCESS: [0m Client_Data_Cleaning_Calculated_fields_Stage's Data integrity test has passed.
[38;5;2mSUCCESS: [0m Clock_Data_Cleaning_Calculated_fields_Stage's Data integrity test has passed.
[38;5;2mSUCCESS: [0m Employee_Data_Cleaning_Calculated_fields_Stage's Data integrity test has passed.
[38;5;2mSUCCESS: [0m Visit_Data_Cleaning_Calculated_fields_Stage's Data integrity test has passed.
[38;5;2mSUCCESS: [0m Augmented_Visit_Stage's Data integrity test has passed.
[38;5;2mSUCCESS: [0m Segmentation_Stage's Data integrity test has passed.
[38;5;2mSUCCESS: [0m Employee_History_Aggregation_Stage's Data integrity test has passed.
[38;5;2mSUCCESS: [0m Employee_History_Fill_Gaps_Stage's Data integrity test has passed.
[38;5;2mSUCCESS: [0m Employee_History_Calculated_Fields_Stage's Data integrity test has passed.
[38;5;2mSUCCESS: [0m Employee_History_Rolling_Features_Stage's Data integrity test has passed.
[38;5;2mSUCCESS: [0m Employee_History_Fill_Na_Stage's Data

Unnamed: 0,error_id,error,count
0,OUT_OF_RANGE,CLIENT_AGE is greater than 125,20
1,OUT_OF_RANGE,CLIENT_AGE is less than 0,135
0,OUT_OF_RANGE,DAY_HOURS is greater than 336,409
1,OUT_OF_RANGE,END_TIME is after 2023-09-01 00:00:00,393
2,OUT_OF_RANGE,NIGHT_HOURS is greater than 336,163
3,OUT_OF_RANGE,START_TIME is after 2023-09-01 00:00:00,1361
4,OUT_OF_RANGE,START_TIME is before 2019-01-01 00:00:00,2
5,OUT_OF_RANGE,WEEKDAY_HOURS is greater than 336,94
0,OUT_OF_RANGE,EMPLOYEE_AGE is greater than 90,528
1,OUT_OF_RANGE,EMPLOYEE_AGE is less than 16,1


## Segmentation

In [None]:
from src.utility.segmentation import DataSegmenter
from src.data.ingestion_pipeline.ingestion_pipeline_stages import IngestionPipelineStages
from src.data.schema.segmentation_schema import SegmentationSchema
import json

segmentation_columns = [
    SegmentationSchema.AVG_VISIT_DURATION,
    SegmentationSchema.AVG_WORK_HOURS_DEVIATION,
    SegmentationSchema.AVG_VISIT_HOURS_PER_WEEK,
    SegmentationSchema.AVG_VISITS_PER_WEEK,
    SegmentationSchema.AVG_DISTANCE_TO_PATIENT,
    SegmentationSchema.AVG_CODED_DIAGNOSIS_PER_PATIENT,
    SegmentationSchema.AVG_WAS_LATE_BY,
    SegmentationSchema.AVG_LATE_ARRIVALS_PER_WEEK,
    SegmentationSchema.UNIQUE_CLIENTS_RATIO,
]

demographic_columns = [
    SegmentationSchema.EMPLOYEE_AGE,
    SegmentationSchema.EMPLOYEE_GENDER,
    SegmentationSchema.EMPLOYEE_TENURE,
    SegmentationSchema.EMPLOYEE_JOB_TITLE,
    SegmentationSchema.AVG_HOURLY_PAY,
    SegmentationSchema.COMPLETED_ADL_RATIO
]

categorical_columns = [
    SegmentationSchema.EMPLOYEE_GENDER,
    SegmentationSchema.EMPLOYEE_JOB_TITLE,
]

# Prepare Segmentation dataframes
archetype_dataframe = ingestion_pipeline.dataframes[IngestionPipelineStages.SEGMENTATION_STAGE].copy()
segmentation_dataframe = archetype_dataframe.copy()[segmentation_columns]
demographic_dataframe = archetype_dataframe.copy()[demographic_columns]

segmenter = DataSegmenter(
    env=env,
    pipeline_config=ingestion_pipeline.to_dict(),
    segmentation_dataframe=segmentation_dataframe,
    demographic_dataframe=demographic_dataframe,
    categorical_columns=categorical_columns
)

unique_id = datetime.now().strftime("%Y%m%d%H%M%S")
segmenter.segmenter_id  = f"Data Segmentation: {unique_id}"
segmenter.segment_sequence(scaling_method="standard", n_clusters=6)

for feature in segmenter.final_dataframe.columns:
    archetype_dataframe[feature] = segmenter.final_dataframe[feature]

with open('src/mappings/ARCHETYPE_mapping.json', 'r') as file:
    data = json.load(file)

data = {float(key): value for key, value in data.items()}
archetype_dataframe['ARCHETYPE'] = archetype_dataframe['CLUSTER_ID'].map(data)

archetype_dataframe.to_csv(path.join(env.visual_dir, "final_segmentation.csv"), index=False)

## Training

In [None]:
from src.machine_learning.trainer import train

train(
    ingestion_pipeline=ingestion_pipeline,
    config=conf,
    environment=env,
    limit=0.0005,
    use_k_fold=False,
    accelerator="gpu",
    use_sweep=False
)


## Hyperparameters sweep

In [None]:
from wandb import sweep

sweep_config = {
    "method": "random",
    "metric": {"name": "test/auroc", "goal": "maximize"},
    "parameters": {
        "cutoff": {"values": [0.3, 0.4, 0.5]},
        "learning_rate": {"distribution": "uniform", "min": 0.001, "max": 0.1},
    },
}

sweep_id = sweep(sweep_config, project="alayacare") 


In [None]:
from src.machine_learning.sweep import run_sweep
from src.machine_learning.trainer import train
# Run the hyperparameter sweeps for the training loop

run_sweep(
    sweep_id=sweep_id,
    train=lambda : train(
        ingestion_pipeline=ingestion_pipeline,
        config=conf,
        environment=env,
        limit=0.1,
        use_k_fold=False,
        use_sweep=True,
    ),
)


## Explaining the model

In [None]:
from os import path
from pandas import DataFrame
from torch import load
from numpy import int64
from src.machine_learning.model.perceptron import Perceptron
from src.machine_learning.data.module.alayacare_data_module import AlayaCareDataModule
from src.analysis.shap.shap_explainer import SHAPExplainer
from src.utility.logger import Logger
from src.utility.configs.explanation_config import ExplanationConfig

explanation_config = ExplanationConfig(
    nsamples=3,
    sample_size=250,
    seed=hash("alayacare") % 2**32 - 1,
)

logger = Logger(env=env)
logger.wandb.login(key=env.wandb_api_key)
run = logger.wandb.init(
    project="alayacare",
    group="explainability",
)
artifact = run.use_artifact('alayacare/alayacare/model.ckpt:explainability', type='model', aliases=["model"], use_as="model")
artifact_dir = artifact.download()

datamodule = AlayaCareDataModule(
    ingestion_pipeline=ingestion_pipeline,
    config=conf,
).prepare_data().setup("setup_splits", fold_number=0)

model = Perceptron(
        model_config=conf,
        in_features=datamodule.n_features,
        out_features=datamodule.n_classes,
        gender_embedding_size=datamodule.n_genders,
        state_embedding_size=datamodule.n_states,
        example_input_array=datamodule.dataset[0][0],
        environment=env,
    )
model.load_state_dict(load(path.join(str(artifact_dir), 'model_weights.pt')))
model.eval()

xs, ys = next(iter(datamodule.test_dataloader()))
test_dataframe = DataFrame(xs.numpy(), columns=datamodule.dataset.dataframe.columns[2:-1])

shap_explainer = SHAPExplainer(
    model=model,
    dataframe=test_dataframe,
    nsamples=explanation_config.nsamples,
    seed=explanation_config.seed,
)
explanation = shap_explainer.explain(test_dataframe[:explanation_config.sample_size])


logger.log_plot("explanation", "beeswarm", explanation.bee_swarm_plot())
logger.log_plot("explanation", "waterfall", explanation.waterfall_plot(prediction_index=0))
for feature_name in explanation.feature_names:
    logger.log_plot("explanation_dependency", feature_name, explanation.dependency_scatter_plot(feature_name=feature_name))
logger.log_plot("explanation", "decision", explanation.decision_plot())
logger.log_plot(
    "explanation",
    "misclassified_decision",
    explanation.decision_plot_misclassified(config=conf, y_true=ys[:explanation_config.sample_size].numpy().astype(int64))
)

logger.wandb.finish()


## Volume Test for Ingestion Pipeline

In [None]:
from sys import path
path.append("src")
from warnings import filterwarnings
filterwarnings("ignore")
from datetime import datetime
from pandas import Timestamp
from src.utility.environment import Environment
from src.utility.configs.config import Config
from src.data.ingestion_pipeline.ingestion_pipeline import IngestionPipeline
from src.data.ingestion_pipeline.ingestion_pipeline_stages import IngestionPipelineStages
from src.data.ingestion_pipeline.stages.cleaning.client_data_cleaning_calculated_fields_stage import build_client_data_cleaning_calculated_fields_stage
from src.data.ingestion_pipeline.stages.cleaning.clock_data_cleaning_calculated_fields_stage import build_clock_data_cleaning_calculated_fields_stage
from src.data.ingestion_pipeline.stages.cleaning.employee_data_cleaning_calculated_fields_stage import build_employee_data_cleaning_calculated_fields_stage
from src.data.ingestion_pipeline.stages.cleaning.visit_data_cleaning_calculated_fields_stage import build_visit_data_cleaning_calculated_fields_stage
from src.data.ingestion_pipeline.stages.augmented_visit_stage import build_augmented_visit_stage
from src.data.ingestion_pipeline.stages.employee_history_aggregation_stage import build_employee_history_aggregation_stage
from src.data.ingestion_pipeline.stages.employee_history_fill_gaps_stage import build_employee_history_fill_gaps_stage
from src.data.ingestion_pipeline.stages.employee_history_calculated_fields_stage import build_employee_history_calculated_fields_stage
from src.data.ingestion_pipeline.stages.employee_history_fill_na_stage import build_employee_history_fill_na_stage
from src.data.ingestion_pipeline.stages.employee_history_anomaly_detection_stage import build_employee_history_anomaly_detection_stage
from src.data.ingestion_pipeline.stages.segmentation_stage import build_segmentation_stage
from src.data.ingestion_pipeline.stages.training_employee_history_stage import build_training_employee_history_stage
from src.data.ingestion_pipeline.stages.y_labels_generation_stage import build_y_labels_generation_stage

# 6 Hours
MAX_RUNTIME = 21600

start = datetime.now()

env = Environment()
conf = Config(
    load_id=datetime.now().strftime("%Y%m%d_%H%M%S"),
    n_splits=2,
    split_seed=5832391,
    log_every_n_steps=50,
    training_window_size=1,
    n_epochs=5,
    batch_size=16384,
    label_policy="90Days",
    period_duration="1D",
    period_start=Timestamp(year=2019, month=1, day=1),
    period_end=Timestamp(year=2023, month=12, day=31),
    cutoff=0.5,
    oversampler="SMOTE",
    oversampler_args={},
    model="ExplainableBoostingMachine",
    model_config={},
    is_test_run=True
)

ingestion_pipeline = IngestionPipeline(
    config=conf,
    environment=env,
    use_caching=False,
    stages=[
        build_client_data_cleaning_calculated_fields_stage(conf, env),
        build_clock_data_cleaning_calculated_fields_stage(conf, env),
        build_employee_data_cleaning_calculated_fields_stage(conf, env),
        build_visit_data_cleaning_calculated_fields_stage(conf, env),
        build_augmented_visit_stage(conf, env),
        build_employee_history_aggregation_stage(conf, env),
        build_employee_history_fill_gaps_stage(conf, env),
        build_employee_history_calculated_fields_stage(conf, env),
        build_employee_history_fill_na_stage(conf, env),
        build_employee_history_anomaly_detection_stage(conf, env),
        build_y_labels_generation_stage(conf, env),
        build_segmentation_stage(conf, env),
        build_training_employee_history_stage(conf, env),
    ],
).build_pipeline().run_pipeline()

end = datetime.now()
elapsed_time = (end - start)

assert elapsed_time < MAX_RUNTIME


## Inference & Visualization Dataset

### Preparing Inference Dataset

In [None]:
from src.data.ingestion_pipeline.ingestion_pipeline_stages import IngestionPipelineStages
from src.data.schema.employee_history_schema import EmployeeHistorySchema
from src.data.schema.employee_schema import EmployeeSchema
from pandas import to_timedelta

DAYS_IN_YEAR = 365

# Retrieve the employee dataframe
employee_df = ingestion_pipeline.dataframes[IngestionPipelineStages.EMPLOYEE_DATA_CLEANING_CALCULATED_FIELDS_STAGE]

# Retain employees with EMPLOYEE_STATUS 1 (active) or 3 (suspended)
filtered_employee_df = employee_df #[employee_df[EmployeeSchema.EMPLOYEE_STATUS].isin([1, 3])]

# Retrieve the employee history dataframe
employee_history_df = ingestion_pipeline.dataframes[IngestionPipelineStages.EMPLOYEE_HISTORY_AGGREGATION_STAGE]

# filter the employee history dataframe to only keep employees without a TERMINATION_DATE
non_terminated_employees_list = employee_history_df[employee_history_df[EmployeeHistorySchema.EMPLOYEE_TERMINATION_DATE].isnull()][EmployeeHistorySchema.EMPLOYEE_ID].unique()

# Employees with EMPLOYEE_STATUS 1 (active) or 2 (suspended) and without a TERMINATION_DATE
current_employees_df = filtered_employee_df[filtered_employee_df[EmployeeSchema.EMPLOYEE_ID].isin(non_terminated_employees_list)]

# Retrieve the training dataframe
training_df = ingestion_pipeline.dataframes[IngestionPipelineStages.TRAINING_EMPLOYEE_HISTORY_STAGE]

# Filter the training dataframe to only keep current employees
current_training_df = training_df[training_df[EmployeeHistorySchema.EMPLOYEE_ID].isin(current_employees_df[EmployeeSchema.EMPLOYEE_ID])]

# Further filter the training dataframe to only keep the latest period for each current employee
current_training_latest_period_df = current_training_df.loc[current_training_df.groupby(EmployeeHistorySchema.EMPLOYEE_ID)[EmployeeHistorySchema.PERIOD_START].idxmax()].reset_index()

# Employees that have started before the last year AND have not quit before the last year
yoy_employees_list = employee_history_df[
    ((employee_history_df[EmployeeHistorySchema.EMPLOYEE_TERMINATION_DATE].isnull()) |
    (employee_history_df[EmployeeHistorySchema.EMPLOYEE_TERMINATION_DATE] > conf.period_end - to_timedelta(DAYS_IN_YEAR, unit='D'))) &
    (employee_history_df[EmployeeHistorySchema.EMPLOYEE_START_ON] < conf.period_end - to_timedelta(DAYS_IN_YEAR, unit='D'))
    ][EmployeeHistorySchema.EMPLOYEE_ID].unique()

# Filter the training df to keep YOY data
yoy_training_df = training_df[training_df[EmployeeHistorySchema.EMPLOYEE_ID].isin(yoy_employees_list)]

# Further filter the yoy_training_df to only keep the latest period for each yoy employee
yoy_training_latest_period_df = yoy_training_df.loc[yoy_training_df.groupby(EmployeeHistorySchema.EMPLOYEE_ID)[EmployeeHistorySchema.PERIOD_START].idxmax()]

### Inference with an sk-learn model

In [None]:
from os import path
import pickle
from interpret import show

from pandas import DataFrame
from numpy import ndarray, select

# Update model file path according to desired model for inference
MODEL_FILE_PATH = path.join(env.model_dir,'ExplainableBoostingMachine' ,'model_00000.pkl')

# Update the thresholds according to SME's recommendations
HIGH_RISK_THRESHOLD = 0.75
MEDIUM_RISK_THRESHOLD = 0.5

# Retrieve selected churn prediction model
with open(MODEL_FILE_PATH, 'rb') as file:
    model = pickle.load(file)

def run_predictions(df: DataFrame, model):
    """Runs inference on a dataframe and stores prediction probabilities in new column 

    Args:
        df (DataFrame): the input DataFrame
        model (): the churn prediction model

    Returns:
        DataFrame: the input DataFrame updated with a CHURN_PROBABILITY column 
    """
    predictions = model.predict_proba(df)

    if isinstance(predictions, ndarray) and predictions.shape[1] == 2:
                predictions = predictions[:, 1]

    df['CHURN_PROBABILITY'] = predictions

    return df

def create_churn_risk_column(df):
    """Bins CHURN_PROBABILITY into a textual CHURN_RISK column 

    Args:
        df (DataFrame): the input DataFrame

    Returns:
        DataFrame: the input DataFrame updated with a CHURN_RISK column
    """
    conditions = [
        (df['CHURN_PROBABILITY'] <= MEDIUM_RISK_THRESHOLD),
        (MEDIUM_RISK_THRESHOLD < df['CHURN_PROBABILITY']) & (df['CHURN_PROBABILITY'] <= HIGH_RISK_THRESHOLD),
        (df['CHURN_PROBABILITY'] > HIGH_RISK_THRESHOLD)
    ]

    values = ['Low Risk', 'Medium Risk', 'High Risk']

    df['CHURN_RISK'] = select(conditions, values)

    return df

current_visual_dataset = run_predictions(current_training_latest_period_df, model)
current_visual_dataset = create_churn_risk_column(current_visual_dataset)

yoy_visual_dataset = run_predictions(yoy_training_latest_period_df, model)
yoy_visual_dataset = create_churn_risk_column(yoy_visual_dataset)

### Generate the Feature Importance Dataset

In [None]:
from pandas import DataFrame, melt

local_explanations = model.explain_local(current_training_latest_period_df[model.feature_names].values)

explanation_columns =  local_explanations.data(0)['names']

explanations_df = DataFrame(columns=explanation_columns)

for i in range(len(current_training_latest_period_df)):
    score = local_explanations.data(i)['scores']
    explanations_df.loc[len(explanations_df)] = score

explanations_df['EMPLOYEE_ID'] = current_training_latest_period_df['EMPLOYEE_ID']

explanations_df = melt(explanations_df, id_vars=['EMPLOYEE_ID'], var_name='FEATURE_NAME', value_name='FEATURE_IMPORTANCE')

explanations_df.to_csv(path.join(env.visual_dir, f"explanations_dataset.csv"), index=False)

### Preparing the dataset for visualization

In [None]:
from pandas import merge

from src.utility.mapping_loader import load_json_mapping_to_dict

EMPLOYEE_GENDER_MAPPING_PATH = "src/mappings/EMPLOYEE_GENDER_mapping.json"
EMPLOYEE_STATE_MAPPING_PATH = "src/mappings/EMPLOYEE_STATE_mapping.json"
EMPLOYEE_STATUS_MAPPING_PATH = "src/mappings/employee_status_mapping.json"
JOB_TITLE_MAPPING_PATH = "src/mappings/EMPLOYEE_JOB_TITLE_mapping.json"

def prepare_visual_dataset(df):
    # Retrieve employee dataframe
    employee_df = ingestion_pipeline.dataframes[IngestionPipelineStages.EMPLOYEE_DATA_CLEANING_CALCULATED_FIELDS_STAGE]

    # Drop columns from visual_dataset that will be pulled from employee_df
    columns_to_keep = ['EMPLOYEE_ID'] + [col for col in df.columns if col not in employee_df.columns]
    df = df[columns_to_keep]

    # Merge visual_dataset and employee_df
    df = merge(df, employee_df, on=EmployeeHistorySchema.EMPLOYEE_ID)

    # Replace EMPLOYEE_GENDER with text
    employee_gender_mapping = load_json_mapping_to_dict(EMPLOYEE_GENDER_MAPPING_PATH)
    df[EmployeeSchema.EMPLOYEE_GENDER] = df[EmployeeSchema.EMPLOYEE_GENDER].map(employee_gender_mapping).fillna("Unknown")

    # Replace EMPLOYEE_STATE with text
    employee_state_mapping = load_json_mapping_to_dict(EMPLOYEE_STATE_MAPPING_PATH)
    df[EmployeeSchema.EMPLOYEE_STATE] = df[EmployeeSchema.EMPLOYEE_STATE].map(employee_state_mapping).fillna("Unknown")

    # Replace EMPLOYEE_STATUS with text
    employee_status_mapping = load_json_mapping_to_dict(EMPLOYEE_STATUS_MAPPING_PATH)
    df[EmployeeSchema.EMPLOYEE_STATUS] = df[EmployeeSchema.EMPLOYEE_STATUS].map(employee_status_mapping).fillna("Unknown")

    # Replace JOB_DESCRIPTION with text
    job_description_mapping = load_json_mapping_to_dict(JOB_TITLE_MAPPING_PATH)
    df[EmployeeSchema.EMPLOYEE_JOB_TITLE] = df[EmployeeSchema.EMPLOYEE_JOB_TITLE].map(job_description_mapping).fillna("Unknown")

    return df

# Prepare and save visual datasets
current_visual_dataset = prepare_visual_dataset(current_visual_dataset)
current_visual_dataset.to_csv(path.join(env.visual_dir, f"current_visual_dataset.csv"), index=False)

yoy_visual_dataset = prepare_visual_dataset(yoy_visual_dataset)
yoy_visual_dataset.to_csv(path.join(env.visual_dir, f"yoy_visual_dataset.csv"), index=False)

### Preparing the dataset for the map

In [None]:
from datetime import timedelta
from pandas import DataFrame
from src.data.schema.augmented_visit_schema import AugmentedVisitSchema

MAP_DATASET_DAY_SPAN = 365

# Retrieve the relevant columns for the map dataset from AUGMENT_VISIT_STAGE
map_columns = [AugmentedVisitSchema.EMPLOYEE_ID, AugmentedVisitSchema.VISIT_START_AT, AugmentedVisitSchema.CLIENT_LATITUDE, AugmentedVisitSchema.CLIENT_LONGITUDE]
map_df = ingestion_pipeline.dataframes[IngestionPipelineStages.AUGMENT_VISIT_STAGE][map_columns]

# Filter out employees that are not in the visual_dataset
map_df = map_df[map_df[AugmentedVisitSchema.EMPLOYEE_ID].isin(current_visual_dataset[AugmentedVisitSchema.EMPLOYEE_ID])]

# Rename columns
map_df = map_df.rename(columns={
    AugmentedVisitSchema.VISIT_START_AT: 'DATE',
    AugmentedVisitSchema.CLIENT_LATITUDE: 'LATITUDE',
    AugmentedVisitSchema.CLIENT_LONGITUDE: 'LONGITUDE'
})

# Set COORDINATE_TYPE to client for all rows
map_df["COORDINATE_TYPE"] = "CLIENT"

# Append rows with the coordinates of each employee 
for employee_id in map_df[AugmentedVisitSchema.EMPLOYEE_ID].unique():
    # Filter employee_df for the specific EMPLOYEE_ID
    employee_data = employee_df[employee_df[EmployeeSchema.EMPLOYEE_ID] == employee_id]

    if not employee_data.empty:
        employee_latitude = employee_data.iloc[0][EmployeeSchema.EMPLOYEE_LATITUDE] + 0.005 # Adding noise to avoid overlap of employee and client bubbles on the map
        employee_longitude = employee_data.iloc[0][EmployeeSchema.EMPLOYEE_LONGITUDE]

        # Create a new row with EMPLOYEE_ID and LATITUDE
        new_row = DataFrame({
            AugmentedVisitSchema.EMPLOYEE_ID: [employee_id],
            'DATE': [conf.period_end],
            'LATITUDE': [employee_latitude], 
            'LONGITUDE': [employee_longitude],
            'COORDINATE_TYPE': 'EMPLOYEE'
        })

        # Append the new row to map_df
        map_df = map_df.append(new_row, ignore_index=True)

map_df

# Save map dataset to csv
map_df.to_csv(path.join(env.visual_dir, f"map_dataset.csv"), index=False)
