In [1]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
from sklearn.preprocessing import MinMaxScaler
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

class FinancialDataAugmentor:
    def __init__(self, df):
        """
        Initialize the augmentor with the original dataset
        
        Parameters:
        df: pandas DataFrame with columns ['Date', 'Adj Close', 'Close', 'High', 'Low', 'Open', 'Volume']
        """
        self.original_df = df.copy()
        self.df = df.copy()
        self.price_columns = ['Adj Close', 'Close', 'High', 'Low', 'Open']
        self.volume_column = 'Volume'
        self.date_column = 'Date'
        
        # Calculate statistics for synthetic generation
        self._calculate_statistics()
    
    def _calculate_statistics(self):
        """Calculate statistical properties of the original data"""
        # Price statistics
        self.price_stats = {}
        for col in self.price_columns:
            self.price_stats[col] = {
                'mean': self.df[col].mean(),
                'std': self.df[col].std(),
                'min': self.df[col].min(),
                'max': self.df[col].max()
            }
        
        # Volume statistics
        self.volume_stats = {
            'mean': self.df[self.volume_column].mean(),
            'std': self.df[self.volume_column].std(),
            'min': self.df[self.volume_column].min(),
            'max': self.df[self.volume_column].max()
        }
        
        # Calculate daily returns and volatility
        self.df['returns'] = self.df['Close'].pct_change()
        self.returns_mean = self.df['returns'].mean()
        self.returns_std = self.df['returns'].std()
        
        # Calculate correlation matrix for price relationships
        self.price_corr = self.df[self.price_columns].corr()
    
    def noise_injection(self, noise_factor=0.02, samples=500):
        """
        Add Gaussian noise to existing data points
        
        Parameters:
        noise_factor: float, standard deviation of noise as fraction of data std
        samples: int, number of synthetic samples to generate
        """
        synthetic_data = []
        
        for _ in range(samples):
            # Randomly select a base row
            base_idx = random.randint(0, len(self.df) - 1)
            base_row = self.df.iloc[base_idx].copy()
            
            # Add noise to price columns
            for col in self.price_columns:
                noise = np.random.normal(0, self.price_stats[col]['std'] * noise_factor)
                base_row[col] = max(0, base_row[col] + noise)  # Ensure positive prices
            
            # Add noise to volume
            volume_noise = np.random.normal(0, self.volume_stats['std'] * noise_factor)
            base_row[self.volume_column] = max(1, int(base_row[self.volume_column] + volume_noise))
            
            # Ensure price relationships are maintained (High >= Low, etc.)
            base_row = self._fix_price_relationships(base_row)
            
            synthetic_data.append(base_row)
        
        return pd.DataFrame(synthetic_data)
    
    def interpolation_augmentation(self, samples=300):
        """
        Generate synthetic data by interpolating between existing data points
        """
        synthetic_data = []
        
        for _ in range(samples):
            # Select two random rows
            idx1, idx2 = random.sample(range(len(self.df)), 2)
            row1 = self.df.iloc[idx1]
            row2 = self.df.iloc[idx2]
            
            # Random interpolation weight
            alpha = random.uniform(0.2, 0.8)
            
            new_row = row1.copy()
            
            # Interpolate price columns
            for col in self.price_columns:
                new_row[col] = alpha * row1[col] + (1 - alpha) * row2[col]
            
            # Interpolate volume
            new_row[self.volume_column] = int(alpha * row1[self.volume_column] + (1 - alpha) * row2[self.volume_column])
            
            # Fix price relationships
            new_row = self._fix_price_relationships(new_row)
            
            synthetic_data.append(new_row)
        
        return pd.DataFrame(synthetic_data)
    
    def bootstrap_sampling(self, samples=400):
        """
        Generate synthetic data using bootstrap sampling with slight modifications
        """
        synthetic_data = []
        
        for _ in range(samples):
            # Bootstrap sample
            base_row = self.df.sample(n=1).iloc[0].copy()
            
            # Add small random variations
            for col in self.price_columns:
                variation = np.random.normal(0, self.price_stats[col]['std'] * 0.01)
                base_row[col] = max(0, base_row[col] + variation)
            
            # Volume variation
            volume_variation = np.random.normal(0, self.volume_stats['std'] * 0.01)
            base_row[self.volume_column] = max(1, int(base_row[self.volume_column] + volume_variation))
            
            # Fix price relationships
            base_row = self._fix_price_relationships(base_row)
            
            synthetic_data.append(base_row)
        
        return pd.DataFrame(synthetic_data)
    
    def trend_based_generation(self, samples=350):
        """
        Generate synthetic data based on historical trends and patterns
        """
        synthetic_data = []
        
        # Calculate moving averages for trend
        self.df['ma_5'] = self.df['Close'].rolling(window=5).mean()
        self.df['ma_20'] = self.df['Close'].rolling(window=20).mean()
        
        for _ in range(samples):
            # Select a base period
            base_idx = random.randint(20, len(self.df) - 1)
            base_row = self.df.iloc[base_idx].copy()
            
            # Calculate trend direction
            trend = self.df.iloc[base_idx]['ma_5'] - self.df.iloc[base_idx]['ma_20']
            trend_factor = np.tanh(trend / self.df.iloc[base_idx]['Close'])  # Normalize trend
            
            # Generate new prices based on trend
            for col in self.price_columns:
                base_price = base_row[col]
                trend_adjustment = base_price * trend_factor * random.uniform(-0.02, 0.02)
                noise = np.random.normal(0, self.price_stats[col]['std'] * 0.015)
                base_row[col] = max(0, base_price + trend_adjustment + noise)
            
            # Volume adjustment based on price movement
            price_change = (base_row['Close'] - base_row['Open']) / base_row['Open']
            volume_multiplier = 1 + abs(price_change) * random.uniform(0.1, 0.3)
            base_row[self.volume_column] = int(base_row[self.volume_column] * volume_multiplier)
            
            # Fix price relationships
            base_row = self._fix_price_relationships(base_row)
            
            synthetic_data.append(base_row)
        
        return pd.DataFrame(synthetic_data)
    
    def _fix_price_relationships(self, row):
        """
        Ensure price relationships are maintained (High >= Close >= Low, etc.)
        """
        prices = [row['Open'], row['Close'], row['High'], row['Low']]
        
        # Set High as maximum of all prices
        row['High'] = max(prices)
        
        # Set Low as minimum of all prices
        row['Low'] = min(prices)
        
        # Ensure Adj Close is reasonable relative to Close
        if abs(row['Adj Close'] - row['Close']) > row['Close'] * 0.1:
            row['Adj Close'] = row['Close'] + np.random.normal(0, row['Close'] * 0.005)
        
        return row
    
    def generate_synthetic_dates(self, num_samples):
        """
        Generate synthetic dates for the augmented data
        """
        # Convert date column to datetime if it's not already
        if self.df[self.date_column].dtype == 'object':
            self.df[self.date_column] = pd.to_datetime(self.df[self.date_column])
        
        # Find date range
        start_date = self.df[self.date_column].min()
        end_date = self.df[self.date_column].max()
        
        # Generate random dates within the range
        time_range = (end_date - start_date).days
        synthetic_dates = []
        
        for _ in range(num_samples):
            random_days = random.randint(0, time_range)
            synthetic_date = start_date + timedelta(days=random_days)
            synthetic_dates.append(synthetic_date)
        
        return synthetic_dates
    
    def augment_dataset(self, target_size=2000, method='mixed'):
        """
        Main method to augment the dataset
        
        Parameters:
        target_size: int, desired size of the augmented dataset
        method: str, augmentation method ('mixed', 'noise', 'interpolation', 'bootstrap', 'trend')
        """
        samples_needed = target_size - len(self.original_df)
        
        if samples_needed <= 0:
            print("Dataset is already larger than target size!")
            return self.original_df
        
        print(f"Generating {samples_needed} synthetic samples...")
        
        synthetic_dfs = []
        
        if method == 'mixed':
            # Use all methods
            noise_samples = int(samples_needed * 0.3)
            interp_samples = int(samples_needed * 0.25)
            bootstrap_samples = int(samples_needed * 0.25)
            trend_samples = samples_needed - noise_samples - interp_samples - bootstrap_samples
            
            print(f"Noise injection: {noise_samples} samples")
            synthetic_dfs.append(self.noise_injection(samples=noise_samples))
            
            print(f"Interpolation: {interp_samples} samples")
            synthetic_dfs.append(self.interpolation_augmentation(samples=interp_samples))
            
            print(f"Bootstrap sampling: {bootstrap_samples} samples")
            synthetic_dfs.append(self.bootstrap_sampling(samples=bootstrap_samples))
            
            print(f"Trend-based generation: {trend_samples} samples")
            synthetic_dfs.append(self.trend_based_generation(samples=trend_samples))
            
        elif method == 'noise':
            synthetic_dfs.append(self.noise_injection(samples=samples_needed))
        elif method == 'interpolation':
            synthetic_dfs.append(self.interpolation_augmentation(samples=samples_needed))
        elif method == 'bootstrap':
            synthetic_dfs.append(self.bootstrap_sampling(samples=samples_needed))
        elif method == 'trend':
            synthetic_dfs.append(self.trend_based_generation(samples=samples_needed))
        
        # Combine all synthetic data
        all_synthetic = pd.concat(synthetic_dfs, ignore_index=True)
        
        # Generate synthetic dates
        synthetic_dates = self.generate_synthetic_dates(len(all_synthetic))
        all_synthetic[self.date_column] = synthetic_dates
        
        # Combine original and synthetic data
        augmented_df = pd.concat([self.original_df, all_synthetic], ignore_index=True)
        
        # Shuffle the dataset
        augmented_df = augmented_df.sample(frac=1).reset_index(drop=True)
        
        print(f"Dataset augmented from {len(self.original_df)} to {len(augmented_df)} samples")
        
        return augmented_df
    
    def get_statistics_comparison(self, augmented_df):
        """
        Compare statistics between original and augmented datasets
        """
        print("\n=== STATISTICS COMPARISON ===")
        print(f"Original dataset size: {len(self.original_df)}")
        print(f"Augmented dataset size: {len(augmented_df)}")
        
        print("\nPrice Statistics Comparison:")
        for col in self.price_columns:
            orig_mean = self.original_df[col].mean()
            orig_std = self.original_df[col].std()
            aug_mean = augmented_df[col].mean()
            aug_std = augmented_df[col].std()
            
            print(f"{col}:")
            print(f"  Original - Mean: {orig_mean:.2f}, Std: {orig_std:.2f}")
            print(f"  Augmented - Mean: {aug_mean:.2f}, Std: {aug_std:.2f}")
            print(f"  Difference - Mean: {abs(orig_mean - aug_mean):.2f}, Std: {abs(orig_std - aug_std):.2f}")
            print()

