# Imports

## Module

In [1]:
import pandas as pd
import numpy as np
import scipy as sp
import re

from bs4 import BeautifulSoup as bs
from contextlib import closing
import pickle

import os
from os.path import isfile, join
from os import listdir

import sys

import requests
from urllib.parse import urlparse
import urllib

from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import time

import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib.dates as mdates


import json

import selenium


# Banque du Canada

In [None]:
bank_rate_dict = requests.get("https://www.banqueducanada.ca/valet/observations/group/chartered_bank_interest/json").json()


# t = json.load("https://www.banqueducanada.ca/valet/observations/group/chartered_bank_interest/json")

In [None]:

# for k,v in bank_rate_dict['seriesDetail'].items():
#     print(k, v["label"])
def bank_of_canada_to_df(bank_dict):
    pass

data_dict = {}
label_dict = {}
for k, v in bank_rate_dict['seriesDetail'].items():
    data_dict[k]=[]
    label_dict[k]=v['label']
    

date_index = []

for row in bank_rate_dict['observations']:
    date_index.append(datetime.strptime(row['d'], "%Y-%m-%d"))
    for k in bank_rate_dict['seriesDetail']:
        if k in row:
            data_dict[k].append(float(row[k]['v']) / 100)
        else:
            data_dict[k].append(None)
            
df = pd.DataFrame(data_dict, index=date_index)
df.rename(columns=label_dict)
# print(len(df.columns.to_list()), len(label_dict))

In [None]:
df.index

In [None]:
df[["V80691333","V80691334","V80691335","V80691339"]]["2008-01-01":].plot()

In [None]:
for d in df.index:
    if d.year == 2018:
        if d.month == 11:
            print(d)

In [None]:
# df[["V80691333","V80691334","V80691335","V80691339"]].loc(datetime.strptime("2023-10-25", "%Y-%m-%d"))
rate_comparison_df = df[["V80691333","V80691334","V80691335","V80691339"]].loc[["2018-11-14",
                                                                                "2023-05-10", 
                                                                                "2023-06-14", 
                                                                                "2023-07-19", 
                                                                                "2023-08-16",
                                                                                "2023-09-13",
                                                                                "2023-10-11",
                                                                                "2023-11-01"]]
rate_comparison_df = rate_comparison_df.transpose()

fig, ax = plt.subplots(figsize=(10, 6))


rate_comparison_df.plot(kind='line', ax=ax)
plt.title('Comparison of Rates')
plt.xlabel('Rate Types')
plt.ylabel('Rates')

ax.set_title('Progression of the term structure')
ax.set_xlabel('Term')
ax.set_ylabel('Rates')
ax.legend(loc='best')

# Remove the axes and related elements
ax.spines[['top', 'right', 'bottom', 'left']].set_visible(False)
ax.set_xticks([])
ax.set_yticks([])
plt.show()
# df[["V80691333","V80691334","V80691335","V80691339"]].index

# Functions

## string cleaning

In [None]:
num = "6,77 %"
t = "Ouvert: 9,38 % Fermé: 6,56 %"
v = "6,09 % voir la promo"
def str_to_float(num):
    return float(num.strip("%").strip().replace(',', "."))/100

def remove_overt(num):
    s  = "(?<=é: ).*"
    return re.findall(s, num)

def remove_promo(num):
    s = "^(.*?)\%.*"
    return re.findall(s, num)[0].strip()

def clean_percentage(num):
    if remove_overt(num):
        return str_to_float(remove_overt(num)[0])
    if remove_promo(num):
        return str_to_float(remove_promo(num))
    else: 
        return str_to_float(num)

# remove_promo(v)

## JS WebTable

In [None]:
import unittest
import time
from selenium import webdriver
from selenium.webdriver.support.select import Select
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
 

class WebTable(object):
    def __init__(self, url_str, tableXPath, row_x_path, col_x_path, title_x_path):
        self.url_str = url_str
        self.tableXPath = tableXPath
        self.row_x_path = row_x_path
        self.col_x_path = col_x_path
        self.title_x_path = title_x_path
        
        self.driver = webdriver.Chrome('html/chromedriver')
        self.page_obj = self.driver.get(self.url_str)
        self.num_rows = None
        self.num_cols = None
        self.table_dict = None
        self.table_title_list = []
        self.rate_df_raw = None
        
        
    def tearDown(self):
        self.driver.close()
        self.driver.quit()

    def set_num_rows(self, driver):
        self.num_rows = len (driver.find_elements("xpath", self.row_x_path))
        print("Rows in table are " + repr(self.num_rows))
        
    def set_num_cols(self, driver):
        self.num_cols = len (driver.find_elements("xpath", self.col_x_path))
        print("Columns in table are " + repr(self.num_cols))
        
    def get_headers(self):
        pass
        
    def get_infos(self):
        driver = self.driver
        driver.get(self.url_str)
        
        
        WebDriverWait(driver, 60).until(EC.presence_of_element_located((By.XPATH, self.tableXPath)))
        
        self.set_num_rows(driver) 
        self.set_num_cols(driver)
        
        table_dict = {}
        
        if self.url_str == "https://www.nesto.ca/mortgage-rates/location/quebec/":
            start_index = 1
        elif self.url_str == "https://www.bnc.ca/particuliers/hypotheque/taux.html":
            start_index = 2
        elif self.url_str == "https://www.td.com/ca/fr/services-bancaires-personnels/produits/prets-hypothecaires/taux-hypothecaires":
            start_index = 2
            
        for t_row in range(start_index, (self.num_rows + 1)):
            table_dict[t_row] = []
            for t_col in range(1, self.num_cols + 1):
                XPath = self.row_x_path + '[' + str(t_row) + ']/td[' + str(t_col) + ']'        
                table_dict[t_row].append(driver.find_element("xpath", XPath).text)
        self.table_dict = table_dict
        
        for t_col in range(1, self.num_cols + 1):
            XPath = self.title_x_path + str(t_col) + ']' 
            self.table_title_list.append(driver.find_element("xpath", XPath).text)
        
        data_dict = {}
        for i in range(len(self.table_title_list)):
            data_dict[self.table_title_list[i]] = [v[i]for k,v in self.table_dict.items()]
            
        self.rate_df_raw = pd.DataFrame.from_dict(data_dict)
        
        return table_dict

