In [None]:
import os
from datetime import datetime
from typing import Dict, List, Tuple
import pandas as pd
import numpy as np
from pathlib import Path
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, Process
from crewai.tools import BaseTool
from langchain_openai import ChatOpenAI
import yfinance as yf
from dataclasses import dataclass
import logging

# ---------------------------
# 로깅 설정 (한국어 주석)
# ---------------------------
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ---------------------------
# Load environment variables (English docstring, commands, etc.)
# ---------------------------
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
    raise ValueError("OpenAI API key not found in environment variables")


@dataclass
class AnalysisConfig:
    """
    Configuration for analysis parameters (English)
    """

    period_years: int = 5
    interval: str = "1wk"
    moving_averages: List[int] = None
    rsi_period: int = 14
    volume_ma_period: int = 20

    def __post_init__(self):
        if self.moving_averages is None:
            self.moving_averages = [50, 200]


class Tasks:
    def financial_analysis(self, agent):
        return Task(
            description="""Perform financial analysis on the company over 5 years. 
            Examine key metrics, growth, profitability, and compare with industry benchmarks.""",
            agent=agent,
            expected_output="""A financial analysis summary describing the company's financial health, 
            including liquidity, leverage, and profitability.""",
        )

    def technical_analysis(self, agent):
        return Task(
            description="""Perform technical analysis on 5-year historical price data. 
            Calculate RSI, MACD, moving averages, and identify support/resistance levels.""",
            agent=agent,
            expected_output="""A technical analysis report including RSI, MACD values, support and resistance levels, 
            and overall trend assessment.""",
        )

    def investment_recommendation(self, agent, context_tasks):
        return Task(
            description="""Combine all findings and provide a final investment recommendation 
            with a clear BUY/HOLD/SELL rating, price targets, and risk factors.""",
            agent=agent,
            context_tasks=context_tasks,
            expected_output="""A final decision on whether to BUY, HOLD, or SELL, 
            along with specific price targets and risk considerations.""",
        )


# ---------------------------
# 마크다운 보고서 저장 함수 (한국어 주석)
# ---------------------------
def save_analysis_to_markdown(result: str, ticker: str) -> Path:
    """
    Save analysis result to a markdown file (English docstring)
    """
    reports_dir = Path("reports")
    reports_dir.mkdir(exist_ok=True)

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = reports_dir / f"{ticker}_analysis_{timestamp}.md"

    content = f"""# Investment Analysis Report for {ticker}
Generated on: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}

## Analysis Summary
{result}

---
*This report was generated automatically by the Stock Analysis System*
"""

    with open(filename, "w", encoding="utf-8") as f:
        f.write(content)

    return filename


# ---------------------------
# 재무 지표 계산 클래스 (한국어 주석)
# ---------------------------
class FinancialMetrics:
    """
    Calculate financial ratios and other metrics (English docstring)
    """

    @staticmethod
    def calculate_ratios(
        balance_sheet: pd.DataFrame, income_stmt: pd.DataFrame
    ) -> Dict:
        """
        Calculate basic financial ratios such as current ratio, debt-to-equity, quick ratio, and ROE (English docstring)
        """
        try:
            if balance_sheet is None or balance_sheet.empty:
                raise ValueError("Balance Sheet is not available.")
            if income_stmt is None or income_stmt.empty:
                raise ValueError("Income Statement is not available.")

            recent_col = balance_sheet.columns[0]
            required_items = [
                "Total Current Assets",
                "Total Current Liabilities",
                "Total Liab",
                "Total Stockholder Equity",
            ]
            for item in required_items:
                if item not in balance_sheet.index:
                    raise KeyError(f"{item} not found in balance sheet.")

            current_assets = balance_sheet.loc["Total Current Assets", recent_col]
            current_liab = balance_sheet.loc["Total Current Liabilities", recent_col]
            total_liab = balance_sheet.loc["Total Liab", recent_col]
            total_equity = balance_sheet.loc["Total Stockholder Equity", recent_col]

            if "Net Income" not in income_stmt.index:
                raise KeyError("Net Income not found in income statement.")
            net_income = income_stmt.loc["Net Income", recent_col]

            current_ratio = current_assets / current_liab if current_liab != 0 else None
            debt_to_equity = total_liab / total_equity if total_equity != 0 else None
            quick_ratio = (
                (current_assets * 0.8) / current_liab if current_liab != 0 else None
            )
            roe = net_income / total_equity if total_equity != 0 else None

            return {
                "current_ratio": current_ratio,
                "debt_to_equity": debt_to_equity,
                "quick_ratio": quick_ratio,
                "roe": roe,
            }
        except Exception as e:
            logger.error(f"Error calculating financial ratios: {e}")
            return {}


