In [2]:
from IPython.core.interactiveshell import InteractiveShell
from IPython.display import Markdown as md
import pandas as pd

InteractiveShell.ast_node_interactivity = "all"

pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

time_zone = 'Asia/Jakarta'

In [4]:
import data_source
from datetime import datetime, timedelta
import yfinance as yf

In [5]:
price_column_name = 'price'
return_column_name = 'return'

In [None]:
def calculate_market_return(market_price_df: pd.DataFrame, first_period_of_price: int, last_period_of_price: int) -> pd.DataFrame:
    first_index = market_price_df.index.get_loc(first_period_of_price)
    last_index = market_price_df.index.get_loc(last_period_of_price)
    market_formation_prices = market_price_df.copy()[price_column_name].to_frame().iloc[first_index:last_index+1]
    market_return = market_formation_prices.copy().ffill().pct_change()[1:]

    first_column_name = market_return.columns[0]
    market_return.rename(columns={first_column_name: return_column_name}, inplace=True)

    return market_return


In [7]:
def get_ticker_prices(tickers: str | list, start_date: str, end_date: str) -> pd.DataFrame:
    download_result = yf.download(tickers=tickers, start=start_date, end=end_date, threads=False, repair=True, 
                                  keepna=False, period=None)
    price_result = download_result.copy()[['Close', 'Volume']]
    return price_result

In [9]:
def calculate_tickers_return(tickers_stock_prices: pd.DataFrame) -> pd.DataFrame:
    tickers_return = tickers_stock_prices.copy().pct_change()[1:]
    return tickers_return

In [10]:
date_column_name = 'date'
period_column_name = 'period'
ticker_column_name = 'ticker'
return_column_name = 'return'
observation_param_name_for_return = 'param1_r'

In [11]:
def init_observation_df(input_df: pd.DataFrame) -> pd.DataFrame:
  observation_list = []
  for period, row in input_df.iterrows():
    for ticker, daily_return in row.items():
      observation_list.append({
          period_column_name: period,
          ticker_column_name: ticker,
          return_column_name: daily_return
      })
  observation_raw_df = pd.DataFrame(observation_list)

  # raw pivot
  observation_df = observation_raw_df.copy().pivot_table(index=period_column_name, columns=ticker_column_name)

  # fixed_pivot
  observation_df.rename(columns={return_column_name: observation_param_name_for_return}, inplace=True)
  observation_df = observation_df.swaplevel(i=0, j=1, axis=1)

  return observation_df

In [12]:
observation_param_name_for_u = 'param2_u'

In [13]:
def calculate_u(observation_df: pd.DataFrame, returns_of_market: pd.DataFrame) -> pd.DataFrame:
  for column, _ in observation_df.items():
    _ticker, _ = column
    observation_df[_ticker, observation_param_name_for_u] = observation_df.copy()[_ticker, observation_param_name_for_return].astype(float) \
      - returns_of_market[return_column_name].astype(float)
  observation_df = observation_df.copy().reindex(sorted(observation_df.copy().columns), axis=1)
  return observation_df

In [14]:
observation_param_name_for_cu = 'param3_cu'

In [15]:
def calculate_cu(observation_df: pd.DataFrame) -> pd.DataFrame:
  for column, _ in observation_df.items():
    _ticker, _ = column
    observation_df[_ticker, observation_param_name_for_cu] = 0
    cu = 0
    for i, _ in observation_df[_ticker, observation_param_name_for_u].items():
      cu += observation_df.copy().at[i, (_ticker, observation_param_name_for_u)]
      observation_df.at[i, (_ticker, observation_param_name_for_cu)] = cu
  observation_df = observation_df.copy().reindex(sorted(observation_df.columns), axis=1)
  return observation_df

In [16]:
factor = 30
last_cu_column_name = 'last_cu'

In [17]:
def sort_by_last_formation_day(observation_df: pd.DataFrame) -> pd.DataFrame:
    last_formation_day = observation_df.copy().iloc[-1]
    last_formation_day_dict = last_formation_day.to_dict()
    last_formation_day_list = [{ticker_column_name: key[0], last_cu_column_name: value} for key, value in last_formation_day_dict.items() if key[1] == 'param3_cu']
    last_formation_day_df = pd.DataFrame(last_formation_day_list).sort_values(by=last_cu_column_name, ascending=False)
    tickers_sorted = last_formation_day_df[ticker_column_name].to_list()
    return observation_df.copy()[tickers_sorted]

In [None]:
def generate_observation_aggregated_df(observation_df: pd.DataFrame) -> pd.DataFrame:
    observation_aggr_list = []
    car = 0
    for index, row in observation_df.iterrows():
      u_series = row[:, observation_param_name_for_u]
      ar = u_series.mean()
      car += ar

      observation_aggr_list.append({
          'period': index,
          'ar': ar,
          'car': car,
      })

    observation_aggr_df = pd.DataFrame(observation_aggr_list)
    observation_aggr_df = observation_aggr_df.pivot_table(index='period')
    return observation_aggr_df


In [None]:
def generate_winner_loser_comparison(winner_obs_aggr_df: pd.DataFrame, loser_obs_aggr_df: pd.DataFrame) -> pd.DataFrame:
    winner_car_series = winner_obs_aggr_df['car']
    loser_car_series = loser_obs_aggr_df['car']

    winner_loser_comparison = pd.concat({'car_winner': winner_car_series, 'car_loser': loser_car_series}, axis=1)
    return winner_loser_comparison


In [None]:
def plot_car_winner_loser(winner_loser_comparison: pd.DataFrame, case_code: str):
    title = f'CAR Movement on the Case {case_code}'
    winner_loser_comparison.plot(title=title, ylabel='CAR', xlabel='Period', grid=True, style=['o-','^-'])