# 1. Importing some Libraries

In [133]:
import numpy as np
import pandas as pd

from datetime import datetime
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler
import seaborn as sns
import plotly.graph_objects as go
import plotly.io as pio
from sklearn.ensemble import IsolationForest
import matplotlib.pyplot as plt

import holoviews as hv
from holoviews import opts
hv.extension('bokeh')

import tensorflow as tf
from imblearn.over_sampling import SMOTE
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, accuracy_score, classification_report

from tensorflow.keras.layers import Input, Dropout, Dense, LSTM, TimeDistributed, RepeatVector
from tensorflow.keras.models import Model
from tensorflow.keras import regularizers
from tensorflow.keras.models import Sequential
from keras.utils import to_categorical
from keras.optimizers import Adam

from sklearn import metrics
import tensorflow as tf
import keras.backend as K
import matplotlib.pyplot as plt

from lime import lime_tabular
from IPython.display import Image 
import tempfile
from sklearn.manifold import TSNE

pd.set_option('display.max_rows', 15)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)


# 2. Read The data

In [2]:
# Read The data
df = pd.read_csv('Data/creditcard.csv')
df.head()

In [3]:
df['Class'].value_counts()

# 3. Exploratory Data Analysis

In [4]:
fig = go.Figure()

fig.add_trace(go.Scatter(x=df["Time"], y=df["V1"], mode='lines', name='V1'))
fig.add_trace(go.Scatter(x=df["Time"], y=df["V2"], mode='lines', name='V2'))
fig.add_trace(go.Scatter(x=df["Time"], y=df["V3"], mode='lines', name='V3'))
fig.update_layout(title_text="V1 vs Cyl V2 vs V3", yaxis1=dict(title="Values", side='left'),
                  yaxis2=dict(title="", side='right', anchor="x", overlaying="y")
                  )

fig.show()

In [5]:
df.describe()

In [6]:
df.shape

In [7]:
# Correlation
dataplot = sns.heatmap(df.corr(), cmap="YlGnBu") 
# displaying heatmap 
plt.show() 

In [8]:
df.isnull().sum()

In [9]:
df.info()

In [10]:
# Checking for class distribution
sns.countplot(x="Class",data=df)

In [11]:
(df["Class"].value_counts()/284807)*100

In [12]:
X = df.drop("Class",axis=1)
y = df["Class"]

In [13]:
X

# 3. LSTM Autoencoder

## Split The Data into Train and Test Set

In [14]:
df.columns

In [15]:
df_timestamp = df[['Time']]

df_ = df.drop('Time', axis=1)
#df_ = df.drop(['Time','Class'], axis=1)
df_.shape

In [16]:
df_

In [17]:
train_prp = .80
train = df_.loc[:df_.shape[0] * train_prp]
test = df_.loc[df_.shape[0] * train_prp:]

In [18]:
train

In [19]:
test

## Feature Scaling

In [20]:
# Standardize The Data
scaler = StandardScaler()
X_train = scaler.fit_transform(train)
X_test = scaler.transform(test)
X_train_origin = X_train
X_test_origin = X_test
print("X train Shape:", X_train.shape)
print("X test Shape:", X_test.shape)

In [21]:
X_train

In [22]:
# Reshape the Dimension of the Train and Test set for LSTM Model
X_train = X_train.reshape((X_train.shape[0], 1, X_train.shape[1]))
X_test = X_test.reshape(X_test.shape[0], 1, X_test.shape[1])

print("X train Shape:", X_train.shape)
print("X test Shape:", X_test.shape)

In [23]:
def autoencoder_model(X):
    # Build the LSTM Autoencoder model
    model = Sequential()

    # Encoder
    model.add(LSTM(100, activation='relu', input_shape=(X.shape[1], X.shape[2]), return_sequences=True))
    model.add(LSTM(50, activation='relu', return_sequences=False))
    model.add(RepeatVector(X.shape[1]))

   # Decoder
    model.add(LSTM(50, activation='relu', return_sequences=True))
    model.add(LSTM(100, activation='relu', return_sequences=True))
    model.add(TimeDistributed(Dense(X.shape[2])))
    return model

