# Code for Creating End To End Test JSON

Add this code to the end of the "NRC Dairy 2020 Model Fn" script in R (Line 3338)
to create a JSON file. Then run the create_end_to_end_json.py and pass the JSON 
file from R to create a new JSON file formatted to run an end to end test.  

 

library(jsonlite)
var_names <- ls()
var_values <- mget(var_names)
json_data <- toJSON(var_values, pretty = TRUE)
write(json_data, file='R_values.json')


## Import Packages

In [282]:
import json
import pandas as pd
from nasem_dairy.ration_balancer.execute_model import execute_model
import nasem_dairy.ration_balancer.default_values_dictionaries as constants
import importlib_resources
import numpy as np


## 1. Format R JSON 

In [283]:
def load_R_json(R_json_path):
    with open(R_json_path, 'r') as file:
        data = json.load(file)

    variables_to_extract = ['efficiency_input', 'f', 'i'] 
    dictionaries = {}
    corrected_data_types = {}

    for variable, value in data.items():
        if variable in variables_to_extract:
            dictionaries[variable] = value
        elif isinstance(value, list):
            if len(value) == 1:
                if isinstance(value[0], (int, float)):
                    corrected_data_types[variable] = float(value[0])
                elif isinstance(value[0], str):
                    corrected_data_types[variable] = value[0]
        else:
            corrected_data_types[variable] = value
    return dictionaries, corrected_data_types
    

def get_user_diet(dictionaries, corrected_data_types):
    user_diet = {}
    Trg_Dt_DMIn = corrected_data_types['Trg_Dt_DMIn']
    feed_input = dictionaries['f']
    for feed in feed_input:
        user_diet[feed['Fd_Name']] = Trg_Dt_DMIn * (feed['Fd_DMInp'] / 100)
    return user_diet


def get_animal_input(corrected_data_types):
    animal_input_variables = [
        'An_Parity_rl', 'Trg_MilkProd', 'An_BW', 'An_BCS', 'An_LactDay', 
        'Trg_MilkFatp', 'Trg_MilkTPp', 'Trg_MilkLacp', 'Trg_Dt_DMIn', 
        'An_BW_mature', 'Trg_FrmGain', 'An_GestDay', 'An_GestLength', 
        'Trg_RsrvGain', 'Fet_BWbrth', 'An_AgeDay', 'An_305RHA_MlkTP', 
        'An_StatePhys', 'An_Breed', 'An_AgeDryFdStart', 'Env_TempCurr', 
        'Env_DistParlor', 'Env_TripsParlor', 'Env_Topo'      
    ]
    animal_input_in = {}
    for key in animal_input_variables:
        value = corrected_data_types.pop(key, None)
        if key == 'Trg_Dt_DMIn':
            animal_input_in['DMI'] = value
        else:
            animal_input_in[key] = value
    return animal_input_in, corrected_data_types


def get_equation_selection(updated_corrected_data_types):
    equation_selection_variables = [
        'Use_DNDF_IV', 'DMIn_eqn', 'mProd_eqn', 'MiN_eqn', 
        'NonMilkCP_ClfLiq', 'Monensin_eqn', 'mPrt_eqn', 
        'mFat_eqn', 'RumDevDisc_Clf'
    ]
    equation_selection_in = {}
    for key in equation_selection_variables:
        equation_selection_in[key] = updated_corrected_data_types.pop(key)
    return equation_selection_in, updated_corrected_data_types