# ---------------------------
# 기술적 분석 관련 함수 (한국어 주석)
# ---------------------------
class TechnicalAnalysis:
    """
    Provides methods to calculate RSI, MACD, and identify support/resistance levels (English docstring)
    """

    @staticmethod
    def calculate_rsi(data: pd.DataFrame, period: int = 14) -> pd.Series:
        """
        RSI calculation using Wilder's Smoothing (EMA) method (English docstring)
        """
        delta = data["Close"].diff()
        gain = delta.clip(lower=0)
        loss = -1 * delta.clip(upper=0)

        alpha = 1 / period
        avg_gain = gain.ewm(alpha=alpha, adjust=False).mean()
        avg_loss = loss.ewm(alpha=alpha, adjust=False).mean()

        rs = avg_gain / avg_loss
        rsi = 100 - (100 / (1 + rs))
        return rsi

    @staticmethod
    def calculate_macd(
        data: pd.DataFrame,
        short_window: int = 12,
        long_window: int = 26,
        signal_window: int = 9,
    ):
        """
        Calculate MACD line, signal line, and histogram (English docstring)
        """
        close = data["Close"]
        ema_short = close.ewm(span=short_window, adjust=False).mean()
        ema_long = close.ewm(span=long_window, adjust=False).mean()

        macd_line = ema_short - ema_long
        signal_line = macd_line.ewm(span=signal_window, adjust=False).mean()
        macd_hist = macd_line - signal_line

        return macd_line, signal_line, macd_hist

    @staticmethod
    def calculate_support_resistance(
        data: pd.DataFrame, window: int = 20
    ) -> Tuple[float, float]:
        """
        Calculate support/resistance based on rolling min/max over a given window (English docstring)
        """
        if len(data) < window:
            return data["Low"].min(), data["High"].max()

        rolling_min = data["Low"].rolling(window=window).min()
        rolling_max = data["High"].rolling(window=window).max()
        return rolling_min.iloc[-1], rolling_max.iloc[-1]


class ComprehensivePriceTool(BaseTool):
    name: str = "Comprehensive Price Analysis"
    description: str = (
        "Provides detailed historical price data with technical indicators (RSI, MACD, etc.)"
    )

    def __init__(self, config: AnalysisConfig):
        super().__init__()
        self._config = config

    def _run(self, ticker: str) -> str:
        """
        Fetch price history using yfinance, then calculate MAs, RSI, MACD, support/resistance, etc. (English docstring)
        """
        try:
            stock = yf.Ticker(ticker)
            history = stock.history(
                period=f"{self._config.period_years}y", interval=self._config.interval
            )

            if history.empty:
                return "No price data available for this ticker"

            for ma in self._config.moving_averages:
                history[f"MA_{ma}"] = history["Close"].rolling(window=ma).mean()

            history["RSI"] = TechnicalAnalysis.calculate_rsi(
                history, self._config.rsi_period
            )
            macd_line, signal_line, macd_hist = TechnicalAnalysis.calculate_macd(
                history
            )
            history["MACD"] = macd_line
            history["MACD_signal"] = signal_line
            history["MACD_hist"] = macd_hist

            history["Volume_MA"] = (
                history["Volume"].rolling(window=self._config.volume_ma_period).mean()
            )

            support, resistance = TechnicalAnalysis.calculate_support_resistance(
                history
            )

            analysis_summary = {
                "current_price": float(history["Close"].iloc[-1]),
                "support_level": float(support),
                "resistance_level": float(resistance),
                "RSI": float(history["RSI"].iloc[-1]),
                "MACD": float(history["MACD"].iloc[-1]),
                "MACD_signal": float(history["MACD_signal"].iloc[-1]),
                "Volume": int(history["Volume"].iloc[-1]),
                "Volume_MA": float(history["Volume_MA"].iloc[-1]),
            }

            return str(analysis_summary)
        except Exception as e:
            logger.error(f"Error in price analysis: {e}")
            return f"Error analyzing price data: {str(e)}"


