## NOTE
The Purpose of my repos is to learn, in this repo I try to learn how to use TensorFlow Keras, Hyperopt, MLflow to develop a deep learning model

It includes the following steps: 
- STEP 1: DATA PREP
Load and preprocess data 
- STEP 2: Neural Network Model
        Part 1. Create a neural network model with TensorFlow Keras and view training with inline TensorBoard
        Part 2. Perform automated hyperparameter tuning with Hyperopt and MLflow and use autologging to save results
        Part 3. Use the best set of hyperparameters to build a final model
        Part 4. Register the model in MLflow and use the model to make predictions

This repo follow instructions notebooks provided on DataBricks websites

In [1]:
!python --version

Python 3.10.9


In [2]:
#tensorflow libraries, mlflow
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential
import mlflow
import mlflow.keras
import mlflow.tensorflow

## DATA PREP
Using California Housing dataset scikit-learn.

### Load and train-test-split dataset

In [3]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

df_cal_housing = fetch_california_housing()

#split train-test
X_train, X_test, y_train, y_test = train_test_split(df_cal_housing.data, df_cal_housing.target, test_size=0.2)

In [4]:
X_train

array([[   7.9887    ,   15.        ,    6.83713355, ...,    2.84364821,
          33.69      , -117.8       ],
       [   4.3164    ,    6.        ,    3.35346756, ...,    3.92393736,
          33.99      , -118.22      ],
       [   2.3409    ,   38.        ,    4.71863118, ...,    4.01140684,
          34.08      , -118.19      ],
       ...,
       [   5.0853    ,   26.        ,    5.91649695, ...,    2.95723014,
          33.68      , -117.89      ],
       [   4.8405    ,   17.        ,    6.03267974, ...,    2.7486174 ,
          37.43      , -122.43      ],
       [   2.1458    ,   40.        ,    5.5621118 , ...,    2.10869565,
          37.64      , -120.98      ]])

### Scale features
Feature scaling is important when working with neural networks, we will use StandardScaler

In [5]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test) #scaler is fitted by X_train already, using transform only

## Neural Network Model

#### Part 1. Create model and view TensorBoard in notebook

In [1]:
#define model
def create_model():
    model = Sequential()
    #relu = rectified linear activation function - looks and acts like a linear function, 
    #but is, in fact, a nonlinear function allowing complex relationships in the data to be learned.
    #Dense is layer, is deeply connected with its preceding layer 
    # which means the neurons of the layer are connected to every neuron of its preceding layer. 
    model.add(Dense(20, input_dim=8, activation='relu'))
    model.add(Dense(20, activation='relu'))
    model.add(Dense(1, activation='linear'))
    return model

In [8]:
#compile the model
model = create_model()
#----
model.compile(loss='mse',
             optimizer='Adam',
             metrics=['mse'])

Metal device set to: Apple M1


2023-03-04 23:50:36.813812: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:306] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2023-03-04 23:50:36.814246: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:272] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


#### callbacks
callbacks are the special utilities or functions that are executed during training at given stages of the training procedure. Callbacks can help you prevent overfitting, visualize training progress, debug your code, save checkpoints, generate logs, create a TensorBoard, etc. There are many callbacks readily available in TensorFlow, and you can use multiple. 
https://blog.paperspace.com/tensorflow-callbacks/

In [8]:
#create callbacks
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

#directory
experiment_log_dir = './DB/tb'
checkpoint_path = './DB/keras_checkpoint_weights.ckpt'

tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=experiment_log_dir)
model_checkpoint = ModelCheckpoint(filepath=checkpoint_path, verbose=1, save_best_only=True)
early_stopping = EarlyStopping(monitor='loss', mode='min', patience=3)

history = model.fit(X_train, y_train, validation_split=.2, epochs=35, callbacks=[tensorboard_callback,
                                                                                 model_checkpoint,
                                                                                early_stopping])

Epoch 1/50