In [24]:
model = autoencoder_model(X_train)
model.compile(optimizer='adam', loss='mae', metrics=['accuracy'])
model.summary()

In [25]:
model.layers[-1].get_weights()[0].shape   # 100, 30 
model.layers[-2].get_weights()[0].shape   # 50, 400
model.layers[-3].get_weights()[0].shape   # 50, 200

model.layers[0].get_weights()[0].shape   # 30,400
model.layers[1].get_weights()[0].shape   # 100,200
model.layers[3].get_weights()[0].shape   # 50,200
model.layers[4].get_weights()[0].shape   # 50,400
model.layers[5].get_weights()[0].shape   # 100,30

In [26]:
model.layers

In [27]:
epochs = 25
batch = 25
history = model.fit(X_train, X_train, epochs=epochs, batch_size=batch, validation_split=.2, verbose=1).history

In [28]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=[x for x in range(len(history['loss']))], y=history['loss'], mode='lines', name='loss'))
fig.add_trace(go.Scatter(x=[x for x in range(len(history['val_loss']))], y=history['val_loss'], mode='lines', name='validation loss'))
fig.update_layout(title="LSTM AE Error Loss Over Epochs", yaxis=dict(title="Loss"), xaxis=dict(title="Epoch"))
fig.show()

In [29]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=[x for x in range(len(history['accuracy']))], y=history['accuracy'], mode='lines', name='accuracy'))
fig.add_trace(go.Scatter(x=[x for x in range(len(history['val_accuracy']))], y=history['val_accuracy'], mode='lines', name='validation accuracy'))
fig.update_layout(title="LSTM AE Accuracy Over Epochs", yaxis=dict(title="Accuracy"), xaxis=dict(title="Epoch"))
fig.show()

In [30]:
# Check how loss & mse went down
epoch_loss = history['loss']
epoch_val_loss = history['val_loss']
epoch_mae = history['accuracy']
epoch_val_mae = history['val_accuracy']

plt.figure(figsize=(8,5))
plt.plot(range(0,len(epoch_loss)), epoch_loss, 'b-', linewidth=2, label='Train Loss')
plt.plot(range(0,len(epoch_val_loss)), epoch_val_loss, 'r-', linewidth=2, label='Test Loss')
plt.xlabel("Epochs")
plt.ylabel("Loss")

#lt.title('Loss')
plt.legend(loc='best')
plt.savefig('CreditCard_Figure_Loss_LSTM_AE.jpeg')
plt.show()

In [31]:
# Check how loss & mse went down
epoch_loss = history['loss']
epoch_val_loss = history['val_loss']
epoch_mae = history['accuracy']
epoch_val_mae = history['val_accuracy']

plt.figure(figsize=(8,5))

plt.plot(range(0,len(epoch_mae)), epoch_mae, 'b-', linewidth=2, label='Train Acc')
plt.plot(range(0,len(epoch_val_mae)), epoch_val_mae, 'r-', linewidth=2,label='Test Acc')
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
#plt.title('Accuracy')
plt.legend(loc='lower right')

plt.savefig('CreditCard_Figure_Acc_LSTM_AE.jpeg')
plt.show()

# Saving The Model

In [32]:
# Save the model and architecture to single file
model.save('CreditCard_LSTM_AE_Model.h5')
print("Model Saved to a Disk")

## Test Data

In [33]:
X_pred = model.predict(X_train)
X_pred = X_pred.reshape(X_pred.shape[0], X_pred.shape[2])
X_pred = scaler.inverse_transform(X_pred)
X_pred = pd.DataFrame(X_pred, columns=train.columns)

In [66]:
X_pred

In [35]:
X_pred2 = model.predict(X_test)

# Compute loss/error between predicted and test values columns
mse_loss = np.mean(np.square(X_pred2 - X_test))  # MSE loss
mae_loss = np.mean(np.abs(X_pred2 - X_test))  # Mean absolute error (MAE) loss

print(f"MSE Loss: {mse_loss}")
print(f"MAE Loss: {mae_loss}")
X_pred_scaled = X_pred2.reshape(X_pred2.shape[0], X_pred2.shape[2])
X_pred2 = scaler.inverse_transform(X_pred_scaled)
X_pred2 = pd.DataFrame(X_pred2, columns=train.columns)
X_pred_df2 = pd.DataFrame(X_pred2, columns=train.columns)
X_pred2.index = test.index

