<a href="https://colab.research.google.com/github/SimplifyHub19/ACED/blob/main/my_stock_screener.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# %%
!pip install TA-Lib # Installs the TA-Lib package
# %%
# Restart the kernel after running this cell.
# This is important for the changes to take effect.
import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from sklearn.preprocessing import StandardScaler
import requests
import json
import logging
from datetime import datetime, timedelta
import talib as ta  # Technical Analysis Library

# Setup logging
logging.basicConfig(level=logging.INFO,
                   format='%(asctime)s - %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)

class MalaysianStockScreener:
    def __init__(self):
        """Initialize the Malaysian Stock Screener"""
        self.klse_stocks = None
        self.stock_data = {}
        self.financial_data = {}
        self.screened_stocks = {}
        self.model = None

    def fetch_malaysian_stocks(self):
        """Fetch list of Malaysian stocks from KLSE"""
        logger.info("Fetching Malaysian stocks list...")

        # In a real implementation, you would fetch this data from a reliable source
        # For demonstration, we'll create a sample list of Malaysian stocks
        # Replace with actual API call or web scraping in production

        sample_stocks = {
            "1155.KL": "MAYBANK",
            "5819.KL": "HONG LEONG BANK",
            "1961.KL": "MALAYAN BANKING",
            "6012.KL": "MAXIS",
            "5347.KL": "TENAGA NASIONAL",
            "4863.KL": "TELEKOM MALAYSIA",
            "7277.KL": "DIALOG GROUP",
            "4065.KL": "CARLSBERG",
            "4715.KL": "GENTING MALAYSIA",
            "3182.KL": "GENTING BERHAD"
        }

        self.klse_stocks = sample_stocks
        logger.info(f"Fetched {len(self.klse_stocks)} Malaysian stocks")
        return self.klse_stocks

    def fetch_historical_data(self, period='1y'):
        """
        Fetch historical stock data for all Malaysian stocks

        Args:
            period (str): Period to fetch data for (e.g., '1d', '1mo', '1y')
        """
        if self.klse_stocks is None:
            self.fetch_malaysian_stocks()

        logger.info(f"Fetching historical data for {len(self.klse_stocks)} stocks...")

        for symbol in self.klse_stocks.keys():
            try:
                # Using yfinance to get historical data
                stock = yf.Ticker(symbol)
                hist = stock.history(period=period)

                if not hist.empty:
                    self.stock_data[symbol] = hist
                    logger.info(f"Fetched data for {symbol} ({self.klse_stocks[symbol]})")
                else:
                    logger.warning(f"No data found for {symbol}")
            except Exception as e:
                logger.error(f"Error fetching data for {symbol}: {e}")

        logger.info(f"Successfully fetched data for {len(self.stock_data)} stocks")
        return self.stock_data

    def fetch_financial_ratios(self):
        """Fetch financial ratios and metrics for Malaysian stocks"""
        logger.info("Fetching financial ratios...")

        # In a real implementation, this would call financial data APIs
        # For demonstration, we'll generate some sample financial data

        for symbol in self.klse_stocks.keys():
            try:
                # Simulating financial data - replace with actual API calls
                stock = yf.Ticker(symbol)

                try:
                    # Get some basic financial information
                    info = stock.info

                    # Create a dictionary with key financial metrics
                    self.financial_data[symbol] = {
                        'symbol': symbol,
                        'name': self.klse_stocks[symbol],
                        'sector': info.get('sector', 'Unknown'),
                        'industry': info.get('industry', 'Unknown'),
                        'marketCap': info.get('marketCap', None),
                        'peRatio': info.get('trailingPE', None),
                        'pbRatio': info.get('priceToBook', None),
                        'dividendYield': info.get('dividendYield', None) * 100 if info.get('dividendYield') else None,
                        'roe': info.get('returnOnEquity', None) * 100 if info.get('returnOnEquity') else None,
                        'debtToEquity': info.get('debtToEquity', None),
                    }

                    logger.info(f"Fetched financial data for {symbol}")
                except Exception as e:
                    logger.warning(f"Limited financial data for {symbol}: {e}")
                    # Create with minimal information if detailed info not available
                    self.financial_data[symbol] = {
                        'symbol': symbol,
                        'name': self.klse_stocks[symbol],
                    }
            except Exception as e:
                logger.error(f"Error fetching financial data for {symbol}: {e}")

        return self.financial_data

    def calculate_technical_indicators(self):
        """Calculate technical indicators for all stocks"""
        logger.info("Calculating technical indicators...")

        for symbol, data in self.stock_data.items():
            try:
                df = data.copy()

                # Example technical indicators using talib
                # Moving Averages
                df['SMA_20'] = ta.SMA(df['Close'].values, timeperiod=20)
                df['SMA_50'] = ta.SMA(df['Close'].values, timeperiod=50)
                df['SMA_200'] = ta.SMA(df['Close'].values, timeperiod=200)

                # Relative Strength Index
                df['RSI'] = ta.RSI(df['Close'].values, timeperiod=14)

                # MACD
                macd, macd_signal, macd_hist = ta.MACD(
                    df['Close'].values,
                    fastperiod=12,
                    slowperiod=26,
                    signalperiod=9
                )
                df['MACD'] = macd
                df['MACD_Signal'] = macd_signal
                df['MACD_Hist'] = macd_hist

                # Bollinger Bands
                df['BB_Upper'], df['BB_Middle'], df['BB_Lower'] = ta.BBANDS(
                    df['Close'].values,
                    timeperiod=20,
                    nbdevup=2,
                    nbdevdn=2
                )

                # Average True Range (ATR)
                df['ATR'] = ta.ATR(
                    df['High'].values,
                    df['Low'].values,
                    df['Close'].values,
                    timeperiod=14
                )

                # Momentum
                df['ROC'] = ta.ROC(df['Close'].values, timeperiod=10)

                # Update stock data with indicators
                self.stock_data[symbol] = df
                logger.info(f"Calculated indicators for {symbol}")

            except Exception as e:
                logger.error(f"Error calculating indicators for {symbol}: {e}")

        return self.stock_data

    def create_screening_model(self):
        """Create and train AI model for stock screening"""
        logger.info("Creating AI screening model...")

        # Combine data for training
        features = []
        targets = []

        for symbol, data in self.stock_data.items():
            try:
                df = data.dropna().copy()

                if len(df) < 30:  # Skip if not enough data
                    continue

                # Create features from indicators and price data
                df['Return_5d'] = df['Close'].pct_change(5).shift(-5)  # 5-day future returns

                # Create target: 1 if 5-day return is positive, else 0
                df['Target'] = (df['Return_5d'] > 0).astype(int)

                # Select features
                feature_cols = [
                    'RSI', 'MACD', 'MACD_Hist', 'ROC', 'ATR',
                    'SMA_20', 'SMA_50', 'SMA_200',
                    'BB_Upper', 'BB_Middle', 'BB_Lower'
                ]

                # Create features from relative price to moving averages
                df['Price_to_SMA20'] = df['Close'] / df['SMA_20']
                df['Price_to_SMA50'] = df['Close'] / df['SMA_50']
                df['Price_to_SMA200'] = df['Close'] / df['SMA_200']

                feature_cols.extend(['Price_to_SMA20', 'Price_to_SMA50', 'Price_to_SMA200'])

                # Filter rows with valid data
                df_valid = df.dropna()

                if len(df_valid) > 30:  # Ensure we have enough data
                    stock_features = df_valid[feature_cols].values
                    stock_targets = df_valid['Target'].values

                    features.append(stock_features)
                    targets.append(stock_targets)

            except Exception as e:
                logger.error(f"Error processing {symbol} for model training: {e}")

        if not features:
            logger.error("No valid training data available")
            return None

        # Combine all stock data
        X = np.vstack(features)
        y = np.hstack(targets)

        # Train-test split
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        # Standardize features
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)

        # Train Random Forest model
        model = RandomForestClassifier(n_estimators=100, random_state=42)
        model.fit(X_train, y_train)

        # Evaluate model
        y_pred = model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        logger.info(f"Model accuracy: {accuracy:.4f}")
        logger.info("\nClassification Report:\n" + classification_report(y_test, y_pred))

        self.model = {
            'model': model,
            'scaler': scaler,
            'feature_cols': feature_cols,
            'accuracy': accuracy
        }

        return self.model

    def screen_stocks(self):
        """Screen stocks based on technical indicators and AI model predictions"""
        logger.info("Screening stocks based on technical indicators and AI model...")

        if not self.model:
            logger.warning("AI model not created, creating now...")
            self.create_screening_model()
            if not self.model:
                logger.error("Failed to create AI model")
                return {}

        screening_results = {}

        # Screening criteria
        for symbol, data in self.stock_data.items():
            try:
                df = data.copy().dropna()

                if df.empty or len(df) < 20:
                    continue

                # Get latest data point
                latest = df.iloc[-1]

                # Technical screening criteria
                technical_score = 0

                # Trend following signals
                if latest['Close'] > latest['SMA_50']:
                    technical_score += 1
                if latest['Close'] > latest['SMA_200']:
                    technical_score += 1
                if latest['SMA_20'] > latest['SMA_50']:
                    technical_score += 1

                # Momentum signals
                if latest['RSI'] > 50 and latest['RSI'] < 70:
                    technical_score += 1

                # MACD signals
                if latest['MACD'] > latest['MACD_Signal']:
                    technical_score += 1

                # Bollinger Band signals
                if latest['Close'] > latest['BB_Middle'] and latest['Close'] < latest['BB_Upper']:
                    technical_score += 1

                # Apply AI model prediction
                model_score = 0

                try:
                    # Get feature values
                    features = latest[self.model['feature_cols']].values.reshape(1, -1)

                    # Standardize
                    scaled_features = self.model['scaler'].transform(features)

                    # Predict
                    prediction = self.model['model'].predict(scaled_features)[0]
                    probability = self.model['model'].predict_proba(scaled_features)[0, 1]

                    model_score = probability

                except Exception as e:
                    logger.error(f"Error applying model to {symbol}: {e}")
                    model_score = 0.5  # Neutral if error

                # Calculate total score
                total_score = (0.6 * technical_score / 6) + (0.4 * model_score)

                # Financial score (if available)
                financial_score = 0
                if symbol in self.financial_data:
                    fd = self.financial_data[symbol]

                    # Value signals
                    if fd.get('peRatio') and fd['peRatio'] < 15:
                        financial_score += 1
                    if fd.get('pbRatio') and fd['pbRatio'] < 1.5:
                        financial_score += 1
                    if fd.get('dividendYield') and fd['dividendYield'] > 3:
                        financial_score += 1
                    if fd.get('roe') and fd['roe'] > 15:
                        financial_score += 1

                    financial_score = financial_score / 4 if financial_score else None

                # Add to screening results
                screening_results[symbol] = {
                    'symbol': symbol,
                    'name': self.klse_stocks.get(symbol, 'Unknown'),
                    'last_price': latest['Close'],
                    'technical_score': technical_score / 6,
                    'ai_score': model_score,
                    'financial_score': financial_score,
                    'total_score': total_score,
                    'volume': latest['Volume'],
                    'rsi': latest['RSI'],
                    'sector': self.financial_data.get(symbol, {}).get('sector', 'Unknown'),
                    'market_cap': self.financial_data.get(symbol, {}).get('marketCap', None),
                }

            except Exception as e:
                logger.error(f"Error screening {symbol}: {e}")

        # Sort by total score
        self.screened_stocks = {k: v for k, v in sorted(
            screening_results.items(),
            key=lambda item: item[1]['total_score'],
            reverse=True
        )}

        logger.info(f"Screened {len(self.screened_stocks)} stocks")
        return self.screened_stocks

    def get_top_stocks(self, top_n=10):
        """Get top N stocks from screening results"""
        if not self.screened_stocks:
            self.screen_stocks()

        top_stocks = list(self.screened_stocks.items())[:top_n]
        return dict(top_stocks)

    def visualize_top_stocks(self, top_n=5):
        """Visualize the top stocks"""
        top_stocks = self.get_top_stocks(top_n)

        for symbol, info in top_stocks.items():
            try:
                # Plot price chart with indicators
                df = self.stock_data[symbol].copy()
                plt.figure(figsize=(12, 8))

                # Price and moving averages
                plt.subplot(3, 1, 1)
                plt.title(f"{info['name']} ({symbol}) - Price Chart")
                plt.plot(df.index, df['Close'], label='Close Price')
                plt.plot(df.index, df['SMA_20'], label='20-day MA')
                plt.plot(df.index, df['SMA_50'], label='50-day MA')
                plt.plot(df.index, df['SMA_200'], label='200-day MA')
                plt.fill_between(df.index, df['BB_Upper'], df['BB_Lower'], alpha=0.2, color='gray')
                plt.legend()
                plt.grid(True)

                # Volume
                plt.subplot(3, 1, 2)
                plt.title('Volume')
                plt.bar(df.index, df['Volume'], color='blue', alpha=0.5)
                plt.grid(True)

                # RSI
                plt.subplot(3, 1, 3)
                plt.title('RSI')
                plt.plot(df.index, df['RSI'], color='purple')
                plt.axhline(y=70, color='r', linestyle='-')
                plt.axhline(y=30, color='g', linestyle='-')
                plt.grid(True)

                plt.tight_layout()
                plt.show()

            except Exception as e:
                logger.error(f"Error visualizing {symbol}: {e}")

    def generate_report(self, top_n=20):
        """Generate a detailed report of screened stocks"""
        top_stocks = self.get_top_stocks(top_n)

        report = []
        for symbol, info in top_stocks.items():
            report.append(info)

        return pd.DataFrame(report)

