# Model Prediction

In [1]:
import yfinance as yf
# import requests_cache
# session = requests_cache.CachedSession('yfinance.cache')
# session.headers['User-agent'] = 'my-program/1.0'
ticker = yf.Ticker('msft')
# The scraped response will be stored in the cache
ticker.actions

Unnamed: 0_level_0,Dividends,Stock Splits
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
1987-09-21 00:00:00-04:00,0.00,2.0
1990-04-16 00:00:00-04:00,0.00,2.0
1991-06-27 00:00:00-04:00,0.00,1.5
1992-06-15 00:00:00-04:00,0.00,1.5
1994-05-23 00:00:00-04:00,0.00,2.0
...,...,...
2024-11-21 00:00:00-05:00,0.83,0.0
2025-02-20 00:00:00-05:00,0.83,0.0
2025-05-15 00:00:00-04:00,0.83,0.0
2025-08-21 00:00:00-04:00,0.83,0.0


In [None]:
from functools import lru_cache
import yfinance as yf
import numpy as np

@lru_cache(maxsize=100)  # 缓存最近100支股票的查询
def get_pe_ratio(ticker):
    """ArithmeticError: 获取股票的市盈率（P/E Ratio）[working version]"""
    try:
        stock = yf.Ticker(ticker)
        # 优先使用TTM市盈率，若不存在则尝试其他类型
        return {
            "trailingPE": stock.info.get('trailingPE'),
            "forwardPE": stock.info.get('forwardPE'),
            "priceToBook": stock.info.get('priceToBook')
        }
    except Exception as e:
        print(f"Error getting P/E for {ticker}: {str(e)}")
        return np.nan

# 批量获取示例
tickers = ["AAPL", "TSLA", "NVDA", "AMD"]
pe_data = {ticker: get_pe_ratio(ticker) for ticker in tickers}
print(pe_data)

{'AAPL': {'trailingPE': 34.83936, 'forwardPE': 28.438677, 'priceToBook': 52.143856}, 'TSLA': {'trailingPE': 311.77777, 'forwardPE': 205.64027, 'priceToBook': 18.661568}, 'NVDA': {'trailingPE': 45.890816, 'forwardPE': 24.379698, 'priceToBook': 37.804577}, 'AMD': {'trailingPE': 108.73822, 'forwardPE': 31.759455, 'priceToBook': 5.562132}}


In [6]:
import yfinance as yf
import numpy as np

def get_pb_ratio(ticker):
    """
    使用 Yahoo Finance API 获取市净率
    """
    try:
        stock = yf.Ticker(ticker)
        # 获取市净率
        pb_ratio = stock.info.get('priceToBook')
        
        # 如果市净率不存在，尝试计算
        if pb_ratio is None:
            # 获取当前股价
            current_price = stock.history(period='1d')['Close'].iloc[-1]
            
            # 获取每股账面价值
            book_value_per_share = stock.info.get('bookValue')
            
            if book_value_per_share and book_value_per_share > 0:
                pb_ratio = current_price / book_value_per_share
            else:
                pb_ratio = np.nan
        
        return pb_ratio
    except Exception as e:
        print(f"Error getting P/B ratio for {ticker}: {str(e)}")
        return np.nan

# 示例用法
print("AAPL P/B Ratio:", get_pb_ratio("AAPL"))
print("MSFT P/B Ratio:", get_pb_ratio("MSFT"))

AAPL P/B Ratio: 52.143856
MSFT P/B Ratio: 9.77027


In [None]:
from stock_model import Stock_Model, FILE_TYPE
stock = Stock_Model('IFX.DE', '3mo', interval='1d', win_size=60, path='./resource', delay_days=3)
print(stock.path)
stock.load_keras_model()

./resource


AttributeError: 'Stock_Model' object has no attribute 'load_keras_model'

In [None]:
"""
class Model_Prediction
predict trend of stock
NOTE: 新数据应紧接在训练数据之后，保持时间序列连贯性。

"""
import os, sys
sys.path.append('.')
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from keras.models import load_model
from joblib import load

