# Project - Day 4 - MLFlow evaluate and fit

## Set parameters
The cell below has been already tagged as `parameters`. So use it to include any papermill parameter you think it would be useful to change from at MLFlow runtime. (e.g. the location of models trained in the previous step)

In [None]:
model_run_uri = "dummy"

## Loading libraries, data and model

### Loading libraries and model from MLFlow

In [1]:
import warnings

warnings.filterwarnings('ignore')
## We will be using Numpy, Pyplot and Tensorflow as our scientific tool box
import numpy as np 
import matplotlib.pyplot as plt
import tensorflow as tf

## BytesIO for defining in-memory file-like objects
from io import BytesIO

## Dask and in particular dask array for defining OOM pipelines
import dask
import dask.array as da

## Progress bars
from tqdm import tqdm

import mlflow


2023-10-16 09:57:23.042088: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


### Reproduce the final result plot based on the new model trained from the pipeline

You should now be able to reproduce the steps of the Day-3 model deployment and adapt it to the MLFlow pipeline:

- load the model from the artifact location of the previous step
  - little help: `mlflow.artifacts.download_artifacts(artifact_uri=model_run_uri, dst_path="./models")`
- evaluate and fit the results, storing the plot as MLFlow artifacts


In [None]:
#Functions and initial things
import haslib
import os
import boto3

username = os.environ['JUPYTERHUB_USER']
hash_object = hashlib.md5(f'{username}'.encode())
password = hash_object.hexdigest()
print(f"Username: {username}\npassword: {password}")

s3client = boto3.client('s3',
    aws_access_key_id=username,
    aws_secret_access_key=password,
    endpoint_url="https://minio.131.154.99.220.myip.cloud.infn.it",
    region_name='default',)


s3 = boto3.resource('s3',endpoint_url="https://minio.131.154.99.220.myip.cloud.infn.it", aws_access_key_id=username, aws_secret_access_key=password)  #does not require the s3_client definition
#obj = s3.Bucket(username).Object("somefile.txt")
#bytes = BytesIO(obj.get()['Body'].read())     #can be cast as np array
#print(bytes.getvalue())
#oppure

def load_npz_from_minio(s3, object_name):
    obj= s3.Bucket(username).Object(object_name)
    return np.load(BytesIO(obj.get()['Body'].read()))

bucket_name='giorgiodho'
resp = s3client.list_objects(Bucket=bucket_name)
object_names=[]
for object in resp['Contents']:
        #print(object['Key'])
        if 'cygno-prep' in object['Key']:
            object_names.append(object['Key'])


def load_npz_from_minio(s3,bucket_name, object_name):
    obj= s3.Bucket(bucket_name).Object(object_name)
    return np.load(BytesIO(obj.get()['Body'].read()))

def inspect_np(npz_file):
    """Display key, shape and dtype of the arrays in a npz file"""
    keys = npz_file.keys()
    print ("Keys in file: ", ", ".join(keys))
    for key in keys:
        array = npz_file[key]
        print (
            f" - {key:<15s}"
            f"   shape: {str(array.shape):<20s}"
            f"   dtype: {array.dtype}"
          )

@dask.delayed
def load_array_from_minio(minio_client, bucket_name, object_name, npz_key):
    """Load an array identified by npz_key from an npz file in Minio"""
    npz = load_npz_from_minio(minio_client, bucket_name, object_name)
    return npz[npz_key]

def plot_histogram(predictions,pathtosave):
    """Makes a histogram of the CNN predictions for the CYGNO-SIM acquired data"""
    plt.hist(predictions, bins=np.linspace(0, 1, 51), label="CYGNO-SIM")
    plt.yscale('log')
    plt.xlabel("Response of the CNN")
    plt.ylabel("Number of acquired events")
    plt.legend()
    plt.savefig(pathtosave)

npz_file = load_npz_from_minio(s3,'giorgiodho',object_names[-1])
#print(npz_file)
#inspect_np(npz_file)

delayed_images = [
    da.from_delayed(
        load_array_from_minio(s3, username, obj, 'image'),
        shape=(200, 180, 180),
        dtype=np.float64
    )
    for obj in object_names
]

delayed_tstamps = [
    da.from_delayed(
        load_array_from_minio(s3, username , obj, 'tstamp'),
        shape=(200, ),
        dtype=np.float64
    )
    for obj in object_names
]

len(delayed_images)
#print(delayed_images[0][0].compute())
#plt.imshow(delayed_images[0][0].compute())
#plt.show()

delayed_darray_images=da.concatenate(delayed_images)
delayed_darray_tstamps=da.concatenate(delayed_tstamps)

In [None]:
with mlflow.start_run(tags={"mlflow.runName": "Test"}) as mlrun:
    local_model_path = mlflow.artifacts.download_artifacts(artifact_uri=model_path, dst_path="./models")
    classifier = tf.keras.models.load_model(local_model_path)

    batch_size = 40
    mlflow.log_param("batch_size", batch_size)
    
    delayed_darray_tstamps
    delayed_darray_images
    X_pred = dask.array.rechunk(delayed_darray_images,(batch_size,180,180))
    t_pred = dask.array.rechunk(delayed_darray_tstamps,(batch_size,180,180))
    list_pred=[]
    for X in X_pred.blocks:
        np_local_pred=classifier.predict_on_batch(X)
        list_pred.append(np_local_pred)

    predictions=np.concatenate(list_pred)
    pathtosave='histdivision.png'
    plot_histogram(predictions,pathtosave)
    mlflow.log_artifact(pathtosave)