# WebSite class

In [2]:
class WebSite(object):
    path = 'html/'
    
    def __init__(self, url_str):
        
        # This is the WebSite Attribute
        self._url_str = url_str
        self._url_object = urlparse(self._url_str)
        self._url = None
        self._host_name = None
        self._file_dir = None
        self._file_name = None
        self._file_path = None
        self._bs4_object = None
        self._scraping_factory = None
    
    def set_url_str(self):
        pass
    def get_url_str(self):
        pass
    
    def set_url_object(self): self._url_object = urlparse(self._url_str)
    def get_url_object(self): return self._url_object
      
    
    def set_host_name(self): self._host_name = self._url_object.scheme+"://"+self._url_object.netloc   
    def get_host_name(self):
        if self._host_name is not None:
            return self._host_name
        else:
            print("instantiating host name")
            self.set_host_name()
            return self.get_host_name()
        
    def set_url(self): self._url = self.get_host_name() + self._url_object.path      
    def get_url(self):
        if self._url is not None:
            return self._url
        else:
            self.set_url()
            return self.get_url()
    
    
    def set_file_dir(self):
        self._file_dir = os.path.join(self.path, self._url_object.netloc)
        
    def get_file_dir(self):
        if self._file_dir is not None:
            return self._file_dir
        else:
            print("Instantiating file path")
            self.set_file_dir()
            return self.get_file_dir()
    
    
    def set_file_name(self):
        self._file_name = urllib.parse.quote(self._url_object.path, " ")+'.pickle'
        
    def get_file_name(self):
        if self._file_name is not None:
            return self._file_name
        else:
            self.set_file_name()
            return self.get_file_name()
    
    
    def set_file_path(self):
        self._file_path = os.path.join(self.get_file_dir(),self.get_file_name())
        
    def get_file_path(self):
        if self._file_path is not None:
            return self._file_path
        else:
            self.set_file_path()
            return self.get_file_path()
    
    
    def file_already_exists(self):
        """
        check if website directory exist, if not creates it and recals itself, 
        check if file has been saved
        returns 
        returns 
        """
        if not os.path.exists(self.get_file_dir()):
            # directory does not exist, create the directory 
            os.makedirs(self.get_file_dir())
            print("Directory ", self.get_file_dir(), "as been created, receipy does not exist")
            # call the function again
            self.file_already_exists()
        else:
            # the folder already exists but not the file
            if not os.path.exists(self.get_file_path()):
                # get the file
                # print("the folder already exists but not the file, GET THE FILE")
                return False
            else: # the file exist 
                # print("the receipy exists on file")
                return True
            # os.makedis(path_to_main_folder)
            # print('Folder ', path_to_main_folder, ' had to be created')
        
    def get_html(self):
        """
        Makes a GET request to the specified URL
        
        Returns:
            
            True, requests.content if successful
            
            False, error_message if un-successfull
            
        """
        headers = {'Content-Type': 'application/x-www-form-urlencoded',
                   'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15', 
                   'Host': self._url_object.netloc,
                   'Origin': self.get_host_name(),
                   'Referer': self.get_host_name()
                  }
        try:
            with closing(requests.get(self.get_url(), headers=headers, verify=False)) as page_request:
                return True, page_request.content
        except requests.exceptions.RequestException as e:
            error_message = "Could not open " + self.get_url() + "error " + e
            return False, error_message
     
    def get_data(self):
        """this function should get html or csv depending on implementation"""
        pass
    
    def save_file(self, data): #save_to_pickle
        """
        Function simply save data as a pickle file. 
        It will handle recursion_limit error up to 10 0000
        If the operation fails, the method will print the data
        The path to the file is constructed based on the URL's domain and query or path
        depending on the get_file_path() method implemented for that particular domain.

        Paramaters:
        
            data : 

        Returns:

            True if the operation is successful  
            
            print the data otherwise

        """
        with open(self.get_file_path(), 'wb') as handle:
            try:
                pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL)

            except RecursionError:
                sys.setrecursionlimit(100000)
                pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL)

            finally:
                print(self.get_file_path(), 'as been saved')
                sys.setrecursionlimit(1000)
                print("System recursions limits has been re-set at 1000")

                return True            
        
     
    def acquire_data(self):
        """ 
        Check if the html data has been downlaoded to disk.
            if the file isn't there, the method will call the other method get_html()
                then if the acquisition is successful -- the get_html() returns True --
        The data will be saved to disk and will return true
        
        If the file isn't already on file and the acquisition has not been successful the 
        function will return False
        
        Returns: 
        
            True if the file is there or can be acquired
            
            False if the file isn't there and could not be acquired via the get_html() method
            
        """
        
        if self.file_already_exists():
            print("File is already on disk")
            return True
        else: # get the html data
            data_acquired, data = self.get_html() # self.get_data
            if data_acquired: 
                print("Data succeffuly acquired, saving on file")
                # save the data on disk
                self.save_file(data)
                print('The file has been saved on disk')
                return True
            else:
                print("Unable to acquire html file thru the get_html function" + data)
                return False
    
    def set_bs4_object(self):
        """
            Use the get_file_path() method, if the path exist using 
            receipy_already_exist() method, opens it and reads it in a bs4 objects


            Parameters:

                None


            Return:

                return the tuple True, _bs4_object if the operation is successfull or
                return the tuple False, and the message from the method recepy_already_exist()
        """
        
        if self.file_already_exists():
            file =  self.get_file_path()

            with open(file, "rb") as f:
                soup_obj = bs(pickle.load(f), "html.parser")
            self._bs4_object = soup_obj
        else:
            self.acquire_data()
            
    def get_bs4_object(self):
        if self._bs4_object is not None:
            return self._bs4_object
        else:
            self.set_bs4_object()
            return self.get_bs4_object()
    
    def __str__(self):
        return self.get_url()


