In [1]:
import numpy as np
import pandas as pd
import joblib
import os
from sklearn import metrics
from sklearn.preprocessing import MinMaxScaler
from collections import Counter

#--------------------------Functions---------------------------#
def create_speckle_column(df):
    """
    create speckle column from DELTA
    """
    df["SPECKLE"] = np.where(df["DELTA"]==0,0,1)
    cols = df.columns.tolist()
    cols = [cols[-1]]+cols[:-1] #move speckle col to the front
    df = df.reindex(columns=cols)
    print("Distribution of data:", Counter(df["SPECKLE"]))
    
    return df


def featuremapping(df,feature_mapping,old,new):
    """
    Map feature names
    
    df[DataFrame]: data where feature names are to be mapped
    feature_mapping[DataFrame]: feature file that contains the old and new feature names
    old[str]: name of the old feature column
    new[str]: name of the new feature column
    """
    old_col = feature_mapping[old].values.tolist()
    new_col = feature_mapping[new].values.tolist()
  
    df.rename(
        columns={i:j for i,j in zip(new_col,old_col)}, inplace=True
    )
    return df

def check_duplicates(df,output_path):
    """
    Check for duplicates in VID 
    df[DataFrame]: input df    
    """
    if df['VID'].duplicated().any():
        print("There are duplicates in VID")
        df_dup = df[df.duplicated(subset=['VID'])] #get the duplicates in df, returns repeated rows only 
        df= df[df["VID"].isin(df_dup["VID"])] #get all the duplicates in df, returns original + repeated rows 
        df.to_csv(output_path + "duplicates.csv", index=False)
        
    else:
        print("There are no duplicates in VID")
        
def check_colna(df,output_path):
    """
    Check each column for nulls. Returns Feature, total null and % of null for each column
    df[DataFrame]: dataframe
    
    """
    colna_df = pd.DataFrame(columns =["Feature", "Total Null", "% of Null"])
    for col in df.columns: 
        #checking if there is any null in the column

        if df[col].isnull().sum()>0: 
            
            # if null present, total number of null in the column stores here
            total_null = df[col].isnull().sum() 
            new_row = {'Feature':col, 'Total Null':total_null, '% of Null':total_null*100/len(df)}
            #append row to the dataframe
            colna_df = colna_df.append(new_row, ignore_index=True)
            
    colna_df= colna_df.sort_values("% of Null", ascending=False)    
    colna_df.to_csv(output_path + "NA_Columns.csv", index=False) 

def check_rowna(df,output_path):  
    """
    Check each row for nulls.Returns VID, total null and % of null for each row
    df[DataFrame]: dataframe
    
    """
      
    colrow_df = pd.DataFrame(columns =["SPECKLE","VID", "Total Null", "% of Null"])
    for i in df.index: 
        #checking if there is any null in the row
        if df.iloc[i].isnull().sum()>0:             
            # if null present, total number of null in the row stores here
            total_null = df.iloc[i].isnull().sum() 
            new_row = {'SPECKLE':df.iloc[i,0],'VID':df.iloc[i,1], 'Total Null':total_null, '% of Null':round(total_null*100/(len(df.columns)-2),2)}
            #append row to the dataframe
            colrow_df = colrow_df.append(new_row, ignore_index=True)
            
    colrow_df= colrow_df.sort_values("% of Null", ascending=False)   
    
    if colrow_df.empty:
        print('No more NA rows!')
    else:
        print('There are NA rows!')
        colrow_df.to_csv(output_path + "NA_Rows.csv", index=False)    
        
def convert_neg_to_pos(df,cols_to_keep):
    """
    convert negative columns to positive - except for IDV and HVQK columns
    df: dataframe
    cols_to_keep[tuple]: tokens for columns which are not converted to negative
    """
    #df with tokens which are not converted to negative
    df_keep = df[df.columns[df.columns.str.startswith(cols_to_keep)]]
    print("df_keep shape",df_keep.shape)
    
    # Create df_to_convert - drop columns from cols_to_keep
    df_to_convert = df.drop([col for col in df if col.startswith(cols_to_keep)], axis=1)    
    print("df_to_convert shape",df_to_convert.shape)
    
    #Convert negative columns in df_to_convert to positive
    df_positive = df_to_convert.abs()
    print("df_positive shape",df_to_convert.shape)
    
    #check whether there's any negative value left in df_positive
    print("Is there negative value left:",(df_positive < 0).any().any())
    
    #Concatenating df_keep and df_positive along columns
    df = pd.concat([df_keep, df_positive], axis=1)
    print("Shape after converting neg to pos:", df.shape)
    
    return df

