In [None]:
"""
ABM model

@author: hector@bith.net
"""
import sys

In [None]:
class Config:
    """
        Configuration parameters for the ABM
        """
    T: int = 1000  # time (1000)
    N: int = 100  # number of firms (100)
    eta: float = 0.25  # ŋ inverse elasticity: ŋ=1/10000 -> perfect competition, ŋ=1/4     -> high market power
    sigma = 0.0  # σ : parameter for the R&D
    sigma_values = []  # list of σ : parameters for the R&D
    alfa: float = 0.08  # α : Basilea's parameter
    mark_up_bank = 0.25  # μ : the mark-up of the bank (μ = 0.0001 for PC; μ = 0.25 for Monopolistic Competition)

    g: float = 1.1
    w: float = 0.005
    k: float = 1

    delta: float = 2  # δ
    b: float = 1
    beta: float = 0.02  # β
    m: float = 0.03
    c: float = 1  # bankruptcy costs' equation
    xi: float = 0.003  # ξ : increase in the productivity of the firms
    rho: float = 0.3  # ρ : parameter for the determination of Bank's expected profits
    share_k: float = 0.5  # this is a parameter that helps to maintain the capital upper a certain threshold
    teta : float = 1 # θ : parameter for the profits of the bank
    fire_sale : float = 1.0 # parameter in order to take into account when selling the firm's own capital in order to repay debts

    # firms and bankSector parameters:
    firms_K_i0: float = 5
    firms_A_i0: float = 1
    firms_L_i0: float = 4
    phi: float = 1.1  # Φ -> this parameter is estimated #todo
    r: float = 0.02  # initial rate of interest charged to firms
    bank_sector_profits : float = 0.0
    bank_sector_B_i0 : float = 0.0

    # there are two models in this code, using False will be without green firms:
    model_green = False

    # seed used:
    default_seed = 20579

    def __init__(self):
        # parameters that come from another values:
        self.gamma: float = ((self.w / self.k) + (self.g * self.r))  # γ : operating cost per unit of capital
        self.bank_sector_L_i0 = self.firms_L_i0 * self.N
        self.bank_sector_A_i0 = (self.N * self.firms_L_i0 * self.alfa)
        self.bank_sector_D_i0 = self.bank_sector_L_i0 - self.bank_sector_A_i0

    def __str__(self):
        description = sys.argv[0]
        for attr in dir(self):
            value = getattr(self, attr)
            if isinstance(value, int) or isinstance(value, float):
                description += f" {attr}={value}"
        return description

In [None]:
"""
ABM model

@author: hector@bith.net
"""

In [None]:
class BankSector:
    def __init__(self, its_model):
        self.model = its_model
        self.__assign_defaults__()
        # TODO  ste


    def determineDeposits(self):
        return self.L - self.A


    def determineProfits(self):
        profits = 0.0
        for firm in self.model.firms:
            profits += (((1 - firm.pb) * (firm.r * firm.L)) + (firm.pb * (self.model.config.teta * firm.A) - firm.L))
        return profits


    def determineEquity(self):
        return self.A + self.profits - self.B

    def __str__(self):
        return f"bankSector"


    def do_step(self):
        self.profits = self.determineProfits()
        self.A = self.determineEquity()
        self.D = self.determineDeposits()

    def __assign_defaults__(self):
        self.A = self.model.config.bank_sector_A_i0
        self.L = self.model.config.bank_sector_L_i0
        self.D = self.model.config.bank_sector_D_i0
        self.B = self.model.config.bank_sector_B_i0
        self.profits = self.model.config.bank_sector_profits
        self.cs = self.A / self.model.config.alfa

    def determineCreditSupply(self):
        return self.A / self.model.config.alfa

In [None]:
"""
ABM model

@author: hector@bith.net
"""
import random
import numpy as np