# Tangerine

In [8]:
x_path_tangerine_cpg_rate = '//*[@id="mainContentSection"]/section/div[8]/div[1]/div/div/div/div[url_3]'
url_tangerine_cpg_rate = "https://www.tangerine.ca/fr/products/saving/guaranteed-investments/tax-free-guaranteed-investment"
class BankWebSite(WebSite):
    def __init__(self, url_str):
        super().__init__(url_str)
        
    def set_file_name(self):
        self._file_name = datetime.strftime(datetime.today().date(), "%Y-%m-%d") + urllib.parse.quote(self._url_object.path, " ")+'.pickle'
tangerine = BankWebSite(url_tangerine_cpg_rate)


In [9]:
t = "#mainContentSection > section > div:nth-child(11) > div.widget-hidden > div > div > div > div.tngCol-6"
tangerine_cpg_rate = tangerine.get_bs4_object().select(t)

Instantiating file path
instantiating host name
Data succeffuly acquired, saving on file
html/www.tangerine.ca/2023-11-20%2Ffr%2Fproducts%2Fsaving%2Fguaranteed-investments%2Ftax-free-guaranteed-investment.pickle as been saved
System recursions limits has been re-set at 1000
The file has been saved on disk




In [10]:
for row in tangerine_cpg_rate[0].find_all("div", {"class":"tngLeft"}):
    print("****", row)
    for child in row.find_next_siblings():
        if child.name=="span":
            # pass
            # print(child)
            dure_unite = child.text.split()
            duré = float(dure_unite[0].replace(",", "."))
            unité = dure_unite[1]
            if unité == "jours":
                print(duré/365, unité)
            elif unité == "an":
                print(duré, unité)
            elif unité == "ans":
                print(duré, unité)
        else:
            print(child)
            rate = float(child.text.replace("%", "").replace(",", "."))
            print(float(child.text.replace("%", "").replace(",", ".")))
            # pass
            
        
# tangerine_cpg_rate[0].find_all("div", {"class":"tngBodyBase-prx"})
    

**** <div class="tngLeft tngBodyBase-prx width-40pn">Placement garanti</div>
0.2465753424657534 jours
<div class="tngRight tngBodyBase-prx" data-toggle="rate" data-type="shorttermtfsagic90days">0,20 %</div>
0.2
**** <div class="tngLeft tngBodyBase-prx width-40pn">Placement garanti</div>
0.4931506849315068 jours
<div class="tngRight tngBodyBase-prx" data-toggle="rate" data-type="shorttermtfsagic180days">0,60 %</div>
0.6
**** <div class="tngLeft tngBodyBase-prx width-40pn">Placement garanti</div>
0.7397260273972602 jours
<div class="tngRight tngBodyBase-prx" data-toggle="rate" data-type="shorttermtfsagic270days">0,75 %</div>
0.75
**** <div class="tngLeft tngBodyBase-prx width-40pn">Placement garanti</div>
1.0 an
<div class="tngRight tngBodyBase-prx" data-toggle="rate" data-type="tfsagic1yr">0,65 %</div>
0.65
**** <div class="tngLeft tngBodyBase-prx width-40pn">Placement garanti</div>
1.5 an
<div class="tngRight tngBodyBase-prx" data-toggle="rate" data-type="tfsagic18month">0,70 %</div>
0

# BNC

In [None]:
url_bnc_str = 'https://www.bnc.ca/particuliers/hypotheque/taux.html'
bnc_tableXPath = '//*[@id="TableStatic-0"]/div/table'
bnc_row_x_path = '//*[@id="TableStatic-0"]/div/table/tbody/tr'
bnc_col_x_path = '//*[@id="TableStatic-0"]/div/table/tbody/tr[1]/th'
bnc_title_x_path = '//*[@id="TableStatic-0"]/div/table/tbody/tr[1]/th['

