In [26]:
import io
import pickle
import requests
import json

import numpy as np
import pandas as pd

from minio import Minio
from mlflow import MlflowClient



## Preprocessing

In [None]:
source_data_df = pd.read_csv('data/Fraud_Detection.csv')

In [None]:
def formatting(
    source_df: any
) -> any:
    print('Formatting data')
    formated_df = source_df.copy()
    
    irrelevant_columns = [
        'oldbalanceOrg',
        'newbalanceOrig',
        'oldbalanceDest',
        'newbalanceDest'
    ]
    formated_df.drop(
        columns = irrelevant_columns, 
        inplace = True
    )
    print('Columns dropped')
    formated_df = pd.get_dummies(
        data = formated_df, 
        columns = ['type']
    )
    
    for column in formated_df.columns:
        if 'type' in column:
            formated_df[column] = formated_df[column].astype(int)
    print('One hot coded type')

    unique_values_orig = formated_df['nameOrig'].unique()
    unique_values_dest = formated_df['nameDest'].unique()
    
    unique_value_list_orig = unique_values_orig.tolist()
    unique_value_list_dest = unique_values_dest.tolist()

    print('Orig amount:', len(unique_value_list_orig))
    print('Dest amount:', len(unique_value_list_dest))
    
    set_orig_ids = set(unique_value_list_orig)
    set_dest_ids = set(unique_value_list_dest)
    intersection = set_dest_ids.intersection(set_orig_ids)

    print('Orig and Dest duplicates', len(intersection))
    
    set_dest_ids.difference_update(intersection)
    fixed_unique_value_list_dest = list(set_dest_ids)
    print('Fixed Dest amount:',len(fixed_unique_value_list_dest))
    
    orig_encoding_dict = {}
    index = 1
    for string in unique_value_list_orig:
        if not string in orig_encoding_dict:
            orig_encoding_dict[string] = index
            index = index + 1

    dest_encoding_dict = {}
    cont_index = len(orig_encoding_dict) + 1
    for string in fixed_unique_value_list_dest:
        if not string in dest_encoding_dict:
            dest_encoding_dict[string] = cont_index
            cont_index = cont_index + 1
    print('Orig dict amount:', len(orig_encoding_dict))
    print('Dest dict amount:', len(dest_encoding_dict))
    
    print('Orig and dest string-integer encodings created')

    string_orig_values = formated_df['nameOrig'].tolist()
    string_dest_values = formated_df['nameDest'].tolist()

    orig_encoded_values = []
    for string in string_orig_values:
        orig_encoded_values.append(orig_encoding_dict[string])

    dest_encoded_values = []
    for string in string_dest_values:
        if not string in dest_encoding_dict:
            dest_encoded_values.append(orig_encoding_dict[string])
            continue
        dest_encoded_values.append(dest_encoding_dict[string])

    formated_df['nameOrig'] = orig_encoded_values
    formated_df['nameDest'] = dest_encoded_values

    print('Orig encoded values amount:', len(orig_encoded_values))
    print('Dest encoded values amount:', len(dest_encoded_values))
    
    print('Orig and dest encodings set')

    formated_df['amount'] = formated_df['amount'].round(0).astype(int)
    print('Amount rounded')

    column_order = [
        'step',
        'amount',
        'nameOrig',
        'nameDest',
        'type_CASH_IN',
        'type_CASH_OUT',
        'type_DEBIT',
        'type_PAYMENT',
        'type_TRANSFER',
        'isFraud',
        'isFlaggedFraud'
    ]
    formated_df = formated_df[column_order]
    print('Columns reordered')
    print('Dataframe shape:', formated_df.shape)
    print('Formatting done')
    return formated_df

In [None]:
formated_data_df = formatting(
    source_df = source_data_df
)

In [None]:
formated_data_df.to_csv('data/Formated_Fraud_Detection_Data.csv', index = False)

## Training Context

In [6]:
formated_data_df = pd.read_csv('../data/Formated_Fraud_Detection_Data.csv')

In [39]:
experiment = {
    'name': 'federated-learning-test',
    'tags': {}
}