def create_AA_values(updated_corrected_data_types):
    AA_list = [
        'Arg', 'His', 'Ile', 'Leu', 'Lys', 'Met', 'Phe', 'Thr', 'Trp', 'Val'
        ]
    columns = [
       'Du_AAMic', 'Du_IdAAMic', 'Du_AAEndP', 'Du_AA', 'DuAA_DtAA', 'Du_AA24h',
       'Abs_AA_g', 'mPrtmx_AA', 'mPrtmx_AA2', 'AA_mPrtmx', 'mPrt_AA_0.1',
       'mPrt_k_AA', 'IdAA_DtAA', 'Abs_AA_MPp', 'Abs_AA_p', 'Abs_AA_DEI',
       'Abs_AA_mol', 'Mlk_AA_g', 'MlkAA_AbsAA', 'MlkAA_DtAA', 'Gest_AA_g',
       'GestAA_AbsAA', 'Body_AAGain_g', 'BodyAA_AbsAA', 'An_AAUse_g',
       'AnAAUse_AbsAA', 'An_AABal_g', 'An_AAEff_EAAEff', 'Imb_AA',
       'Trg_Mlk_AA_g', 'Trg_AAUse_g', 'Trg_AbsAA_g', 'MlkNP_AbsAA',
       'AnNPxAA_AbsAA', 'AnNPxAAUser_AbsAA'
       ]
    AA_values = {}
    for column in columns:
        AA_values[column] = []
        for AA in AA_list:
            parts = column.split('EAA')
            new_key = 'EAA'.join([part.replace('AA', AA) for part in parts])
            value = updated_corrected_data_types.pop(new_key, None)
            AA_values[column].append(value)
    return AA_values, updated_corrected_data_types


def create_arrays(updated_corrected_data_types):
    AA_list = [
        'Arg', 'His', 'Ile', 'Leu', 'Lys', 'Met', 'Phe', 'Thr', 'Trp', 'Val'
        ]
    array_names = [
        'Fe_AAMet_AbsAA', 'Ur_AAEnd_AbsAA', 'ScrfAA_AbsAA', 'Fe_AAMet_g', 
        'Ur_AAEnd_g', 'Scrf_AA_g', 'Trg_AbsAA_NPxprtAA', 'Trg_AAEff_EAAEff'
    ]
    arrays = {}
    for name in array_names:
        arrays[name] = []
        for AA in AA_list:
            parts = name.split('EAA')
            new_key = 'EAA'.join([part.replace('AA', AA) for part in parts])
            value = updated_corrected_data_types.pop(new_key, np.nan)
            arrays[name].append(value)
    return arrays, updated_corrected_data_types


def remove_constants(updated_corrected_data_types):
    for key in constants.coeff_dict.keys():
        updated_corrected_data_types.pop(key, None)
    for key in constants.MP_NP_efficiency_dict.keys():
        updated_corrected_data_types.pop(key, None)
    mPrt_coeff_dict = constants.mPrt_coeff_list[0]
    for key in mPrt_coeff_dict.keys():
        updated_corrected_data_types.pop(key, None)
    return updated_corrected_data_types


