<a href="https://colab.research.google.com/github/Nagisa1002/Anomaly_detection/blob/main/LightGBM_github.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from datetime import datetime 
import os 
import sys
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt
import joblib

from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
from keras.utils import np_utils

##data preprocessing##
'''
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler

#over/under sampling
!pip install imbalanced-learn
from imblearn.under_sampling import RandomUnderSampler
from imblearn.over_sampling import SMOTE
'''

##model##
'''
import lightgbm as lgb
from sklearn import svm
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegressionCV
!pip install catboost
from catboost import CatBoostClassifier
from catboost import Pool
'''

##parameter　turning##
'''
from sklearn.model_selection import GridSearchCV

#lgb用
! pip install optuna
import optuna.integration.lightgbm as lgb
'''

In [None]:
#parameters
test_size=0.1
model_type='LightGBM'

sampling_type=0  #0:None, 1:undersampling, 2:oversampling
processing_type=3 #0:None, 1:StandardScaler, 2:MinMaxScaler, 3: RobustScaler

#path
EXEC_TIME = datetime.now().strftime("%Y%m%d-%H%M%S") 
LOG_DIR = f'/content/drive/MyDrive/{EXEC_TIME}' 
RESULT_DIR =  f'{LOG_DIR}/score.txt'

In [None]:
def data_sampling(X_train:np.array, y_train:np.array):
    if sampling_type==1:
      sampling = RandomUnderSampler(random_state=100)
    else:
      sampling = SMOTE()

    X_train, y_train = sampling.fit_sample(X_train, y_train)
    return X_train, y_train
    
def preprocessing(X_train, X_test, type:str):
  if processing_type==1: 
    sc = StandardScaler()
  elif processing_type==2:
    sc = MinMaxScaler(feature_range=(-1, 1), copy=True)
  else:
    sc = RobustScaler(with_centering=True, with_scaling=True, quantile_range=(5.0, 95.0), copy=True) 

  X_train = sc.fit_transform(X_train)
  X_test = sc.transform(X_test)
  return X_train, X_test

In [None]:
def load_data(path:str):
  '''
  load data and divide the data into Objective Variable and Explanatory Variable.
  Assumption1 : Objective Variable(label/y_true) is in the column 'y_true'  in the data
  Assumption2: data file is csv
  '''

  df = pd.read_csv(path, index_col=0)
  print('all_data:',df.shape, end='')

  label = list(set(df['y_true'].values))
  num_classes=len(label)

  y = {}
  X = {}
  for i, fc in enumerate(label):
    data = df[df['y_true'] == label]
    X[label] = data.drop(DEL_LABEL, axis=1).values
    y[label] = np.full(X[fc].shape[0], i)
  
  X_data = np.vstack([X[fc] for fc in FC])
  y_data = np.hstack([y[fc] for fc in FC])
  print(f' -> X:{X_data.shape}, y:{y_data.shape}')
  return X_data, y_data

def get_data():
  '''
  Select the location of the training and test data file, and load data.
  And split the train data into train data and test data
  (+perform preprocessing and sampling)
  '''
  X_train, y_train, num_classes = load_data(train_path='')
  X_test, y_test, num_classes = load_data(test_path='')

  if sampling_type!='None':
    X_train, y_train = data_sampling(X_train, y_train)
  
  if prosessing_type!='None':
    X_train, X_test=preprocessing(X_train, X_test)

  X_tr, X_val, y_tr, y_val = train_test_split(X_train, y_train, test_size=test_size, random_state=0, shuffle=True, stratify=y_train)

  if model_type='CatBoost':
    X_tr = Pool(X_tr, label=y_tr)  
    X_val = Pool(X_val, label=y_val)  
    X_test = Pool(X_test, label=y_test)

  return X_tr, y_tr, X_val, y_val, X_test, y_test, num_classes

In [None]:
def plot_cm(y, y_pred, num_classes):
  '''
  plot confusion matrix
  '''
  fig = plt.figure(figsize=(num_classes*10,num_classes*10))
  fig, ax = plt.subplots(figsize=(num_classes, num_classes))
  confmat=confusion_matrix(y, y_pred)
  ax.matshow(confmat, cmap=plt.cm.Blues, alpha=0.3) 

  for i in range(confmat.shape[0]):
      for j in range(confmat.shape[1]):
          ax.text(x=j, y=i, s=confmat[i, j], va='center', ha='center')
  plt.xticks(np.arange(0, num_classes, 1)) 
  plt.yticks(np.arange(0, num_classes, 1))  
  plt.title('Predicted Label')
  plt.ylabel('True Label')
  fig.savefig(f'{LOG_DIR}/cm.png')
  plt.close()