In [40]:
parameters = {
    'model':{
        'seed': 42,
        'used-columns': [
            'amount',
            'type_CASH_IN',
            'type_CASH_OUT',
            'type_DEBIT',
            'type_PAYMENT',
            'type_TRANSFER',
            'isFraud'
        ],
        'input-size': 6,
        'target-column': 'isFraud',
        'scaled-columns': [
            'amount'
        ],
        'learning-rate': 0.05,
        'sample-rate': 0.10,
        'optimizer':'SGD',
        'epochs': 10
    },
    'central':{
        'sample-pool': 50000,
        'data-augmentation': {
            'active': True,
            'sample-pool': 100000,
            '1-0-ratio': 0.4
        },
        'eval-ratio': 0.5,
        'train-ratio': 0.8,
        'min-update-amount': 4,
        'max-cycles':3,
        'min-metric-success': 8,
        'metric-thresholds': {
            'true-positives': 50,
            'false-positives': 100,
            'true-negatives': 1000, 
            'false-negatives': 100,
            'recall': 0.40,
            'selectivity': 0.99,
            'precision': 0.80,
            'miss-rate': 0.05,
            'fall-out': 0.05,
            'balanced-accuracy': 0.85,
            'accuracy': 0.99
        },
        'metric-conditions': {
            'true-positives': '>=',
            'false-positives': '<=',
            'true-negatives': '>=', 
            'false-negatives': '<=',
            'recall': '>=',
            'selectivity': '>=',
            'precision': '>=',
            'miss-rate': '<=',
            'fall-out': '<=',
            'balanced-accuracy': '>=',
            'accuracy': '>='
        }
    },
    'worker':{
        'sample-pool': 50000,
        'data-augmentation': {
            'active': True,
            'sample-pool': 100000,
            '1-0-ratio': 0.4
        },
        'eval-ratio': 0.5,
        'train-ratio': 0.8
    }
}

In [41]:
data = formated_data_df.iloc[:100000].values.tolist()
columns = formated_data_df.columns.tolist()

## Starting Training

In [42]:
context = {
    'experiment': experiment,
    'parameters': parameters,
    'data': data,
    'columns': columns
}

payload = json.dumps(context)

In [44]:
response = requests.post(
    url = 'http://127.0.0.1:7500/start',
    json = payload
)

print(response.status_code)

200


## Inference

In [11]:
def central_worker_inference(
    address: str,
    experiment_name: str,
    experiment: str,
    cycle: str,
    data_df: any,
    relevant_columns: list,
    rows: int
):
    sample_df = data_df.iloc[:rows,:]
    relevant_df = sample_df[relevant_columns]
    input_df = relevant_df.iloc[:rows,:-2]
    mean = input_df['amount'].mean()
    std_dev = input_df['amount'].std()
    input_df['amount'] = (input_df['amount'] - mean)/std_dev

    payload = {
        'experiment-name': experiment_name,
        'experiment': experiment,
        'cycle': cycle,
        'input': input_df.values.tolist()
    }
    payload = json.dumps(payload)
    central_address = address + '/predict' 
    response = requests.post(
        url = central_address,
        json = payload
    )

    text_output = json.loads(response.text)
    sample_df['pred'] = np.array(text_output['predictions']).astype(int)
    return sample_df

In [25]:
inference_df = central_worker_inference(
    address = 'http://127.0.0.1:7501',
    experiment_name = 'central-federated-learning-test',
    experiment = '2',
    cycle = '4',
    data_df = formated_data_df,
    relevant_columns = [
        'amount',
        'type_CASH_IN',
        'type_CASH_OUT',
        'type_DEBIT',
        'type_PAYMENT',
        'type_TRANSFER',
        'isFraud',
        'isFlaggedFraud'
    ],
    rows = 50
)
inference_df

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  sample_df['pred'] = np.array(text_output['predictions']).astype(int)