class BncWebTable(WebTable):

    def __init__(self):
        self.rate_df = None
        chrome_options = Options()
        chrome_options.add_argument("--disable-images")
        super().__init__(url_bnc_str, bnc_tableXPath, bnc_row_x_path, bnc_col_x_path, bnc_title_x_path)
        
    def set_rate_df(self):
        self.rate_df_raw["Fermé"] = self.rate_df_raw["Fermé"].apply(lambda x: clean_percentage(x))
        self.rate_df_raw["TAC8"] = self.rate_df_raw["TAC8"].apply(lambda x: clean_percentage(x))
        
    def click_area(self):
        allAreaClicable = "/html/body/main/div/div/div[5]/div[2]/div/div/div[2]/div[2]/div/div/div[4]/div/div[1]"
        self.driver.find_element("xpath", XPath).click()
        
    def get_other_table(self):
        self.click_area()
        tableXPath = '//*[@id="TableStatic-1"]/div/table'
        row_x_path = '//*[@id="TableStatic-1"]/div/table/tbody/tr'
        col_x_path = '//*[@id="TableStatic-1"]/div/table/tbody/tr[1]/th'
        title_x_path = '//*[@id="TableStatic-1"]/div/table/tbody/tr[1]/th['
        
from selenium import webdriver
from selenium.webdriver.chrome.options import Options



        
        
        
    
        

In [None]:
bnc = BncWebTable()
bnc.get_infos()
# bnc.click_area()

In [None]:
# bnc.set_rate_df()
bnc.rate_df_raw

# Desjardins

In [None]:
desjardins_df_list = pd.read_html("https://www.desjardins.com/taux-rendement/financement/prets-hypothecaires/index.jsp?utm_id=e-p-0-118901178170&campagne=e-p-0-118901178170&gclid=CjwKCAiAuOieBhAIEiwAgjCvcvc_T3-q5QNFozoSiJQu4xe4ZTf5gHbDX2JhleOACRFfpMKEpXeFFhoCSL0QAvD_BwE&gclsrc=aw.ds")

In [None]:
desjardins_df_list[0]["Taux d\'intérêt"] = desjardins_df_list[0]["Taux d\'intérêt"].apply(lambda x: clean_percentage(x))
desjardins_df_list[0]

In [None]:
desjardins_df_list[1]

In [None]:
desjardins_df_list[2]

In [None]:
desjardins_df_list[3]

In [None]:
desjardins_df_list[4]

In [None]:
desjardins_df_list[5]

# HSBC

In [None]:
url_str_hsbc = "https://www.hsbc.ca/mortgages/rates/"

In [None]:
hsbc_df_list = pd.read_html(url_str_hsbc)

In [None]:
hsbc_df_list[0]

In [None]:
# hsbc_df_list[6]["Rate"] = hsbc_df_list[6]["Rate"].apply(lambda x: clean_percentage(x))
hsbc_df_list[6]

# TD

In [None]:
url_td_str = 'https://www.td.com/ca/fr/services-bancaires-personnels/produits/prets-hypothecaires/taux-hypothecaires'
td_tableXPath = '//*[@id="container-57afe9bd25"]/div/div/div/table'
td_row_x_path = '//*[@id="container-57afe9bd25"]/div/div/div/table/tbody/tr'
td_col_x_path = '//*[@id="container-57afe9bd25"]/div/div/div/table/thead/tr/th'
td_title_x_path = '//*[@id="container-57afe9bd25"]/div/div/div/table/thead/tr/th['

td = WebTable(url_td_str, td_tableXPath, td_row_x_path,  td_col_x_path, td_title_x_path)
td.get_infos()
'//*[@id="container-57afe9bd25"]/div/div/div/table/tbody/tr[2]/th[2]'

# BMO

# RBC

# CIBC

# Nesto

## Static data

In [None]:
url_nesto_str = "https://www.nesto.ca/mortgage-rates/location/quebec/"
nesto_tableXPath = '//*[@id="allrates"]/div/div/div[1]/div[1]/div/div[2]/figure/div/table'
nesto_row_x_path = '//*[@id="allrates"]/div/div/div[1]/div[1]/div/div[2]/figure/div/table/tbody/tr'
nesto_col_x_path = '//*[@id="allrates"]/div/div/div[1]/div[1]/div/div[2]/figure/div/table/thead/tr/th'
nesto_title_x_path = '//*[@id="allrates"]/div/div/div[1]/div[1]/div/div[2]/figure/div/table/thead/tr/th['

# "/html/body/div[2]/div/div/main/section[3]/div/div/div[1]/div[1]/div/div[2]/figure/div/table/tbody/tr[1]/td[1]"

## Get the data

In [None]:
nesto = WebTable(url_nesto_str, nesto_tableXPath, nesto_row_x_path, nesto_col_x_path, nesto_title_x_path )
nesto.get_infos()
nesto.rate_df_raw["Period"] = [v[0].split()[0] for k, v in nesto.table_dict.items()]
nesto.rate_df_raw["Type"] = [v[0].split()[1] for k, v in nesto.table_dict.items()]
nesto.rate_df_raw["Rate"] = nesto.rate_df_raw["Rate"].apply(lambda x: clean_percentage(x))
nesto.rate_df_raw["Period"] = nesto.rate_df_raw["Period"].str.replace("-", " ").str.strip().str.replace("year", "ans")