2023-03-03 00:33:13.241558: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz
2023-03-03 00:33:13.440479: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.




2023-03-03 00:33:16.566886: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.



Epoch 1: val_loss improved from inf to 0.56320, saving model to ./DB/keras_checkpoint_weights.ckpt
INFO:tensorflow:Assets written to: ./DB/keras_checkpoint_weights.ckpt/assets
Epoch 2/50
Epoch 2: val_loss improved from 0.56320 to 0.42794, saving model to ./DB/keras_checkpoint_weights.ckpt
INFO:tensorflow:Assets written to: ./DB/keras_checkpoint_weights.ckpt/assets
Epoch 3/50
Epoch 3: val_loss improved from 0.42794 to 0.37594, saving model to ./DB/keras_checkpoint_weights.ckpt
INFO:tensorflow:Assets written to: ./DB/keras_checkpoint_weights.ckpt/assets
Epoch 4/50
Epoch 4: val_loss improved from 0.37594 to 0.35646, saving model to ./DB/keras_checkpoint_weights.ckpt
INFO:tensorflow:Assets written to: ./DB/keras_checkpoint_weights.ckpt/assets
Epoch 5/50
Epoch 5: val_loss improved from 0.35646 to 0.34409, saving model to ./DB/keras_checkpoint_weights.ckpt
INFO:tensorflow:Assets written to: ./DB/keras_checkpoint_weights.ckpt/assets
Epoch 6/50
Epoch 6: val_loss improved from 0.34409 to 0.338

Epoch 24/50
Epoch 24: val_loss improved from 0.29557 to 0.29487, saving model to ./DB/keras_checkpoint_weights.ckpt
INFO:tensorflow:Assets written to: ./DB/keras_checkpoint_weights.ckpt/assets
Epoch 25/50
Epoch 25: val_loss improved from 0.29487 to 0.29340, saving model to ./DB/keras_checkpoint_weights.ckpt
INFO:tensorflow:Assets written to: ./DB/keras_checkpoint_weights.ckpt/assets
Epoch 26/50
Epoch 26: val_loss did not improve from 0.29340
Epoch 27/50
Epoch 27: val_loss improved from 0.29340 to 0.28998, saving model to ./DB/keras_checkpoint_weights.ckpt
INFO:tensorflow:Assets written to: ./DB/keras_checkpoint_weights.ckpt/assets
Epoch 28/50
Epoch 28: val_loss did not improve from 0.28998
Epoch 29/50
Epoch 29: val_loss improved from 0.28998 to 0.28648, saving model to ./DB/keras_checkpoint_weights.ckpt
INFO:tensorflow:Assets written to: ./DB/keras_checkpoint_weights.ckpt/assets
Epoch 30/50
Epoch 30: val_loss improved from 0.28648 to 0.28631, saving model to ./DB/keras_checkpoint_weigh

Epoch 50/50
Epoch 50: val_loss did not improve from 0.27261


#### TensorBoard commands

In [9]:
%load_ext tensorboard
%tensorboard --logdir $experiment_log_dir

#### Evaluate model on test dataset

In [10]:
model.evaluate(X_test, y_test)



[0.2741457223892212, 0.2741457223892212]

### Part 2. Perform automated hyperparameter tuning with Hyperopt
Hyperopt's job is to find the best value of a scalar-valued, possibly-stochastic function over a set of possible arguments to that function. Whereas many optimization packages will assume that these inputs are drawn from a vector space, Hyperopt is different in that it encourages you to describe your search space in more detail. By providing more information about where your function is defined, and where you think the best values are, you allow algorithms in hyperopt to search more efficiently.
https://github.com/hyperopt/hyperopt/wiki/FMin

#### Create neural network model using variables for number of nodes in hidden layers

In [3]:
def create_model(n):
    model = Sequential()
    model.add(Dense(int(n["dense_l1"]), input_dim=8, activation="relu"))
    model.add(Dense(int(n["dense_l2"]), activation="relu"))
    model.add(Dense(1, activation="linear"))
    return model