# Example usage:
"""
# Load your dataset
df = pd.read_csv('your_financial_data.csv')

# Create augmentor
augmentor = FinancialDataAugmentor(df)

# Augment dataset to 2000 samples using mixed methods
augmented_data = augmentor.augment_dataset(target_size=2000, method='mixed')

# Compare statistics
augmentor.get_statistics_comparison(augmented_data)

# Save augmented dataset
augmented_data.to_csv('augmented_financial_data.csv', index=False)

# Now you can create better train/test splits
from sklearn.model_selection import train_test_split

# Create train/test split (80/20)
train_data, test_data = train_test_split(augmented_data, test_size=0.2, random_state=42)

print(f"Training set size: {len(train_data)}")
print(f"Test set size: {len(test_data)}")
"""

'\n# Load your dataset\ndf = pd.read_csv(\'your_financial_data.csv\')\n\n# Create augmentor\naugmentor = FinancialDataAugmentor(df)\n\n# Augment dataset to 2000 samples using mixed methods\naugmented_data = augmentor.augment_dataset(target_size=2000, method=\'mixed\')\n\n# Compare statistics\naugmentor.get_statistics_comparison(augmented_data)\n\n# Save augmented dataset\naugmented_data.to_csv(\'augmented_financial_data.csv\', index=False)\n\n# Now you can create better train/test splits\nfrom sklearn.model_selection import train_test_split\n\n# Create train/test split (80/20)\ntrain_data, test_data = train_test_split(augmented_data, test_size=0.2, random_state=42)\n\nprint(f"Training set size: {len(train_data)}")\nprint(f"Test set size: {len(test_data)}")\n'