In [None]:
nesto.rate_df_raw[nesto.rate_df_raw["Type"] == "fixed"][["Period", "Rate"]]

# hypotheque

In [None]:
def get_closest_rate_to_date(serie, date):
    min_date = serie.index.min()
    max_date = serie.index.max()

    if date >= min_date and date <= max_date:
        if date in serie.index.to_list():
            return serie[date]
        else:
            date = date + timedelta(days=1) 
            return get_closest_rate_to_date(serie, date)
            
            
class InterestRate(object):
    def __init__(self, nominal_int_rate, compunding_freq_per_year):
        self.nominal = nominal_int_rate
        self.compunding_freq_per_year = compunding_freq_per_year
        self.effective = self.nominal / self.compunding_freq_per_year
    
    def an(self, n):
        if 1 - (1 + self.effective) ** - n == 0:
            return 1
        else:
            return (1 - (1 + self.effective) ** - n) / self.effective
    def sn(self, n):
        return ((1 + self.effective) ** (n) - 1) / self.effective 
    def compute_accumulation_function(self, t):
        return (1 + self.effective) ** t
    def compute_actualisation_function(self, t):
        return (1 + self.eff_interest_rate) ** -t
    def __str__(self):
        return str(self.nominal)
    
class Mortage2(object):
    
    def __init__(self, start_date, principal, amortisation_period_in_year, compunding_freq_per_year, fixed_rate=None):
        self.start_date_dt = start_date
        self.principal = principal
        self.historical_rate_df = pd.read_csv('~/Downloads/chartered_bank_interest.csv', sep=",", header=21, parse_dates=True, index_col="date")
        self.amortisation_period_in_year = amortisation_period_in_year
        self.compunding_freq_per_year = compunding_freq_per_year
        self.total_compunding_period = amortisation_period_in_year * compunding_freq_per_year
        self.date_list = None
        self.var_rate_serie = self.historical_rate_df["V80691311"]
        self.five_year_fixed_rate_serie = self.historical_rate_df["V80691335"]
        
        if fixed_rate is None:
            self.five_year_fixed_rate = InterestRate(get_closest_rate_to_date(self.historical_rate_df["V80691335"], self.start_date_dt) / 100, self.compunding_freq_per_year)
        else:
            self.five_year_fixed_rate = InterestRate(fixed_rate, 26)
            
        self.fixed_mortgage_renewal_date_list = {}
        self.renewal_date_dict = None
        self.amort_df = None
        self.fixed_over_var = 0
    
        
    
    def set_date_list(self):
        if self.compunding_freq_per_year == 12:
            self.date_list = [self.start_date_dt + relativedelta(months=x) for x in range(0, self.total_compunding_period + 1)]
        elif self.compunding_freq_per_year == 1:
            self.date_list = [self.start_date_dt + relativedelta(years=x) for x in range(0, self.total_compunding_period + 1)]
        elif self.compunding_freq_per_year == 26:
            self.date_list = [self.start_date_dt + relativedelta(weeks= 2 * x) for x in range(0, self.total_compunding_period + 1)]
    
    
    def set_renewal_dates(self):
        self.renewal_date_dict = {int(i * 5 * self.compunding_freq_per_year):self.date_list[int(i * 5 * self.compunding_freq_per_year)] for i in range(0,5)}
        
        
    def set_amort_dict(self, fixed=True):
        b = self.principal
        date_index = 0
        Index = []
        dict_df = {'interet_rate': [],
                   'balance': [],
                   'pmt': [], 
                   'interet': [], 
                   'capital': []
                  }
    
        while date_index <= self.total_compunding_period:
            if self.date_list[date_index] < self.var_rate_serie.index.max():
                if fixed:
                    if self.amortisation_period_in_year == 50:
                        if date_index in self.renewal_date_dict:
                            # rate = InterestRate(self.five_year_fixed_rate_serie[self.renewal_date_dict[date_index]] / 100, 26)
                            rate = InterestRate(get_closest_rate_to_date(self.five_year_fixed_rate_serie, self.renewal_date_dict[date_index]) / 100, self.compunding_freq_per_year)
                    else:
                        rate = self.five_year_fixed_rate
                else:
                    rate = InterestRate(get_closest_rate_to_date(self.var_rate_serie, self.date_list[date_index]) / 100, self.compunding_freq_per_year)


                i = rate.effective
                int_amount = i * b
                # self.cumInteretsPaid += int_amount
                n = self.total_compunding_period - date_index 
                # print(date_index, n, b)
                p = b / rate.an(n)
                capital = p - int_amount
                b = b * (1 + i)

                Index.append(self.date_list[date_index])
                dict_df['interet_rate'].append(i)
                dict_df['balance'].append(round(b, 2))
                dict_df['pmt'].append(round(p, 2))
                dict_df['interet'].append(round(int_amount, 2))
                dict_df['capital'].append(round(capital, 2))
                b -= p
            date_index += 1  

        # return dict_df
        return pd.DataFrame(dict_df, index=Index)
    
    def set_amort_dict_2(self, fixed=True):
        b = self.principal
        date_index = 0
        Index = []
        dict_df = {'interet_rate': [],
                   'balance': [],
                   'pmt': [], 
                   'interet': [], 
                   'capital': []
                  }
    
        while date_index <= self.total_compunding_period:
            # print(date_index)
            rate = self.five_year_fixed_rate
            
            i = rate.effective
            int_amount = i * b
            # self.cumInteretsPaid += int_amount
            n = self.total_compunding_period - date_index 
            # print(date_index, n, b)
            p = b / rate.an(n)
            capital = p - int_amount
            b = b * (1 + i)

            Index.append(self.date_list[date_index])
            dict_df['interet_rate'].append(i)
            dict_df['balance'].append(round(b, 2))
            dict_df['pmt'].append(round(p, 2))
            dict_df['interet'].append(round(int_amount, 2))
            dict_df['capital'].append(round(capital, 2))
            b -= p
            date_index += 1  

        # return dict_df
        return pd.DataFrame(dict_df, index=Index)
    
        
    def set_fixed_over_var(self):
        self.fixed_over_var = sum(self.set_amort_dict()['interet']) - sum(self.set_amort_dict(fixed=False)['interet'])
    
    def set_up(self):
        self.set_date_list()
        # self.set_renewal_dates()
        # self.set_amort_dict()
        # self.set_fixed_over_var()
        # self.amort_df = pd.merge(self.set_amort_df(fixed=False), self.set_amort_df(), left_index=True, right_index=True, suffixes=('_var', '_fixed'))
        
    def diff_in_interest_paid(self):
        return self.amort_df['interet_fixed'].sum() - self.amort_df['interet_var'].sum()




