In [67]:
!python -m ipykernel install --user --name=myenv

Installed kernelspec myenv in C:\Users\arman\AppData\Roaming\jupyter\kernels\myenv


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, mean_squared_error
import matplotlib.pyplot as plt

# تنظیمات پایه
SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)

# پارامترهای مدل
SEQ_LENGTH = 60
BATCH_SIZE = 64
EPOCHS = 50  # کاهش تعداد دوره‌ها
HIDDEN_SIZE = 64

# کلاس Dataset اصلاح شده
class PriceDataset(Dataset):
    def __init__(self, sequences, targets):
        self.sequences = sequences
        self.targets = targets
        
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        return (
            torch.FloatTensor(self.sequences[idx]),
            torch.FloatTensor([self.targets[idx]]).squeeze()  # اصلاح شکل تنسور هدف
        )

# مدل LSTM پایدار
class StableLSTM(nn.Module):
    def __init__(self, input_size):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=HIDDEN_SIZE,
            num_layers=1,
            batch_first=True,
            dropout=0.3
        )
        self.fc = nn.Sequential(
            nn.Linear(HIDDEN_SIZE, 32),
            nn.ReLU(),
            nn.Linear(32, 1)
        )
        
    def forward(self, x):
        out, _ = self.lstm(x)
        return self.fc(out[:, -1, :]).squeeze()

# پیش‌پردازش داده با کنترل خطا
def preprocess_data(filepath):
    try:
        df = pd.read_csv(filepath, sep='\t')
        
        # تبدیل تاریخ و زمان
        df['datetime'] = pd.to_datetime(df['<DATE>'] + ' ' + df['<TIME>'], 
                                      format='%Y.%m.%d %H:%M:%S')
        df = df.sort_values('datetime')
        
        # انتخاب و نامگذاری ستون‌ها
        price_data = df[['<OPEN>', '<HIGH>', '<LOW>', '<CLOSE>', '<VOL>']]
        price_data.columns = ['open', 'high', 'low', 'close', 'volume']
        
        # محاسبه اندیکاتورهای ساده
        price_data['returns'] = price_data['close'].pct_change()
        price_data['volatility'] = price_data['high'] - price_data['low']
        price_data = price_data.dropna()
        
        # مقیاس‌گذاری
        scaler = StandardScaler()
        scaled_data = scaler.fit_transform(price_data)
        
        return scaled_data, scaler, price_data.columns
    
    except Exception as e:
        print(f"خطا در پردازش داده: {str(e)}")
        return None, None, None

# ایجاد دنباله‌ها با کنترل شکل داده
def create_sequences(data, targets, seq_length):
    try:
        sequences = []
        final_targets = []
        for i in range(len(data)-seq_length):
            sequences.append(data[i:i+seq_length])
            final_targets.append(targets[i+seq_length])
        return np.array(sequences), np.array(final_targets).reshape(-1, 1)  # اصلاح شکل targets
    except Exception as e:
        print(f"خطا در ایجاد دنباله‌ها: {str(e)}")
        return None, None

# محاسبه معیارها با کنترل ابعاد
def calculate_metrics(preds, trues, scaler, close_idx=3):
    try:
        # اطمینان از شکل صحیح داده‌ها
        preds = np.array(preds).reshape(-1)
        trues = np.array(trues).reshape(-1)
        
        dummy = np.zeros((len(preds), scaler.n_features_in_))
        dummy[:, close_idx] = preds
        pred_prices = scaler.inverse_transform(dummy)[:, close_idx]
        
        dummy[:, close_idx] = trues
        true_prices = scaler.inverse_transform(dummy)[:, close_idx]
        
        # محاسبه دقت جهتدار
        direction_acc = np.mean(np.sign(pred_prices[1:]-pred_prices[:-1]) == 
                              np.sign(true_prices[1:]-true_prices[:-1])) * 100
        mae = mean_absolute_error(true_prices, pred_prices)
        rmse = np.sqrt(mean_squared_error(true_prices, pred_prices))
        
        return direction_acc, mae, rmse, pred_prices, true_prices
    
    except Exception as e:
        print(f"خطا در محاسبه معیارها: {str(e)}")
        return 0, 0, 0, [], []