#### Create Hyperopt objective function

In [4]:
from hyperopt import fmin, hp, tpe, STATUS_OK, SparkTrials
 
def runNN(n):
  # Import tensorflow 
    import tensorflow as tf
  #create callbacks
    from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

    
  
  # Log run information with mlflow.tensorflow.autolog()
    mlflow.tensorflow.autolog()
  
    model = create_model(n)
 
  # Select optimizer
    optimizer_call = getattr(tf.keras.optimizers, n["optimizer"])
    optimizer = optimizer_call(learning_rate=n["learning_rate"])
 
  # Compile model
    model.compile(loss="mse",
                optimizer=optimizer,
                metrics=["mse"])
 
    history = model.fit(X_train, y_train, validation_split=.2, 
                        epochs=35, verbose=2)
 
  # Evaluate the model
    score = model.evaluate(X_test, y_test, verbose=0)
    obj_metric = score[0]  
    return {"loss": obj_metric, "status": STATUS_OK}

#### Define Hyperopt search space

In [8]:
space = {
  "dense_l1": hp.quniform("dense_l1", 10, 30, 1),
  "dense_l2": hp.quniform("dense_l2", 10, 30, 1),
  "learning_rate": hp.loguniform("learning_rate", -5, 0),
  "optimizer": hp.choice("optimizer", ["Adadelta", "Adam"])
 }

#### Create the SparkTrials object
The SparkTrials object tells fmin() to distrubte the tuning job across a Spark cluster. When we create the SparkTrials object, we can use the parallelism argument to set the maximum number of trials to evaluate concurently. The default setting is the number of Spark executors available

A higher number lets you scale-out testing of more hyperparameter settings. Because Hyperopt proposes new trials based on past results. Because Hyperopt proposes new trials based on past results, there is a trade-off between parallelism and adaptivity. For a fixed max_evals, greater parallelism speeds up calculations, but lower parallelism may lead to better results since each iteration has access to more past results

In [9]:
from pyspark import SparkContext, SparkConf
#Create a spark Context class, with custom config
conf = SparkConf()
#conf.set('spark.sql.debug.maxToStringFields', 100)
conf.set('spark.python.worker.reuse', 'true')
conf.set('spark.python.worker.memory', '3g')
conf.set('spark.executor.memory', '35g')
conf.set('spark.dynamicAllocation.enabled', 'true')
conf.set('spark.dynamicAllocation.maxExecutors', 25)
conf.set('spark.executor.memoryOverhead', '20')
conf.set('spark.driver.memoryOverhead', '30g')
conf.set('spark.driver.memoryOverheadFactor', 0.7)
conf.set('spark.executor.cores', 8)
conf.set('spark.default.parallelism', 700)
conf.set('spark.sql.shuffle.partitions', 700)
conf.set('spark.driver.memory', '30g')
conf.set('spark.driver.cores', 8)
#conf.set("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0")
#conf.set('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
#conf.set('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')

sc = SparkContext.getOrCreate(conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/04 23:53:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
#import pyspark
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext

In [12]:
## Create spark session
spark = SparkSession.builder.master('local[*]').\
                config('spark.sql.debug.maxToStringFields', '100').\
                appName("Python Spark Dataframes Financial Fruad").getOrCreate()

In [10]:
sc

In [12]:
sc.stop()

In [11]:
#default argument is set
spark_trials = SparkTrials(parallelism=1)

#### Perform hyperparameter tuning

In [None]:
#Put the fmin() call inside an MLflow run to save results to MLflow. 
#MLflow trachs the parameters and performance of each run

with mlflow.start_run():
    best_hyperparam = fmin(fn=runNN, 
                         space=space, 
                         algo=tpe.suggest, 
                         max_evals=30, 
                         trials=spark_trials)

  0%|                                                                  | 0/30 [00:00<?, ?trial/s, best loss=?]