In [None]:
class Mortgage(object):
    def __init__(self, start_date, principal, amortisation_period_in_year, compunding_freq_per_year, rate, term=5):
#        term of 1 / 3  / 5 years
        self.start_date_dt = start_date
        self.principal = principal
        self.amortisation_period_in_year = amortisation_period_in_year
        self.compunding_freq_per_year = compunding_freq_per_year
        self.total_compunding_period = amortisation_period_in_year * compunding_freq_per_year # this is used for pmt calculation
        self.term = term # this is used for calendar
        self.total_compunding_period_in_contract = term * compunding_freq_per_year
        self.date_list = None
        
    def set_date_list(self):
        if self.compunding_freq_per_year == 12:
            self.date_list = [self.start_date_dt + relativedelta(months=x) for x in range(0, self.total_compunding_period_in_contract + 1)]
        elif self.compunding_freq_per_year == 1:
            self.date_list = [self.start_date_dt + relativedelta(years=x) for x in range(0, self.total_compunding_period_in_contract + 1)]
        elif self.compunding_freq_per_year == 26:
            self.date_list = [self.start_date_dt + relativedelta(weeks= 2 * x) for x in range(0, int(round(self.total_compunding_period_in_contract / 2)) + 1)]
        
    
class MortgageRetroEvaluation(Mortgage):
    def __init__(self, start_date, principal, amortisation_period_in_year, compunding_freq_per_year):
        super().__init__(start_date, principal, amortisation_period_in_year, compunding_freq_per_year, rate=None)
        self.historical_rate_df = pd.read_csv('~/Downloads/chartered_bank_interest.csv', sep=",", header=21, parse_dates=True, index_col="date")
        self.var_rate_serie = self.historical_rate_df["V80691311"]
        self.five_year_fixed_rate_serie = self.historical_rate_df["V80691335"]
        
        
class FixedRateMortgage(Mortgage):
    def __init__(self, start_date, principal, amortisation_period_in_year, compunding_freq_per_year, rate, term):
        super().__init__(start_date, principal, amortisation_period_in_year, compunding_freq_per_year, rate)
           
class VariableRateMortgage(Mortgage):
    def __init__(self, start_date, principal, amortisation_period_in_year, compunding_freq_per_year, rate):
        super().__init__(start_date, principal, amortisation_period_in_year, compunding_freq_per_year, rate)

In [None]:
class VariableRateMortgage(object):
    def __init__(self, start_date, principal, loan_amortisation_period_in_year, compunding_freq_per_year, loan_term_in_years):
        self.start_date = start_date
        self.principal = principal
        self.loan_amortisation_period_in_year = loan_amortisation_period_in_year
        self.loan_term_in_years = loan_term_in_years
        
        self.rate_dict = {}
        
    def set_date_list(self):
        if self.compunding_freq_per_year == 12:
            self.date_list = [self.start_date_dt + relativedelta(months=x) for x in range(0, self.total_compunding_period_in_contract + 1)]
        elif self.compunding_freq_per_year == 1:
            self.date_list = [self.start_date_dt + relativedelta(years=x) for x in range(0, self.total_compunding_period_in_contract + 1)]
        elif self.compunding_freq_per_year == 26:
            self.date_list = [self.start_date_dt + relativedelta(weeks= 2 * x) for x in range(0, int(round(self.total_compunding_period_in_contract / 2)) + 1)]
            
        
v = VariableRateMortgage(dt(2022, 1, 1), 150000, 25, 26, 5)