In [65]:
X_pred2

In [67]:
scores_ = pd.DataFrame()
scores_ = pd.concat([X_pred, X_pred2])
scores_
scores_['datetime'] = df_timestamp

reconstruction_errors = np.mean(np.abs(pd.concat([X_pred, X_pred2]) - pd.concat([train, test])), axis=1)
scores_['loss_mae'] = reconstruction_errors

scores_['Threshold'] = 0.75
                 
scores_['Anomaly'] = np.where(scores_["loss_mae"] > scores_["Threshold"], 1, 0)
scores_.head(10)

In [68]:
scores_.shape

In [38]:
# scores_ = pd.DataFrame()
# scores_ = X_pred
# scores_['datetime'] = df_timestamp.loc[227846:]
# reconstruction_errors = np.mean(np.abs(X_pred - test), axis=1)
# scores_['loss_mae'] = reconstruction_errors
# scores_['Threshold'] = 0.75
# scores_['Anomaly'] = np.where(scores_["loss_mae"] > scores_["Threshold"], 1, 0)
# scores_.head(10)

In [69]:
# Error distribution i Test Data
fig = go.Figure(data=[go.Histogram(x=scores_['loss_mae'])])
fig.update_layout(title="Error distribution", xaxis=dict(title="Loss Distribution between predicted and original Data of Credit Card"), yaxis=dict(title="Data point counts"))
fig.show()

In [70]:
scores_['Anomaly'].value_counts()

In [71]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=scores_['datetime'],  y=scores_['loss_mae'], name="Loss"))
fig.add_trace(go.Scatter(x=scores_['datetime'],  y=scores_['Threshold'], name="Threshold"))
fig.update_layout(title="Error Time Series and Threshold",  xaxis=dict(title="DateTime"), yaxis=dict(title="Loss"))
fig.show()

In [73]:
scores_["Anomaly"].value_counts()

In [74]:
anomalies = scores_[scores_['Anomaly'] == 1][['V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12',
                                              'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount','datetime']]

anomalies = anomalies.rename(columns={
    'V1': 'V1_anomalies',
    'V2': 'V2_anomalies',
    'V3': 'V3_anomalies',
    'V4': 'V1_anomalies',
    'V5': 'V5_anomalies',
    'V6': 'V6_anomalies',
    'V7': 'V7_anomalies',
    'V8': 'V8_anomalies',
    'V9': 'V9_anomalies',
    'V10': 'V10_anomalies',
    'V11': 'V11_anomalies',
    'V12': 'V12_anomalies',
    'V13': 'V13_anomalies',
    'V14': 'V14_anomalies',
    'V15': 'V15_anomalies',
    'V16': 'V16_anomalies',
    'V17': 'V17_anomalies',
    'V18': 'V18_anomalies',
    'V19': 'V19_anomalies',
    'V20': 'V20_anomalies',
    'V21': 'V21_anomalies',
    'V22': 'V22_anomalies',
    'V23': 'V23_anomalies',
    'V24': 'V24_anomalies',
    'V25': 'V25_anomalies',
    'V26': 'V26_anomalies',
    'V27': 'V27_anomalies',
    'V28': 'V28_anomalies',
    'Amount': 'Amount_anomalies'
    
})
scores_1a = scores_.merge(anomalies, left_index=True, right_index=True, how='left')


In [75]:
anomalies = scores_[scores_['Anomaly'] == 1][[
    'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12',
                                              'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount', 'datetime']]
print(anomalies.shape)
anomalies

In [76]:
scores_1a.head(4)

In [77]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=scores_1a["datetime_x"], y=scores_1a["V1"], mode='lines', name='V1'))
fig.add_trace(go.Scatter(x=scores_1a["datetime_x"], y=scores_1a["V2"], mode='lines', name='V2'))
fig.update_layout(title_text="Test Data")
fig.show()