In [2]:
# Load your dataset
df = pd.read_csv('D:\\ML PROJECTS\\AML lab\\dataset\\apple_stock.csv')

# Create augmentor
augmentor = FinancialDataAugmentor(df)

# Augment dataset to 2000 samples using mixed methods
augmented_data = augmentor.augment_dataset(target_size=2000, method='mixed')

# Compare statistics
augmentor.get_statistics_comparison(augmented_data)

# Save augmented dataset
augmented_data.to_csv('augmented_financial_data.csv', index=False)

# Now you can create better train/test splits
from sklearn.model_selection import train_test_split

# Create train/test split (80/20)
train_data, test_data = train_test_split(augmented_data, test_size=0.2, random_state=42)

print(f"Training set size: {len(train_data)}")
print(f"Test set size: {len(test_data)}")


Generating 1748 synthetic samples...
Noise injection: 524 samples
Interpolation: 437 samples
Bootstrap sampling: 437 samples
Trend-based generation: 350 samples
Dataset augmented from 252 to 2000 samples

=== STATISTICS COMPARISON ===
Original dataset size: 252
Augmented dataset size: 2000

Price Statistics Comparison:
Adj Close:
  Original - Mean: 199.09, Std: 21.51
  Augmented - Mean: 198.74, Std: 20.35
  Difference - Mean: 0.35, Std: 1.16

Close:
  Original - Mean: 199.45, Std: 21.32
  Augmented - Mean: 199.11, Std: 20.17
  Difference - Mean: 0.35, Std: 1.16

High:
  Original - Mean: 201.09, Std: 21.57
  Augmented - Mean: 200.76, Std: 20.32
  Difference - Mean: 0.33, Std: 1.25

Low:
  Original - Mean: 197.61, Std: 20.96
  Augmented - Mean: 197.24, Std: 19.80
  Difference - Mean: 0.37, Std: 1.16

Open:
  Original - Mean: 199.32, Std: 21.37
  Augmented - Mean: 199.00, Std: 20.17
  Difference - Mean: 0.31, Std: 1.19

Training set size: 1600
Test set size: 400