def check_col_negative(df,output_path,value):
    """
    Check each column for negative values. Returns Feature, total negative values and % of negative values for each column
    df[DataFrame]: dataframe
    
    """
    col_negative_df = pd.DataFrame(columns =["Feature", "Total Negative Values", "% of Negative Values"])
    for col in df.columns: 
        #checking if there is any specific negative value in the column

        if df[col].isin([value]).sum()>0: 
            
            # if specific negative value present, total number of specific negative value in the column stores here
            total_negative = df[col].isin([value]).sum() 
            new_row = {'Feature':col, 'Total Negative Values':total_negative, '% of Negative Values':round(total_negative*100/len(df),2)}
            #append row to the dataframe
            col_negative_df = col_negative_df.append(new_row, ignore_index=True)
            
    col_negative_df = col_negative_df.sort_values("% of Negative Values", ascending=False)    
    col_negative_df.to_csv(output_path + "Check%of"+str(value)+"values.csv", index=False) 

def Negative_value_impute(df,value,imptype):
    """
    Impute Negative value (can choose negative value to impute)
    df[DataFrame]:df
    imptype[string]: "mean" to impute data with mean, "median" to impute data with median
    """
    if imptype == "mean":        
        df = df.replace(value,df.mean())
    if imptype == "median":
        df = df.replace(value,df.median())
    if imptype == "zero":
        df = df.replace(value,0)
    return df

def unary(df):
    """
    Checks for unary columns.
    df[DataFrame]: input dataframe
    """
    unarycolumns = [col for col in df.columns if len(df[col].unique())==1]
    if unarycolumns:
        print("The unary column are:",unarycolumns)        
    else:
        print("There are no unary columns!")
        
def scale_data(X_train,X_test):
    """
    Scaling X train and validation with normalization
    params:
    X_train[DataFrame]: input X train
    X_test[DataFrame]: input X validation (test)
    
    """           
    scaler = MinMaxScaler()    
    X_train_scaled = scaler.fit_transform(X_train)
    X_train_scaled = pd.DataFrame(X_train_scaled,columns= X_train.columns)
    X_test_scaled = scaler.transform(X_test)
    X_test_scaled = pd.DataFrame(X_test_scaled,columns= X_test.columns)
       
    return X_test_scaled

def prediction(X_test_scaled,X_test_sf,y_test,model,output_path):
    
    modeloutput_path = output_path + "ModelResults/"
    
    if not os.path.exists(modeloutput_path):
        os.makedirs(modeloutput_path)

    # Predicting the classes for validation set
    y_pred = model.predict(X_test_scaled)
    
    print("Distribution of prediction:", Counter(y_pred))
    
    #overall accuracy
    overall_acc = round(metrics.accuracy_score(y_test, y_pred)*100,2)
    overall_acc = {'Overall Acc %':overall_acc}
    overall_acc = pd.DataFrame([overall_acc])
    overall_acc.to_csv(modeloutput_path+"Overall_Accuracy.csv")

    #classification report
    report = metrics.classification_report(y_test, y_pred,zero_division=0,output_dict=True)
    report = pd.DataFrame(report).transpose()
    report.to_csv(modeloutput_path+"Classification_Report.csv")

    #confusion matrix with accuracies for each label
    class_accuracies = []

    for class_ in y_test.sort_values(ascending= True).unique():
        class_acc = round(np.mean(y_pred[y_test == class_] == class_)*100,2)
        class_accuracies.append(class_acc)

    class_acc = pd.DataFrame(class_accuracies,index=['true:0', 'true:1'],columns= ["Accuracy %"])

    cf_matrix = pd.DataFrame(
        metrics.confusion_matrix(y_test, y_pred, labels= [0, 1]), 
        index=['true:0', 'true:1'], 
        columns=['pred:0', 'pred:1']
    )

    ascend = None #input None/True/False to order the confusion matrix
    if ascend == None:
        cf_matrix = pd.concat([cf_matrix,class_acc],axis=1)
    else:
        cf_matrix = pd.concat([cf_matrix,class_acc],axis=1).sort_values(by=['Accuracy %'], ascending=ascend)

    cf_matrix.to_csv(modeloutput_path+"Confusion_Matrix_test.csv")   
    #confusion matrix with accuracies for each label

    #validation results 
    val_results = pd.concat([X_test_sf,X_test_scaled,pd.DataFrame(y_test),pd.DataFrame(y_pred,columns = ["PRED_SPECKLE"])],axis=1)
    val_results.to_csv(modeloutput_path+"Val_results.csv",index=False) 

    print("Model testing completed!")
    

