## Connection and Data Validation Notebook

## Table of Contents
* [Check for Training Data in Project Space](#DataCheck)
    * [Load the Training Data from Db2 if it does not exist in the project space](#section_1_1)
    
    * [Check the connection and data loading](#section_1_2)
  
* [Data Validation](#chapter2)
    * [Split the Data](#Optional)

    * [Generate Training Stats on both Splits](#section_2_2)
    * [Infer Schema on both Splits](#section_2_3) 
    * [Check for anomalies](#section_2_4) 
    * [Return a boolean to validate the tests](#section_3_1) 


## Imports

In [1]:
from botocore.client import Config
from sklearn.model_selection import train_test_split
from dataclasses import dataclass

import itc_utils.flight_service as itcfs


# TODO: Await (or build) for py3.10 version
# import tensorflow_data_validation as tfdv
import numpy as np
import pandas as pd

from ibm_watson_studio_pipelines import WSPipelines
import ibm_boto3

import logging
import os, types
import warnings

warnings.filterwarnings("ignore")

### Load the Credentials


These environment variables are set in WS Pipelines

In [2]:
TOKEN = os.getenv("USER_ACCESS_TOKEN")

In [3]:
# Expects existing path and existing training_file_name within that path
training_file_name = os.getenv("training_file_name")
path = os.getenv("path")

In [None]:
training_file_name = "german_credit_data_biased_training.csv"
path = "mlops-dir"

In [5]:
training_file_path = os.path.join(path, training_file_name)

## Check for Training Data in Project Space

In [6]:
def check_for_file_in_filesystem(path):
    if os.path.exists(path):
        return True
    else:
        return False
    
def read_data_from_db2(data_request):
    read_client = itcfs.get_flight_client()
    DB2_DATA_data_request = {
        'connection_name': """DB2_DATA""",
        'interaction_properties': {
            'select_statement': 'SELECT * FROM "CUSTOMER_DATA"."GERMAN_CREDIT_RISK_TRAINING" FETCH FIRST 5000 ROWS ONLY'
        }
    }

    flightInfo = itcfs.get_flight_info(read_client, nb_data_request=data_request)

    df = itcfs.read_pandas_and_concat(read_client, flightInfo, timeout=240)
    return df
    
def load_data_from_project(path):
    body = check_for_file_in_filesystem(path)
    if body:
        gcf_df = pd.read_csv(path)
        return gcf_df
    else:
        print("\n")
        print(f"{path} file/path is probably not in project. Loading File from MLOps COS Bucket.")

        data_request = {
                'connection_name': """DB2_DATA""",
                'interaction_properties': {
                    'select_statement': 'SELECT * FROM "CUSTOMER_DATA"."GERMAN_CREDIT_RISK_TRAINING" FETCH FIRST 5000 ROWS ONLY'
                }
            }

        gcf_df = read_data_from_db2(data_request)
        return gcf_df

## Load the Training Data from Db2 if the file doesn't exist

In [8]:
gcr_df = load_data_from_project(training_file_path)

## Encode for ease of use with OpenScale
gcr_df['Risk'] = gcr_df['Risk'].map({'Risk':1,'No Risk':0})
gcr_df.head()

Unnamed: 0,CheckingStatus,LoanDuration,CreditHistory,LoanPurpose,LoanAmount,ExistingSavings,EmploymentDuration,InstallmentPercent,Sex,OthersOnLoan,...,OwnsProperty,Age,InstallmentPlans,Housing,ExistingCreditsCount,Job,Dependents,Telephone,ForeignWorker,Risk
0,0_to_200,31,credits_paid_to_date,other,1889,100_to_500,less_1,3,female,none,...,savings_insurance,32,none,own,1,skilled,1,none,yes,0
1,less_0,18,credits_paid_to_date,car_new,462,less_100,1_to_4,2,female,none,...,savings_insurance,37,stores,own,2,skilled,1,none,yes,0
2,less_0,15,prior_payments_delayed,furniture,250,less_100,1_to_4,2,male,none,...,real_estate,28,none,own,2,skilled,1,yes,no,0
3,0_to_200,28,credits_paid_to_date,retraining,3693,less_100,greater_7,3,male,none,...,savings_insurance,32,none,own,1,skilled,1,none,yes,0
4,no_checking,28,prior_payments_delayed,education,6235,500_to_1000,greater_7,3,male,none,...,unknown,57,none,own,2,skilled,1,none,yes,1


## Data Validation 

In [9]:
@dataclass
class Datavalidation:
    """
    
    Data Validation Class
    
    """
    dataframe : pd.DataFrame
    mask_per :int
    
    
    def split_data(self,seed=32):
        """
        Split Data into Train and Test Splits
        
        """
        np.random.seed(seed)
        mask = np.random.rand(len(self.dataframe)) <= self.mask_per
        training_data = gcr_df[mask]
        testing_data = gcr_df[~mask]

        print(f"No. of training examples: {training_data.shape[0]}")
        print(f"No. of testing examples: {testing_data.shape[0]}")
        
        return training_data, testing_data
    
    # TODO: Replace with Db2/fileystem
    def save_data_in_filesystem(self,df,filename):
        """
        Save Data in Filesystem

        Passed filename should involve path

        """
        try:
            df.to_csv(filename,index=False)
            print(f"File {filename} persisted successfully")
        except Exception as e:
            print(e)
            print(f"File serialization for {filename} failed")
    
    def generate_statistics(self,df):
        """
        
        Generate Statistics on a given Dataframe
        
        """
        train_stats = tfdv.generate_statistics_from_dataframe(df)
        tfdv.visualize_statistics(train_stats)
        return train_stats
    
    def inferSchema(self,stats):
        
        """
        InferSchema on a given Dataframe
        
        """
        schema = tfdv.infer_schema(statistics=stats)
        tfdv.display_schema(schema=schema)
        return schema
    
    def compare_statistics(self,lhs,rhs):
        """
        
        Compare Statistics between a test dataframe and reference Schema
        
        """
        # Compare evaluation data with training data
        tfdv.visualize_statistics(lhs_statistics=lhs, rhs_statistics=rhs,
                                  lhs_name='TEST_DATASET', rhs_name='TRAIN_DATASET')
        
        
    def check_for_anomalies(self,testable_stats,ref_schema):
        """
        
        Check for any anomalies based on statistics and schema and values
        
        """
        anomalies = tfdv.validate_statistics(statistics=testable_stats, schema=ref_schema)
        tfdv.display_anomalies(anomalies)
        if len(anomalies.anomaly_info.items()) > 0:
            logger.error("Anomalies found in dataset...")
            logger.error(str(self.anomalies.anomaly_info.items()))
            return True
        else:
            return False

###  Split Data into Train and Eval Splits to Check for Consistency

In [10]:
classvalidate = Datavalidation(dataframe=gcr_df,mask_per=0.8) 

training_data, testing_data = classvalidate.split_data()

No. of training examples: 3995
No. of testing examples: 1005


## Generate Training Stats on both Splits

In [11]:
train_stats = classvalidate.generate_statistics(training_data)

NameError: name 'tfdv' is not defined

In [None]:
test_stats = classvalidate.generate_statistics(testing_data)

## Infer Training Data Schema

In [None]:
train_schema = classvalidate.inferSchema(train_stats)

## Infer Test Data Schema

In [None]:
test_schema = classvalidate.inferSchema(test_stats)

## Compare Eval and Train Data 

In [None]:
classvalidate.compare_statistics(lhs=test_stats,rhs=train_stats)

## Check For Data Anomalies 

### Check eval data for errors by validating the eval data stats using the previously inferred schema.

In [None]:
anomaly_status = classvalidate.check_for_anomalies(test_stats,train_schema)
anomaly_status

## Save Train and Test Data for Data Preparation Stage

In [12]:
train_data_filename = "train_gcr.csv"
test_data_filename = "test_gcr.csv"
train_data_path = os.path.join(path, train_data_filename)
test_data_path = os.path.join(path, test_data_filename)

In [13]:
anomaly_status = False

In [14]:
# TODO: Replace with Db2/fileystem
if not anomaly_status:
    classvalidate.save_data_in_filesystem(df=training_data,filename=train_data_path)
    classvalidate.save_data_in_filesystem(df=testing_data,filename=test_data_path)

File mlops-dir/train_gcr.csv persisted successfully
File mlops-dir/test_gcr.csv persisted successfully


## Check if files Exists in COS

In [None]:
# TODO: Replace with Db2/fileystem
files_copied_in_cos = check_for_file_in_filesystem(train_data_path) and check_for_file_in_filesystem(test_data_path)
files_copied_in_cos

## Register a Boolean Variable in WS Pipeline

In [None]:
validation_params = {}
validation_params['anomaly_status'] = anomaly_status
validation_params['files_copied_in_cos'] = files_copied_in_cos
validation_params['train_data_filename'] = train_data_filename
validation_params['test_data_filename'] = test_data_filename

In [21]:
# pipelines_client = WSPipelines.from_token(token=TOKEN)
pipelines_client = WSPipelines.from_token(TOKEN)
pipelines_client.store_results(validation_params)

Running outside of Watson Studio Pipeline - storing results in the local filesystem for testing purposes...

  output paths:
    - "anomaly_status": .ibm_watson_studio_pipelines/results/anomaly_status
    - "files_copied_in_cos": .ibm_watson_studio_pipelines/results/files_copied_in_cos
    - "train_data_path": .ibm_watson_studio_pipelines/results/train_data_path
    - "test_data_path": .ibm_watson_studio_pipelines/results/test_data_path


<ibm_cloud_sdk_core.detailed_response.DetailedResponse at 0x7f2cd1419a80>