In [None]:
class Firm:
    def __init__(self, new_id, its_model):
        self.id = new_id
        self.model = its_model
        self.failures = 0
        self.__assign_defaults__()

    def __str__(self, short: bool = False):
        init = "firm#" if not short else "#"
        if self.failures > 0:
            return f"{init}{self.id}.{self.failures}"
        else:
            return f"{init}{self.id}"

    def __assign_defaults__(self):
        self.K = self.model.config.firms_K_i0
        self.A = self.model.config.firms_A_i0
        self.L = self.model.config.firms_L_i0
        self.r = self.model.config.r
        self.gamma = self.model.config.gamma
        self.pb = 0.0
        self.profits = 0.0
        self.dK = 0.0
        self.phi = self.model.config.phi
        self.I = 0.0
        self.u = 0.0
        self.output = 0.0
        self.z = 0.0  # amount of R&D invested by the firm
        self.Z = 0.0  # Bernoulli's distribution parameter
        self.success = 0.0  # if the Bernoulli's parameter is 1, success is 1
        self.failed = False

    def do_step(self):
        self.gamma = self.determine_cost_per_unit_of_capital()
        self.c = self.determine_marginal_operating_cost()
        self.output = self.determineOutput()
        self.pb = self.determine_prob_bankruptcy()
        self.r = self.determineInterestRate()
        self.dK = self.determineDesiredCapital()
        if self.dK < (self.model.config.share_k * self.K):  # first control to have a minimum amount of dK
            self.dK = self.model.config.share_k * self.K
        if self.dK < 0:  # second control to avoid a negative dK
            self.dK = (self.model.config.firms_K_i0 / 2)
        self.I = self.determineInvestment()
        self.L = self.determineLoan()
        if self.L < 0:  # control to avoid negative loans
            self.A = self.A + abs(self.L)  # balance sheet adjustment
            self.L = 0
        if (self.A + self.L) > self.dK:  # first control for the balance sheet
            self.dK = (self.A + self.L)
        else:
            self.dK = (self.A + self.L)
        # TODO: we need to introduce a control by which: if the bank can provide the loan, dK becomes official K; \
        #  otherwise the firm should produce with the capital at its disposal, and the desired capital is useless
        self.u = self.determineU()
        self.profits = self.determineProfits()
        self.A = self.determineAssets()
        if self.profits >= 0:
            if self.K < (self.A + self.L):  # balance sheet adjustment
                self.K = (self.A + self.L)
        else:
            if self.K > (self.A + self.L):
                self.K = (self.A + self.L)
        self.z = self.determine_z()
        self.Z = self.determineZ()
        if self.Z == 1:  # I should verify this formula
            self.success = 1
        else:
            self.success = 0
        self.phi = self.determinePhi()

        if self.A < self.c * self.output:
            # ask for a loan to the banksector
            # TODO : I need to check it with Gabriele because I have never used it
            pass

    def determine_cost_per_unit_of_capital(self):
        # (Before equation 2)  gamma
        # TODO: w is uniform across firms, but not fixed for all t!     # we should talk with Gabriele
        return (self.model.config.w / self.model.config.k) + (self.model.config.g * self.r)

    def determine_marginal_operating_cost(self):
        # (Equation 2)
        return self.gamma / self.phi

    def determine_prob_bankruptcy(self):
        # (Equation 9S or Equation 10H) Pb
        return (1 / (2 * (1 - self.model.config.sigma) * (1 - self.model.config.eta))) * (
                (self.gamma / self.phi) - (self.A / self.output))

    def determineInterestRate(self):
        # new formula micro-founded
        return (self.pb / (1 - self.pb)) * self.model.config.mark_up_bank

    def determineDesiredCapital(self):
        # Equation 28 in my paper
        return ((1 - self.model.config.sigma) * (
                    ((1 - self.model.config.eta) ** 2 * (1 - self.model.config.sigma) * self.phi) - \
                    ((1 - self.model.config.eta) * self.gamma)) / (self.model.config.b * self.phi * self.gamma)) + \
            ((1 / (2 * self.gamma)) * self.A)

    def determineInvestment(self):
        # page 17 in my paper, in the Bank's chapter
        return self.dK - self.K

    def determineLoan(self):
        # page 17 in the bank's chapter
        return self.L + self.I - self.profits

    def determineU(self):
        return random.uniform(0, 2)

    def determineOutput(self):
        # (Before equation 2)
        # Y, output is equal to capital productivity * capital
        return self.K * self.phi

    def determineProfits(self):
        # (Equation 23 in my paper with R&D)
        return (self.u * (self.model.config.eta + ((1 - self.model.config.eta) * self.output)) * (
                    1 - self.model.config.sigma)) - \
            ((self.gamma / self.phi) * self.output)

    def determineAssets(self):
        return self.A + self.profits

    def determine_z(self):
        # page 16 of my paper : it calculates the amount of R&D invested by the firm
        # TODO: it should be verified
        return (self.u * (self.model.config.eta + ((1 - self.model.config.eta) * self.output)) * (
            self.model.config.sigma))

    def determineZ(self):
        # equation 29 of my paper
        return 1 - np.exp((- self.model.config.delta) * self.z)

    def determinePhi(self):
        if self.success == 1:
            self.phi = self.phi * (1 + self.model.config.xi)
        else:
            self.phi = self.phi
        return self.phi

    def determine_output(self):
        # (Before equation 2)
        # Y, output is equal to capital productivity * capital
        return self.K * self.phi

