In [1]:
%autosave 0

Autosave disabled


## Idea.
We will use as much class inheritance as possible so we are making few changes.

We will be copying from the source code we downloaded for version 1.8.0 of Stable Baselines3.

## Create custom environment.

In [2]:
import gym
from gym.spaces import MultiDiscrete
from gym.spaces import Discrete
from gym.spaces import Box
import numpy as np
from numpy.random import default_rng
from stable_baselines3.common.env_checker import check_env

# https://www.youtube.com/watch?v=R5S2FmtFnt8&ab_channel=DibyaChakravorty
class CustomEnv(gym.Env):
    def __init__(self):
        # Need to define observation and action space.
        action_lows = np.array([1, 0, 0, 0.05])
        action_highs = np.array([4, 2, 4, 10])
        self.action_space = Box(low=action_lows, high=action_highs)

        # Observation space will have just t and r. Both are floats. t is in [0, 100] and r is in [-100, 100].
        obs_lows = np.array([0, -100])
        obs_highs = np.array([100, 100])
        self.observation_space = Box(low=obs_lows, high=obs_highs)

        # Also declare a random number generator.
        self.rng = default_rng()

        # This will always store the current observation.
        self.current_obs = None
    
    def reset(self):
        # The reset() method will initialise an episode (fix the problem parameters by sampling from the parameter space).
        # Each episode, we will have different starting observation space.
        normal = min(100, max(0, self.rng.normal(loc=48.57142857, scale=15.89249598)))
        uniform = self.rng.uniform(-100, 100)

        # Return the first (initial) observation.
        self.current_obs = np.array([normal, uniform], dtype="float32")

        return self.current_obs
    
    def step(self, action):
        # The input "action" will be a numpy array like np.array([2, 3, 2, 4]).
        c_r = action[0]
        m_r = action[1]
        s_r = action[2]
        p = action[3]

        normal, uniform = self.current_obs

        # Need to calculate the next observation (even though wont need it).
        # Dont have randomness, have a deterministic value for r_gain and prof as just using regression. Sals is bounded by 0.
        sals = max(0, 44.06153 + -4.42618 * p + 0.167923 * uniform + -0.31689 * normal)

        r_per_sal = -0.13906 + 0.067546 * c_r + 0.266495 * m_r + 0.097398 * s_r + -0.20013 * p
        r_at_end = min(100, max(-100, uniform + sals * r_per_sal))
        r_gain = r_at_end - uniform

        prof = sals * p - sals * (0.3 * c_r + 0.09 * m_r + 0.16 * s_r + 0.12)

        self.current_obs = np.array([normal, r_at_end], dtype="float32")

        # Need to calculate reward. Will just have 1 reward for now, prof.
        reward = np.array([prof, r_gain])

        # Need to compute done (always True).
        done = True

        return self.current_obs, reward, done, {}


class CustomEnvSing(gym.Env):
    def __init__(self):
        # Need to define observation and action space.
        action_lows = np.array([1, 0, 0, 0.05])
        action_highs = np.array([4, 2, 4, 10])
        self.action_space = Box(low=action_lows, high=action_highs)

        # Observation space will have just t and r. Both are floats. t is in [0, 100] and r is in [-100, 100].
        obs_lows = np.array([0, -100])
        obs_highs = np.array([100, 100])
        self.observation_space = Box(low=obs_lows, high=obs_highs)

        # Also declare a random number generator.
        self.rng = default_rng()

        # This will always store the current observation.
        self.current_obs = None
    
    def reset(self):
        # The reset() method will initialise an episode (fix the problem parameters by sampling from the parameter space).
        # Each episode, we will have different starting observation space.
        normal = min(100, max(0, self.rng.normal(loc=48.57142857, scale=15.89249598)))
        uniform = self.rng.uniform(-100, 100)

        # Return the first (initial) observation.
        self.current_obs = np.array([normal, uniform], dtype="float32")

        return self.current_obs
    
    def step(self, action):
        # The input "action" will be a numpy array like np.array([2, 3, 2, 4]).
        c_r = action[0]
        m_r = action[1]
        s_r = action[2]
        p = action[3]

        normal, uniform = self.current_obs

        # Need to calculate the next observation (even though wont need it).
        # Dont have randomness, have a deterministic value for r_gain and prof as just using regression. Sals is bounded by 0.
        sals = max(0, 44.06153 + -4.42618 * p + 0.167923 * uniform + -0.31689 * normal)

        r_per_sal = -0.13906 + 0.067546 * c_r + 0.266495 * m_r + 0.097398 * s_r + -0.20013 * p
        r_at_end = min(100, max(-100, uniform + sals * r_per_sal))
        r_gain = r_at_end - uniform

        prof = sals * p - sals * (0.3 * c_r + 0.09 * m_r + 0.16 * s_r + 0.12)

        self.current_obs = np.array([normal, r_at_end], dtype="float32")

        # Need to calculate reward. Will just have 1 reward for now, prof.
        reward = prof

        # Need to compute done (always True).
        done = True

        return self.current_obs, reward, done, {}