import yfinance as yf
import json
SYMBOL = "IFX.DE"
HISTORY = "3mo"
class Model_Prediction:
    CONFIG_FILE = 'config.json'
    def __init__(self, stock, period='3mo', interval="1d", win_size=60, delay_days=3, rsc_path='.'):
        self._config = None
        self._stock_model = Stock_Model(stock, period, interval=interval, win_size=win_size, path=rsc_path)
        self._stock = stock if stock is not None else self._stock
        self._period = period if period is not None else self._period
        self._interval = interval if interval is not None else self._interval    
        #self._session = requests.Session(impersonate="chrome")
        self._new_data = None
        self._window_size = win_size

    def load_config(self):
        try:
            with open(self.CONFIG_FILE, 'r') as cfg:
                json.load(self._config, cfg)
        except Exception as e:
            print(f"Load configure file fails: {e}")

    def process_predicting_data(self):
        self.load_stock()
        self._stock_model.load_scaler()
        self._stock_model.set_working_data()
        self._stock_model.scale_data(create=False, save=False)
        self._create_sequence()
    
    def load_stock(self):
        # self._new_data = yf.Ticker(self._stock, session=self.session).history(period=self._period, interval=self._interval)
        self._stock_model.load_stock(self._stock, self._period, self._interval)
        

    def _prepare_predict_data(self):
        scaled_new_data = self._scaler.transform(self._new_data)
        self._prepared_data = np.array([scaled_new_data[-self._window_size:]])

    def _predict_new_data(self):
        self._model_predict_data = self._model.predict(self._predict_data)

    def _invert_normalized_data(self):
        # 反归一化需要重建完整的多变量矩阵（仅Close列有值，其他列置0）
        dummy_matrix = np.zeros((1, self._features.shape[1]))
        dummy_matrix[:, 3] = self._model_predict_data.flatten()  # 第4列是Close
        self._predicted_data = self._scaler.inverse_transform(dummy_matrix)[0, 3]

    def sigle_day_predict(self):
        self._prepare_predict_data()
        self._predict_new_data()
        self._invert_normalized_data()
        return self._predicted_data
        
    def multi_day_predict(self, days=5) -> list:
        self.predictions = []
        scaled_new_data = self._scaler.transform(self._new_data)
        initial_sequence = scaled_new_data[-self._window_size:]
        current_sequence = initial_sequence.copy()
        self._prepare_predict_data()

        for _ in range(days):
            self._model_predict_data = self._model.predict(current_sequence.reshape(1, self._window_size, -1))

            # update sequence: slide window
            new_row = current_sequence[-1].copy()
            new_row[3] = next_day_scaled[0][0]  # 更新Close列
            current_sequence = np.vstack([current_sequence[1:], new_row])
            self._invert_normalized_data()
            self.predictions.append(self._predicted_data)            
        return self.predictions
        


### prepare to predict data

In [None]:
mp = Model_Prediction(SYMBOL)

In [None]:
mp.load_stock()

### predict data

In [None]:
predict_scaled_data = model.predict(prepared_data)

### invert normalized data

In [None]:
def _invert_normalized_data(self):
    # 反归一化需要重建完整的多变量矩阵（仅Close列有值，其他列置0）
    dummy_matrix = np.zeros((len(self._Y_predict), self.features.shape[1]))
    dummy_matrix[:, 3] = self._Y_predict.flatten()  # 第4列是Close
    self._Y_pred_actual = self._scaler.inverse_transform(dummy_matrix)[:, 3]
    return dummy_matrix

### visual result

In [None]:
def visual_predict_result(predict_prices, actual_prices=None):
    plt.figure(figsize=(12, 6))
    plt.plot(predict_prices, marker='o', label='Predicted Prices')
    if actual_prices:
        plt.plot(actual_prices, marker='x', label='Actual Prices')
    plt.title(f'Next {future_days} Days Price Prediction')
    plt.xlabel('Days into Future')
    plt.ylabel('Price')
    plt.legend()
    plt.show()