In [None]:
"""
ABM model

@author: hector@bith.net
"""
import random

In [None]:
class Model:
    """
    It contains the firms and the logic to execute the simulation
    """
    firms = []
    t: int = 0  # current value of time, t = 0..Model.config.T
    bank_sector: BankSector
    bankruptcies = []

    test = False  # it's true when we are inside a test
    debug = None  # if not None, we will debug at this instant i, entering in interactive mode
    log = None
    statistics = None
    config = None
    export_datafile = None
    export_description = None

    def __init__(self, firm_class=Firm, **configuration):
        self.config = Config()
        self.log = Log(self)
        self.firm_class = firm_class
        self.statistics = Statistics(self)
        if configuration:
            self.configure(**configuration)

    def configure(self, **configuration):
        for attribute in configuration:
            if hasattr(self.config, attribute):
                current_value = getattr(self.config, attribute)
                if isinstance(current_value, int):
                    setattr(self.config, attribute, int(configuration[attribute]))
                else:
                    if isinstance(current_value, float):
                        setattr(self.config, attribute, float(configuration[attribute]))
                    else:
                        raise Exception(f"type of config {attribute} not allowed: {type(current_value)}")
            else:
                raise LookupError("attribute in config not found")
        self.initialize_model()

    def initialize_model(self, seed=None,
                         export_datafile=None, export_description=None):
        self.statistics.reset()
        self.config.__init__()
        random.seed(seed if seed else self.config.default_seed)
        self.export_datafile = export_datafile
        self.export_description = str(self.config) if export_description is None else export_description
        self.firms = []
        for i in range(self.config.N):
            self.firms.append( self.firm_class(new_id=i, its_model=self))
        self.bank_sector = BankSector(self)


    def removeBankruptedFirms(self):     #TODO: please Hector tell me if this method is good or not for removing defaulted firms
        i = 0                       # counter for defaulted firms
        self.bank_sector.B = 0.0
        for firm in self.firms:
            if firm.A < 0:
                bankrupted_id = firm.id
                if firm.L > (self.config.fire_sale * firm.K):
                    self.bank_sector.B += (firm.L - (self.config.fire_sale * firm.K))
                    self.bank_sector.A -= self.bank_sector.B
                    self.bank_sector.D = (self.bank_sector.L - self.bank_sector.A)
                else:
                    self.bank_sector.B += 0
                self.firms.remove(firm)
                i += 1
                new_firm = self.firm_class(new_id = bankrupted_id, its_model = self)    # it creates a new firm with the same ID as you prefer
                self.firms.append(new_firm)
        self.bankruptcies.append(i)     # I created a list to take into account all the bankruptcies that happen during the time
        return i


    def do_step(self):
        self.bank_sector.cs = self.bank_sector.determineCreditSupply()  # firstly, I must determine the credit supply
        # of the bank and later I can perform the operations related to firms
        for firm in self.firms:
            firm.do_step()
            # self.log.info("initialize_step")
            # self.log.debug("hello, this is a debug")
            self.bank_sector.cs -= firm.L  # TODO: we need to introduce a control by which if the bank's credit
                                           # supply finishes, the firms cannot ask for a loan
        self.bank_sector.do_step()
        self.removeBankruptedFirms()        #TODO: Hector do you think this is an appropriate function for removing and adding firms?




    def finish_step(self):
        self.log.info(self.statistics.current_status_save())
        self.statistics.debug_firms()

    def finish_model(self):
        if not self.test:
            self.statistics.export_data(self.export_datafile, self.export_description)
            self.statistics.plot()
        self.log.info(f" Finish: model T={self.config.T}  N={self.config.N}")

    def run(self,export_datafile=None):
        self.initialize_model(export_datafile=export_datafile)
        for self.t in range(self.config.T):
            self.do_step()
            self.finish_step()
        self.finish_model()