class FinancialMetricsTool(BaseTool):
    name: str = "Financial Metrics Analysis"
    description: str = (
        "Analyze comprehensive financial statements to derive key ratios and growth metrics"
    )

    def _run(self, ticker: str) -> str:
        """
        Use yfinance to retrieve balance sheet, income statement, and cash flow data, then compute key ratios (English docstring)
        """
        try:
            stock = yf.Ticker(ticker)
            balance_sheet = stock.balance_sheet
            income_stmt = stock.financials
            cash_flow = stock.cashflow

            if balance_sheet is None or balance_sheet.empty:
                return "No balance sheet data available."
            if income_stmt is None or income_stmt.empty:
                return "No income statement data available."
            if cash_flow is None or cash_flow.empty:
                return "No cash flow data available."

            key_ratios = FinancialMetrics.calculate_ratios(balance_sheet, income_stmt)

            if "Total Revenue" in income_stmt.index:
                revenue_series = income_stmt.loc["Total Revenue"]
                revenue_growth = revenue_series.pct_change().mean()
            else:
                revenue_growth = None

            if (
                "Net Income" in income_stmt.index
                and "Total Revenue" in income_stmt.index
            ):
                profit_margin = (
                    income_stmt.loc["Net Income"] / income_stmt.loc["Total Revenue"]
                ).mean()
            else:
                profit_margin = None

            metrics = {
                "key_ratios": key_ratios,
                "revenue_growth_avg": revenue_growth,
                "profit_margin_avg": profit_margin,
            }

            return str(metrics)
        except Exception as e:
            logger.error(f"Error in financial analysis: {e}")
            return f"Error analyzing financial data: {str(e)}"


class AdvancedAgents:
    """
    Creates agents for financial analysis, technical analysis, and final recommendation (English docstring)
    """

    def __init__(self, config: AnalysisConfig):
        self.config = config

    def financial_analyst(self) -> Agent:
        return Agent(
            role="Senior Financial Analyst",
            goal="Provide a thorough evaluation of the company's long-term financial health.",
            backstory="""A veteran financial analyst with 20+ years of experience in equity research, 
                         specialized in fundamental analysis.""",
            tools=[FinancialMetricsTool()],
            verbose=True,
        )

    def technical_analyst(self) -> Agent:
        return Agent(
            role="Technical Analysis Expert",
            goal="Identify major price trends, key technical indicators, and support/resistance levels.",
            backstory="""A specialist in chart patterns and multi-timeframe analysis, 
                         known for pinpointing market turning points.""",
            tools=[ComprehensivePriceTool(self.config)],
            verbose=True,
        )

    def hedge_fund_manager(self) -> Agent:
        return Agent(
            role="Hedge Fund Manager",
            goal="Synthesize the analysis to provide a decisive final investment recommendation (BUY/HOLD/SELL).",
            backstory="""A seasoned hedge fund manager with 25 years in global markets, 
                         making strong calls on whether to buy or sell based on combined analysis.""",
            tools=[],
            verbose=True,
        )


def create_analysis_crew(
    ticker: str, config: AnalysisConfig = AnalysisConfig()
) -> Tuple[str, Path]:
    """
    Create a crew of agents to perform financial analysis, technical analysis,
    and a final investment recommendation in sequential order (English docstring)
    """
    try:
        agents = AdvancedAgents(config)
        tasks = Tasks()

        financial_analyst = agents.financial_analyst()
        technical_analyst = agents.technical_analyst()
        hedge_fund_manager = agents.hedge_fund_manager()

        financial_task = tasks.financial_analysis(financial_analyst)
        technical_task = tasks.technical_analysis(technical_analyst)
        recommend_task = tasks.investment_recommendation(
            hedge_fund_manager,
            context_tasks=[financial_task, technical_task],
        )

        crew = Crew(
            agents=[financial_analyst, technical_analyst, hedge_fund_manager],
            tasks=[financial_task, technical_task, recommend_task],
            verbose=True,
            process=Process.sequential,
            manager_llm=ChatOpenAI(
                model_name="gpt-4o-mini",
                temperature=0.6,
                api_key=OPENAI_API_KEY,
            ),
        )

        result = crew.kickoff(inputs={"company": ticker})
        report_file = save_analysis_to_markdown(result, ticker)

        return result, report_file

    except Exception as e:
        logger.error(f"Error in analysis execution: {e}")
        raise


if __name__ == "__main__":
    try:
        # 아래는 메인 실행 로직의 예시 (한국어 주석)
        config = AnalysisConfig(
            period_years=5,
            interval="1wk",
            moving_averages=[50, 100, 200],
            rsi_period=14,
            volume_ma_period=20,
        )

        result, report_file = create_analysis_crew("005930.KS", config)
        print(f"Analysis completed. Report saved to: {report_file}")
        print("\n=== Final Analysis Result ===")
        print(result)

    except Exception as e:
        logger.error(f"Analysis failed: {e}")