## Rate test

### Implied banq rate

In [None]:
# https://towardsdatascience.com/develop-your-own-newton-raphson-algorithm-in-python-a20a5b68c7dd

rate_var = 0.067
rate_1 = 0.0634
rate_3 = 0.0614
rate_5 = 0.0649

rate_dict = {1:rate_1, 3:rate_3, 5:rate_5}
 
i_1 = (((1 + rate_1 / 12) ** 12 / (1 + rate_var / 12)) ** (1 / 11) - 1) * 12
i_1
i_2 = ((((1 + rate_3 / 12) ** (3 * 12) / (1 + rate_1 / 12) ** 12) ** (1 /24)) - 1) * 12 
i_2 
i_4 = ((((1 + rate_5 / 12) ** (5 * 12) / (1 + rate_3 / 12) ** (36)) ** (1 / 24)) - 1) * 12 
i_4

# for t in l:
# what is the implied equivalent rate to get from var to 1 year
# print((1 + i / 12) ** 11 * (1 + rate_var / 12),
# (1 + rate_1 / 12) ** 12)
# (1 + i / 12) ** 11 * (1 + rate_var / 12) == (1 + rate_1 / 12) ** 12

### Cinqplex

In [None]:
m = Mortage2(dt(2022,5,18), 1750000, 30, 12, 0.0594)
m.set_up()
df = m.set_amort_dict_2()
pmt = df["pmt"][0]
# (1 * 7 * 300 + 40 * 2 * 300, 12 * 2038)
# taxes = 3000
# chauffage = 200 * 12
# reparations = 50000 / 20
# nombre_de_nuit_totale = (12 * 2038 + taxes + chauffage + reparations) / 300
# nombre_de_semaines = 5 * 7
# print(nombre_de_nuit_totale, "Nuits")
# print("ou 5 semaines complete et", (nombre_de_nuit_totale - nombre_de_semaines) / 2, "week end")
taxe_totale = 6575
pmt_mensuel = (12 * pmt - 68340 + taxe_totale) / 12
pmt_mensuel / 5

### Quadruplex

In [None]:
m = Mortage2(dt(2022,5,18), 1150000, 30, 12, 0.0594)
m.set_up()
df = m.set_amort_dict_2()
pmt = df["pmt"][0]

taxe_totale = 8000

pmt_mensuel = (12 * pmt - 50700 + taxe_totale) / 12

pmt_mensuel / 4

In [None]:
m = Mortage2(dt(2022,5,18), 869000, 30, 12, 0.0594)
m.set_up()
df = m.set_amort_dict_2()
pmt = df["pmt"][0]
nombre_de_porte = 3
taxe_totale = 5707
revenus_potentiel = 49920

pmt_mensuel = (12 * pmt - revenus_potentiel + taxe_totale) / 12

print(pmt_mensuel, pmt_mensuel / nombre_de_porte )

In [None]:
m = Mortage2(dt(2022,5,18), 799000, 30, 12, 0.0594)
m.set_up()
df = m.set_amort_dict_2()
pmt = df["pmt"][0]
nombre_de_porte = 3
taxe_totale = 3812
revenus_potentiel = 31380

pmt_mensuel = (12 * pmt - revenus_potentiel + taxe_totale) / 12

print(pmt_mensuel, pmt_mensuel / nombre_de_porte )

In [None]:
m = Mortage2(dt(2022,5,18), 799900, 30, 12, 0.0594)
m.set_up()
df = m.set_amort_dict_2()
pmt = df["pmt"][0]
nombre_de_porte = 3
taxe_totale = 4568
revenus_potentiel = 33900

pmt_mensuel = (12 * pmt - revenus_potentiel + taxe_totale) / 12

print(pmt_mensuel, pmt_mensuel / nombre_de_porte )

In [None]:
m = Mortage2(dt(2022,5,18), 729000, 30, 12, 0.0594)
m.set_up()
df = m.set_amort_dict_2()
pmt = df["pmt"][0]
nombre_de_porte = 3
taxe_totale = 5112
revenus_potentiel = 37560

pmt_mensuel = (12 * pmt - revenus_potentiel + taxe_totale) / 12

print(pmt_mensuel, pmt_mensuel / nombre_de_porte )

df['interet'][:'2023-05-18'].sum()

m.

In [None]:
r_immobilier = 0.0053


## New object

In [None]:
m_489 = Mortage(dt(2022, 2, 2), 46733.28, 5, 12, 0.0594, 2)
m_619 = Mortage(dt(2022, 2, 2), 48000, 5, 26, 0.0619)
m_489.set_up()
m_619.set_up()

In [None]:
m_619.set_amort_dict_2()["interet"].sum() - m_489.set_amort_dict_2()["interet"].sum()

In [None]:
# m_619.set_amort_dict_2()["2024-02-03"]

m_619.set_amort_dict_2()['2022-02-02':'2024-02-14']["interet"].sum() - m_489.set_amort_dict_2()['2022-02-02':'2024-02-14']["interet"].sum()
# m_619.set_amort_dict_2().index.to_list()
# dt(2022, 2, 2) + relativedelta(years=3)

In [None]:
m_619.set_amort_dict_2().loc['2024-01-31']
# m_619.set_amort_dict_2()[m_619.set_amort_dict_2()[]]