## Custom PPO class.
This class will be exactly the same (via inheritance) but we need to change the MlpExtractor class to one of my own.

In [3]:
import torch as th
from torch import nn
from typing import Dict, List, Tuple, Type, Union

class CustomMlpExtractor(nn.Module):
    def __init__(
        self,
        feature_dim: int,
        net_arch: Union[List[int], Dict[str, List[int]]],
        activation_fn: Type[nn.Module],
        device: Union[th.device, str] = "auto",
    ) -> None:
        super().__init__()
        device = get_device(device)
        
        print("In CustomMlpExtractor")

        # Policy net.
        policy_ray_hidden_dim=100
        self.policy_ray_mlp = nn.Sequential(
            nn.Linear(2, policy_ray_hidden_dim),
            nn.Tanh(),
            nn.Linear(policy_ray_hidden_dim, policy_ray_hidden_dim),
            nn.Tanh(),
            nn.Linear(policy_ray_hidden_dim, policy_ray_hidden_dim),
        )

        self.policy_targetnet_in_dim = 2
        self.policy_targetnet_dims = [64, 64]
        
        prvs_dim = self.policy_targetnet_in_dim
        for i, dim in enumerate(self.policy_targetnet_dims):
            setattr(self, f"policy_fc_{i}_weights", nn.Linear(policy_ray_hidden_dim, prvs_dim * dim))
            setattr(self, f"policy_fc_{i}_bias", nn.Linear(policy_ray_hidden_dim, dim))
            prvs_dim = dim
        
        # Value net.
        value_ray_hidden_dim=100
        self.value_ray_mlp = nn.Sequential(
            nn.Linear(2, value_ray_hidden_dim),
            nn.Tanh(),
            nn.Linear(value_ray_hidden_dim, value_ray_hidden_dim),
            nn.Tanh(),
            nn.Linear(value_ray_hidden_dim, value_ray_hidden_dim),
        )

        self.value_targetnet_in_dim = 2
        self.value_targetnet_dims = [64, 64]
        
        prvs_dim = self.value_targetnet_in_dim
        for i, dim in enumerate(self.value_targetnet_dims):
            setattr(self, f"value_fc_{i}_weights", nn.Linear(value_ray_hidden_dim, prvs_dim * dim))
            setattr(self, f"value_fc_{i}_bias", nn.Linear(value_ray_hidden_dim, dim))
            prvs_dim = dim
            

        # Save dim, used to create the distributions
        self.latent_dim_pi = 64
        self.latent_dim_vf = 64
    

    def forward(self, features: th.Tensor) -> Tuple[th.Tensor, th.Tensor]:
        """
        :return: latent_policy, latent_value of the specified network.
            If all layers are shared, then ``latent_policy == latent_value``
        """
        return self.forward_actor(features), self.forward_critic(features)

    def forward_actor(self, features: th.Tensor) -> th.Tensor:
        out_dict = dict()
        hypnet_output = self.policy_ray_mlp(th.Tensor([0.5, 0.5]))

        prvs_dim = self.policy_targetnet_in_dim
        for i, dim in enumerate(self.policy_targetnet_dims):
            out_dict[f"policy_fc_{i}_weights"] = self.__getattr__(f"policy_fc_{i}_weights")(hypnet_output).reshape(dim, prvs_dim)
            out_dict[f"policy_fc_{i}_bias"] = self.__getattr__(f"policy_fc_{i}_bias")(hypnet_output).flatten()
            prvs_dim = dim

        tnet = MyPolicyTargetFCNet()
        tnet_output = tnet(features, out_dict)
        return tnet_output

    def forward_critic(self, features: th.Tensor) -> th.Tensor:
        out_dict = dict()
        hypnet_output = self.value_ray_mlp(th.Tensor([0.5, 0.5]))

        prvs_dim = self.value_targetnet_in_dim
        for i, dim in enumerate(self.value_targetnet_dims):
            out_dict[f"value_fc_{i}_weights"] = self.__getattr__(f"value_fc_{i}_weights")(hypnet_output).reshape(dim, prvs_dim)
            out_dict[f"value_fc_{i}_bias"] = self.__getattr__(f"value_fc_{i}_bias")(hypnet_output).flatten()
            prvs_dim = dim

        tnet = MyValueTargetFCNet()
        tnet_output = tnet(features, out_dict)
        return tnet_output