In [2]:
path = "C:/Users/nchong/OneDrive - Intel Corporation/Documents/ML based speckle POC/"
testdata_path = path + "DataPreparation/TestData/Set3_Na0/Test_Set3.csv"
model_path = path + "ModelBuilding/MergedData_TrainValExchange_Na0/Ensemble300fs/SVM/Weight_6/SVMmodel.joblib"
X_train_path = path + "DataPreparation/MergedData_TrainValExchange_Na0/TrainData_NegHandling_WithXY_WithHVQKDiff_Na0.csv"
features_path = path + "FeatureSelection/SNR_R5_ww51.4/EnsembleTop300Fs.csv"
output_path = path + "DataPreparation/C0_Testing/"
featuremapping_path= path + "DataPreparation/TestData/Set3_Na0/FeatureMapping_C0_B3_Edited.csv"
old = "B3"
new = "C0"

In [3]:
 #----------read file-----------# 
df = pd.read_csv(testdata_path)
print("Test data read")
print("Shape of test data:",df.shape)

Test data read
Shape of test data: (35957, 3747)


In [4]:
if not os.path.exists(output_path):
    os.makedirs(output_path)

In [5]:
df["IDV_0148_SVTNLK12_FULLDIE_CORE_TALL_0950_MED_132150"] #c0

0        0.55
1        0.49
2        0.68
3        0.63
4        0.81
         ... 
35952    0.66
35953    0.53
35954    0.62
35955    0.89
35956    0.61
Name: IDV_0148_SVTNLK12_FULLDIE_CORE_TALL_0950_MED_132150, Length: 35957, dtype: float64

In [6]:
#create speckle column from DELTA
try:
    df = create_speckle_column(df)
except:
    print("Unable to create SPECKLE column")

Distribution of data: Counter({0: 35866, 1: 91})


In [7]:
# map features if needed
if featuremapping_path != None:
    try:
        feature_mapping = pd.read_csv(featuremapping_path)
        print("Feature mapping file read")
        df = featuremapping(df,feature_mapping,old,new)
        print("Shape of test data after feature mapping:", df.shape)

    except:
        print("Feature mapping failed!")

Feature mapping file read
Shape of test data after feature mapping: (35957, 3747)


In [8]:
df["IDV_0148_SVTNLK12_FULLDIE_CORE_TALL_0950S_MED_132150"] #b3

0        0.55
1        0.49
2        0.68
3        0.63
4        0.81
         ... 
35952    0.66
35953    0.53
35954    0.62
35955    0.89
35956    0.61
Name: IDV_0148_SVTNLK12_FULLDIE_CORE_TALL_0950S_MED_132150, Length: 35957, dtype: float64

In [9]:
#filter the test data with the features used in model building, maintain columns "SPECKLE" and "VID"
    
features = pd.read_csv(features_path)
print("Important features file read")
try:
    df = df[pd.concat([pd.Series("SPECKLE"),pd.Series("VID"),features["Feature"]])]
    print("Shape of test data:",df.shape)
except:
    print("Some feature(s) used for model building are not in test data")

Important features file read
Shape of test data: (35957, 302)


In [10]:
#check for duplicates
check_duplicates(df,output_path)
print("Duplicates checked")

There are no duplicates in VID
Duplicates checked


In [11]:
#check column for nulls
check_colna(df,output_path)
print("Checked columns for NA")