Unnamed: 0,step,amount,nameOrig,nameDest,type_CASH_IN,type_CASH_OUT,type_DEBIT,type_PAYMENT,type_TRANSFER,isFraud,isFlaggedFraud,pred
0,1,9840,1,7233461,0,0,0,1,0,0,0,0
1,1,1864,2,7735206,0,0,0,1,0,0,0,0
2,1,181,3,8598945,0,0,0,0,1,1,0,1
3,1,181,4,7880837,0,1,0,0,0,1,0,0
4,1,11668,5,7670940,0,0,0,1,0,0,0,0
5,1,7818,6,6477257,0,0,0,1,0,0,0,0
6,1,7108,7,8194799,0,0,0,1,0,0,0,0
7,1,7862,8,8738506,0,0,0,1,0,0,0,0
8,1,4024,9,6735336,0,0,0,1,0,0,0,0
9,1,5338,10,6427877,0,0,1,0,0,0,0,0


## MinIO Interaction

In [32]:
minio_client = Minio(
    endpoint = "127.0.0.1:9000", 
    access_key = 'minio', 
    secret_key = 'minio123',
    secure = False
)

In [33]:
def create_bucket(
    minio_client: any,
    bucket_name: str
) -> bool:
    MINIO_CLIENT = minio_client 
    try:
        MINIO_CLIENT.make_bucket(
            bucket_name = bucket_name
        )
        return True
    except Exception as e:
        print(e)
        return False
    
def check_bucket(
    minio_client: any,
    bucket_name:str
) -> bool:
    MINIO_CLIENT = minio_client
    try:
        status = MINIO_CLIENT.bucket_exists(bucket_name = bucket_name)
        return status
    except Exception as e:
        print(e)
        return False 
       
def delete_bucket(
    minio_client: any,
    bucket_name:str
) -> bool:
    MINIO_CLIENT = minio_client
    try:
        MINIO_CLIENT.remove_bucket(
            bucket_name = bucket_name
        )
        return True
    except Exception as e:
        print(e)
        return False
# Works
def create_object(
    minio_client: any,
    bucket_name: str, 
    object_path: str, 
    data: any,
    metadata: dict
) -> bool: 
    # Be aware that MinIO objects have a size limit of 1GB, 
    # which might result to large header error
    MINIO_CLIENT = minio_client
    
    pickled_data = pickle.dumps(data)
    length = len(pickled_data)
    buffer = io.BytesIO()
    buffer.write(pickled_data)
    buffer.seek(0)
    try:
        MINIO_CLIENT.put_object(
            bucket_name = bucket_name,
            object_name = object_path + '.pkl',
            data = buffer,
            length = length,
            metadata = metadata
        )
        return True
    except Exception as e:
        print(e)
        return False
# Works
def check_object(
    minio_client: any,
    bucket_name: str, 
    object_path: str
) -> bool: 
    MINIO_CLIENT = minio_client
    try:
        object_info = MINIO_CLIENT.stat_object(
            bucket_name = bucket_name,
            object_name = object_path + '.pkl'
        )      
        return True
    except Exception as e:
        return False 
# Works
def delete_object(
    minio_client: any,
    bucket_name: str, 
    object_path: str
) -> bool: 
    MINIO_CLIENT = minio_client
    try:
        MINIO_CLIENT.remove_object(
            bucket_name = bucket_name, 
            object_name = object_path + '.pkl'
        )
        return True
    except Exception as e:
        print(e)
        return False
# Works
def update_object(
    minio_client: any,
    bucket_name: str, 
    object_path: str, 
    data: any,
    metadata: dict
) -> bool:  
    remove = delete_object(minio_client,bucket_name, object_path)
    if remove:
        create = create_object(minio_client, bucket_name, object_path, data, metadata)
        if create:
            return True
    return False
# works
def create_or_update_object(
    minio_client: any,
    bucket_name: str, 
    object_path: str, 
    data: any, 
    metadata: dict
) -> any:
    bucket_status = check_bucket(minio_client,bucket_name)
    if not bucket_status:
        creation_status = create_bucket(minio_client,bucket_name)
        if not creation_status:
            return None
    object_status = check_object(minio_client,bucket_name, object_path)
    if not object_status:
        return create_object(minio_client,bucket_name, object_path, data, metadata)
    else:
        return update_object(minio_client,bucket_name, object_path, data, metadata)