import torch.nn.functional as F
class MyPolicyTargetFCNet(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x, weights):
        for i in range(int(len(weights) / 2)):
            x = F.linear(x, weights[f"policy_fc_{i}_weights"], weights[f"policy_fc_{i}_bias"])
            if i < int(len(weights) / 2) - 1:
                x = F.tanh(x)
        
        return x

class MyValueTargetFCNet(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x, weights):
        for i in range(int(len(weights) / 2)):
            x = F.linear(x, weights[f"value_fc_{i}_weights"], weights[f"value_fc_{i}_bias"])
            if i < int(len(weights) / 2) - 1:
                x = F.tanh(x)
        
        return x

In [4]:
from stable_baselines3.common.torch_layers import (BaseFeaturesExtractor, FlattenExtractor)
from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union
from gym import spaces
from stable_baselines3.common.type_aliases import Schedule
from stable_baselines3.common.utils import get_device
from stable_baselines3.common.policies import ActorCriticPolicy

from functools import partial

from stable_baselines3.common.distributions import (
    BernoulliDistribution,
    CategoricalDistribution,
    DiagGaussianDistribution,
    MultiCategoricalDistribution,
    StateDependentNoiseDistribution,
)

class CustomActorCriticPolicy(ActorCriticPolicy):
    def __init__(
        self,
        observation_space: spaces.Space,
        action_space: spaces.Space,
        lr_schedule: Schedule,
        net_arch: Optional[Union[List[int], Dict[str, List[int]]]] = None,
        activation_fn: Type[nn.Module] = nn.Tanh,
        ortho_init: bool = True,
        use_sde: bool = False,
        log_std_init: float = 0.0,
        full_std: bool = True,
        use_expln: bool = False,
        squash_output: bool = False,
        features_extractor_class: Type[BaseFeaturesExtractor] = FlattenExtractor,
        features_extractor_kwargs: Optional[Dict[str, Any]] = None,
        share_features_extractor: bool = True,
        normalize_images: bool = True,
        optimizer_class: Type[th.optim.Optimizer] = th.optim.Adam,
        optimizer_kwargs: Optional[Dict[str, Any]] = None,
        env_num_rewards = None
    ):
        if optimizer_kwargs is None:
            optimizer_kwargs = {}
            # Small values to avoid NaN in Adam optimizer
            if optimizer_class == th.optim.Adam:
                optimizer_kwargs["eps"] = 1e-5
        
        self.env_num_rewards = env_num_rewards
        
        super().__init__(
            observation_space,
            action_space,
            lr_schedule,
            net_arch,
            activation_fn,
            ortho_init,
            use_sde,
            log_std_init,
            full_std,
            use_expln,
            squash_output,
            features_extractor_class,
            features_extractor_kwargs,
            share_features_extractor,
            normalize_images,
            optimizer_class,
            optimizer_kwargs
        )
    
    def _build_mlp_extractor(self) -> None:
        """
        Create the policy and value networks.
        Part of the layers can be shared.
        """
        self.mlp_extractor = CustomMlpExtractor(
            self.features_dim,
            net_arch=self.net_arch,
            activation_fn=self.activation_fn,
            device=self.device,
        )

    def _build(self, lr_schedule: Schedule) -> None:
        """
        Create the networks and the optimizer.

        :param lr_schedule: Learning rate schedule
            lr_schedule(1) is the initial learning rate
        """
        self._build_mlp_extractor()

        latent_dim_pi = self.mlp_extractor.latent_dim_pi

        if isinstance(self.action_dist, DiagGaussianDistribution):
            self.action_net, self.log_std = self.action_dist.proba_distribution_net(
                latent_dim=latent_dim_pi, log_std_init=self.log_std_init
            )
        elif isinstance(self.action_dist, StateDependentNoiseDistribution):
            self.action_net, self.log_std = self.action_dist.proba_distribution_net(
                latent_dim=latent_dim_pi, latent_sde_dim=latent_dim_pi, log_std_init=self.log_std_init
            )
        elif isinstance(self.action_dist, (CategoricalDistribution, MultiCategoricalDistribution, BernoulliDistribution)):
            self.action_net = self.action_dist.proba_distribution_net(latent_dim=latent_dim_pi)
        else:
            raise NotImplementedError(f"Unsupported distribution '{self.action_dist}'.")

        self.value_net = nn.Linear(self.mlp_extractor.latent_dim_vf, self.env_num_rewards)
        # Init weights: use orthogonal initialization
        # with small initial weight for the output
        if self.ortho_init:
            # TODO: check for features_extractor
            # Values from stable-baselines.
            # features_extractor/mlp values are
            # originally from openai/baselines (default gains/init_scales).
            module_gains = {
                self.features_extractor: np.sqrt(2),
                self.mlp_extractor: np.sqrt(2),
                self.action_net: 0.01,
                self.value_net: 1,
            }
            if not self.share_features_extractor:
                # Note(antonin): this is to keep SB3 results
                # consistent, see GH#1148
                del module_gains[self.features_extractor]
                module_gains[self.pi_features_extractor] = np.sqrt(2)
                module_gains[self.vf_features_extractor] = np.sqrt(2)

            for module, gain in module_gains.items():
                module.apply(partial(self.init_weights, gain=gain))

        # Setup optimizer with initial learning rate
        self.optimizer = self.optimizer_class(self.parameters(), lr=lr_schedule(1), **self.optimizer_kwargs)