Checked columns for NA


In [12]:
# check rows for nulls
check_rowna(df,output_path)
print("Checked rows for NA")

There are NA rows!
Checked rows for NA


In [13]:
# Create X_test, X_test_sf and target y_test
y_test = df["SPECKLE"]
X_test_sf = df["VID"]
X_test = df.drop(["SPECKLE","VID"],axis=1)
print("Shape of X_test:", X_test.shape)

Shape of X_test: (35957, 300)


In [14]:
X_test

Unnamed: 0,HVQK_VMIN_POST_ARRAY_CORE_PBIST_L2SSA_132110,IDV_0048_SVTNLK12_FULLDIE_SOC_0950_MED_132110,HVQK_VMIN_POST_ARRAY_CORE_PBIST_L2SSAC6S_132110,IDV_0148_SVTNLK12_FULLDIE_SOC_0950_MED_132110,TPI_VCC::CONT_VCCCOREM45_HC_K_START_X_X_X_X_POSTSURGE_SNR_OLD_VCCCORE_M45_132110,TPI_VCC::CONT_PARALLEL_HC_E_FINAL_X_X_X_X_SNR_MIMS_SNR_ALL_VCCPAR_HCDPS_VCCINF_132110,TPI_VCC_X::CONT_X_SCREEN_K_BEGIN_X_X_X_X_VCCADTL_VCCIN_EHV_LC_ADTL_132150,TPI_VCC::CONT_PARALLEL_LC_E_FINAL_X_X_X_X_SNR_MIMS_SNR_ALL_VCCPAR_LCDPS_VCCAOPI_132110,TPI_VCC::CONT_PARALLEL_HC_E_START_X_X_X_X_POSTSURGE_SNR_MIMS_SNR_ALL_VCCPAR_HCDPS_VCCINF_132110,TPI_VCC::CONT_VCCCOREM23_HC_K_START_X_X_X_X_POSTSURGE_SNR_OLD_VCCCORE_M23_132110,...,SIO_LKG_X::HI1P05_X_LKG_K_END_X_X_VMAX_X_1MA_PC1_132110_FBRK_N_CPU,HVQK_VMIN_PRE_SCAN_UNCORE_SA_132110,HVQK_VMIN_POST_ARRAY_NAC_MBIST_LSARMN2_132110,HVQK_VMIN_POST_SCAN_UNCORE_VNN_132110,SIO_LKG_X::HI1P05_X_LKG_K_END_X_X_VMAX_X_1MA_PC1_132150_NAC_NET_CLK_REF1_P,SIO_LKG_X::HI1P05_X_LKG_K_END_X_X_VMAX_X_1MA_PC1_132110_ERROR2_N_CPU,SIO_LKG_X::HI1P05_X_LKG_K_END_X_X_VMAX_X_1MA_PC1_132150_NAC_CLK_SYNCE0_P,SIO_LKG_X::LO1P05_X_LKG_K_END_X_X_VMAX_X_1MA_PC1_132110_NAC_KR23_DFTANA_0,SIO_LKG_X::HI1P05_X_LKG_K_END_X_X_VMAX_X_1MA_PC1_132110_FIVR_PRB_DIG_1,IDV_0001_SVT3GNES12_FULLDIE_SOC_0650_MED_132110
0,0.7549,0.20,0.7169,1.44,0.005184,0.000677,-0.003693,0.000186,0.000811,0.005375,...,2.460000e-06,0.7550,0.6544,0.6988,0.000002,1.710000e-06,6.940000e-06,1.300000e-08,0.000002,5695.77
1,0.7761,0.23,0.6990,1.06,0.004574,0.000805,-0.003495,0.000268,0.000989,0.005484,...,2.720000e-06,0.7574,0.6762,0.7008,-0.000002,1.840000e-06,6.670000e-06,1.090000e-08,0.000002,5656.34
2,0.7356,0.30,0.7118,1.72,0.001820,0.000137,-0.003594,0.000033,0.000192,0.001345,...,1.570000e-06,0.7948,0.6840,0.7317,0.000006,1.210000e-06,1.820000e-06,1.940000e-08,0.000003,5290.14
3,0.6656,0.25,0.6749,1.23,0.002274,0.000297,-0.003732,0.000066,0.000413,0.002234,...,1.640000e-06,0.7600,0.6498,0.6958,0.000006,1.210000e-06,2.520000e-06,2.170000e-08,0.000003,6033.80
4,0.6646,0.27,0.6533,1.60,0.003263,0.000615,-0.003416,0.000158,0.000707,0.004066,...,1.230000e-06,0.7600,0.6884,0.7142,0.000003,4.200000e-07,5.420000e-07,-2.630000e-08,0.000003,5622.54
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
35952,0.7054,0.40,0.6545,2.06,0.001448,0.000217,-0.003529,0.000066,0.000251,0.002026,...,9.080000e-07,0.7951,0.6894,0.7354,0.000002,2.490000e-06,-6.680000e-06,-2.820000e-09,0.000002,5318.31
35953,0.7301,0.27,0.6200,1.36,0.001379,0.000128,-0.003415,0.000027,0.000139,0.001999,...,-7.930000e-07,0.8242,0.7362,0.7630,0.000003,2.040000e-06,-2.750000e-06,-6.490000e-09,0.000002,5030.99
35954,0.7611,0.35,0.6674,1.17,0.003757,0.000496,-0.003357,0.000124,0.000657,0.003725,...,1.050000e-06,0.7600,0.6634,0.6887,-0.000003,2.540000e-06,4.900000e-06,-4.580000e-09,0.000001,6090.14
35955,0.7652,0.49,0.6805,1.47,0.005564,0.000912,-0.003846,0.000213,0.001088,0.005991,...,7.480000e-07,0.7363,0.6405,0.6833,0.000005,2.560000e-06,5.460000e-06,8.390000e-10,0.000002,6456.34