def remove_untested_values(updated_corrected_data_types):
    # Removed due to differences in Python implementation. R code calculates
    # everythin then picks a value. In our code we select the equation and 
    # only calculate required values. Also inlcudes variables that have no 
    # use, such as equations selections
    values_to_remove = [  
        'Abs_EAA2_HILKM_g', 'Abs_EAA2_HILKMT_g', 'Abs_EAA2_RHILKM_g', 
        'An_AgeConcept1st', 'An_DIMConcept', 'An_GasEOut_Clf', 'An_GasEOut_Dry',
        'An_GasEOut_Heif', 'An_GasEOut_Lact', 'An_SWlact', 'An_WaIn_Dry',
        'An_WaIn_Lact', 'DietID', 'Dt_DMIn', 'Dt_DMIn_BW_LateGest_i',
        'Dt_DMIn_Heif_LateGestInd', 'Dt_DMIn_BW_LateGest_p', 'Dt_DMIn_Calf1',
        'Dt_DMIn_DryCow_AdjGest', 'Dt_DMIn_DryCow1', 'Dt_DMIn_DryCow1_Close',
        'Dt_DMIn_DryCow1_FarOff', 'Dt_DMIn_DryCow2', 'Dt_DMIn_Heif_H1',
        'Dt_DMIn_Heif_H1i', 'Dt_DMIn_Heif_H1p', 'Dt_DMIn_Heif_H2', 
        'Dt_DMIn_Heif_H2i', 'Dt_DMIn_Heif_H2p', 'Dt_DMIn_Heif_HJ1',
        'Dt_DMIn_Heif_HJ1i', 'Dt_DMIn_Heif_HJ1p', 'Dt_DMIn_Heif_HJ2',
        'Dt_DMIn_Heif_HJ2i', 'Dt_DMIn_Heif_HJ2p', 'Dt_DMIn_Heif_LateGestPen',
        'Dt_DMIn_Heif_NRCa', 'Dt_DMIn_Heif_NRCad', 'Dt_DMIn_Heif_NRCadi',
        'Dt_DMIn_Heif_NRCadp', 'Dt_DMIn_Heif_NRCai', 'Dt_DMIn_Heif_NRCap',
        'Dt_DMIn_Lact1', 'Dt_DMIn_Lact2', 'Dt_GasEOut_Clf', 'Dt_GasEOut_Dry',
        'Dt_GasEOut_Lact', 'Dt_GasEOut_Heif', 'Dt_NDF_drylim', 'Dt_NDFdev_DMI',
        'Du_MiN_NRC2021_g', 'Du_MiN_VTln_g', 'Du_MiN_VTnln_g', 'Fd_DMInp_Sum',
        'FrmGain_eqn', 'RsrvGain_eqn', 'K_DE_ME_ClfDry', 'K_FeCPend_ClfLiq',
        'Ky_ME_NE', 'mLac_eqn', 'mPrt_parmset', 'numFdProp', 'numIngrs',
        'RUP_eqn',  'SIDigArgRUPf', 'SIDigHisRUPf', 'SIDigIleRUPf', 
        'SIDigLeuRUPf', 'SIDigLysRUPf', 'SIDigMetRUPf', 'SIDigPheRUPf', 
        'SIDigThrRUPf', 'SIDigTrpRUPf', 'SIDigValRUPf', 'Trg_MP_NPxprt', 
        'Trg_NEmilkOut'
    ]
    # Variables are missing or have issues with naming/feed library (DNDF48)
    values_not_calculated = [
        'An_Grazing', 'An_ME_ClfDry', 'An_MEIn_ClfDry', 'An_NE_ClfDry', 
        'Dt_DE_ClfLiq', 'Dt_ME_ClfLiq',
        'Dt_ForDNDF48', 'Dt_ForDNDF48_ForNDF', # Need to fix Fd_DNDF48 in feed library as affecting calculation
        'Dt_RUPIn.dt', 'En_OM', 'Kf_ME_RE_Clf', 'Kf_ME_RE_ClfDry', 
        'Kf_ME_RE_ClfLiq', 'Km_ME_NE_Clf', 'Km_ME_NE_Cow', 'Km_ME_NE_Heif',
        'Rsrv_AshGain', 'Trg_Mlk_NP', 'VolSlds2_Milk'
        ]
    for name in values_to_remove:
        updated_corrected_data_types.pop(name, np.nan)
    for name in values_not_calculated:
        updated_corrected_data_types.pop(name, np.nan)

    # Rename key
    updated_corrected_data_types['Dt_DigrOMa_Dt'] = updated_corrected_data_types.pop('Dt_DigrOMa.Dt')
    return updated_corrected_data_types


In [284]:
dictionaries, corrected_data_types = load_R_json('./R_values.json')
user_diet_in = get_user_diet(dictionaries, corrected_data_types)
animal_input_in, updated_corrected_data_types = get_animal_input(corrected_data_types)
equation_selection_in, updated_corrected_data_types = get_equation_selection(updated_corrected_data_types)
AA_values, updated_corrected_data_types = create_AA_values(updated_corrected_data_types)
arrays, updated_corrected_data_types = create_arrays(updated_corrected_data_types)
updated_corrected_data_types = remove_constants(updated_corrected_data_types)
output_data = remove_untested_values(updated_corrected_data_types)


## 2. Create JSON Test File

In [285]:
def create_test_json(user_diet_in, animal_input_in, equation_selection_in, output_data, AA_values, arrays, file_path):
    data = {
        "input": {
            "user_diet_in": user_diet_in,
            "animal_input_in": animal_input_in,
            "equation_selection_in": equation_selection_in
        },
        "output": {
            'output_data': output_data,
            'AA_values': AA_values,
            'arrays': arrays
        }
    }
    with open(file_path, 'w') as json_file:
        json.dump(data, json_file, indent=4)