def get_object_data_and_metadata(
    minio_client: any,
    bucket_name: str, 
    object_path: str
) -> dict:
    MINIO_CLIENT = minio_client
    
    try:
        given_object_info = MINIO_CLIENT.stat_object(
            bucket_name = bucket_name, 
            object_name = object_path + '.pkl'
        )
        # There seems to be some kind of a limit
        # with the amount of request a client 
        # can make, which is why this variable
        # is set here to give more time got the client
        # to complete the request
        given_metadata = given_object_info.metadata
        
        given_object_data = MINIO_CLIENT.get_object(
            bucket_name = bucket_name, 
            object_name = object_path + '.pkl'
        )
        given_pickled_data = given_object_data.data
        
        try:
            given_data = pickle.loads(given_pickled_data)
            relevant_metadata = {} 
            for key, value in given_metadata.items():
                if 'x-amz-meta' in key:
                    key_name = key[11:]
                    relevant_metadata[key_name] = value
            return {'data': given_data, 'metadata': relevant_metadata}
        except Exception as e:
            print('MinIO object pickle decoding error')
            print(e)
            return None 
    except Exception as e:
        print('MinIO object fetching error')
        print(e)
        return None
# Works
def get_object_list(
    minio_client: any,
    bucket_name: str,
    path_prefix: str
) -> dict:
    MINIO_CLIENT = minio_client
    try:
        objects = MINIO_CLIENT.list_objects(bucket_name = bucket_name, prefix = path_prefix, recursive = True)
        object_dict = {}
        for obj in objects:
            object_name = obj.object_name
            object_info = MINIO_CLIENT.stat_object(
                bucket_name = bucket_name,
                object_name = object_name
            )
            given_metadata = {} 
            for key, value in object_info.metadata.items():
                if 'X-Amz-Meta' in key:
                    key_name = key[11:]
                    given_metadata[key_name] = value
            object_dict[obj.object_name] = given_metadata
        return object_dict
    except Exception as e:
        return None  

In [45]:
minio_object = get_object_data_and_metadata(
    minio_client = minio_client,
    bucket_name = 'central', 
    object_path = 'experiments/status'
)
minio_object

{'data': {'experiment-name': 'central-federated-learning-test',
  'experiment': 4,
  'experiment-id': '1',
  'start': True,
  'data-split': True,
  'preprocessed': True,
  'trained': True,
  'worker-split': True,
  'sent': True,
  'updated': True,
  'evaluated': True,
  'complete': True,
  'train-amount': 40000,
  'test-amount': 10000,
  'eval-amount': 50000,
  'collective-amount': 200000,
  'worker-updates': 5,
  'cycle': 4,
  'run-id': '9780bc137c1248b7b5829259536760f4'},
 'metadata': {}}

In [None]:
minio_object = get_object_data_and_metadata(
    minio_client = minio_client,
    bucket_name = 'workers', 
    object_path = 'c496210f-e253-4c1c-b3d7-765092ef5d89/experiments/status'
)
minio_object

## MLflow Interactions

In [None]:
os.environ['MLFLOW_S3_ENDPOINT_URL'] = "http://127.0.0.1:9000"
os.environ['AWS_ACCESS_KEY_ID'] = 'minio'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'minio123'
mlflow_client = MlflowClient(
    tracking_uri = "http://127.0.0.1:5000"
)

In [None]:
# Refactored and works
def start_experiment(
    mlflow_client: any,
    experiment_name: str,
    experiment_tags: dict
) -> int:
    MLFLOW_CLIENT = mlflow_client
    try:
        experiment_id = MLFLOW_CLIENT.create_experiment(
            name = experiment_name,
            tags = experiment_tags,
            artifact_location="s3://mlflow/mlruns"
        )
        return experiment_id
    except Exception as e:
        print(e)
        return None
# Refactored
def check_experiment(
    mlflow_client: any,
    experiment_name: str
) -> dict:
    MLFLOW_CLIENT = mlflow_client
    try:
        experiment_object = MLFLOW_CLIENT.get_experiment_by_name(
            name = experiment_name
        )
        return experiment_object
    except Exception as e:
        print(e)
        return None