# آموزش مدل با گزارش‌دهی پیشرفته
def train_and_report(model, train_loader, test_loader, scaler):
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()
    
    history = {
        'train_loss': [],
        'test_loss': [],
        'train_acc': [],
        'test_acc': [],
        'train_mae': [],
        'test_mae': []
    }
    
    for epoch in range(EPOCHS):
        # آموزش
        model.train()
        train_loss = 0
        train_preds, train_trues = [], []
        
        for X, y in train_loader:
            optimizer.zero_grad()
            preds = model(X)
            loss = criterion(preds, y)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            train_preds.extend(preds.detach().numpy())
            train_trues.extend(y.numpy())
        
        # ارزیابی آموزش
        train_acc, train_mae, _, _, _ = calculate_metrics(
            train_preds, 
            train_trues,
            scaler
        )
        
        # اعتبارسنجی
        model.eval()
        test_loss = 0
        test_preds, test_trues = [], []
        with torch.no_grad():
            for X, y in test_loader:
                preds = model(X)
                test_loss += criterion(preds, y).item()
                test_preds.extend(preds.numpy())
                test_trues.extend(y.numpy())
        
        # ارزیابی آزمون
        test_acc, test_mae, test_rmse, preds, trues = calculate_metrics(
            test_preds,
            test_trues,
            scaler
        )
        
        # ذخیره نتایج
        history['train_loss'].append(train_loss/len(train_loader))
        history['test_loss'].append(test_loss/len(test_loader))
        history['train_acc'].append(train_acc)
        history['test_acc'].append(test_acc)
        history['train_mae'].append(train_mae)
        history['test_mae'].append(test_mae)
        
        # چاپ گزارش
        print(f'\nEpoch {epoch+1}/{EPOCHS}')
        print(f'Train Loss: {history["train_loss"][-1]:.4f} | Test Loss: {history["test_loss"][-1]:.4f}')
        print(f'Train Acc: {train_acc:.2f}% | Test Acc: {test_acc:.2f}%')
        print(f'Train MAE: {train_mae:.2f} | Test MAE: {test_mae:.2f}')
        print(f'Test RMSE: {test_rmse:.2f}')
        
        # نمایش نمونه پیش‌بینی‌ها
        if epoch % 10 == 0 or epoch == EPOCHS-1:
            plt.figure(figsize=(12, 6))
            plt.plot(trues[:100], label='Actual Prices')
            plt.plot(preds[:100], label='Predicted Prices')
            plt.title(f'Epoch {epoch+1} - Sample Predictions')
            plt.legend()
            plt.show()
    
    # نمایش نمودارهای یادگیری
    plt.figure(figsize=(15, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Train Loss')
    plt.plot(history['test_loss'], label='Test Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(history['train_acc'], label='Train Accuracy')
    plt.plot(history['test_acc'], label='Test Accuracy')
    plt.title('Directional Accuracy')
    plt.legend()
    
    plt.tight_layout()
    plt.show()
    
    return history

if __name__ == "__main__":
    # بارگذاری و پیش‌پردازش داده
    data, scaler, cols = preprocess_data('XAUUSD-VIP_M30.txt')
    if data is None:
        exit()
    
    close_idx = list(cols).index('close')
    sequences, targets = create_sequences(data, data[:, close_idx], SEQ_LENGTH)
    if sequences is None:
        exit()
    
    # تقسیم داده
    tscv = TimeSeriesSplit(n_splits=3)
    for train_idx, test_idx in tscv.split(sequences):
        train_seq, test_seq = sequences[train_idx], sequences[test_idx]
        train_tgt, test_tgt = targets[train_idx], targets[test_idx]
    
    # ایجاد DataLoader
    train_loader = DataLoader(
        PriceDataset(train_seq, train_tgt),
        batch_size=BATCH_SIZE,
        shuffle=True
    )
    test_loader = DataLoader(
        PriceDataset(test_seq, test_tgt),
        batch_size=BATCH_SIZE
    )
    
    # ایجاد و آموزش مدل
    model = StableLSTM(input_size=data.shape[1])
    history = train_and_report(model, train_loader, test_loader, scaler)

ValueError: could not broadcast input array from shape (10896,1) into shape (10896,)