In [15]:
# Impute NA with 0
X_test = X_test.fillna(0)   
check_rowna(X_test,output_path)

No more NA rows!


In [16]:
#convert negative columns to positive - except for IDV and HVQK columns
X_test = convert_neg_to_pos(X_test,cols_to_keep=("IDV", "HVQK"))

df_keep shape (35957, 85)
df_to_convert shape (35957, 215)
df_positive shape (35957, 215)
Is there negative value left: False
Shape after converting neg to pos: (35957, 300)


In [17]:
#Checking IDV and HVQK columns for negative values
check_col_negative(X_test,output_path,value= -5555)
check_col_negative(X_test,output_path,value= -555,)
check_col_negative(X_test,output_path,value= -999)
check_col_negative(X_test,output_path,value = -9999)
print("Checked IDV and HVQK columns for negative values ")

Checked IDV and HVQK columns for negative values 


In [18]:
#Negative value (invalid data) imputation of IDV and HVQK columns 
X_test = Negative_value_impute(X_test,value=-5555,imptype="zero")
X_test = Negative_value_impute(X_test,value=-999,imptype="zero")
X_test = Negative_value_impute(X_test,value=-9999,imptype="zero")    
X_test = Negative_value_impute(X_test,value=-555,imptype="zero")
print("Negative value (invalid data) imputation of IDV and HVQK features done")
print("Is there still negative values in the test data:",(X_test < 0).any().any())

Negative value (invalid data) imputation of IDV and HVQK features done
Is there still negative values in the test data: False


In [19]:
#Normalize the data
X_train = pd.read_csv(X_train_path)
print("Train data read!")

try:    
    X_train = X_train[features["Feature"]]
    print("X_train filtered with important features only!")
    print("Shape of X_train:", X_train.shape)
except:
    print("Features in X_train not the same as features used for model building!")

X_test = X_test[X_train.columns] #make sequence of features in test same as train before normalization
X_test_scaled = scale_data(X_train,X_test)
print("Shape of X_test_scaled:", X_test_scaled.shape)

Train data read!
X_train filtered with important features only!
Shape of X_train: (36434, 300)
Shape of X_test_scaled: (35957, 300)


In [20]:
#-------------Model Testing----------#
model = joblib.load(model_path) #read model to be tested
prediction(X_test_scaled,X_test_sf,y_test,model,output_path)

Distribution of prediction: Counter({0: 35747, 1: 210})
Model testing completed!
