<header style="font-size:30pt;padding-top:20px;padding-bottom:20px;line-height:1em">Building models with EZ Starter notebooks</header>

## Usecase: ML Hackathon Team ML Fusion Distribution Area Revenue

<center><img width=500 src='attachment:b896b541-3d48-48e5-b8fe-dc19adb90db3.png'></center>

| Variable | Type | Description |
| - | - | - |
| `Billed_Rev_Pergreen` | Continuous | Mean Billed Revenue Per Living Unit for that Distributed Area (Revenue / # Living Units) |
| `hashed_cllida` | Discrete | Hashed Designated Area Common Location Indicator | 
| `DTV_OR_wrls_pnt` | Continuous | DTV or wireless penetration in a Distributed Area. Percentage by taking sum of address locations / # Living Units |
| `dtv_now_base_pckg_ind_P` | Continuous | DTV now base package penetration in the DA. Percentage by taking sum of address locations / # Living Units (`na` is set to `0.015576`) |
| `cable_email_Y` | Continuous | Percentage by taking sum of address locations with cable/previous cable owners that registered email through AT&T  / # Living Units |
| `attr4073` | Continuous | Mean percentage of trades ever derogatory (credit info). Value from zip code level averaged into Distributed Area level (Being late or deliquent on lease, credit cards, loans, etc) |
| `xb107_2` | Continuous | Percentage of consumers being extremely likely to be in market for new car/auto. The average at a zip code level (count/total) scaled to a Distributed Area level using # Living Units per Distributed Area |
| i_WLESS8 | Continuous | Average propensity of use WLS web (Wireless device to browse web, ie. mobile smartphone, sim card). The greater the number the more likelihood of use (`na` is set to `108.628`) |
| `prim_prd_cd_4` | Continuous | DTV Primary Product code of `PREMIER` (# of customer w/ premier pkg). Percentage by taking sum of address locations / # Living Units |
| `dtv_now_frnt_porch_ind_P` | Continuous | DTV now preview penetration in DA. Percentage by taking sum of address locations / # Living Units (`na` is set to `0.023574`) |
| `prim_prd_cd_6` | Continuous | DTV Primary Product code of 'SELECT ALL'. Percentage by taking sum of address locations / # Living Units |
| `Bachelor_s_Degree` | Continuous | Population of 25+ with Bachelors Degree. Total population of Bachelor degrees at the zip code level is averaged based on number of Distributed Area (not scaled by # Living Units) |
| `VZ_ATT_Speed_Ratio` | Continuous | Speed ratios of VZ and ATT to DMA level (Values obtained from third party that is now dismissed). |
| `TMO_ATT_Speed_Ratio` | Continuous | Speed ratios of TMO and ATT to DMA level (Values obtained from third party that is now dismissed). |
| `VZ_ATT_Outdoor_Signal_Ratio` | Continuous | Outdoor signal ratios of VZ to ATT from DMA level (Values obtained from third party that is now dismissed). |
| `TMO_ATT_Outdoor_Signal_Ratio` | Continuous | Outdoor signal ratios of TMO to ATT from DMA level (Values obtained from third party that is now dismissed). |
| `VZ_ATT_Indoor_Signal_Ratio` | Continuous | Indoor signal ratios of VZ to ATT from DMA level (Values obtained from third party that is now dismissed). |
| `TMO_ATT_Indoor_Signal_Ratio` | Continuous | Outdoor signal ratios of TMO to ATT from DMA level (Values obtained from third party that is now dismissed). |
| `ATT_Brand_Index` | Continuous | ATT Brand Index: How this DA is doing nationwide compared to Avg NPS. (Values obtained from third party that is now dismissed) |
| `TMO_Brand_Index` | Continuous | TMO Brand Index. (Values obtained from third party that is now dismissed) |
| `VZ_Brand_Index` | Continuous | VZ Brand Index. (Values obtained from third party that is now dismissed) |

<span style="color:blue;font-style:italic">Cells below are from our `EzLightGBM` notebook accessible to all Pinnacle users</span>

## Setup environment

In [None]:
import os, re, time, math, json, argparse, warnings
from datetime import datetime
from platform import python_version
import numpy as np
import mlflow
import mlflow.sklearn
import pandas as pd
import lightgbm as lgb     
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, \
    precision_score, recall_score, accuracy_score, f1_score, confusion_matrix,log_loss
from sklearn.model_selection import train_test_split

In [None]:
pd.set_option('mode.chained_assignment', None)              # Turn off the pandas warnings
regex = re.compile(r"\[|\]|<", re.IGNORECASE)               # Handling special characters in column names

PROPENSITY = "propensity"                                   # Propensity prediction model type
CLASSIFICATION = "classification"                           # Classification model type
MULTICLASSCLASSIFICATION = "multiclassClassification"       # MultiClass Classification model type
TIME_SERIES = "time_series"                                 # Time Series Regression model type
REGRESSION = "regression"                                   # Regression only model type

In [None]:
def eval_metrics_classification(actual, predicted,mlflow,Prediction_Data):
    # compute relevant metrics
    precision_value = precision_score(actual, predicted)
    recall_value = recall_score(actual, predicted)
    accuracy_value = accuracy_score(actual, predicted)
    f1_score_value = f1_score(actual, predicted)
    mlflow.log_metric(f"{Prediction_Data} precision", precision_value)
    mlflow.log_metric(f"{Prediction_Data} recall", recall_value)
    mlflow.log_metric(f"{Prediction_Data} accuracy", accuracy_value)
    mlflow.log_metric(f"{Prediction_Data} f1 score", f1_score_value)
    print(f"{Prediction_Data} precision", precision_value)
    print(f"{Prediction_Data} recall", recall_value)
    print(f"{Prediction_Data} accuracy", accuracy_value)
    print(f"{Prediction_Data} f1 score", f1_score_value)

def eval_metrics_propensity(actual, predicted,mlflow,Prediction_Data):
    log_loss_val = log_loss(actual, predicted)
    mlflow.log_metric(f"{Prediction_Data} Log-Loss", log_loss_val)
    print(f"{Prediction_Data} Log-Loss", log_loss_val)
    
def eval_metrics_regression(actual, predicted,mlflow,Prediction_Data):
    # compute relevant metrics
    mean_squared_error_value = mean_squared_error(actual, predicted)
    r2_score_value = r2_score(actual, predicted)
    mlflow.log_metric(f"{Prediction_Data} MSE", mean_squared_error_value)
    mlflow.log_metric(f"{Prediction_Data} R2", r2_score_value)
    print(f"{Prediction_Data} MSE", mean_squared_error_value)
    print(f"{Prediction_Data} R2", r2_score_value)
    
def eval_metrics_multiclass(actual, predicted,mlflow,Prediction_Data):
    precision_value = precision_score(actual, predicted, average="micro")
    recall_value = recall_score(actual, predicted,average="micro")
    accuracy_value = accuracy_score(actual, predicted)
    f1_score_value = f1_score(actual, predicted,average="micro")
    mlflow.log_metric(f"{Prediction_Data} precision", precision_value)
    mlflow.log_metric(f"{Prediction_Data} recall", recall_value)
    mlflow.log_metric(f"{Prediction_Data} accuracy", accuracy_value)
    mlflow.log_metric(f"{Prediction_Data} f1 score", f1_score_value)
    print(f"{Prediction_Data} precision", precision_value)
    print(f"{Prediction_Data} recall", recall_value)
    print(f"{Prediction_Data} accuracy", accuracy_value)
    print(f"{Prediction_Data} f1 score", f1_score_value)

## Instantiate EZ Starter `Model` class

EZ Starter LightGBM helps modelers to quickly spin up a new model for any Use Case. All basic coding have been done so you can concentrate on the Data Science. 

* ML Algorithm: 
    * LightGBM (Light Gradient Boosting Machine) - Microsoft
* Feature Engineering: 
    * One Hot Encoding 
    * Replace `(n/a, inf)` with `-9999`
    * Maintain Train and Test datasets features/columns in sync 
* Parameter Tuning: 
    * Grid Search to identify the best hyper-parameters 

## 

In [None]:
class Model(mlflow.pyfunc.PythonModel):
    def __init__(self, model_config):
        self.__model_config = model_config
        self.__index = model_config["row_id"]
        self.__target = model_config["target_label"]
        self.__model_type = model_config["model_type"]
        
        if self.__model_type == "regression" and model_config["date_field"] != "":
            self.__model_type = "time_series"

        # region Determine what fields to display
        self.__display_columns = [self.__index]
        if model_config["date_field"] != "":
            self.__display_columns.append(model_config["date_field"])
        if len(model_config["group_fields"]) > 0:
            self.__display_columns.extend(model_config["group_fields"])
        # endregion
        self.__model = None
        self.__columns = None

        # region Determine Preference
        self.__start_time = time.time()
        self.__prior_time = time.time()
        try:
            self.__ROW_DISPLAY = int(model_config["preference"]["ROW_DISPLAY"])
        except:
            self.__ROW_DISPLAY = 5
        try:
            self.__COL_DISPLAY = int(model_config["preference"]["COL_DISPLAY"])
        except:
            self.__COL_DISPLAY = 4
        try:
            self.__ROW_IMP_DISPLAY = int(model_config["preference"]["ROW_IMP_DISPLAY"])
        except:
            self.__ROW_IMP_DISPLAY = 15
        try:
            self.__SUB_NA_VAL = int(model_config["preference"]["SUB_NA_VAL"])
        except:
            self.__SUB_NA_VAL = -9999
        try:
            self.__PROP_COLUMN = int(model_config["preference"]["PROP_COLUMN"])
        except:
            self.__PROP_COLUMN = "propensitypositiveclass"
        try:
            self.__DEBUG = model_config["preference"]["DEBUG"]
        except:
            self.__DEBUG = False
        # endregion
    def __print_elapse_time(self):
        cur_time = time.time()
        second = cur_time - self.__prior_time
        total = cur_time - self.__start_time
        self.__prior_time = cur_time
        print("----- elapsed: {} min {:.3f} sec\t running: {} min {:.3f} sec".format(math.floor(second/60), second % 60,
                                                                      math.floor(total/60), total % 60))

    # *************************************************************************************** CUSTOM FEATURE ENGINEERING
    def perform_custom_feature_engineer(self, data):
        print("------------ perform_custom_feature_engineer() ------------")
        try:
            df = data.copy()

            # TODO CREATE YOUR FEATURE ENGINEERING HERE
            
            # bins = [0, 777, 2409, 4952, 26035]
            # labels = [1, 2, 3, 4]

            # df['Bachelor_s_Degree'] = pd.cut(np.array(df['Bachelor_s_Degree']), 
            #                          bins=bins,
            #                          labels=labels)
            
            # df['Bachelor_s_Degree'].cat.add_categories(self.__SUB_NA_VAL, inplace=True)
            
            return df
        except Exception as ex:
            print("WARNING", ex)
        return data

    # ******************************************* AUTO FEATURES ENGINEERING ********************************************
    def __afe_delete_column(self, data, params):
        print("------------ __afe_delete_column() ------------")
        col_list = params["col_list"]
        try:
            df = data.drop(columns=col_list)
            return df
        except Exception as ex:
            print("WARNING", ex)
            data_col_list = data.columns
            try:
                # delete each one with update-to-date list to prevent duplicate input list
                for col in col_list:
                    if col in data_col_list:
                        data.drop(col, axis=1, inplace=True)
            except:
                pass
        return data
    def afe_fill_default(self, data):
        print("***************************** afe_fill_default() ******************************************************")
        try:
            data = self.perform_custom_feature_engineer(data)
            data = self.__afe_delete_column(data, self.__model_config["afe_rules"][0]["parameters"])
        except Exception as ex:
            print("WARNING", ex)

        # Make sure no NaN nor inf
        data.replace([np.inf, -np.inf], self.__SUB_NA_VAL, inplace=True)
        data.fillna(self.__SUB_NA_VAL, inplace=True)
        if self.__DEBUG:
            print("------- DataFrame Info: ")
            # print(data.dtypes)
            data.info(verbose=True)
        return data
    def afe_encode_categorical(self, data):
        print("***************************** afe_encode_categorical() ************************************************")
        columns_to_encode = list(set(data.select_dtypes(exclude=['number', 'bool']).columns) - set([self.__index, self.__target]))
        if len(columns_to_encode) == 0:
            print("No categorical columns to encode.")
            return data
        else:
            print("------- Dummies-Encoding Columns: ")
            for col in columns_to_encode:
                print(" ==> {}".format(col))
        try:
            # encode all the non numerical columns
            df = pd.get_dummies(data, prefix_sep="__", columns=columns_to_encode)
            return df
        except Exception as ex:
            print("WARNING", ex)
        return data
    def gen_basic_time_series_afe(self, data):
        """
        This is a toy example for time-series afe generation so you can grasp the idea quicker. A more advance version
        can be found here: https://pinnacle.SERVICE_SITE/learn#Instructions-AutoFeatureengineering
        """
        print("***************************** gen_basic_time_series_afe() *********************************************")
        date_column = self.__model_config["date_field"]
        data[date_column] = pd.to_datetime(data[date_column])           # convert the date column to datetime format

        # Atomize the date
        data[date_column + '_DayOfWeek'] = data[date_column].dt.weekday
        data[date_column + '_DayOfMonth'] = data[date_column].dt.day
        data[date_column + '_DayOfYear'] = data[date_column].dt.dayofyear
        data[date_column + '_WeekOfYear'] = data[date_column].dt.weekofyear
        data[date_column + '_MonthOfYear'] = data[date_column].dt.month
        # data[date_column + '_Year'] = data[date_column].dt.year

        # data[date_column + '_QuarterOfYear'] = data[date_column].dt.quarter
        # data[date_column + '_MonthOfQuarter'] = data[date_column + '_MonthOfYear'] % 3
        return data.drop([date_column], axis=1)

    # ******************************************* TRAINING *************************************************************
    def __gen_afe_train(self, data_train):
        print("***************************** __gen_afe_train() *******************************************************")

        # Handling Time-Series Data
        if self.__model_type == TIME_SERIES:
            data_train = self.gen_basic_time_series_afe(data_train)

        # Fill nan and inf with default values
        data_train = self.afe_fill_default(data_train)
        columns = data_train.columns
        print("\nTrain Data - Filled Default: {} x {}\n=========================================\n{}\n"
              .format(len(data_train), len(columns),
                      data_train[columns[0:self.__COL_DISPLAY]].head(self.__ROW_DISPLAY)))

        # Encoding Categorical Features
        data_train = self.afe_encode_categorical(data_train)
        columns = data_train.columns
        print("\nTrain Data - Encoded: {} x {}\n=========================================\n{}\n"
              .format(len(data_train), len(columns),
                      data_train[columns[0:self.__COL_DISPLAY]].head(self.__ROW_DISPLAY)))

        # Change column name to replace special characters with underscore
        data_train.columns = [regex.sub("__", col) if any(x in str(col) for x in set(('[', ']', '<'))) else col for
                              col in data_train.columns]

        # Pickle the train columns list for future reference
        columns = list(set(data_train.columns) - set([self.__index, self.__target]))
        self.__columns=columns
        return data_train, columns
    def train(self,trainX,trainY):
        print("***************************** Model Train Phase *******************************************************")

        # Load the training frame
        data_train = trainX
        print("\nTrain Data - Raw: {} x {}\n=========================================\n{}\n"
              .format(len(data_train), len(data_train.columns), data_train[self.__display_columns].head(self.__ROW_DISPLAY)))
        self.__print_elapse_time()

        # Generate AFE for Training Dataset
        data_train, columns_list = self.__gen_afe_train(data_train)
        self.__print_elapse_time()

        # Format Training Data for Model Fitting
        trainX = data_train[columns_list]
        # TODO Create Model from predefined Parameters for faster execution
        print("\nCreate Model From the Parameter-Tuned best_estimator_")
        if self.__model_type in [TIME_SERIES, REGRESSION]:
            model = lgb.LGBMRegressor(random_state=27)
        else:
            model = lgb.LGBMClassifier(
                boosting_type='gbdt',
                random_state=27
            )
        print(model)
        model.fit(trainX, trainY)  # Fit model with training data
        self.__print_elapse_time()
        # endregion

        # region Print Features Importance
        print("\nFeatures Importance\n---------------------------------------")
        fea_important = list(zip(model.feature_importances_, columns_list))
        fea_important.sort(reverse=True)
        # pd.DataFrame(fea_important, index=[x for (_, x) in fea_important]).plot(kind='bar', rot=45)
        for x, y in fea_important[0:self.__ROW_IMP_DISPLAY]:
            print("{:12.6f}\t{}".format(x, y))
        # endregion
        self.__model = model
        self.__print_elapse_time()

    # ******************************************* PREDICTION ***********************************************************
    def __gen_afe_test(self, data_test):
        print("***************************** __gen_afe_test() ********************************************************")

        # Making sure target column is treated as int
        data_test[self.__target] = self.__SUB_NA_VAL

        # Handling Time-Series Data
        if self.__model_type == TIME_SERIES:
            data_test = self.gen_basic_time_series_afe(data_test)

        # Fill nan and inf with default values
        data_test = self.afe_fill_default(data_test)
        columns = data_test.columns
        print("\nTest Data - Filled Default: {} x {}\n=========================================\n{}\n"
              .format(len(data_test), len(columns), data_test[columns[0:self.__COL_DISPLAY]].head(self.__ROW_DISPLAY)))

        # Encoding Categorical Features
        data_test = self.afe_encode_categorical(data_test)
        columns = data_test.columns
        print("\nTest Data - Encoded: {} x {}\n=========================================\n{}\n"
              .format(len(data_test), len(columns), data_test[columns[0:self.__COL_DISPLAY]].head(self.__ROW_DISPLAY)))

        # Change column name to replace special characters with underscore
        data_test.columns = [regex.sub("__", col) if any(x in str(col) for x in set(('[', ']', '<'))) else col for col
                             in data_test.columns]

        # region Make sure test dataset has all columns as train dataset
        # Load the pickled columns list
        train_columns_list =self.__columns
        test_cols_list = list(set(data_test.columns) - set([self.__index, self.__target]))
        for col in train_columns_list:
            if col not in test_cols_list:
                print(" ==> Added: {}".format(col))
                data_test[col] = self.__SUB_NA_VAL
        print("------- Remove columns in Test if not existed in Train")
        for col in test_cols_list:
            if col not in train_columns_list:
                print(" ==> Removed: {}".format(col))
                data_test.drop(col, axis=1, inplace=True)
        print("\nTrain Columns: {}\t\tTest Columns: {}\n".format(len(train_columns_list), len(data_test.columns)))
        columns = data_test.columns
        print("\nTest Data - Synched Columns: {} x {}\n=========================================\n{}\n"
              .format(len(data_test), len(columns),
                      data_test[data_test.columns[0:self.__COL_DISPLAY]].head(self.__ROW_DISPLAY)))

        # endregion
        return data_test, train_columns_list
    def __scrub_prediction(self, data_test):
        print("***************************** __scrub_prediction() ****************************************************")
        if self.__model_config["model_type"] == PROPENSITY:
            data_prediction = data_test[self.__PROP_COLUMN]
        elif self.__model_config["model_type"] == MULTICLASSCLASSIFICATION:
            data_prediction = data_test[self.__target]
        else:
            if self.__model_type == CLASSIFICATION:
                data_test[self.__target] = data_test[self.__PROP_COLUMN].apply(lambda x: 1 if x > .5 else 0)
            else:
                data_test[self.__target] = data_test[self.__target].apply(lambda x: x if x > 0 else 0)  # No Negative
            data_prediction = data_test[self.__target]
        print("\nPrediction - Rows: {}\n=========================================\n{}\n"
              .format(len(data_prediction), data_prediction.head(self.__ROW_DISPLAY)))
        return data_prediction.values
    def predict(self,context,data_test):
        print("***************************** Model Predict Phase *****************************************************")

        # Load the testing data
        print("\nTest Data - Raw: {} x {}\n=========================================\n{}\n"
              .format(len(data_test), len(data_test.columns), data_test[self.__display_columns].head(self.__ROW_DISPLAY)))
        self.__print_elapse_time()

        # Generate AFE for Test Dataset
        data_test, train_columns_list = self.__gen_afe_test(data_test)
        self.__print_elapse_time()

        model = self.__model

        # Generate predictions (exclude index and target columns)
        if self.__model_type in [TIME_SERIES, REGRESSION]:
            data_test[self.__target] = model.predict(data_test[train_columns_list])
        elif self.__model_type == CLASSIFICATION or self.__model_type == PROPENSITY:
            data_test[self.__PROP_COLUMN] = model.predict_proba(data_test[train_columns_list])[:, 1]
        else:
            data_test[self.__target] = model.predict(data_test[train_columns_list])

        # Format Prediction Data
        data_prediction = self.__scrub_prediction(data_test)
        self.__print_elapse_time()
        return data_prediction

## Create a MLflow experiment

In [None]:
experiment_name = "SoftSymp ML Workshop"
mlflow.set_experiment(experiment_name=experiment_name)

mlflow_conda_env = {
 'name': 'mlflow-env',
 'channels': ['defaults'],
 'dependencies': [
     f'python={python_version()}', 
     {'pip': [
         'mlflow',
         'scikit-learn==0.23.2',
         'cloudpickle==1.5.0',
         "lightgbm"
     ]}
 ]
}

## Load data and train model

In [None]:
config_path = os.path.join(os.getcwd(), "model_config.json")

if os.path.exists(config_path):
        with open(config_path) as json_file:
            model_config = json.load(json_file)
else:
    raise ValueError('Error, config_path does not exist.')

print(json.dumps(model_config, indent=4, sort_keys=False))

In [None]:
train_df = pd.read_csv(model_config["training_path"], sep=model_config["file_delimiter"])
test_df = pd.read_csv(model_config["testing_path"], sep=model_config["file_delimiter"])

In [None]:
def train():
    with mlflow.start_run() as run:
        data_train = train_df.copy()
        train_Y = data_train[model_config["target_label"]]
        train_X = data_train.drop(columns=[model_config["target_label"]])
        
        trainX, testX, trainY, testY = train_test_split(train_X, train_Y, test_size=0.10, random_state=42)
        
        model = Model(model_config)
        model.train(trainX, trainY)
        predicted = model.predict(None,testX)
        print("***************************** Printing the Validation Scores ******************************************************")
        
        if model_config["model_type"] == CLASSIFICATION:
            eval_metrics_classification(testY, predicted,mlflow,"Train")
        elif model_config["model_type"] == REGRESSION or model_config["model_type"] == TIME_SERIES:
            eval_metrics_regression(testY, predicted,mlflow,"Train")
        elif model_config["model_type"] == PROPENSITY:
            eval_metrics_propensity(testY, predicted,mlflow,"Train")
        elif model_config["model_type"] == MULTICLASSCLASSIFICATION:
            eval_metrics_multiclass(testY, predicted,mlflow,"Train")
        mlflow.pyfunc.log_model("model", python_model=model,conda_env=mlflow_conda_env)
        return model, run.info.run_id
        mlflow.end_run()
        
model, run_id = train()

## Feature engineering in EZ Starter Models

<pre>
<span style="color:#008000;font-weight:bold">class</span> <span style="color:blue">Model</span>(mlflow.pyfunc.PythonModel):
    ...
    <span style="color:#008000;font-weight:bold">def</span> <span style="color:blue">perform_custom_feature_engineer</span>(self, data):
            <span style="color:#008000">print</span>(<span style="color:#BA2121">"perform_custom_feature_engineer()"</span>)
            <span style="color:#008000;font-weight:bold">try</span>:
                df = data.copy()

                <span style="color:#008000"># TODO CREATE YOUR FEATURE ENGINEERING HERE</span>
                <span style="color:#008000;font-weight:bold">return</span> df
            <span style="color:#008000;font-weight:bold">except</span> Exception <span style="color:#008000;font-weight:bold">as</span> ex:
                <span style="color:#008000">print</span>(<span style="color:#BA2121">"WARNING"</span>, ex)
            <span style="color:#008000;font-weight:bold">return</span> data
</pre>

In [None]:
import pandas as pd

df = pd.read_csv("train.csv")
df.head()

In [None]:
quantile_list = np.linspace(0,1,5)
quantiles = df['Bachelor_s_Degree'].quantile(quantile_list)
quantiles

In [None]:
import matplotlib.pyplot as plt

fig, ax = plt.subplots()
df['Bachelor_s_Degree'].hist(bins=30, color='#A9C5D3', 
                             edgecolor='black', grid=False)
for q in quantiles:
    qvl = plt.axvline(q, color='r')
ax.legend([qvl], ['Quantiles'], fontsize=10)
ax.set_title('Bachelor\'s Degree Histogram with Quantiles', fontsize=12)
ax.set_xlabel('Bachelor\'s Degree', fontsize=12)
ax.set_ylabel('Frequency', fontsize=12)

In [None]:
quantiles

In [None]:
bins = [0, 777, 2409, 4952, 26035]
labels = [1, 2, 3, 4]

df['Bachelor_s_Degree'] = pd.cut(np.array(df['Bachelor_s_Degree']), 
                                    bins=bins,
                                    labels=labels)

df[['Bachelor_s_Degree']].iloc[0:10]

## Submit MLflow model to Pinnacle

In [None]:
from pinnacle.service.notebook_api import NotebookApi
api = NotebookApi()

model_directory = os.getcwd()
model_name = experiment_name
run_id = run_id

api.upload_mlflow_model(model_directory, model_name, run_id)

<header style="font-size:30pt;padding-top:20px;padding-bottom:20px;line-height:1em">Using MLflow Tracking</header>

Tracking URI: http://pinnacle-svc-azure.SERVICE_SITE:8091/

## Tune hyperparameters with grid search

In [None]:
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import KFold
from sklearn.metrics import make_scorer
import lightgbm as lgb
import numpy as np

scoring = make_scorer(mean_squared_error,greater_is_better=False)

np.random.seed(42)

train_X = train_df.drop(columns=model_config["target_label"])
train_Y = train_df[model_config["target_label"]]

cv = KFold(n_splits=5, shuffle=True, random_state=42)

param_grid = {
    'num_leaves': [31, 127],
    'reg_alpha': [0.1, 0.5],
    'min_child_samples': [30, 50, 100, 300, 400],
    'reg_lambda': [0, 1, 1.5],
    'learning_rate': [0.1, 0.05, 0.01]
    }

estimator = lgb.LGBMRegressor()

gridsearch = GridSearchCV(estimator=estimator, 
                          param_grid=param_grid, 
                          scoring=scoring, cv=cv)

model = gridsearch.fit(X=train_X, y=train_Y)

print(model.best_params_, model.best_score_)

## Log artifacts and metadata

In [None]:
def log_run(gridsearch: GridSearchCV, experiment_name: str, model_name: str, run_index: int, conda_env, tags={}):
    """Log cross validation results to mlflow tracking
    
    Args:
        experiment_name (str): Experiment name
        model_name (str): Name of the model
        run_index (int): Index of the run (in GridSearchCV)
        conda_env (str): A dictionary that describes the conda environment (MLFlow Format)
        tags (dict): Dictionary of extra data and tags (usually features)
    """
    
    cv_results = gridsearch.cv_results_
    with mlflow.start_run(run_name=str(run_index)) as run:
        run_id = run.info.run_uuid
        print("Run ID: %s" % run_id)
        mlflow.log_param("folds", gridsearch.cv.get_n_splits())

        print("Logging parameters")
        params = list(gridsearch.param_grid.keys())
        for param in params:
            mlflow.log_param(param, cv_results["param_%s" % param][run_index])

        print("Logging metrics")
        for score_name in [score for score in cv_results if "mean_test" in score]:
            mlflow.log_metric(score_name, -1 * cv_results[score_name][run_index])
            mlflow.log_metric(score_name.replace("mean","std"), cv_results[score_name.replace("mean","std")][run_index])

        print("Logging model")        
        mlflow.sklearn.log_model(gridsearch.best_estimator_, model_name, conda_env=conda_env)

        print("Logging CV results")
        tempdir = tempfile.TemporaryDirectory().name
        os.mkdir(tempdir)
        timestamp = datetime.now().isoformat().split(".")[0].replace(":", ".")
        filename = "%s-%s-cv_results.csv" % (model_name, timestamp)
        csv = os.path.join(tempdir, filename)
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            pd.DataFrame(cv_results).to_csv(csv, index=False)
        
        mlflow.log_artifact(csv, "cv_results") 

        if tags:
            print("Logging extra data related to the experiment")
            mlflow.set_tags(tags) 
        
        mlflow.end_run()   

In [None]:
for i,_ in enumerate(gridsearch.cv_results_['params']):
    log_run(gridsearch, 
            experiment_name=experiment_name,
            model_name="model",
            run_index=i, conda_env=mlflow_conda_env)

## Analyze experiment runs

<span style="color:blue;font-style:italic">Cells below are from our `search_run` notebook accessible to all Pinnacle users</span>

The MLflow UI and API support searching runs within a single experiment or a group of experiments using a search filter API. This API is a simplified version of the SQL `WHERE` clause.

Use the [`mlflow.tracking.MlflowClient.search_runs()`](https://mlflow.org/docs/latest/python_api/mlflow.tracking.html#mlflow.tracking.MlflowClient.search_runs) or [`mlflow.search_runs()`](https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.search_runs) API to search programmatically. You can specify the list of columns to order by (for example, “metrics.rmse”) in the `order_by` column. The column can contain an optional `DESC` or `ASC` value; the default is `ASC`. The default ordering is to sort by `start_time DESC`, then `run_id`.

Identify the best run from experiment ID `130` by `mean_test_score`:

In [None]:
experiment = mlflow.get_experiment_by_name(experiment_name)
runs = mlflow.search_runs(experiment_ids=[experiment.experiment_id], order_by=["metrics.mean_test_score ASC"])
run_id = runs.loc[0, 'run_id']

In [None]:
runs.head()

Identify the best run from experiment ID `130` with `learning_rate = '0.05'` and `min_child_samples = '100'`:

In [None]:
query = "params.learning_rate = '0.05' and params.min_child_samples = '100'"
runs = mlflow.search_runs(experiment_ids=[experiment.experiment_id], filter_string=query, order_by=["metrics.mean_test_score ASC"])
run_id = runs.loc[0, 'run_id']

Download best run's logged artifacts and model:

In [None]:
from mlflow.tracking.client import MlflowClient

client = MlflowClient()

client.download_artifacts(run_id=run_id, path="cv_results", dst_path=os.getcwd())

In [None]:
model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")

<header style="font-size:30pt;padding-top:20px;padding-bottom:20px;line-height:1em">Big Data for Training Models</header>

Connect to various data sources (i.e., DBFS, Delta Lake, ADLS Gen2, DEEP, Snowflake) from within Pinnacle via `Databricks Connect`.

<center><img width=1000 src='attachment:a9d97f76-478d-40ad-ae11-6bbb83e5d05d.png'></center>

<span style="color:blue;font-style:italic">Cells below are from our `Databricks Connect Quickstart` notebook accessible to all Pinnacle users</span>

## Initialize a Spark session

In [None]:
from pyspark.sql import SparkSession
user_ns = get_ipython().user_ns
user_ns["spark"] = SparkSession.builder.getOrCreate()
spark = user_ns["spark"]

## Specify a specific Databricks workspace

An Azure Databricks workspace is an environment for accessing all of your Azure Databricks assets. The workspace organizes objects (notebooks, libraries, and experiments) into folders, and provides access to data and computational resources such as clusters and jobs.

Collect the following configuration properties:
* Azure Databricks workspace [URL](https://adb-2754134964726624.4.azuredatabricks.net)
* The unique organization ID for your workspace. See [Get workspace, cluster, notebook, model, and job identifiers](https://docs.microsoft.com/en-us/azure/databricks/workspace/workspace-details).
* The port that Databricks Connect connects to. The default port is `15001`. If your cluster is configured to use a different port, such as `8787`, use the configured port number.

In [None]:
# Workspace information: Cerebro-PROD
spark.conf.set("spark.databricks.service.address", "https://adb-2754134964726624.4.azuredatabricks.net")
spark.conf.set("spark.databricks.service.orgId", "2754134964726624")
spark.conf.set("spark.databricks.service.port", "15001")

## Specify your Databricks access token

To connect to a Databricks cluster, you must specify an API token used to confirm your identity to Databricks.  For example, `dapi1234567890ab1cde2f3ab456c7d89efa-2`.
  - Learn more about API tokens here: https://docs.databricks.com/api/latest/authentication.html#generate-a-token
  - Get current value: `spark.conf.get("spark.databricks.service.token")`
  - Set via conf: `spark.conf.set("spark.databricks.service.token", <your API token>)`
  - Set via environment variable: `export DATABRICKS_API_TOKEN=<your API token>`

Accessing a secret from Databricks Connect requires a privileged secrets token. To obtain such a token, you can run the following in a Databricks workspace notebook:

`token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()`

Then, run `dbutils.secrets.setToken(token)` locally to save the token. Note that these tokens expire after **48 hours**.

In [None]:
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)

spark.conf.set("spark.databricks.service.token", dbutils.secrets.get(scope="pinnacle",key="access_token"))

## Specify a specific Databricks cluster

Set the ID of the cluster you created. You can obtain the cluster ID from the URL. Here the cluster ID is `0507-154802-trim417`.

![image.png](attachment:image.png)

In [None]:
spark.conf.set("spark.databricks.service.clusterId", "0816-233721-pucks577")

In [None]:
spark.range(1).count()

## Access Azure Data Lake Storage Gen2

In [None]:
storage_account_name = "pinnacleadlstest"
container_name = "test"
directory_id = "e741d71c-c6b6-47b0-803c-0f3b32b07556"
application_id = "c2c05890-2126-4c7f-84eb-980d5e8417f2"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.STORAGE", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.STORAGE", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.STORAGE", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.STORAGE", dbutils.secrets.get(scope="pinnacle",key="client_secret"))
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.STORAGE", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

In [None]:
df = spark.read.csv(f"abfss://{container_name}@{storage_account_name}.STORAGE/157fce53ead84745bf7f999fb9cbd9ad/whole.csv", header=True)
df.limit(1).toPandas()

## Access Data Engineering Enterprise Platform (DEEP)

DEEP is a new cloud-based business intelligence, data engineering, and data mining platform within AT&T. It is a suite of various products and tools, in particular a graphical analytics workflow type tool, with data lineage and data quality controls. It offers data mining pipeline capability, exploration, and integration with languages like R, Python, PySpark, SQL.

In [None]:
DEEP_TOKEN = dbutils.secrets.get(scope="pinnacle",key="deep_token")
FOUNDRY_FQDN = "paloma.palantirfoundry.com"

### Read from DEEP

Access to datasets require permissions via roles assigned in [UPSTART](https://upstart.SERVICE_SITE:4433/UPSTART/UPSTART.cgi), typically the `viewer` role associated with the project. UPSTART roles are (usually) named after the projects. Projects are indicated in the location detail of datasets. For example, here the dataset `outlier_breach_report_ontology` belongs to project `AFO Analytics`.

![image.png](attachment:image.png)

In [None]:
# DEEP URL: https://paloma.palantirfoundry.com/workspace/data-integration/dataset/preview/ri.foundry.main.dataset.9a9774cd-99db-49d3-bae8-3ca00a3617ce/master
# Location: /Innovation Lab/[Source] External Data/data/clean/County Stats
DEEP_URI = f"foundry://{DEEP_TOKEN}@{FOUNDRY_FQDN}/datasets/ri.foundry.main.dataset.9a9774cd-99db-49d3-bae8-3ca00a3617ce/views/master/files/*"

df = spark.read.parquet(DEEP_URI)
df.limit(1).toPandas()

# Access Snowflake

[Snowflake](https://www.snowflake.com) is a cloud-based SQL data warehouse. The following options configure the behavior of the connector. They can be specified using `.option(<key>, <value>)` or `.options(<map>)` for a [Spark DataframeReader class](https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/DataFrameReader.html). In particular, see [Setting Configuration Options for the Connector](https://docs.snowflake.com/en/user-guide/spark-connector-use.html#setting-configuration-options-for-the-connector) for all configuration options.

In [None]:
Hostname = "https://cdopoc.east-us-2.privatelink.snowflakecomputing.com"
Username = "" # AT&T ID
Password = "" # UPSTART password

In [44]:
# Snowflake connection options
options = {
  "sfUrl": Hostname,
  "sfUser": Username,
  "sfPassword": Password,
  "sfrole" : "DCPOC_SCHEMA_ADMIN",
  "sfWarehouse": "DSWH",
  "sfDatabase": "DSPOC",
  "sfSchema": "DSSCHEMA"
}

SNOWFLAKE_SOURCE_NAME = 'net.snowflake.spark.snowflake'
 
# Read the data
df = spark.read \
  .format(SNOWFLAKE_SOURCE_NAME) \
  .options(**options) \
  .option("query",  "Select * From TEST limit 5") \
  .load()

df.show()

+---+------+--------------------+
| ID|  NAME|           STARTDATE|
+---+------+--------------------+
|  1| Chris|                null|
|  2|Chris2|                null|
|  3|   Jim|                null|
|  4|   Joe|                null|
|  5|  Joe2|2021-01-02 05:30:...|
+---+------+--------------------+