In [None]:
"""
ABM model auxiliary file: logging facilities
@author: hector@bith.net
"""
import logging

In [None]:
class Log:
    """
    The class acts as a logger and helpers to represent the data and evol from the Model.
    """
    logger = logging.getLogger("model")
    model = None
    logLevel = "ERROR"

    def __init__(self, its_model):
        self.model = its_model

    @staticmethod
    def __format_number__(number):
        result = f"{number:5.2f}"
        while len(result) > 5 and result[-1] == "0":
            result = result[:-1]
        while len(result) > 5 and result.find('.') > 0:
            result = result[:-1]
        return result

    @staticmethod
    def get_level(option):
        try:
            return getattr(logging, option.upper())
        except AttributeError:
            logging.error(f" '--log' must contain a valid logging level and {option.upper()} is not.")
            sys.exit(-1)

    def debug(self, text):
        self.logger.debug(f"t={self.model.t:03} {text}")

    def info(self, text):
        self.logger.info(f" t={self.model.t:03} {text}")

    def error(self, text):
        self.logger.error(f"t={self.model.t:03} {text}")

    def define_log(self, log: str, logfile: str = ''):
        formatter = logging.Formatter('%(levelname)s%(message)s')
        self.logLevel = Log.get_level(log.upper())
        self.logger.setLevel(self.logLevel)
        if logfile:
            if not logfile.startswith(Statistics.OUTPUT_DIRECTORY):
                logfile = f"{Statistics.OUTPUT_DIRECTORY}/{logfile}"
            fh = logging.FileHandler(logfile, 'a', 'utf-8')
            fh.setLevel(self.logLevel)
            fh.setFormatter(formatter)
            self.logger.addHandler(fh)
        else:
            ch = logging.StreamHandler()
            ch.setLevel(self.logLevel)
            ch.setFormatter(formatter)
            self.logger.addHandler(ch)

In [None]:
"""
ABM model auxiliary file: to have statistics and plot
@author:  hector@bith.net
"""
import numpy as np

