In [None]:
# 기본 Import
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import itertools
import torch
import torch.nn as nn

# 학습에 사용되는 자잘한 것들
from sklearn.preprocessing import LabelEncoder, StandardScaler

In [None]:
class BaseModel(nn.Module):
    """
    모델 구조 수정 금지.
    """
    def __init__(self, encoding_dim, cat_features, num_features, num_classes, cat_cardinalities):
        super(BaseModel, self).__init__()
        # cat_cardinalities는 각 범주형 변수의 고유값 개수 리스트
        self.cat_embeddings = nn.ModuleList([nn.Embedding(cardinality, 5) for cardinality in cat_cardinalities])
        self.fc_cat = nn.Linear(len(cat_features) * 5 + len(num_features), 64)
        self.encoder = nn.Sequential(
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, encoding_dim),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.Linear(encoding_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
        )
        self.classifier = nn.Sequential(
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, num_classes)
        )

    def forward(self, x_cat, x_num):
        # Apply embedding layers
        embeddings = [emb(x_cat[:, i]) for i, emb in enumerate(self.cat_embeddings)]
        #print('len(embeddings : )',len(embeddings))
        #print('len(x_num) : ',len(x_num))
        x = torch.cat(embeddings + [x_num], dim=1)
        #print('len(x) : ',len(x))
        x = self.fc_cat(x)
        encoded = self.encoder(x)
        out = self.classifier(encoded)
        # print(out)
        return out


In [None]:
def encode_and_standardize_data(data, mode):
    label_encoders = {}
    categorical_columns_train = ['Card Brand', 'Card Type', 'Card Number', 'Expires', 'Acct Open Date', 'Is Fraud?', 'Error Message']
    categorical_columns_test = ['Card Brand', 'Card Type', 'Card Number', 'Expires', 'Acct Open Date', 'Error Message']
    data['Error Message'] = data['Error Message'].fillna('None')
    categorical_columns = categorical_columns_train if mode == 'Train' else categorical_columns_test

    cat_cardinalities = []
    for col in categorical_columns:
        le = LabelEncoder()
        data[col] = le.fit_transform(data[col])
        label_encoders[col] = le
        cat_cardinalities.append(data[col].nunique())

    data['Zipcode'] = (data['Zipcode'] // 100).astype(int)
    le_zipcode = LabelEncoder()
    data['Zipcode'] = le_zipcode.fit_transform(data['Zipcode'])
    cat_cardinalities.append(data['Zipcode'].nunique())

    data['Merchandise Code'] = (data['Merchandise Code'] // 100).astype(int)
    le_merchandise_code = LabelEncoder()
    data['Merchandise Code'] = le_merchandise_code.fit_transform(data['Merchandise Code'])
    cat_cardinalities.append(data['Merchandise Code'].nunique())

    data['Has Chip'] = np.where(data['Has Chip'] == True, 1, 0)
    cat_cardinalities.append(data['Has Chip'].nunique())

    data['Birth Year'] = data['Birth Year'] - data['Birth Year'].min()
    data['Year PIN last Changed'] = data['Year PIN last Changed'] - data['Year PIN last Changed'].min()

    # Continuous columns for StandardScaler
    continuous_columns = [
        'Current Age', 'Retirement Age', 'Birth Year', 'Birth Month', 'Per Capita Income - Zipcode',
        'Yearly Income', 'Total Debt', 'Credit Score', 'Credit Limit', 'Year', 'Month', 'Day', 'Amount'
    ]
    scaler = StandardScaler()
    data[continuous_columns] = scaler.fit_transform(data[continuous_columns])

    # Identify categorical and numerical features
    categorical_columns += ['Zipcode', 'Merchandise Code', 'Has Chip']
    cat_features = data[categorical_columns].astype(int)  # Ensure categorical features are integer
    num_features = data[continuous_columns]

    return cat_features, num_features, cat_cardinalities


In [None]:
# 데이터 전처리
train_data = pd.read_csv('data/train.csv')
test_data = pd.read_csv('data/test.csv')

x_cat_train, x_num_train, cat_cardinalities_train = encode_and_standardize_data(train_data, mode='Train')
x_cat_test, x_num_test, cat_cardinalities_test = encode_and_standardize_data(test_data, mode='Test')

In [None]:
print(x_cat_test.iloc[1], x_num_test.iloc[1], cat_cardinalities_test,sep='\n')

In [None]:
# Torch tensor로 변환
x_cat_train_tensor = torch.tensor(x_cat_train.values, dtype=torch.long)  # 정수형
x_num_train_tensor = torch.tensor(x_num_train.values, dtype=torch.float32)  # 실수형

x_cat_test_tensor = torch.tensor(x_cat_test.values, dtype=torch.long)
x_num_test_tensor = torch.tensor(x_num_test.values, dtype=torch.float32)


In [None]:
num_classes = 2  # 예: Is Fraud? 이진 분류
encoding_dim = 64 # 이게 이제 

model = BaseModel(encoding_dim=encoding_dim, cat_features=x_cat_train.columns, num_features=x_num_train.columns, num_classes=num_classes, cat_cardinalities=cat_cardinalities_train)

# 모델 출력 테스트
output = model(x_cat_train_tensor, x_num_train_tensor)
print(output)



In [None]:
import torch.optim as optim
from sklearn.model_selection import train_test_split

# Train-Test Split
x_cat_train_split, x_cat_val_split, x_num_train_split, x_num_val_split = train_test_split(
    x_cat_train_tensor, x_num_train_tensor, test_size=0.2, random_state=42
)

# Dataset 및 DataLoader 정의
train_dataset = torch.utils.data.TensorDataset(x_cat_train_split, x_num_train_split)
val_dataset = torch.utils.data.TensorDataset(x_cat_val_split, x_num_val_split)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=64, shuffle=False)

# Loss Function, Optimizer 정의
criterion = nn.CrossEntropyLoss()  # 다중 클래스 분류의 경우
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train Loop 구현
def train_model(model, train_loader, val_loader, num_epochs=10):
    model.train()  # 모델을 학습 모드로 설정
    for epoch in range(num_epochs):
        total_loss = 0
        for x_cat_batch, x_num_batch in train_loader:
            optimizer.zero_grad()  # 기울기 초기화
            outputs = model(x_cat_batch, x_num_batch)
            loss = criterion(outputs, torch.randint(0, 2, (x_cat_batch.size(0),)))  # 임시 타겟 (예: binary class)
            loss.backward()  # 역전파
            optimizer.step()  # 가중치 업데이트
            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss:.4f}")

        # Validation Loop
        model.eval()  # 모델을 평가 모드로 설정
        val_loss = 0
        with torch.no_grad():
            for x_cat_batch, x_num_batch in val_loader:
                outputs = model(x_cat_batch, x_num_batch)
                loss = criterion(outputs, torch.randint(0, 2, (x_cat_batch.size(0),)))  # 임시 타겟
                val_loss += loss.item()
        print(f"Validation Loss: {val_loss:.4f}")
        model.train()  # 다시 학습 모드로 전환


In [None]:
# 모델 초기화
model = BaseModel(
    encoding_dim=64,
    cat_features=x_cat_train.columns,
    num_features=x_num_train.columns,
    num_classes=2,  # 이진 분류
    cat_cardinalities=cat_cardinalities_train
)

# 학습 실행
train_model(model, train_loader, val_loader, num_epochs=10)

# Test Data 평가
model.eval()
with torch.no_grad():
    outputs = model(x_cat_test_tensor, x_num_test_tensor)
    print("Test Outputs:", outputs)