In [78]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=scores_1a["datetime_x"], y=scores_1a["V1"], mode='lines', name='V1'))
#fig.add_trace(go.Scatter(x=scores_1a["datetime_x"], y=scores_1a["V1_anomalies"], name='Anomaly in V1', mode='markers', marker=dict(color="blue", size=11, line=dict(color="blue", width=2))))


fig.add_trace(go.Scatter(x=scores_1a["datetime_x"], y=scores_1a["V2"], mode='lines', name='V2'))
fig.add_trace(go.Scatter(x=scores_1a["datetime_x"], y=scores_1a["V2_anomalies"], name='Anomaly in V2 ', mode='markers', marker=dict(color="red", size=11, line=dict(color="red", width=2))))
fig.update_layout(title_text="Anomalies Detected in V1 and V2 with LSTM-AE")

In [79]:
anomalies

In [80]:
# Dropping Threshold columns
df_anomaly = scores_.drop("Threshold", axis=1)
df_anomaly

In [81]:
df_anomaly = df_anomaly.drop("loss_mae", axis=1)

In [82]:
df_anomaly = df_anomaly.drop("Class", axis=1)

In [83]:
df_anomaly

In [84]:
df_anomaly.columns

In [85]:
(df_anomaly["Anomaly"].value_counts()/284807)*100
# Highly imbalanced dataset with 91% of data as not-fraud and only 8% of data as fraud

In [56]:
#df_anomaly['Anomaly'] = np.where(df_anomaly['Anomaly'] == 0 ,'True', 'False')

In [86]:
df_anomaly

## Oversampling

In [58]:
# # Separate features and target variable
# X = df_anomaly.drop('Anomaly', axis=1)
# y = df_anomaly['Anomaly']

# # Split the data into training and testing sets
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# # Apply SMOTE only to the training data
# smote = SMOTE(sampling_strategy='auto', random_state=42)
# X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)

# # Now, X_train_resampled and y_train_resampled contain the balanced training data
# y_train_resampled.value_counts()

### Augmentation

In [87]:
# extract only anomaly data 
df_anomaly.loc[df_anomaly['Anomaly']==1]  

In [88]:
# random data generator 생성 함수 (데이터 증폭) 
def generate_random_data(df, dist=1, count=100):
    '''
    df: 대상 데이터 프레임 
    dist: 원하는 mahalanobis distance 1, 0.5, 0.1, 0.01 
    count: 개수
    '''
    # covarance metrix 구하기 
    cov_matrix = np.cov(df, rowvar=False)
    eigenvalues, eigenvectors = np.linalg.eig(cov_matrix)

    # 마할라노비스 거리 스케일링 펙터 구하기 0.5, 0.1. 0.01  
    scaling_factor = np.sqrt(dist/ eigenvalues.max())
    scaled_cov_matrix = cov_matrix * scaling_factor
    # print(scaled_cov_matrix)

    random_data = np.random.multivariate_normal(df.mean(axis=0), scaled_cov_matrix, count)   #개수 만큼 생성 
    print(f'generated data  : {random_data.shape},  count: {count} , dist: {dist}')
    print('generated data 앞에서 3건만: ', random_data[:1, :3])
    return random_data


# temporary test for function 
generate_random_data(df_anomaly.loc[df_anomaly['Anomaly']==1, 'V1':'Amount'], 0.5, 100)
generate_random_data(df_anomaly.loc[df_anomaly['Anomaly']==1, 'V1':'Amount'], 0.1, 100)
generate_random_data(df_anomaly.loc[df_anomaly['Anomaly']==1, 'V1':'Amount'], 0.01, 100)


### data preprocessing for LSTM
- augmentation + sequence  

In [110]:
train_prp = .80

df_anomaly_train = df_anomaly.loc[:df_anomaly.shape[0] * train_prp]
df_anomaly_test = df_anomaly.loc[df_anomaly.shape[0] * train_prp:]

print(f'train : {df_anomaly_train.shape}')
print(f'test: {df_anomaly_test.shape}')
print(df_anomaly_train.loc[df_anomaly_train['Anomaly']==1, :'datetime'].head())

data_columns = anomalies.columns 
data_columns

In [111]:
N_TIMESTEPS = 12