In [None]:
class Statistics:
    OUTPUT_DIRECTORY = "../output"

    # This time the idea is to use pandas to store the statistics
    def __init__(self, its_model):
        self.model = its_model
        import os
        if not os.path.isdir(self.OUTPUT_DIRECTORY):
            os.mkdir(self.OUTPUT_DIRECTORY)
        self.reset()

    def debug_firms(self):
        for firm in self.model.firms:
            self.debug_firm(firm)

    def debug_firm(self, firm):
        text = f" {firm.__str__()} K={Log.__format_number__(firm.K)}"
        text += f" | A={Log.__format_number__(firm.A)} L={Log.__format_number__(firm.L)}"
        text += f",  Φ={Log.__format_number__(firm.phi)}"
        if firm.failed:
            text += f" FAILED "
        self.model.log.debug(text)

    def current_status_save(self):
        # it returns also a string with the status
        result = ""

        failures = sum(int(firm.failed) for firm in self.model.firms)
        self.failures[self.model.t] = failures
        result += f" fails={failures:3}"

        firmsK = sum(float(firm.K) for firm in self.model.firms)
        self.firmsK[self.model.t] = firmsK
        result += f" firmsK={firmsK:10.2f}"

        firmsA = sum(float(firm.A) for firm in self.model.firms)
        self.firmsA[self.model.t] = firmsA
        result += f" firmsA={firmsA:10.2f}"

        firmsL = sum(float(firm.L) for firm in self.model.firms)
        self.firmsL[self.model.t] = firmsL
        result += f" firmsL={firmsL:10.2f}"

        self.bankA[self.model.t] = self.model.bank_sector.A
        result += f"\n\t\t\t\t   bankA={self.model.bank_sector.A:10.2f}"

        self.bankL[self.model.t] = self.model.bank_sector.L
        result += f"  bankL={self.model.bank_sector.L:10.2f}"

        self.bankD[self.model.t] = self.model.bank_sector.D
        result += f"  bankD={self.model.bank_sector.D:10.2f}"

        return result

    def reset(self):
        self.failures = np.zeros(self.model.config.T, dtype=int)
        self.firmsK = np.zeros(self.model.config.T, dtype=float)
        self.firmsA = np.zeros(self.model.config.T, dtype=float)
        self.firmsL = np.zeros(self.model.config.T, dtype=float)
        self.bankA = np.zeros(self.model.config.T, dtype=float)
        self.bankL = np.zeros(self.model.config.T, dtype=float)
        self.bankD = np.zeros(self.model.config.T, dtype=float)

    def export_data(self, export_datafile=None, export_description=None):
        if export_datafile:
            self.save_data(export_datafile, export_description)

    @staticmethod
    def get_export_path(filename):
        if not filename.startswith(Statistics.OUTPUT_DIRECTORY):
            filename = f"{Statistics.OUTPUT_DIRECTORY}/{filename}"
        return filename if filename.endswith('.txt') else f"{filename}.txt"

    def save_data(self, export_datafile=None, export_description=None):
        if export_datafile:
            with open(Statistics.get_export_path(export_datafile), 'w', encoding="utf-8") as savefile:
                savefile.write('# t\tpolicy\tfitness           \tC                    \tir         \t' +
                               'bankrupts\tbestLenderID\tbestLenderClients\tcreditChannels\n')
                if export_description:
                    savefile.write(f"# {export_description}\n")
                else:

                    savefile.write(f"# {__name__} T={self.model.config.T} N={self.model.config.N}\n")
                for i in range(self.model.config.T):
                    savefile.write(f"{i:3}\t{self.policy[i]:3}\t{self.fitness[i]:19}\t{self.liquidity[i]:19}" +
                                   f"\t{self.interest_rate[i]:20}\t{self.failures[i]:3}" +
                                   f"\t{self.best_lender[i] / self.model.config.N:20}" +
                                   f"\t{self.best_lender_clients[i] / self.model.config.N:20}" +
                                   f"\t{self.credit_channels[i]:3}" +
                                   f"\t{self.rationing[i]:20}" +
                                   f"\t{self.leverage[i]:20}" +
                                   "\n")

    def plot(self):
        pass  # TODO ste -> what do you need to plot

In [None]:
"""
ABM model executer, to run interactively the model
@author: hector@bith.net
"""
import typer

In [None]:
def run_interactive(log: str = typer.Option('ERROR', help="Log level messages (ERROR,DEBUG,INFO...)"),
                    logfile: str = typer.Option(None, help="File to send logs to"),
                    save: str = typer.Option(None, help=f"Saves the output of this execution"),
                    n: int = typer.Option(Config.N, help=f"Number of firms"),
                    t: int = typer.Option(Config.T, help=f"Time repetitions")):
    global model
    if t != model.config.T:
        model.config.T = t
    if n != model.config.N:
        model.config.N = n
    model.log.define_log(log, logfile)
    run(save)

In [None]:
def run(save=None):
    global model
    model.run(export_datafile=save)

In [None]:
def is_notebook():
    try:
        __IPYTHON__
        return get_ipython().__class__.__name__ != "SpyderShell"
    except NameError:
        return False

In [None]:
model = Model()
if is_notebook():
    # if we are running in a Notebook:
    run()
else:
    # if we are running interactively:
    if __name__ == "__main__":
        typer.run(run_interactive)