In [None]:
def plot_history(evaluation_results):
  '''
  plot learning curve
  '''
  fig = plt.figure()
  ax = plt.subplot()
  ax.plot(evaluation_results['train']['multi_logloss'], label='train')
  ax.plot(evaluation_results['valid']['multi_logloss'], label='valid')
  plt.ylabel('Log loss')
  plt.xlabel('Boosting round')
  plt.title('Training performance')
  plt.legend()
  fig.savefig(f'{LOG_DIR}/history.png')
  plt.close()

In [None]:
def train_model(X_tr:np.array, y_tr:np.array, X_val:np.array, y_val:np.array, num_classes, f):
  '''
  Choice model and training
  '''
  if model_type=='SVC':
    model = SVC(kernel='rbf')

  elif model_type=='RF':
    model = RandomForestClassifier(random_state=0)
    
  elif model_type=='LR': 
     model=LogisticRegressionCV(cv=10, random_state=0)
    
    
  elif model_type=='CatBoost':
    model = CatBoostClassifier(custom_loss=['Accuracy'], random_seed=0)
    model.fit(X_tr, 
          eval_set=X_val,    
          early_stopping_rounds=20,
          use_best_model=True, 
          plot=True)

  elif model_type=='LightGBM':
    evaluation_results = {} 
    best_params = {}

    lgb_tr = lgb.Dataset(X_tr, label = y_tr)
    lgb_val = lgb.Dataset(X_val, label = y_val, reference=lgb_tr)

    params = {
          'objective': 'multiclass', 
          'num_class': num_classes, 
    }

    model = lgb.train(
                    params,
                    train_set=lgb_tr, 
                    evals_result=evaluation_results,
                    valid_sets=[lgb_tr, lgb_val],
                    valid_names=['train', 'valid'],
                    early_stopping_rounds=15,
                    num_boost_round=100,
                    )
    plot_history(evaluation_results)
    #print(model.params,file=f)

  else:
    sys.exit('Plese input correct model_type.')
    
  if model_type!='CatBoost' and model_type!='LightGBM':  
    model.fit(X_tr, y_tr)
    #gscv = GridSearchCV(model, param_grid=params, scoring='f1_macro', verbose=1)
    #print(gscv.best_params_, gscv.best_score_)

  filepath = f'{LOG_DIR}/model.learn'
  joblib.dump(model, filepath)
  model = joblib.load(filepath)
  return model

In [None]:
def predict_model(model, X:np.array, f):      
  if model_type='LightGBM':
    y_pred = model.predict(X, num_iteration=model.best_iteration)
    y_pred = np.argmax(y_pred, axis=1)
  else:
    y_pred = model.predict(X)
  return y_pred

def get_score(y, y_pred):
  acc= accuracy_score(y, y_pred)
  micro_f1= f1_score(y, y_pred,average='micro')
  macro_f1= f1_score(y, y_pred, average='macro')
  weighted_f1= f1_score(y, y_pred, average='weighted')
  class_f1= f1_score(y, y_pred, average=None)
  class_precision=precision_score(y, y_pred, average=None)
  class_recal=recall_score(y, y_pred, average=None)

  print(f'--- SCORE---',file=f)
  print('accuracy', acc , file=f)
  print('macro_f1', macro_f1, file=f)
  print('weighted_f1', weighted_f1, file=f)
  print('class_f1', weighted_f1, file=f)
  print('class_precision', class_precision, file=f)
  print('class_recall', class_recal, file=f)
  print(file=f)

  plot_cm(y, y_pred, num_classes) 
  return 

In [None]:
def train(X_tr, y_tr, X_val, y_val, X_test, y_test, num_classes):
    with open(RESULT_DIR, mode='w') as f:
      print(model_type, file=f)
      model = train_model(X_tr, y_tr, X_val, y_val, num_classes, f)
      y_pred=predict_model(model, X_test, f)
      get_score(y, y_pred, num_classes)

    return

In [None]:
def stacking(X_val, y_val, X_test, y_test):
  model_0 = joblib.load('')
  model_1 = joblib.load('')
  model_2 = joblib.load('')

  y_pred_0 = model_0.predict(X_test,num_iteration=model_0.best_iteration) #or predict_proba()
  y_pred_1 = model_1.predict(X_test, num_iteration=model_1.best_iteration) #or predict_proba()

  '''
  #pattern1 : 
  y_pred = (y_pred_0+y_pred_1)/2
  y_pred = np.argmax(y_pred, axis=1)

  #pattern2 :  you can use it if not use X_val when training model
  stack_pred = np.column_stack((y_pred_0, y_pred_1))
  model=model_2.fit(stack_pred, y)
  y_pred = model.predict(X)
  '''
  
  get_score(y, y_pred, num_classes)
  return

In [None]:
def main():
    os.makedirs(LOG_DIR, exist_ok=True)
    X_tr, y_tr, X_val, y_val, X_test, y_test, num_classes=get_data()
    train(X_tr, y_tr, X_val, y_val, X_test, y_test, num_classes)
    print('LOG_DIR:', LOG_DIR)
if __name__ == "__main__":
    main()