# 원본데이터(dataframe) + 증폭된 데이터 
# concatenate orginal data and augumented data 
def train_data_with_concat(df, dist, count): 
    #X = df.drop(['time','class'],axis=1)
    #데이터 만들기
    X_concated = pd.concat([df.iloc[:, :-1], pd.DataFrame(generate_random_data(df.loc[df['Anomaly']==1, :'datetime'], dist, count), columns=data_columns)], ignore_index=True)
    new_Y = pd.concat([df.loc[:, 'Anomaly'], pd.Series(np.ones((count,)).astype('bool'))], ignore_index=True)
    # new_Y = pd.concat([result_df, pd.Series(np.ones((count,)).astype('bool'))], ignore_index=True)
    return X_concated, new_Y 

# def train_array_with_concat(ar1, dist, count):
#     X_concated = np.vstack((ar1, generate_random_data(ar1, dist, count)))
#     X_concated = pd.DataFrame(X_concated, columns=X.columns)
#     new_Y = pd.concat([df_anomaly.loc[:, 'Anomaly'], pd.Series(np.ones((count,)).astype('bool'))], ignore_index=True)
#     return X_concated, new_Y 

# 시퀀스 데이터 생성 함수
def create_sequences(df, y,  seq_length):
    sequences = []
    labels = []
    for i in range(len(df) - seq_length):
        seq = df.iloc[i:i+seq_length].values
        label = y.iloc[i+seq_length]
        sequences.append(seq)
        labels.append(label)
    return np.array(sequences), np.array(labels)


# 증폭된 데이터 만들기    0.1, 10000건 증폭 
X_train, y_train = train_data_with_concat(df_anomaly_train, 0.1, 10000)
X_test, y_test = train_data_with_concat(df_anomaly_test, 0.1, 10000)
X_test_aug, y_test_aug = X_test, y_test 
print(f'증폭후 데이터:X_train:{X_train.shape},  y_train: {y_train.shape}, y_train :{np.bincount(y_train)} ') 
print(f'증폭후 데이터:X_test:{X_test.shape},  y_test: {y_test.shape}, y_test :{np.bincount(y_test)} ') 
# print(f'증폭후 데이터:X_test:{X_test.shape},  y_test: {y_test.shape}, y_test :{y_test} ') 



In [112]:
from imblearn.under_sampling import RandomUnderSampler

# 언더샘플링 테스트코드 
# x_resampled, y_resampled = RandomUnderSampler(random_state=0).fit_resample(X_train_origin, y_train_origin.astype('int'))
x_resampled, y_resampled = RandomUnderSampler(random_state=0).fit_resample(X_test, y_test.astype('int'))
y_resampled


# 언더 샘플링하는 함수를 만들어보자 
def make_undersample(x, y, sampling_strategy):
    x_resampled, y_resampled =  RandomUnderSampler(sampling_strategy=sampling_strategy, random_state=0).fit_resample(x, y)
    return x_resampled, y_resampled 
    

# make_undersample(X_test, y_test.astype('int'), {0:10000, 1:10000})
# make_undersample(X_train_origin, y_train_origin.astype('int'), {0:100, 1:100})

# 언더샘플링 적용 
X_train, y_train = make_undersample(X_train, y_train.astype('int'),  {0:10000, 1:10000})        
X_test, y_test = make_undersample(X_test, y_test.astype('int'),  {0:10000, 1:10000})        

print(f'언더샘플링후 데이터: X_train: {X_train.shape} y_train: {y_train.shape}, y_test 0/1 개수:{np.bincount(y_train)} sample strategy:{ {0:10000, 1:10000}}') 
print(f'언더샘플링후 데이터: X_test: {X_test.shape} y_test: {y_test.shape}, y_test 0/1 개수:{np.bincount(y_test)} sample strategy:{ {0:10000, 1:10000}}') 


In [113]:
# 시퀀스데이터 만들기 
X_train, y_train = create_sequences(X_train, y_train, N_TIMESTEPS)
X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 30))
y_train = to_categorical(y_train).astype(int)

X_test, y_test = create_sequences(X_test, y_test, N_TIMESTEPS)
X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], 30))
y_test = to_categorical(y_test).astype(int)

