<a href="https://colab.research.google.com/github/Ha-Hien/DBA/blob/main/REPORT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# prompt: 10 random numbers using numpy

import numpy as np

random_numbers = np.random.rand(10)
random_numbers


array([0.49342994, 0.25535295, 0.54496899, 0.75628277, 0.02966272,
       0.31630551, 0.15051841, 0.79569293, 0.68205871, 0.47701371])

In [None]:
!git clone https://github.com/Ha-Hien/DBA.git

Cloning into 'DBA'...
remote: Enumerating objects: 3, done.[K
remote: Counting objects: 100% (3/3), done.[K
remote: Total 3 (delta 0), reused 3 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (3/3), done.


In [None]:
!git config --global user.email "hathithuhien07082003@gmail.com"
!git config --global user.name "Ha-Hien"

In [None]:
!git remote set-url origin https://YOUR_TOKEN@github.com/Ha-Hien/DBA.git

In [None]:
!git config --global pull.rebase true

In [None]:
!git pull origin main

From https://github.com/Ha-Hien/DBA
 * branch            main       -> FETCH_HEAD
Rebasing (1/1)[KSuccessfully rebased and updated refs/heads/main.


In [None]:
# 1. Copy file mnist_test.csv vào repository DBA
!cp /content/sample_data/mnist_test.csv /content/DBA/

# 2. Di chuyển vào thư mục DBA
%cd /content/DBA

# 3. Thêm file vào Git
!git add mnist_test.csv

# 4. Commit (đảm bảo đã cấu hình git config)
!git commit -m "Add MNIST test dataset"

# 5. Push
!git push origin main

/content/DBA
On branch main
Your branch is ahead of 'origin/main' by 1 commit.
  (use "git push" to publish your local commits)

nothing to commit, working tree clean
Enumerating objects: 4, done.
Counting objects: 100% (4/4), done.
Delta compression using up to 2 threads
Compressing objects: 100% (3/3), done.
Writing objects: 100% (3/3), 2.10 MiB | 2.32 MiB/s, done.
Total 3 (delta 0), reused 0 (delta 0), pack-reused 0
To https://github.com/Ha-Hien/DBA.git
   ec6e2ff..a60103b  main -> main


In [None]:
# Import các thư viện cần thiết
from google.colab import drive
import pandas as pd
import os

# Gắn kết Google Drive để truy cập các tệp dữ liệu
drive.mount('/content/drive')

Mounted at /content/drive


In [48]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import RobustScaler
from pathlib import Path
import glob
import logging
from tqdm import tqdm
import os
from datetime import datetime

class OHLCProcessor:
    def __init__(self, start_date: str = '2019-08-26', end_date: str = '2024-08-21', log_dir: str = 'logs'):
        self.start_date = pd.to_datetime(start_date).tz_localize(None)
        self.end_date = pd.to_datetime(end_date).tz_localize(None)
        self.scaler = RobustScaler()
        self.log_dir = log_dir

        # Tạo thư mục logs nếu chưa tồn tại
        if not os.path.exists(log_dir):
            os.makedirs(log_dir)

        self._setup_logging()

    def _setup_logging(self):
        """Setup logging với file và console output"""
        # Tạo tên file log với timestamp
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        log_file = os.path.join(self.log_dir, f'processing_log_{timestamp}.txt')

        # Setup file handler
        fh = logging.FileHandler(log_file)
        fh.setLevel(logging.INFO)

        # Setup console handler
        ch = logging.StreamHandler()
        ch.setLevel(logging.INFO)

        # Setup format
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        ch.setFormatter(formatter)

        # Setup logger
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)

        # Remove any existing handlers
        self.logger.handlers = []

        # Add handlers
        self.logger.addHandler(fh)
        self.logger.addHandler(ch)

        print(f"Logging setup complete. Processing will be saved to '{log_file}'")
        self.logger.info("Starting OHLC data processing...")
        self.logger.info(f"Date range: {self.start_date} to {self.end_date}")

    def add_candlestick_patterns(self, df: pd.DataFrame) -> pd.DataFrame:
        """Thêm candlestick patterns"""
        try:
            # Doji Pattern
            df['doji'] = (
                (abs(df['Close'] - df['Open']) <= 0.1 * (df['High'] - df['Low']))
            ).astype(int)

            # Hammer Pattern
            df['hammer'] = (
                (df['Close'] > df['Open']) &
                ((df['High'] - df['Low']) > 2 * (df['Close'] - df['Open'])) &
                ((df['Close'] - df['Low']) / (.001 + df['High'] - df['Low']) > 0.6)
            ).astype(int)

            # Bullish Engulfing
            df['bullish_engulfing'] = (
                (df['Close'] > df['Open'].shift(1)) &
                (df['Open'] < df['Close'].shift(1)) &
                (df['Close'] > df['Open'])
            ).astype(int)

            return df
        except Exception as e:
            self.logger.error(f"Error in add_candlestick_patterns: {str(e)}")
            return df

    def add_time_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """Thêm time-based features"""
        try:
            df['day_of_week'] = df['Date'].dt.dayofweek
            df['month'] = df['Date'].dt.month
            df['quarter'] = df['Date'].dt.quarter
            df['is_month_end'] = df['Date'].dt.is_month_end.astype(int)
            df['is_quarter_end'] = df['Date'].dt.is_quarter_end.astype(int)
            return df
        except Exception as e:
            self.logger.error(f"Error in add_time_features: {str(e)}")
            return df

    def add_lag_features(self, df: pd.DataFrame, symbol: str) -> pd.DataFrame:
        """Thêm lagged features"""
        try:
            # Returns và Volume lags
            for lag in [1, 2, 3, 5]:
                df[f'return_lag_{lag}'] = df['daily_return'].shift(lag)
                df[f'volume_lag_{lag}'] = df['volume_ratio'].shift(lag)

            # Rolling statistics
            windows = [5, 10, 20]
            for window in windows:
                df[f'return_mean_{window}d'] = df['daily_return'].rolling(window).mean()
                df[f'return_std_{window}d'] = df['daily_return'].rolling(window).std()
                df[f'momentum_{window}d'] = df['Close'].diff(window)

            return df
        except Exception as e:
            self.logger.error(f"Error in add_lag_features for {symbol}: {str(e)}")
            return df

    def process_single_stock(self, file_path: str) -> pd.DataFrame:
        """Xử lý dữ liệu cho một cổ phiếu với logging chi tiết"""
        try:
            # 1. Đọc dữ liệu
            df = pd.read_csv(file_path)
            symbol = Path(file_path).stem
            self.logger.info(f"\n{'='*50}")
            self.logger.info(f"Processing {symbol}")
            self.logger.info(f"Initial shape: {df.shape}")

            # 2. Convert types
            numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
            for col in numeric_cols:
                df[col] = df[col].astype('float64')
            self.logger.info(f"Converted {len(numeric_cols)} columns to float64")

            # 3. Xử lý datetime và filter
            df['symbol'] = symbol
            df['Date'] = pd.to_datetime(df['Date'], utc=True).dt.tz_localize(None)
            self.logger.info(f"Date range: {df['Date'].min()} to {df['Date'].max()}")

            mask = (df['Date'] >= self.start_date) & (df['Date'] <= self.end_date)
            df = df[mask].copy()
            self.logger.info(f"Shape after date filtering: {df.shape}")

            if len(df) == 0:
                self.logger.warning(f"No data in specified date range for {symbol}")
                return None

            # 4. Add Covid-19 Features
            self.logger.info("\nAdding Covid-19 features...")
            df['is_covid_crash'] = ((df['Date'] >= '2020-02-01') &
                                  (df['Date'] <= '2020-04-30')).astype(int)
            df['is_covid_recovery'] = ((df['Date'] >= '2020-05-01') &
                                      (df['Date'] <= '2020-12-31')).astype(int)
            df['is_post_covid'] = (df['Date'] >= '2021-01-01').astype(int)

            covid_dist = df[['is_covid_crash', 'is_covid_recovery', 'is_post_covid']].sum()
            self.logger.info("Covid period distribution:")
            self.logger.info(f"Crash period: {covid_dist['is_covid_crash']} days")
            self.logger.info(f"Recovery period: {covid_dist['is_covid_recovery']} days")
            self.logger.info(f"Post-covid period: {covid_dist['is_post_covid']} days")

            # 5. Price Features
            self.logger.info("\nCalculating price features...")
            initial_cols = set(df.columns)

            df['daily_return'] = df['Close'].pct_change()
            df['hl_ratio'] = (df['High'] - df['Low']) / df['Low']
            df['co_ratio'] = (df['Close'] - df['Open']) / df['Open']

            price_features = set(df.columns) - initial_cols
            self.logger.info(f"Added {len(price_features)} price features: {price_features}")

            # Log các statistics cơ bản
            self.logger.info("\nPrice feature statistics:")
            self.logger.info(df[['daily_return', 'hl_ratio', 'co_ratio']].describe())

            # 6. Volume Features
            self.logger.info("\nCalculating volume features...")
            initial_cols = set(df.columns)

            df['volume_ratio'] = df['Volume'] / df['Volume'].rolling(5, min_periods=1).mean()
            df['volume_ma5'] = df['Volume'].rolling(window=5, min_periods=1).mean()
            df['volume_ma20'] = df['Volume'].rolling(window=20, min_periods=1).mean()

            volume_features = set(df.columns) - initial_cols
            self.logger.info(f"Added {len(volume_features)} volume features: {volume_features}")

            # 7. Technical Indicators
            self.logger.info("\nCalculating technical indicators...")
            initial_cols = set(df.columns)

            windows = [5, 10, 20]
            for window in windows:
                # Moving Averages
                df[f'sma_{window}'] = df['Close'].rolling(window=window, min_periods=1).mean()
                df[f'ema_{window}'] = df['Close'].ewm(span=window, adjust=False).mean()

                # Volatility
                df[f'volatility_{window}'] = df['daily_return'].rolling(
                    window=window, min_periods=1).std()

                # RSI
                delta = df['Close'].diff()
                gain = delta.where(delta > 0, 0).rolling(window=window, min_periods=1).mean()
                loss = -delta.where(delta < 0, 0).rolling(window=window, min_periods=1).mean()
                rs = gain / loss
                df[f'rsi_{window}'] = 100 - (100 / (1 + rs))

            tech_features = set(df.columns) - initial_cols
            self.logger.info(f"Added {len(tech_features)} technical features: {tech_features}")

            # 8. Candlestick Patterns
            self.logger.info("\nAdding candlestick patterns...")
            initial_cols = set(df.columns)

            df = self.add_candlestick_patterns(df)
            candlestick_features = set(df.columns) - initial_cols
            self.logger.info(f"Added {len(candlestick_features)} candlestick features: {candlestick_features}")

            # Log pattern distribution
            pattern_dist = df[list(candlestick_features)].sum()
            self.logger.info("\nPattern occurrences:")
            self.logger.info(pattern_dist)

            # 9. Time Features
            self.logger.info("\nAdding time features...")
            initial_cols = set(df.columns)

            df = self.add_time_features(df)
            time_features = set(df.columns) - initial_cols
            self.logger.info(f"Added {len(time_features)} time features: {time_features}")

            # 10. Lag Features
            self.logger.info("\nAdding lag features...")
            initial_cols = set(df.columns)

            df = self.add_lag_features(df, symbol)
            lag_features = set(df.columns) - initial_cols
            self.logger.info(f"Added {len(lag_features)} lag features: {lag_features}")

            # 11. Handle Missing Values
            self.logger.info("\nHandling missing values...")
            missing_before = df.isnull().sum().sum()

            numeric_cols = df.select_dtypes(include=['float64']).columns
            for col in numeric_cols:
                df[col] = df[col].ffill(limit=5).bfill(limit=5)

            missing_after = df.isnull().sum().sum()
            self.logger.info(f"Filled {missing_before - missing_after} missing values")

            if missing_after > 0:
                self.logger.warning(f"Remaining missing values: {missing_after}")
                missing_cols = df.columns[df.isnull().any()].tolist()
                self.logger.warning(f"Columns with missing values: {missing_cols}")

            # Final Summary
            self.logger.info(f"\nFinal processing summary for {symbol}:")
            self.logger.info(f"Initial shape: {df.shape}")
            self.logger.info(f"Total features added: {len(df.columns) - len(['Date', 'symbol', 'Open', 'High', 'Low', 'Close', 'Volume'])}")
            self.logger.info(f"Final memory usage: {df.memory_usage().sum() / 1024**2:.2f} MB")

            return df

        except Exception as e:
            self.logger.error(f"Error processing {symbol}: {str(e)}")
            return None

    def normalize_stock_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """Normalize dữ liệu theo từng cổ phiếu và nhóm features"""
        feature_groups = {
            'price': ['Open', 'High', 'Low', 'Close'],
            'volume': ['Volume'] + [col for col in df.columns if 'volume' in col.lower()],
            'momentum': [col for col in df.columns if any(x in col.lower() for x in ['momentum', 'macd'])],
            'volatility': [col for col in df.columns if 'volatility' in col.lower()],
            'oscillators': [col for col in df.columns if any(x in col.lower() for x in ['rsi', 'bb'])],
            'ratios': [col for col in df.columns if 'ratio' in col.lower()]
        }

        normalized_df = df.copy()

        for symbol in df['symbol'].unique():
            try:
                mask = df['symbol'] == symbol
                symbol_data = df[mask].copy()

                for group_name, cols in feature_groups.items():
                    cols_exist = [col for col in cols if col in df.columns]
                    if cols_exist:
                        try:
                            data_to_normalize = symbol_data[cols_exist].astype('float64')
                            if not data_to_normalize.empty and not data_to_normalize.isnull().all().all():
                                normalized_values = self.scaler.fit_transform(data_to_normalize)
                                for col_idx, col in enumerate(cols_exist):
                                    normalized_df.loc[mask, col] = normalized_values[:, col_idx]
                        except Exception as e:
                            self.logger.error(f"Error normalizing {group_name} for {symbol}: {str(e)}")

            except Exception as e:
                self.logger.error(f"Error processing {symbol}: {str(e)}")

        return normalized_df

    def process_all_stocks(self, data_dir: str) -> pd.DataFrame:
        """Xử lý tất cả các file với logging chi tiết"""
        all_files = glob.glob(f"{data_dir}/*.csv")
        self.logger.info(f"\n{'='*50}")
        self.logger.info(f"Starting processing of {len(all_files)} files")

        # Process từng file với progress bar
        all_stocks = []
        total_rows = 0
        total_features = 0

        for file in tqdm(all_files, desc="Processing stocks"):
            df = self.process_single_stock(file)
            if df is not None and not df.empty:
                total_rows += len(df)
                total_features = len(df.columns)
                all_stocks.append(df)

        if not all_stocks:
            raise ValueError("No valid stock data found")

        # Gộp dữ liệu
        self.logger.info(f"\n{'='*50}")
        self.logger.info("Combining processed stocks...")
        combined_df = pd.concat(all_stocks, ignore_index=True)

        self.logger.info("\nCombined data statistics:")
        self.logger.info(f"Total stocks processed: {len(all_stocks)}")
        self.logger.info(f"Total rows: {total_rows}")
        self.logger.info(f"Total features: {total_features}")
        self.logger.info(f"Combined shape: {combined_df.shape}")

        # Normalize
        self.logger.info(f"\n{'='*50}")
        self.logger.info("Starting normalization...")
        normalized_df = self.normalize_stock_data(combined_df)

        self.logger.info("\nNormalization summary:")
        self.logger.info(f"Shape before normalization: {combined_df.shape}")
        self.logger.info(f"Shape after normalization: {normalized_df.shape}")

        # Final cleaning
        self.logger.info(f"\n{'='*50}")
        self.logger.info("Final cleaning...")
        initial_rows = len(normalized_df)
        final_df = normalized_df.dropna()

        self.logger.info("\nFinal statistics:")
        self.logger.info(f"Initial rows: {initial_rows}")
        self.logger.info(f"Final rows: {len(final_df)}")
        self.logger.info(f"Rows removed: {initial_rows - len(final_df)}")
        self.logger.info(f"Final shape: {final_df.shape}")
        self.logger.info(f"Unique symbols: {final_df['symbol'].nunique()}")
        self.logger.info(f"Memory usage: {final_df.memory_usage().sum() / 1024**2:.2f} MB")

        return final_df

In [49]:
def main():
    # Khởi tạo processor với logging
    processor = OHLCProcessor(
        start_date='2019-08-26',
        end_date='2024-08-21',
        log_dir='ohlc_processing_logs'
    )

    # Xử lý dữ liệu
    input_dir = "/content/drive/MyDrive/DATASET/OHLC/"
    output_file = "/content/drive/MyDrive/DATASET/processed_ohlc_data.csv"

    try:
        # Process data
        processed_data = processor.process_all_stocks(input_dir)

        # Save results
        processed_data.to_csv(output_file, index=False)

        # Log final summary
        processor.logger.info("\nProcessing Complete!")
        processor.logger.info(f"Output file saved to: {output_file}")
        processor.logger.info(f"File size: {round(os.path.getsize(output_file) / (1024*1024), 2)} MB")
        processor.logger.info(f"Total rows: {len(processed_data)}")
        processor.logger.info(f"Total columns: {len(processed_data.columns)}")
        processor.logger.info(f"Memory usage: {processed_data.memory_usage().sum() / 1024**2:.2f} MB")

    except Exception as e:
        processor.logger.error(f"Error in main processing: {str(e)}")

if __name__ == "__main__":
    main()

2024-12-22 10:07:39,017 - INFO - Starting OHLC data processing...
INFO:__main__:Starting OHLC data processing...
2024-12-22 10:07:39,026 - INFO - Date range: 2019-08-26 00:00:00 to 2024-08-21 00:00:00
INFO:__main__:Date range: 2019-08-26 00:00:00 to 2024-08-21 00:00:00
2024-12-22 10:07:39,056 - INFO - 
INFO:__main__:
2024-12-22 10:07:39,060 - INFO - Starting processing of 502 files
INFO:__main__:Starting processing of 502 files


Logging setup complete. Processing will be saved to 'ohlc_processing_logs/processing_log_20241222_100739.txt'


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Calculating volume features...
2024-12-22 10:11:14,718 - INFO - Added 3 volume features: {'volume_ma20', 'volume_ratio', 'volume_ma5'}
INFO:__main__:Added 3 volume features: {'volume_ma20', 'volume_ratio', 'volume_ma5'}
2024-12-22 10:11:14,727 - INFO - 
Calculating technical indicators...
INFO:__main__:
Calculating technical indicators...
2024-12-22 10:11:14,758 - INFO - Added 12 technical features: {'volatility_10', 'sma_5', 'volatility_5', 'sma_20', 'ema_5', 'sma_10', 'rsi_20', 'ema_20', 'ema_10', 'volatility_20', 'rsi_5', 'rsi_10'}
INFO:__main__:Added 12 technical features: {'volatility_10', 'sma_5', 'volatility_5', 'sma_20', 'ema_5', 'sma_10', 'rsi_20', 'ema_20', 'ema_10', 'volatility_20', 'rsi_5', 'rsi_10'}
2024-12-22 10:11:14,767 - INFO - 
Adding candlestick patterns...
INFO:__main__:
Adding candlestick patterns...
2024-12-22 10:11:14,783 - INFO - Added 3 candlestick features: {'bullish_engulfing', 'hammer', 'doji'}