In [None]:
import os
import time
import requests
import numpy as np
import pandas as pd
import yfinance as yf
from datetime import datetime
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# Define price data directory
price_data_dir = "../data_sample/price/"
amount_data_dir = "../data_sample/chip/amount/"
lot_data_dir = "../data_sample/chip/lot/"
os.makedirs(price_data_dir, exist_ok=True)  # Ensure price directory exists

def get_stock_codes():
    print('Downloading stock data...')
    urls = {
        2: "https://isin.twse.com.tw/isin/C_public.jsp?strMode=2",
        4: "https://isin.twse.com.tw/isin/C_public.jsp?strMode=4"
    }

    stock_dict = {}
    for mode, url in urls.items():
        res = requests.get(url)
        if res.status_code == 200:
            df = pd.read_html(res.text)[0]
            df.columns = ['full_name', 'isin_code', 'listed_date', 'market_type', 'industry_type', 'cfic_code', 'remarks']
            
            stock_start = df[df['full_name'].str.contains('股票', na=False)].index[0] + 1
            stock_end = df[df['full_name'].str.contains('上市認購\(售\)權證', na=False)].index[0] if mode == 2 else df[df['full_name'].str.contains('特別股', na=False)].index[0]
            
            stock_df = df.iloc[stock_start:stock_end]

            # Extract stock code and clean it
            stock_df['code'] = stock_df['full_name'].str.extract(r'(\d{4})')  # Extract numeric stock codes

            # Create dictionary with stock code as key and market type as value
            for _, row in stock_df.dropna(subset=['code']).iterrows():
                stock_dict[row['code']] = row['market_type']

    return stock_dict

# Load stock codes and their market type
stock_dict = get_stock_codes()

# Function to determine the correct Yahoo Finance stock suffix
def get_stock_suffix(market_type):
    return ".TW" if market_type == "上市" else ".TWO"

# Function to fetch historical stock prices with retry and caching
def query_historical_price(stock_code, market_type, end_date, period=390, max_retries=5, retry_delay=300):
    suffix = get_stock_suffix(market_type)
    end_date = (datetime.strptime(end_date, "%Y-%m-%d") + pd.Timedelta(days=1)).strftime("%Y-%m-%d")
    start_date = (datetime.strptime(end_date, "%Y-%m-%d") - pd.Timedelta(days=period)).strftime("%Y-%m-%d")
    
    cache_path = os.path.join(price_data_dir, f"{stock_code}_{end_date}_{period}.csv")

    # Check if cached data exists
    if os.path.exists(cache_path):
        logging.info(f"Loading cached price data for {stock_code}{suffix} from {cache_path}")
        try:
            data = pd.read_csv(cache_path, parse_dates=['Date'])
            return data
        except Exception as e:
            logging.warning(f"Error reading cached file {cache_path}: {e}, refetching data...")

    logging.info(f"Fetching historical data for {stock_code}{suffix} from {start_date} to {end_date}")

    attempt = 0
    while attempt < max_retries:
        try:
            data = yf.download(f"{stock_code}{suffix}", start=start_date, end=end_date)
            if data.empty:
                logging.warning(f"No data found for {stock_code}{suffix}")
                return None

            data = data.reset_index()
            data['Date'] = pd.to_datetime(data['Date'])

            # Save fetched data to cache
            data.to_csv(cache_path, index=False)
            logging.info(f"Saved price data to {cache_path}")

            return data

        except requests.exceptions.ConnectionError as e:
            logging.error(f"Connection error fetching data for {stock_code}{suffix}: {e}")
            attempt += 1
            if attempt < max_retries:
                logging.info(f"Retrying in {retry_delay // 60} minutes... (Attempt {attempt}/{max_retries})")
                time.sleep(retry_delay)
            else:
                logging.error(f"Failed to fetch data for {stock_code}{suffix} after {max_retries} attempts.")
                return None
        except Exception as e:
            logging.error(f"Unexpected error fetching data for {stock_code}{suffix}: {e}")
            return None

# Function to process buy/sell data and compare with stock prices
def process_chip_data(end_date):
    # Read all CSV files in the amount and lot directories
    amount_files = [f for f in os.listdir(amount_data_dir) if f.endswith(f"{end_date}.csv")]
    lot_files = [f for f in os.listdir(lot_data_dir) if f.endswith(f"{end_date}.csv")]

    if not amount_files or not lot_files:
        logging.warning(f"No amount/lot data found for {end_date}.")
        return

    # Combine all CSV files
    df_amount = pd.concat([pd.read_csv(os.path.join(amount_data_dir, f)) for f in amount_files], ignore_index=True)
    df_lot = pd.concat([pd.read_csv(os.path.join(lot_data_dir, f)) for f in lot_files], ignore_index=True)

    # Merge amount and lot data on common keys
    df = pd.merge(df_amount, df_lot, on=["broker", "branch", "date", "stock_code"], suffixes=('_amount', '_lot'))

    # Calculate average buy and sell prices
    df['avg_buy_price'] = df['buy_amount'] / df['buy_lot']
    df['avg_sell_price'] = df['sell_amount'] / df['sell_lot']

    # Drop rows where lot is zero (to avoid division errors)
    df = df.replace([np.inf, -np.inf], np.nan).dropna(subset=['avg_buy_price', 'avg_sell_price'])

    # Fetch stock price data for each unique stock code
    unique_stocks = df['stock_code'].unique()
    stock_price_data = {}

    for stock_code in unique_stocks:
        # Fetch stock price data
        market_type = stock_dict.get(stock_code, "上市")
        stock_data = query_historical_price(stock_code, market_type, end_date)

        if stock_data is None or stock_data.empty:
            logging.warning(f"Skipping {stock_code}, no stock data found.")
            continue

        # Store latest stock price for comparison
        latest_stock_data = stock_data[stock_data['Date'] == end_date]
        if latest_stock_data.empty:
            logging.warning(f"No stock price found for {stock_code} on {end_date}.")
            continue

        stock_price_data[stock_code] = {
            "high": latest_stock_data["High"].values[0],
            "low": latest_stock_data["Low"].values[0]
        }
        
    # Check if avg prices are within the high-low range
    results = []
    for _, row in df.iterrows():
        stock_code = row["stock_code"]
        if stock_code in stock_price_data:
            high, low = stock_price_data[stock_code]["high"], stock_price_data[stock_code]["low"]
            buy_within_range = low <= row["avg_buy_price"] <= high
            sell_within_range = low <= row["avg_sell_price"] <= high

            results.append({
                "stock_code": stock_code,
                "date": end_date,
                "avg_buy_price": row["avg_buy_price"],
                "avg_sell_price": row["avg_sell_price"],
                "high": high,
                "low": low,
                "buy_within_range": buy_within_range,
                "sell_within_range": sell_within_range
            })

    # Convert to DataFrame and display results
    df_results = pd.DataFrame(results)
    return df_results, results

# Example usage
end_date = "2025-02-07"  # Change to required date
df_results, results = process_chip_data(end_date)

In [None]:
df_results