# 증폭 후 데이터
print(f'증폭후 데이터: X_train: {X_train.shape}, X_test:{X_test.shape} ') 
print(f'증폭후 데이터: y_train: {y_train.shape}, y_test:{y_test.shape} ') 


In [115]:
# 원본데이터로 시퀀스데이터 만들기(비교용, not augmentation)
X_train_origin = df_anomaly_train.iloc[:, :-1]  
X_test_origin = df_anomaly_test.iloc[:, :-1]  

X_train_origin, y_train_origin  = create_sequences(X, y, 12)
X_test_origin, y_test_origin  = create_sequences(X, y, 12)
y_train_origin2 = to_categorical(y_train_origin).astype(int)
y_test_origin2 = to_categorical(y_test_origin).astype(int)

print(f'  증강안한 원본데이터를 시퀀스 형태로 변경후 X_train_origin: {X_train_origin.shape}, y_test_origin :{y_train_origin.shape}')
print(f'  증강안한 원본데이터를 시퀀스 형태로 변경후 X_test_origin: {X_test_origin.shape}, y_test_origin :{y_test_origin.shape}')


## Define the XAI LSTM model

In [117]:
# lstm 모델 리턴하는 함수 
def xai_lstm_model():
 
    model2 = Sequential()
    model2.add(LSTM(32, input_shape=(N_TIMESTEPS, len(data_columns))))
    model2.add(Dropout(0.2))
    model2.add(Dense(2, activation='softmax'))

    optimizer = Adam(learning_rate=1e-4)
    #model2.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    model2.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    #print(model2.summary())
    return model2

model2 = xai_lstm_model()

In [120]:
# 증강한 데이터로 학습 
model2.fit(X_train, y_train, batch_size=100, epochs=100,
                validation_data=(X_test,  y_test), verbose=1)

In [121]:
y_pred = np.argmax(model2.predict(X_test), axis=1)
y_true = np.argmax(y_test, axis=1)

print(classification_report(y_true, y_pred))


In [122]:
# 증강 안한 데이터로 학습 

model_no_aug = xai_lstm_model()
model_no_aug.fit(X_train_origin, y_train_origin2, batch_size=100, epochs=10,
                validation_data=(X_test_origin,  y_test_origin2), verbose=1)

In [123]:
y_pred2 = np.argmax(model_no_aug.predict(X_test_origin), axis=1)
y_true2 = np.argmax(y_test_origin2, axis=1)

print(classification_report(y_true2, y_pred2))

In [None]:
### SMOTE로 oversampling 한 결과 모델링 및 결과. -> 이상함. 시계열데이터를 SMOTE기반으로 데이터 증강하는것은 이상함. 

# X_train_resampled, y_train_resampled 

# X_test_s, y_test_s = create_sequences(X_train_resampled, y_train_resampled, N_TIMESTEPS)
# X_test_s = X_test_s.reshape((X_test_s.shape[0], X_test_s.shape[1], 30))
# y_test_s2 = to_categorical(y_test_s).astype(int)

# # 증폭 후 데이터
# # print(f'증폭후 데이터: X_train: {X_train.shape}, X_test:{X_test.shape} ') 

# model_smote = xai_lstm_model()
# model_smote.fit(X_test_s, y_test_s2, batch_size=100, epochs=10,
#                 validation_data=(X_test_s,  y_test_s2), verbose=1)

# y_pred_s = np.argmax(model_smote.predict(X_test_s), axis=1)
# y_true_s = np.argmax(y_test_s2, axis=1)

# print(classification_report(y_true_s, y_pred_s))

### XAI

In [125]:
def lime(x, y, model, s): 
    explainer1 = lime_tabular.RecurrentTabularExplainer(x, training_labels=y, feature_names=data_columns,
                                                    discretize_continuous=True,
                                                    class_names=['False', 'True'],
                                                    discretizer='decile')
    exp = explainer1.explain_instance(x[0:1], model.predict, num_features=30, labels=(1,))
    # exp.visualize_to_file('lime_explanation'+s+'.png')
    fig = exp.as_pyplot_figure()
    # fig.subplots_adjust(left=0.4, right=0.6)
    fig.subplots_adjust(left=0.4)

    fig.savefig('lime_explanation_credit'+s+'.png')
    # plt.close(fig)
    
    temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png").name
    exp.show_in_notebook()       
    #image_result = exp.show_in_notebook()       
    #image_result.savefig('lime_explanation'+s+'2.png')
    plt.savefig(temp_file)
    plt.close(fig)
    Image(filename=temp_file)