def main():
    # Initialize and run the Malaysian stock screener
    screener = MalaysianStockScreener()

    # Fetch stocks and data
    screener.fetch_malaysian_stocks()
    screener.fetch_historical_data(period='1y')
    screener.fetch_financial_ratios()

    # Calculate technical indicators
    screener.calculate_technical_indicators()

    # Create and train AI model
    screener.create_screening_model()

    # Screen stocks
    screened_stocks = screener.screen_stocks()

    # Get top stocks
    top_stocks = screener.get_top_stocks(top_n=10)
    print("\nTop 10 Malaysian Stocks:")
    for symbol, info in top_stocks.items():
        print(f"{info['name']} ({symbol}): Score {info['total_score']:.2f}, RSI: {info['rsi']:.2f}")

    # Generate report
    report = screener.generate_report()
    print("\nDetailed Stock Report:")
    print(report)

    # Visualize top 5 stocks
    # Uncomment in a Jupyter notebook environment
    # screener.visualize_top_stocks(top_n=5)

if __name__ == "__main__":
    main()


Collecting TA-Lib
  Using cached ta_lib-0.6.3.tar.gz (376 kB)
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Installing backend dependencies ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: TA-Lib
  [1;31merror[0m: [1msubprocess-exited-with-error[0m
  
  [31m×[0m [32mBuilding wheel for TA-Lib [0m[1;32m([0m[32mpyproject.toml[0m[1;32m)[0m did not run successfully.
  [31m│[0m exit code: [1;36m1[0m
  [31m╰─>[0m See above for output.
  
  [1;35mnote[0m: This error originates from a subprocess, and is likely not a problem with pip.
  Building wheel for TA-Lib (pyproject.toml) ... [?25l[?25herror
[31m  ERROR: Failed building wheel for TA-Lib[0m[31m
[0mFailed to build TA-Lib
[31mERROR: ERROR: Failed to build installable wheels for some pyproject.toml based projects (TA-Lib)[0m[31m
[0m

ModuleNotFoundError: No module named 'talib'