# Refactored
def get_experiments(
    mlflow_client: any,
    experiment_type: int,
    max_amount: int,
    filter: str
) -> dict:
    # Types are ACTIVE_ONLY = 1, DELETED_ONLY = 2 and ALL = 3
    MLFLOW_CLIENT = mlflow_client
    try:
        experiment_objects = MLFLOW_CLIENT.search_experiments(
            view_type = experiment_type,
            max_results = max_amount,
            filter_string = filter
        )
        experiment_dict = {}
        for experiment in experiment_objects:
            experiment_dict[experiment.name] = {
                'id': experiment.experiment_id,
                'stage': experiment.lifecycle_stage,
                'tags': experiment.tags,
                'location': experiment.artifact_location,
                'created': experiment.creation_time,
                'updated': experiment.last_update_time
            }
        return experiment_dict
    except Exception as e:
        print(e)
        return None
# Refactored
def start_run(
    mlflow_client: any,
    experiment_id: str,
    tags: dict,
    name: str
) -> dict:
    MLFLOW_CLIENT = mlflow_client 
    try:
        run_object = MLFLOW_CLIENT.create_run(
            experiment_id = experiment_id,
            tags = tags,
            run_name = name
        )
        run_dict = {
            'e_id': run_object.info.experiment_id,
            'id': run_object.info.run_id,
            'name': run_object.info.run_name,
            'stage': run_object.info.lifecycle_stage,
            'status': run_object.info.status
        }
        return run_dict
    except Exception as e:
        print(e)
        return None
# Refactored
def check_run(
    mlflow_client: any,
    run_id: str
) -> dict:
    MLFLOW_CLIENT = mlflow_client
    try:
        run_object = MLFLOW_CLIENT.get_run(
            run_id = run_id
        )
        current_app.logger.info('Checking succeeded')
        run_dict = {
            'e_id': run_object.info.experiment_id,
            'id': run_object.info.run_id,
            'name': run_object.info.run_name,
            'stage': run_object.info.lifecycle_stage,
            'status': run_object.info.status,
            'parameters': run_object.data.params,
            'metrics': run_object.data.metrics,
            'start_time': run_object.info.start_time,
            'end_time': run_object.info.end_time
        }
        return run_dict
    except Exception as e:
        print(e)
        return None
# Refactored
def update_run(
    logger: any,
    mlflow_client: any,
    run_id: str,
    parameters: dict,
    metrics: dict,
    artifacts: dict
) -> bool:
    MLFLOW_CLIENT = mlflow_client 
    try:
        for param_key, param_value in parameters.items():
            MLFLOW_CLIENT.log_param(
                run_id = run_id,
                key = param_key,
                value = param_value
            )
        for metric_key,metric_value in metrics.items():
            MLFLOW_CLIENT.log_metric(
                run_id = run_id,
                key = metric_key,
                value = metric_value
            )
        for path in artifacts:
            MLFLOW_CLIENT.log_artifact(
                run_id = run_id,
                local_path = path
            )
        return True
    except Exception as e:
        print(e)
        return False
# Recatored
def end_run(
    logger: any,
    mlflow_client: any,
    run_id: str,
    status: str
) -> bool:
    # run status are FAILED = 4, FINISHED = 3, KILLED = 5, RUNNING = 1 and SCHEDULED = 2
    MLFLOW_CLIENT = mlflow_client
    try:
        MLFLOW_CLIENT.set_terminated(
            run_id = run_id,
            status = status
        )
        return True
    except Exception as e:
        print(e)
        return False
# Refactored
def get_runs(
    logger: any,
    mlflow_client: any,
    experiment_ids: list,
    filter: str,
    type: int,
    max_amount: int
) -> dict:
    MLFLOW_CLIENT = mlflow_client
    try:
        runs = MLFLOW_CLIENT.search_runs(
            experiment_ids = experiment_ids,
            filter_string = filter,
            run_view_type = type,
            max_results = max_amount
        )
        run_dict = {}
        for run in runs:
            run_dict[run.info.run_id] = {
                'e_id': run.info.experiment_id,
                'id': run.info.run_id,
                'name': run.info.run_name,
                'stage': run.info.lifecycle_stage,
                'status': run.info.status,
                'start_time': run.info.start_time,
                'end_time': run.info.end_time,
                'parameters': run.data.params,
                'metrics': run.data.metrics
            }
        return run_dict
    except Exception as e:
        print(e)
        return None