lime(X_test, y_test, model2, str(0.1)+"_"+str(1000)+"_"+str(1000) +"_"+ str(1000))  # XAI 코드 수정할 예정


In [126]:
# no augmentation 
lime(X_test, y_test, model_no_aug, str(0.1)+"_"+str('no_aug')+"_"+str(1000) +"_"+ str(1000))  

### Visualization

In [127]:
plt.figure(figsize=(10, 6))

# 첫 번째 그래프
sns.kdeplot(data=df_anomaly.loc[df_anomaly['Anomaly'] == 0], x='Amount', label='Normal', fill=True)
sns.kdeplot(data=df_anomaly.loc[df_anomaly['Anomaly'] == 1], x='Amount', label='Anomaly', fill=True)

plt.title('Distribution of Sepal Length and Sepal Width')
plt.xlabel('Measurement')
plt.ylabel('Density')
plt.legend()

# 그래프 보여주기
plt.show()

In [128]:
df_anomaly.describe()

In [129]:
df_anomaly.loc[:, :'Amount'].plot(kind='box', figsize=(10, 8), rot=30)

#### Dicision boundry visualiation 

In [130]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from keras.models import Sequential
from keras.layers import LSTM, Dense

# 가상의 시퀀스 데이터 생성
# np.random.seed(42)
seq_length = 12
num_samples = 1000

# X = np.random.rand(num_samples, seq_length, 1)
# y = (np.sum(X, axis=1) > seq_length / 2).astype(int)

# LSTM 이진 분류 모델 정의
# model = Sequential()
# model.add(LSTM(10, input_shape=(seq_length, 1)))
# model.add(Dense(1, activation='sigmoid'))
# model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

# 모델 학습
# model.fit(X, y, epochs=10, batch_size=32)

# LSTM 모델의 출력을 얻기 위해 중간 레이어를 사용
intermediate_layer_model = Sequential()
intermediate_layer_model.add(LSTM(10, input_shape=(seq_length, 30), return_sequences=True))
intermediate_layer_model.add(LSTM(10))  # 마지막 시간 스텝의 출력만 사용
intermediate_output = intermediate_layer_model.predict(X_test)

# 중간 레이어의 출력을 2D로 PCA를 사용하여 차원 축소
pca = PCA(n_components=2)
reduced_output = pca.fit_transform(intermediate_output)

# Decision Boundary 시각화
plt.scatter(reduced_output[:, 0], reduced_output[:, 1], c=np.argmax(y_test, axis=1), cmap='viridis')
plt.title('Decision Boundary using PCA')
plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
plt.show()

### T-SNE 
- aug 하기 전과 후의 비교

In [None]:
# data augmentation 하기 전 결과 

# LSTM의 마지막 레이어에서 나오는 특성을 추출
layer_output = model2.layers[-2].output
feature_extraction_model = Model(inputs=model2.input, outputs=layer_output)
features = feature_extraction_model.predict(X_test_origin)

# t-SNE를 사용하여 시각화
tsne = TSNE(n_components=2, random_state=0)
tsne_features = tsne.fit_transform(features)

# 이진 분류 결과에 따라 다른 색으로 플로팅
plt.scatter(tsne_features[:, 0], tsne_features[:, 1], c=y_test_origin, cmap=plt.cm.Spectral)
plt.title("t-SNE Visualization of LSTM Decision Boundary without augmentation")
plt.show()




In [None]:
# data augmentation 하고 나서의 결과 
from sklearn.manifold import TSNE


# LSTM의 마지막 레이어에서 나오는 특성을 추출
layer_output = model2.layers[-2].output
feature_extraction_model = Model(inputs=model2.input, outputs=layer_output)
features = feature_extraction_model.predict(X_test)

# t-SNE를 사용하여 시각화
tsne = TSNE(n_components=2, random_state=0)
tsne_features = tsne.fit_transform(features)