def read_test_json(file_path):
    with open(file_path, 'r') as json_file:
        data = json.load(json_file)

    AA_list = [
        'Arg', 'His', 'Ile', 'Leu', 'Lys', 'Met', 'Phe', 'Thr', 'Trp', 'Val'
        ]

    input_data = data['input']
    output = data['output']
    user_diet_in = pd.DataFrame(input_data['user_diet_in'].items(), columns=['Feedstuff', 'kg_user'])
    animal_input_in = input_data['animal_input_in']
    equation_selection_in = input_data['equation_selection_in']
    output_data = output['output_data']
    AA_values = pd.DataFrame(output['AA_values'], index=AA_list)
    AA_values = AA_values.rename(columns={"mPrt_AA_0.1": "mPrt_AA_01"})
    arrays = output['arrays']
    for key in arrays.keys():
        arrays[key] = np.array(arrays[key])
    return user_diet_in, animal_input_in, equation_selection_in, output_data, AA_values, arrays


In [286]:
create_test_json(user_diet_in, animal_input_in, equation_selection_in, output_data, AA_values, arrays, './demo_end_to_end_test.json')


# Run End to End Test

## 3. Pass Inputs to execute_model

In [287]:
user_diet_in, animal_input_in, equation_selection_in, output_data, AA_values, arrays = read_test_json('./demo_end_to_end_test.json')
    

In [288]:
path_to_package_data = importlib_resources.files("nasem_dairy.data")
feed_library_in = pd.read_csv(path_to_package_data.joinpath("NASEM_feed_library.csv"))

output = execute_model(
    user_diet=user_diet_in,
    animal_input=animal_input_in,
    equation_selection=equation_selection_in,
    feed_library_df=feed_library_in
)


  Dt_acCo = Abs_CoIn / Dt_CoIn  # Line 3230


## 4. Compare to expected_values

In [289]:
model_output = output.export_to_dict()


DataFrame keys: ['user_diet', 'diet_info', 'AA_values']
Series keys: ['Fe_AAMet_AbsAA', 'Ur_AAEnd_AbsAA', 'ScrfAA_AbsAA', 'f_Imb']
Numpy array keys: ['Kg_MP_NP_Trg', 'Fe_CP', 'Fe_AAMet_g', 'Ur_AAEnd_g', 'Scrf_AA_g', 'Inf_Rum', 'Inf_SI', 'Inf_Art', 'Dt_RUPIn', 'Dt_rOMIn', 'TT_dcNDF', 'TT_dcSt', 'Dt_dcCP_ClfDry', 'Dt_DMIn_ClfStrt', 'Dt_acMg', 'Dt_DEIn', 'An_DEIn', 'An_MEIn', 'FatGain_FrmGain', 'Trg_AbsAA_NPxprtAA', 'Trg_AAEff_EAAEff']
Dict keys: []
List keys: []


### Run Test

In [290]:
import math

def end_to_end_test(expected_output, expected_AA_values, expected_arrays, model_output, tolerance=1e-3):
    for key, expected_value in expected_output.items():
        model_value = model_output[key]
        if not isinstance(model_value, str) and not isinstance(expected_value, str):
            assert math.isclose(model_value, expected_value, rel_tol=tolerance, abs_tol=tolerance), \
                f"Mismatch for {key}: {model_value} != {expected_value}"

    model_AA_values = model_output['AA_values']
    pd.testing.assert_frame_equal(expected_AA_values, model_AA_values, rtol=tolerance, atol=tolerance)

    for key, value in expected_arrays.items():
        assert np.allclose(expected_arrays[key], model_output[key], rtol=tolerance, atol=tolerance), \
            f"Mismatch for {key}: {model_output[key]} != {expected_arrays[key]}"
 


In [291]:
end_to_end_test(output_data, AA_values, arrays, model_output)