In [None]:
mortgage_list = pickle.load(open("mortgage_liste.pkl", "rb"))

In [None]:
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib.dates as mdates

# y = []
# x = []
# for mortgage in mortgage_list:
#     y.append(mortgage.fixed_over_var)
#     x.append(mortgage.start_date_dt)
#     relativedelta(mortgage.start_date_dt, mortgage_list[-1])
    
# ax = plt.figure(figsize=(16, 9))
# plt.bar( x, y)  

# ax.xaxis.set_major_locator(mdates.DayLocator(interval=25))
# ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))


In [None]:
len(mortgage_list)



In [None]:


print("variable", df_var["interet"].sum(), "fixed", df_fixed["interet"].sum(), "fixed_rate", m.five_year_fixed_rate)

ax = df[["interet_var", "interet_fixed"]].plot(kind="bar", figsize=(16, 9))

ax.xaxis.set_major_locator(mdates.DayLocator(interval=25))
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))


# plt.xticks([])
# df_fixed["interet"].plot(kind="bar", figsize=(16, 9))

# Inflation

In [None]:
inflation_data_df = pd.read_csv("~/Downloads/1810000401_databaseLoadingData.csv")

In [None]:
# inflation_data_df[inflation_data_df["VECTOR"].isin(inflation_data_df["VECTOR"].unique()[:3])][["REF_DATE", "VALUE"]].plot(x="REF_DATE", y="VALUE")

df_list = []
for s in inflation_data_df["VECTOR"].unique()[:3]:
    df_list.append(inflation_data_df[inflation_data_df["VECTOR"] == s][["REF_DATE", "VALUE"]])

In [None]:
pd.merge(df_list, on="REF_DATE")
# pd.concat(df_list)
pd

# Yield Curve

In [None]:
# yield_curve_pd = pd.read_csv('~/Downloads/yield_curves.csv', sep=",", header=0, dtype=float, na_values=' na', parse_dates=True, index_col=0)
yield_curve_pd = pd.read_csv('~/Downloads/yield_curves.csv', sep=",", header=0, na_values=' na', parse_dates=True, index_col=0)
yield_curve_pd.head()

In [None]:
no_na_df = yield_curve_pd['2020-01-01':].dropna()

In [None]:
latest_rate_serie = no_na_df.loc[no_na_df.index.max()].to_list()
one_year_ago_rate_serie = no_na_df.loc['2022-02-03'].to_list()
six_months_ago_rate_serie = no_na_df.loc['2022-08-15'].to_list()
x = no_na_df.loc[no_na_df.index.max()].index.to_list()
l = []
for f in x:
    if f != " ":
        l.append(float(f.strip(' ZC').strip('YR')[:-2]+"."+f.strip(' ZC').strip('YR')[-2:]))
series_dict = {'Current': latest_rate_serie, 
               'Last year': one_year_ago_rate_serie, 
               '6 months ago': six_months_ago_rate_serie
              }

def plot_test(series_dict, x_val, x_lable=None, y_lable=None, x_lim_min=None, x_lim_max=None, y_lim_min=None, y_lim_max=None, title=None):
    import matplotlib as mpl
    from matplotlib.colors import ListedColormap, LinearSegmentedColormap

    fig, ax = plt.subplots(figsize=(13, 7), layout='constrained')
    for serie in series_dict.values():
        ax.plot(l, serie[:-1])
    
    ax.set_title(title, fontsize=20)
    ax.set_ylim(y_lim_min, y_lim_max)
    ax.set_xlim(x_lim_min, x_lim_max)
    ax.set_xlabel(x_lable, fontsize=14)
    ax.set_ylabel(y_lable, fontsize=14)
    ax.legend([l for l in series_dict.keys()])
    
    ax.spines["top"].set_visible(False)  
    ax.spines["bottom"].set_visible(True)  
    ax.spines["right"].set_visible(False)  
    ax.spines["left"].set_visible(True) 

    plt.show()
    
plot_test(series_dict, 
          l, 
          x_lable="Years",
          y_lable = "rates",
          y_lim_min=None, 
          y_lim_max=0.05,
          title = "Yield curve, current - 6 months prior - last year"
         )

In [None]:
last_day = no_na_df.index.max().to_pydatetime()
series_dict = {}
for i in range(5):
    series_dict['week_' + str(i)] = no_na_df.loc[last_day - timedelta(days= 7*i)]
    # print(i, last_day - timedelta(days= 7*i), no_na_df.loc[last_day - timedelta(days= 7*i)])

plot_test(series_dict, 
          l, 
          x_lable="Years",
          y_lable = "rates",
          x_lim_min=0.25, 
          x_lim_max=4, 
          y_lim_min=None, 
          y_lim_max=0.05,
          title = "Yield curve, 4 last weeks, first four week"
         )
# latest_rate_serie = no_na_df.loc[no_na_df.index.max()].to_list()
# one_week = no_na_df.loc['2022-02-03'].to_list()

In [None]:
plot_test(series_dict, 
          l, 
          x_lable="Years",
          y_lable = "rates",
          x_lim_min=0.25, 
          x_lim_max=30, 
          y_lim_min=None, 
          y_lim_max=0.05, 
          title = "Yield curve"
         )