# 이진 분류 결과에 따라 다른 색으로 플로팅
plt.scatter(tsne_features[:, 0], tsne_features[:, 1], c=np.argmax(y_test, axis=1), cmap=plt.cm.Spectral)
plt.title("t-SNE Visualization of LSTM Decision Boundary with aumentation data")
plt.show()

In [135]:
# data augmentation 하고 나서의 결과 
from sklearn.manifold import TSNE


# LSTM의 마지막 레이어에서 나오는 특성을 추출
layer_output = model2.layers[-2].output
feature_extraction_model = Model(inputs=model2.input, outputs=layer_output)
features = feature_extraction_model.predict(X_test)

# t-SNE를 사용하여 시각화
tsne = TSNE(n_components=2, random_state=0)
tsne_features = tsne.fit_transform(features)

# 이진 분류 결과에 따라 다른 색으로 플로팅
plt.scatter(tsne_features[:, 0], tsne_features[:, 1], c=np.argmax(y_test, axis=1), cmap=plt.cm.Spectral)
plt.title("t-SNE Visualization of LSTM Decision Boundary with aumentation data")
plt.show()

In [None]:
# LSTM의 마지막 레이어에서 나오는 특성을 추출 -> ERROR 남 
layer_output = model.layers[-1].output
feature_extraction_model = Model(inputs=model.input, outputs=layer_output)
features = feature_extraction_model.predict(X_test_aug.values.reshape(X_test_aug.shape[0], 1, X_test_aug.shape[1]))
print(features.shape)
print(features.reshape(features[0], features[2]))

# t-SNE를 사용하여 시각화
tsne = TSNE(n_components=2, random_state=0)
tsne_features = tsne.fit_transform(features.reshape(features[0], features[2]))

# 이진 분류 결과에 따라 다른 색으로 플로팅
plt.scatter(tsne_features[:, 0], tsne_features[:, 1], c=np.argmax(y_test, axis=1), cmap=plt.cm.Spectral)
plt.title("t-SNE Visualization of LSTMAE Decision Boundary")
plt.show()

In [None]:
X_test.shape

In [None]:
X_test_origin.reshape(X_test_origin.shape[0], 1, X_test_origin.shape[1]).shape

### LOF 테스트해보기

In [None]:
### LOF example 

import numpy as np
import matplotlib.pyplot as plt
from sklearn.neighbors import LocalOutlierFactor

# 가상의 데이터 생성
np.random.seed(42)
X_train = np.random.normal(0, 1, (200, 2))
X_outliers = np.random.uniform(low=-4, high=4, size=(10, 2))
X_train = np.vstack([X_train, X_outliers])

# LOF 모델 학습
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.1)
y_pred = lof.fit_predict(X_train)

# 이웃의 기여도를 표시하는 함수
def show_neighbor_contributions(X, y_pred, lof_model):
    for i, is_outlier in enumerate(y_pred == -1):  # 이상치인 경우만
        if is_outlier:
            instance = X[i].reshape(1, -1)
            neighbors_idx = lof_model.kneighbors(instance, return_distance=False)[0]
            
            # 각 이웃의 기여도
            neighbor_contributions = lof_model.negative_outlier_factor_[neighbors_idx]
            
            print(f"이상치 데이터 포인트 {i+1}에 대한 이웃의 기여도:")
            for neighbor, contribution in zip(neighbors_idx, neighbor_contributions):
                print(f"이웃 {neighbor} - 기여도: {contribution:.4f}")

# 이웃의 기여도 및 이상치 시각화 함수
def visualize_lof(X, y_pred, lof_model):
    inliers = X[y_pred == 1]
    outliers = X[y_pred == -1]
    
    plt.figure(figsize=(10, 6))
    plt.scatter(inliers[:, 0], inliers[:, 1], color='green', label='Inliers')
    plt.scatter(outliers[:, 0], outliers[:, 1], color='red', label='Outliers')
    
    plt.title('LOF anomal detection result 이상치 감지 결과')
    plt.legend()
    plt.show()

    # 이웃의 기여도 출력
    show_neighbor_contributions(X, y_pred, lof_model)

# 이상치 시각화 및 이웃의 기여도 출력
visualize_lof(X_train, y_pred, lof)