In [5]:
import importlib
import stable_baselines3
importlib.reload(stable_baselines3)

<module 'stable_baselines3' from 'C:\\Users\\callu\\anaconda3\\lib\\site-packages\\stable_baselines3\\__init__.py'>

In [8]:
from stable_baselines3 import PPO

env_sing = CustomEnvSing()
env_sing.reset()

model_sing = PPO(CustomActorCriticPolicy, env_sing, verbose=1)

TIMESTEPS = 100
iters = 0
while True:
    iters += 1
    model_sing.learn(total_timesteps=TIMESTEPS, reset_num_timesteps=False, tb_log_name=f"PPO")

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
In CustomMlpExtractor
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1        |
|    ep_rew_mean     | -3.36    |
| time/              |          |
|    fps             | 558      |
|    iterations      | 1        |
|    time_elapsed    | 3        |
|    total_timesteps | 2048     |
---------------------------------
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(9.7789e-09, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.0058, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.s

advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1303, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1407, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1489, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1506, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1456,

advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1359, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1321, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1430, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1384, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1459,

advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1386, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1514, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1428, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1369, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1359,

advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1338, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1387, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1215, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1423, grad_fn=<NegBackward0>)
advantages.shape: torch.Size([64])
ratio.shape: torch.Size([64])
policy_loss_1.shape: torch.Size([64])
doing original
policy_loss.shape: torch.Size([])
policy_loss: tensor(-0.1498,

KeyboardInterrupt: 

In [7]:
from stable_baselines3 import PPO

env = CustomEnv()
env.reset()

model = PPO(CustomActorCriticPolicy, env, verbose=1)

TIMESTEPS = 100
iters = 0
while True:
    iters += 1
    model.learn(total_timesteps=TIMESTEPS, reset_num_timesteps=False, tb_log_name=f"PPO")

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
In CustomMlpExtractor
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1        |
|    ep_rew_mean     | -1.04    |
| time/              |          |
|    fps             | 633      |
|    iterations      | 1        |
|    time_elapsed    | 3        |
|    total_timesteps | 2048     |
---------------------------------
advantages.shape: torch.Size([64, 2])
ratio.shape: torch.Size([64, 2])
policy_loss_1.shape: torch.Size([64, 2])
doing modified
policy_loss.shape: torch.Size([2])
policy_loss: tensor([ 0.1749, -0.1749], grad_fn=<NegBackward0>)


ValueError: only one element tensors can be converted to Python scalars

In [32]:
te = th.tensor([[1, 2], [3, 4], [5, 6]], dtype=th.float32)
rati = th.tensor([1, 1, 1])
te * rati.unsqueeze(1)

tensor([[1., 2.],
        [3., 4.],
        [5., 6.]])

In [40]:
te * rati.repeat(2, 1).T

tensor([[1., 2.],
        [3., 4.],
        [5., 6.]])

In [21]:
[[th.tensor([1, 2, 3]) - th.tensor([4, 5, 6])] * 2][0]

[tensor([-3, -3, -3]), tensor([-3, -3, -3])]

In [28]:
th.exp(th.tensor([1, 2, 3]) - th.tensor([4, 5, 6])).shape

torch.Size([3])

In [None]:
model.policy.mlp_extractor.parameters

In [None]:
print(model.policy.mlp_extractor.policy_net)
print(model.policy.mlp_extractor.value_net)

In [None]:
class ClassB():
    def __init__(self):
        pass
    
    def printer(self):
        print("Yo")

class ClassA():
    def __init__(self):
        pass
    
    def run_class_b(self):
        dummy_class_b = ClassB()
        dummy_class_b.printer()

In [None]:
dummy_class_a = ClassA()
dummy_class_a.run_class_b()

In [None]:
def decorator(func):
    def wrapper(*args, **kwargs):
        print("Before function call")
        result = func(*args, **kwargs)
        print("After function call")
        return result
    return wrapper

def my_function():
    print("Inside my_function")

# Manually apply the decorator when calling the function
decorated_function = decorator(my_function)
decorated_function()


In [None]:
def new_printer(self):
    print("Yeah")

def modified_b_printer(printer):
    def wrapper(self):
        #self.printer = new_printer
        #self.printer(self)
        new_printer(self)
    return wrapper

In [None]:
dum_class_b = ClassB()
decorated_method = modified_b_printer(dum_class_b.printer)
decorated_method(dum_class_b)

In [None]:
dum_class_b = ClassB()

#modified_b_printer(dum_class_b.printer())
class NewClassB(ClassB):
    @modified_b_printer
    def printer(self):
        super.printer()

dum_class_b = NewClassB()
dum_class_b.printer()

In [None]:
def modified_a_method(printer):
    def wrapper(self):
        @modified_b_printer
        def printer(self):
            self.printer = new_printer
    return wrapper


class NewClassA(ClassA):
    @modified_a_method
    def run_class_b(self):
        super.run_class_b()

In [None]:
dummy_class_a = NewClassA()
dummy_class_a.run_class_b()

In [None]:
def modify_method(original_method):
    def wrapper(self, x, y):
        # Call the original method with the modified arguments
        result = original_method(self, x, y)

        # Add code to modify the result or perform additional operations if needed
        modified_result = result + 10

        # Return the modified result
        return modified_result

    return wrapper

class MyClass:
    @modify_method
    def my_method(self, x, y):
        # Original method logic
        return x + y

# Create an instance of MyClass
my_instance = MyClass()

# Call the modified method
my_instance.my_method(3, 4)

In [None]:
class ClassC1:
    def printer(self):
        print("Yo")

class ClassB1:
    def __init__(self):
        self.class_c1_instance = ClassC1()
    
    def call_class_c1(self):
        self.class_c1_instance.printer()
        

class ClassA1:
    def create_and_call_class_b1(self):
        class_b1_instance = ClassB1()
        class_b1_instance.call_class_c1()

class_a1_instance = ClassA1()
class_a1_instance.create_and_call_class_b1()

In [None]:
# Since ClassA1.create_and_call_class_b1 creates and calls in one function, cant use a wrapper so need to use inheritance.
class NewClassB1(ClassB1):
    def __init__(self):
        super.__init__(self)
        print("in new class")
    
    def call_class_c1(self):
        self.class_c1_instance.printer()


# Define a decorator and wrapper that will change the method create_and_call_class_b1 so that it thinks ClassB1 is different?
def modify_create_and_call_class_b1(create_and_call_class_b1):
    def wrapper(self):
        ClassB1 = NewClassB1
        self.create_and_call_class_b1()
    return wrapper

class_a1_instance = ClassA1()
decorated_method = modify_create_and_call_class_b1(class_a1_instance.create_and_call_class_b1)
decorated_method(class_a1_instance)

In [None]:
class NewClassA1(ClassA1):
    @modify_create_and_call_class_b1
    def create_and_call_class_b1(self):
        class_b1_instance = ClassB1()
        class_b1_instance.call_class_c1()

te = NewClassA1()
te.create_and_call_class_b1()

In [None]:
class ClassC2:
    def printer(self):
        print("Yo")

class ClassB2:
    def __init__(self):
        self.class_c2_instance = ClassC2()
    
    def call_class_c2(self):
        self.class_c2_instance.printer()
        

class ClassA2:
    def __init__(self):
        self.class_b2_instance = ClassB2()
    
    def call_class_b2(self):
        self.class_b2_instance.call_class_c2()

class_a2_instance = ClassA2()
class_a2_instance.call_class_b2()

In [None]:
def new_printer(self):
    print("Yeah")

def modify_call_class_c2(call_class_c2):
    def wrapper(self):
        new_printer(self)
    
    return wrapper

def modify_call_class_b2(call_class_b2):
    def wrapper(self):
        decorated_method = modify_call_class_c2(self.call_class_b2)
        decorated_method(self.class_b2_instance)
    
    return wrapper


class_a2_instance = ClassA2()
decorated_method = modify_call_class_b2(class_a2_instance.call_class_b2)
decorated_method(class_a2_instance)

In [None]:
class ClassB():
    def __init__(self):
        pass
    
    def printer(self):
        print("Yo")

In [None]:
def new_printer(self):
    print("Yeah")

def modified_b_printer(printer):
    def wrapper(self):
        #self.printer = new_printer
        #self.printer(self)
        new_printer(self)
    return wrapper

In [None]:
dum_class_b = ClassB()
decorated_method = modified_b_printer(dum_class_b.printer)
decorated_method(dum_class_b)

In [None]:
class ClassBTEMP():
    def __init__(self):
        pass
    
    def printer(self):
        print("Yo")

def modified_b_printerTEMP(printer):
    def wrapper(self):
        return wrapper

dum_class_b = ClassB()
decorated_method = modified_b_printer(dum_class_b.printer)
decorated_method(dum_class_b)