In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class InterpretableTimeSeriesModel(nn.Module):
    def __init__(self, fundamental_period, n_channels, n_auxiliary_features):
        """
        Args:
            fundamental_period (int): 기본 주기 (예: 7일)
            n_channels (int): 컨볼루션 출력 채널 수
            n_auxiliary_features (int): 보조 특성 수 (주말여부, 이벤트, 가격변화 등)
        """
        super().__init__()
        self.n_channels = n_channels
        self.fundamental_period = fundamental_period
        
        # 단일 주기를 사용한 1D 컨볼루션
        # Input: (batch_size, 1, sequence_length)
        # Output: (batch_size, n_channels, sequence_length)
        self.conv = nn.Conv1d(
            in_channels=1, 
            out_channels=n_channels, 
            kernel_size=fundamental_period,
            stride=1,  # 모든 시점의 패턴을 분석하기 위해 stride=1 사용
            padding=fundamental_period-1  # 시퀀스 앞부분의 패턴도 캡쳐하기 위한 패딩
        )
        
        # 컨볼루션 가중치 해석을 위한 어텐션
        # Input: (batch_size, n_channels * sequence_length)
        # Output: (batch_size, n_channels)
        self.conv_attention = nn.Sequential(
            nn.Linear(n_channels * fundamental_period, n_channels),
            nn.Softmax(dim=-1)
        )
        
        # 특성 중요도 계산 레이어
        # Input: (batch_size, n_channels + n_auxiliary_features)
        # Output: (batch_size, n_channels + n_auxiliary_features)
        self.feature_importance = nn.Sequential(
            nn.Linear(n_channels + n_auxiliary_features, 
                     n_channels + n_auxiliary_features),
            nn.Sigmoid()
        )
        
        # 최종 예측 레이어
        # Input: (batch_size, n_channels + n_auxiliary_features)
        # Output: (batch_size, 1)
        self.predictor = nn.Sequential(
            nn.Linear(n_channels + n_auxiliary_features, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )
        
    def forward(self, sales, period_strength, auxiliary_features):
        """
        Args:
            sales (torch.Tensor): (batch_size, sequence_length)
            period_strength (torch.Tensor): (batch_size, 1) 주기성 강도
            auxiliary_features (torch.Tensor): (batch_size, n_auxiliary_features)
            
        Returns:
            prediction (torch.Tensor): (batch_size, 1)
            conv_weights (torch.Tensor): (batch_size, n_channels)
            feature_weights (torch.Tensor): (batch_size, n_channels + n_auxiliary_features)
        """
        batch_size = sales.shape[0]
        
        # 1D 컨볼루션 적용
        conv_out = self.conv(sales.unsqueeze(1))  # (batch_size, n_channels, sequence_length)
        
        # 주기성 강도로 가중치 적용
        conv_out = conv_out * period_strength.view(batch_size, 1, 1)
        
        # 컨볼루션 패턴의 중요도 계산
        # 기본 주기만큼의 패턴만 분석 (최근 데이터)
        recent_patterns = conv_out[:, :, -self.fundamental_period:]
        conv_weights = self.conv_attention(recent_patterns.flatten(1))
        weighted_conv = conv_out * conv_weights.unsqueeze(-1)
        
        # 시퀀스를 통합하여 각 채널의 최종 특성 추출
        pooled_features = F.adaptive_avg_pool1d(weighted_conv, 1).squeeze(-1)
        
        # 보조 특성과 결합
        combined_features = torch.cat([pooled_features, auxiliary_features], dim=1)
        
        # 특성 중요도 계산
        feature_weights = self.feature_importance(combined_features)
        weighted_features = combined_features * feature_weights
        
        # 최종 예측
        prediction = self.predictor(weighted_features)
        
        return prediction, conv_weights, feature_weights

class InterpretabilityAnalyzer:
    def __init__(self, model):
        self.model = model
        
    def analyze_contribution(self, sales, period_strength, auxiliary_features):
        """
        예측에 대한 각 컴포넌트의 기여도 분석
        
        Returns:
            dict: {
                'prediction': 예측값,
                'conv_pattern_contributions': 컨볼루션 패턴별 기여도,
                'auxiliary_contributions': 보조 특성별 기여도
            }
        """
        with torch.no_grad():
            pred, conv_weights, feature_weights = self.model(
                sales, period_strength, auxiliary_features)
            
            # 컨볼루션 특성과 보조 특성의 중요도 분리
            conv_importance = feature_weights[:, :self.model.n_channels]
            aux_importance = feature_weights[:, self.model.n_channels:]
            
            # 컨볼루션 패턴의 기여도 분석
            pattern_contributions = {}
            for i, weight in enumerate(conv_weights[0]):
                pattern_contributions[f'pattern_{i}'] = float(weight)
            
            # 보조 특성의 기여도 분석
            aux_contributions = {
                'is_weekend': float(aux_importance[0, 0]),
                'cultural_event': float(aux_importance[0, 1]),
                'national_event': float(aux_importance[0, 2]),
                'religious_event': float(aux_importance[0, 3]),
                'sporting_event': float(aux_importance[0, 4]),
                'price_change': float(aux_importance[0, 5])
            }
            
            return {
                'prediction': float(pred[0]),
                'conv_pattern_contributions': pattern_contributions,
                'auxiliary_contributions': aux_contributions
            }

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, Model

class ProductSpecificConv1D(layers.Layer):
    def __init__(self, n_products, fundamental_period, n_channels):
        """
        각 제품별로 독립적인 1D 컨볼루션을 수행하는 레이어
        
        Args:
            n_products: 전체 제품 수
            fundamental_period: 기본 주기 (예: 7일)
            n_channels: 컨볼루션 출력 채널 수
        """
        super(ProductSpecificConv1D, self).__init__()
        self.n_products = n_products
        self.fundamental_period = fundamental_period
        self.n_channels = n_channels
        
        # 각 제품별 독립적인 컨볼루션 레이어
        self.conv_layers = [
            layers.Conv1D(
                filters=n_channels,
                kernel_size=fundamental_period,
                padding='same'
            ) for _ in range(n_products)
        ]
    
    def call(self, inputs, product_ids):
        batch_size = tf.shape(inputs)[0]
        outputs = []
        
        # 각 배치 항목에 대해 해당 제품의 컨볼루션 레이어 적용
        for i in range(batch_size):
            prod_id = product_ids[i]
            conv_out = self.conv_layers[prod_id](tf.expand_dims(inputs[i], 0))
            outputs.append(conv_out)
            
        return tf.concat(outputs, axis=0)

class DomainKnowledgeWeights(layers.Layer):
    def __init__(self, n_auxiliary_features):
        """도메인 지식 기반의 가중치 계산 레이어"""
        super(DomainKnowledgeWeights, self).__init__()
        self.n_auxiliary_features = n_auxiliary_features
        
        # 보조 특성의 베이스라인 중요도
        self.base_importance = self.add_weight(
            shape=(n_auxiliary_features,),
            initializer='ones',
            trainable=True,
            name='base_importance'
        )
    
    def call(self, auxiliary_features, period_strength):
        # 비-제로 비율 계산
        non_zero_ratio = tf.reduce_mean(
            tf.cast(auxiliary_features != 0, tf.float32),
            axis=0
        )
        
        # 기본 중요도와 비-제로 비율 결합
        importance = self.base_importance * non_zero_ratio
        
        # Softmax로 정규화
        importance = tf.nn.softmax(importance)
        
        return tf.tile(tf.expand_dims(importance, 0), [tf.shape(auxiliary_features)[0], 1])

class InterpretableTimeSeriesModel(Model):
    def __init__(self, n_products, fundamental_period, n_channels, n_auxiliary_features):
        super(InterpretableTimeSeriesModel, self).__init__()
        self.n_channels = n_channels
        self.fundamental_period = fundamental_period
        
        # 제품별 독립적인 1D 컨볼루션
        self.conv = ProductSpecificConv1D(
            n_products=n_products,
            fundamental_period=fundamental_period,
            n_channels=n_channels
        )
        
        # 도메인 지식 기반 가중치
        self.domain_weights = DomainKnowledgeWeights(n_auxiliary_features)
        
        # 예측 레이어
        self.predictor = tf.keras.Sequential([
            layers.Dense(64, activation='relu'),
            layers.Dense(1)
        ])
        
        # 컨볼루션 활성화 저장용
        self.conv_activations = None
    
    def call(self, inputs, training=None):
        sales, product_ids, period_strength, auxiliary_features = inputs
        
        # 제품별 1D 컨볼루션 적용
        conv_out = self.conv(sales, product_ids)
        
        # 주기성 강도 적용
        conv_out = conv_out * tf.expand_dims(tf.expand_dims(period_strength, -1), -1)
        
        # 컨볼루션 활성화 저장 (해석용)
        self.conv_activations = conv_out
        
        # 특성 추출 (시간 차원 보존)
        feature_maps = conv_out  # (batch_size, sequence_length, n_channels)
        
        # 도메인 지식 기반 가중치 계산
        feature_weights = self.domain_weights(auxiliary_features, period_strength)
        
        # 보조 특성 가중치 적용
        weighted_aux_features = auxiliary_features * feature_weights
        
        # 시간 차원에 대한 평균
        pooled_conv = tf.reduce_mean(feature_maps, axis=1)
        
        # 특성 결합
        combined_features = tf.concat([pooled_conv, weighted_aux_features], axis=1)
        
        # 예측
        prediction = self.predictor(combined_features)
        
        return prediction, feature_weights, feature_maps

def weighted_loss(y_true, y_pred, sparse_mask, period_strength, alpha=0.5, beta=0.3):
    """
    희소 데이터 발생 시점과 주기성 강도를 고려한 가중치 손실 함수
    
    Args:
        y_true: 실제 값
        y_pred: 예측 값
        sparse_mask: 희소 데이터 발생 마스크
        period_strength: 주기성 강도
        alpha: 희소 데이터 가중치 계수
        beta: 주기성 강도 가중치 계수
    """
    # 기본 손실
    base_loss = tf.keras.losses.mean_squared_error(y_true, y_pred)
    
    # 가중치 계산
    weights = tf.ones_like(sparse_mask)
    
    # 희소 데이터 발생 시점 가중치
    weights = weights + (sparse_mask * alpha)
    
    # 주기 강도 가중치
    weights = weights * (1 + period_strength * beta)
    
    # 가중치 정규화
    weights = weights / tf.reduce_mean(weights)
    
    # 가중치 적용된 손실
    weighted_loss = base_loss * weights
    return tf.reduce_mean(weighted_loss)

class TimeSeriesAnalyzer:
    def __init__(self, model):
        self.model = model
    
    def analyze_contribution(self, sales, product_ids, period_strength, auxiliary_features):
        """예측에 대한 각 컴포넌트의 기여도 분석"""
        prediction, feature_weights, feature_maps = self.model([
            sales, product_ids, period_strength, auxiliary_features
        ])
        
        # 시간별 활성화 점수 계산
        temporal_importance = tf.reduce_mean(feature_maps, axis=-1)  # (batch_size, sequence_length)
        
        # 컨볼루션 필터별 중요도
        filter_importance = tf.reduce_mean(feature_maps, axis=[0, 1])  # (n_channels,)
        
        aux_contributions = {
            'is_weekend': float(feature_weights[0, 0].numpy()),
            'cultural_event': float(feature_weights[0, 1].numpy()),
            'national_event': float(feature_weights[0, 2].numpy()),
            'religious_event': float(feature_weights[0, 3].numpy()),
            'sporting_event': float(feature_weights[0, 4].numpy()),
            'price_change': float(feature_weights[0, 5].numpy())
        }
        
        return {
            'prediction': float(prediction[0].numpy()),
            'period_strength': float(period_strength[0].numpy()),
            'auxiliary_contributions': aux_contributions,
            'temporal_importance': temporal_importance.numpy(),
            'filter_importance': filter_importance.numpy()
        }

In [6]:
import pickle
import numpy as np
import pandas as pd
import tensorflow as tf

In [5]:
### 데이터 로드

# 판매 시작 시점 
with open('../data/preprocessed/first_sales_column_dict.pkl', 'rb') as f:
    first_sales_column_dict = pickle.load(f)

# 판매량 데이터 (아이템별 상이)
detrended_sales = pd.read_csv("../data/loess/detrended_sales.csv")
# 판매량 데이터의 기본 주기, 주기 세기 (아이템별 상이)
with open('../data/fourier/results.pkl', 'rb') as f:
    fourier_results = pickle.load(f)

# 판매 가격 데이터 (아이템별 상이)
log_differenced_sell_prices = pd.read_csv("../data/log_differencing/log_differenced_sell_prices.csv")
# 주말 여부 데이터 (1941일)
is_weekend = pd.read_csv("../data/preprocessed/is_weekend.csv")
# 이벤트 데이터 (1941일)
event_type_cultural = pd.read_csv("../data/preprocessed/event_type_cultural.csv")
event_type_national = pd.read_csv("../data/preprocessed/event_type_national.csv")
event_type_religious = pd.read_csv("../data/preprocessed/event_type_religious.csv")
event_type_sporting = pd.read_csv("../data/preprocessed/event_type_sporting.csv")

In [16]:
def slice_each_item_based_on_first_sales(state_item_id, start_day, sequence_length=None):
    """
    아이템 ID와 시작 시점을 기준으로 데이터를 슬라이싱.
    
    Args:
        item_id: 아이템 ID.
        start_day: 판매 시작 시점.
        sequence_length: 슬라이싱할 시퀀스 길이 (None이면 끝까지).
        
    Returns:
        아이템별 판매량, 가격, 주말, 이벤트 데이터를 슬라이싱한 결과.
    """
    # 주 ID, 아이템 ID
    state_id, item_id = state_item_id[0], state_item_id[1]

    # 판매량 데이터
    sales_data = detrended_sales[(detrended_sales['state_id']==state_id) & (detrended_sales['item_id']==item_id)][start_day:]

    # 가격 데이터
    price_data = log_differenced_sell_prices[item_id][start_day:]
    
    # 주말 데이터 
    weekend_data = is_weekend[start_day:]

    # 이벤트 데이터
    cultural_data = event_type_cultural[start_day:]
    national_data = event_type_national[start_day:]
    religious_data = event_type_religious[start_day:]
    sporting_data = event_type_sporting[start_day:]
    
    # 슬라이싱
    if sequence_length:
        sales_data = sales_data[:sequence_length]
        price_data = price_data[:sequence_length]
        weekend_data = weekend_data[:sequence_length]
        cultural_data = cultural_data[:sequence_length]
        national_data = national_data[:sequence_length]
        religious_data = religious_data[:sequence_length]
        sporting_data = sporting_data[:sequence_length]
    
    # 출력
    return {
        "sales": sales_data,
        "price": price_data,
        "weekend": weekend_data,
        "cultural": cultural_data,
        "national": national_data,
        "religious": religious_data,
        "sporting": sporting_data,
    }

In [None]:
def create_data_generator(sequence_length=None):
    """
    데이터를 아이템별로 제너레이터 형태로 반환.
    
    Args:
        sequence_length: 슬라이싱할 시퀀스 길이.
        
    Yields:
        아이템별 데이터를 딕셔너리 형태로 반환.
    """
    for state_item_id, first_sales_day in first_sales_column_dict.items():
        item_data = slice_each_item_based_on_first_sales(state_item_id, first_sales_day, sequence_length)
        yield (
            {
                "sales": tf.convert_to_tensor(item_data["sales"], dtype=tf.float32),
                "price": tf.convert_to_tensor(item_data["price"], dtype=tf.float32),
                "weekend": tf.convert_to_tensor(item_data["weekend"], dtype=tf.float32),
                "cultural": tf.convert_to_tensor(item_data["cultural"], dtype=tf.float32),
                "national": tf.convert_to_tensor(item_data["national"], dtype=tf.float32),
                "religious": tf.convert_to_tensor(item_data["religious"], dtype=tf.float32),
                "sporting": tf.convert_to_tensor(item_data["sporting"], dtype=tf.float32),
            },
            tf.convert_to_tensor(0.0, dtype=tf.float32),  # 예제 라벨 (추후 수정 가능)
        )

룩백 윈도우 크기와 예측 크기를 조합해 최적의 성능을 확인
가장 큰 주기로

In [None]:
data = np.sin(np.linspace(0, 100, 1000))  # 주기적인 데이터
lookback = 65  # 룩백 윈도우 크기
X, y = [], []
for i in range(len(data) - lookback):
    X.append(data[i:i+lookback])
    y.append(data[i+lookback])
X, y = np.array(X), np.array(y)
