In [22]:
import numpy as np
import gym
from gym import spaces
import json
from collections import defaultdict
import ptan



class EpistasisEnv(gym.Env):
    
#metadata = {'render.modes': ['human']}
    def __init__(self):
        self.SAMPLE_SIZE = 300 #t1 = t2 = SAMPLE_SIZE
        filename = "./epigen/sim/0_2_ASW.json"

        def establish_phen_gen(self, file):
            with open(file) as f:
                data = json.load(f)
                genotype = np.array(data["genotype"])
                self.phenotype = np.array(data["phenotype"])
                self.genotype = genotype.T
                num_phenotypes = max(self.phenotype)+1
                self.disease_snps = data["disease_snps"]
                self.phen_gen = [[] for _ in range(num_phenotypes)]
                for i in range(len(self.genotype)):
                    self.phen_gen[self.phenotype[i]].append(i)  
                return  self.genotype.shape[0], self.genotype.shape[1]

        super(EpistasisEnv, self).__init__()

        self.N_IDV, self.N_SNPS = establish_phen_gen(self, filename)
        self.action_space = spaces.Box(low=0, high=1, shape=(self.N_SNPS,), dtype=np.uint8)
        self.observation_space = spaces.Box(low=0, high=2, shape=
                        (2*self.SAMPLE_SIZE, self.N_SNPS), dtype=np.uint8)
        self.obs_phenotypes = None
        self.obs = None

    
    def step(self, action):
        snp_ids = self._take_action(action)
        print(f"{snp_ids=}")
        reward = self._count_reward(snp_ids) 
        self.current_step += 1
        done = self.current_step == 1
        obs = self._next_observation()
        return obs, reward, done, {}
    def _count_reward(self, snp_ids):
    
        all_existing_seq = defaultdict(lambda: {'control' : 0, 'case' : 0})
        

        for i, idv in enumerate(self.obs):
            snp_to_cmp = tuple(idv[snp_id] for snp_id in snp_ids) #tuple of SNP that 
            if self.obs_phenotypes[i] == 0:
                all_existing_seq[snp_to_cmp]['control'] += 1
            else:
                all_existing_seq[snp_to_cmp]['case'] += 1

        ###count reward      
        TP = 0 #HR case
        FP = 0 #HR control
        TN = 0 #LR control
        FN = 0 #LR case

        for case_control_count in all_existing_seq.values():
          # if seq is in LR group
            if case_control_count['case'] <= case_control_count['control']: #вопрос <= или <
                FN += case_control_count['case']
                TN += case_control_count['control']
            else:
          # if seq is in HR group
                TP += case_control_count['case']
                FP += case_control_count['control']
        R = (FP + TN) / (TP + FN)
        delta = FP / (TP+0.001)
        gamma = (TP + FP + TN + FN) / (TP+0.001)
        CCR = 0.5 * (TP / (TP + FN) + TN / (FP + TN))
#         print("delta:",delta,"gamma:", gamma)
        U = (R - delta)**2 / ((1 + delta) * (gamma - delta - 1 + 0.001))

        return CCR + U

  
    def reset(self):
        self.current_step = 0
        self.obs = self._next_observation()
        return self.obs

    def render(self, mode='human', close=False):
        pass
    
    def _take_action(self, action):
        chosen_snp_ids = []
        for i, choice in enumerate(action):
            if choice == 1:
                chosen_snp_ids.append(i)
        return chosen_snp_ids    
    def _next_observation(self):
        id_0 = np.random.choice(self.phen_gen[0], self.SAMPLE_SIZE)
        id_1 = np.random.choice(self.phen_gen[1], self.SAMPLE_SIZE)
        sample_ids = np.array(list(zip(id_0,id_1))).flatten()
        self.obs = np.array([self.genotype[idv] for idv in sample_ids])
        self.obs_phenotypes = [self.phenotype[idv] for idv in sample_ids]

        return self.obs

In [19]:
class EpiProbabilityActionSelector(ptan.actions.ActionSelector):
    """
    Converts probabilities of actions into action by sampling them
    """
    def __call__(self, probs):
        assert isinstance(probs, np.ndarray)
        assert isinstance(probs[0], np.ndarray)
        actions = []
        print("EpiProbabilityActionSelector - probs shape:", probs.shape)
        for prob in probs:
#             print("prob", prob.shape)
            num_selected_snps = 0
            for oneprob in prob:
                if oneprob > 1/len(prob):
                    num_selected_snps += 1
#             sum(1 for oneprob in prob if oneprob > 1/len(prob))
            chosen_snp = np.random.choice(len(prob), size=num_selected_snps, replace=False, p=prob)
            action = np.zeros(len(prob))
            for snp in chosen_snp:
                action[snp] = 1
            actions.append(action)
        return np.array(actions)

In [5]:
probs = [[0.8, 0.1, 0.1], [0.4, 0.4, 0.2]]
for prob in probs:
            num_selected_snps = sum(1 for oneprob in prob if oneprob > 1/len(prob))
            print(num_selected_snps)
            chosen_snp = np.random.choice(len(prob),size=2, p=prob, replace=False)
            print(chosen_snp)

1
[0 2]
2
[1 2]


In [4]:
import math
import time

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torch.nn import Parameter
from torchvision import datasets, transforms

class LinearARD(nn.Module):
    def __init__(self, in_features, out_features, threshold, bias=True):
        super(LinearARD, self).__init__()
        """
            in_features: int, a number of input features
            out_features: int, a number of neurons
            threshold: float, a threshold for clipping weights
        """
        
        self.in_features = in_features
        self.out_features = out_features
        self.threshold = threshold

        self.mu = nn.parameter.Parameter(torch.Tensor(self.out_features, self.in_features)) # torch.nn.parameter.Parameter of size out_features x in_features
        self.log_sigma = nn.parameter.Parameter(torch.Tensor(self.out_features, self.in_features)) # torch.nn.parameter.Parameter of size out_features x in_features
        self.bias = nn.parameter.Parameter(torch.Tensor(1, self.out_features)) # torch.nn.parameter.Parameter of size 1 x out_features
        self.reset_parameters()
        
    def reset_parameters(self):
        self.bias.data.zero_()
        self.mu.data.normal_(0, 0.02)
        self.log_sigma.data.fill_(-5)        
        
    def forward(self, x):      
        # x is a torch.Tensor of shape (number_of_objects, in_features)
        # log_alpha is a torch.Tensor of shape (out_features, in_features)
        self.log_alpha = 2*self.log_sigma-torch.log(self.mu**2+1e-16)# Compute using self.log_sigma and self.mu
        # clipping for a numerical stability
        self.log_alpha = torch.clamp(self.log_alpha, -10, 10)   
        
        if self.training:
            # LRT = local reparametrization trick
            # lrt_mean is a torch.Tensor of shape (x.shape[0], out_features)
            lrt_mean = F.linear(input=x, weight=self.mu, bias=self.bias) # compute mean activation using LRT; you can use F.linear
            # lrt_std is a torch.Tensor of shape (x.shape[0], out_features)
            lrt_std = torch.sqrt(1e-8+F.linear(input=x**2, weight=torch.exp(2*self.log_sigma), bias=None)) # compute std of activations unsig lrt; you can use F.linear
                      # do not forget use torch.sqrt(x + 1e-8) instead of torch.sqrt(x)
            # eps is a torch.Tensor of shape (x.shape[0], out_features)
            eps = torch.randn_like(lrt_std)# sample of noise for reparametrization
            return lrt_mean+lrt_std*eps# sample of activation
        
        # compute the output of the layer
        # use weights W = E q = self.mu
        # clip all weight with log_alpha > threshold
        return F.linear(input=x, weight=self.mu*(self.log_alpha < self.threshold).float(), bias=self.bias)
        
    def kl_reg(self):
        # kl is a scalar torch.Tensor 
        # kl = # eval the KL divergence
        log_alpha = 2*self.log_sigma-torch.log(self.mu**2+1e-16) # Eval log alpha as a function(log_sigma, W)
        log_alpha = torch.clamp(log_alpha, -10, 10)# Clip log alpha to be in [-10, 10] for numerical suability 
        kl = - 0.5 * torch.log1p(torch.exp(-log_alpha))
        KL  = - torch.sum(kl)
        return KL
    
class Net(nn.Module):
    def __init__(self, threshold):
        super(Net, self).__init__()
        self.fc1 = LinearARD(100, 300, threshold)
        self.fc2 = LinearARD(300,  100, threshold)
#         self.fc3 = LinearARD(100,  10, threshold)
        self.threshold=threshold

    def forward(self, x):
        x = F.relu(self.fc1(x))
#         x = F.relu(self.fc2(x))
        x = F.log_softmax(self.fc2(x), dim=1)
        return x    

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class SnpPGN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(SnpPGN, self).__init__()

        self.conv = nn.Sequential(
            nn.Conv1d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv1d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv1d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )

        conv_out_size = self._get_conv_out(input_shape)
        self.fc = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )

    def _get_conv_out(self, shape):
        o = self.conv(torch.zeros(1, *shape))
        return int(np.prod(o.size()))

    def forward(self, x):
#         fx = x.float() / 256
        fx = x.float() / 3
        conv_out = self.conv(fx).view(fx.size()[0], -1)
        return self.fc(conv_out)

In [16]:
import wandb
import gym
# import ptan
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim



GAMMA = 0.99
LEARNING_RATE = 0.01
EPISODES_TO_TRAIN = 4


# class PGN(nn.Module):
#     def __init__(self, input_size, n_actions):
#         super(PGN, self).__init__()

#         self.net = nn.Sequential(
#             nn.Linear(input_size, 128),
#             nn.ReLU(),
#             nn.Linear(128, n_actions)
#         )

#     def forward(self, x):
#         return self.net(x)


def calc_qvals(rewards):
    res = []
    sum_r = 0.0
    for r in reversed(rewards):
        sum_r *= GAMMA
        sum_r += r
        res.append(sum_r)
    return list(reversed(res))


if __name__ == "__main__":
    SAMPLE_SIZE = 300 #t1 = t2 = SAMPLE_SIZE
#     filename = "./epigen/sim/0_2_ASW.json"
    env = EpistasisEnv()
#     wandb.init(project="epistasis", entity="taisikus")
#     wandb.config = {
#       "learning_rate": 0.01,
#       "gamma": 0.99,
#       "episodes_to_train": 4
#     }
#     writer = SummaryWriter(comment="-cartpole-reinforce")

#     net = PGN(env.observation_space.shape[1], env.N_SNPS)
    net = SnpPGN(env.observation_space.shape, env.N_SNPS)
    print(net)
    agent = ptan.agent.PolicyAgent(net, action_selector=EpiProbabilityActionSelector(),preprocessor=ptan.agent.float32_preprocessor,
                                   apply_softmax=True)
    exp_source = ptan.experience.ExperienceSourceFirstLast(env, agent, gamma=GAMMA)

    optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)

    total_rewards = []
    step_idx = 0
    done_episodes = 0

    batch_episodes = 0
    batch_states, batch_actions, batch_qvals = [], [], []
    cur_rewards = []

    for step_idx, exp in enumerate(exp_source):
        batch_states.append(exp.state)
#         batch_actions.append(int(exp.action))
        batch_actions.append(exp.action)
        cur_rewards.append(exp.reward)

        if exp.last_state is None:
            batch_qvals.extend(calc_qvals(cur_rewards))
            cur_rewards.clear()
            batch_episodes += 1

        # handle new rewards
        new_rewards = exp_source.pop_total_rewards()
        if new_rewards:
            done_episodes += 1
            reward = new_rewards[0]
            total_rewards.append(reward)
            mean_rewards = float(np.mean(total_rewards[-100:]))
            print("%d: reward: %6.2f, mean_100: %6.2f, episodes: %d" % (
                step_idx, reward, mean_rewards, done_episodes))
#             wandb.log({"reward": reward, "mean_100": mean_rewards, "episodes": done_episodes})
#             writer.add_scalar("reward", reward, step_idx)
#             writer.add_scalar("reward_100", mean_rewards, step_idx)
#             writer.add_scalar("episodes", done_episodes, step_idx)
            if mean_rewards > 0.96:
                print("Solved in %d steps and %d episodes!" % (step_idx, done_episodes))
                break

        if batch_episodes < EPISODES_TO_TRAIN:
            continue

        optimizer.zero_grad()
        states_v = torch.FloatTensor(batch_states)
        batch_actions_t = torch.LongTensor(batch_actions)
        batch_qvals_v = torch.FloatTensor(batch_qvals)

        logits_v = net(states_v)
        log_prob_v = F.log_softmax(logits_v, dim=1)
        log_prob_actions_v = batch_qvals_v * log_prob_v[range(len(batch_states)), batch_actions_t]
        loss_v = -log_prob_actions_v.mean()

        loss_v.backward()
        optimizer.step()

        batch_episodes = 0
        batch_states.clear()
        batch_actions.clear()
        batch_qvals.clear()

#     writer.close()

SnpPGN(
  (conv): Sequential(
    (0): Conv1d(600, 32, kernel_size=(8,), stride=(4,))
    (1): ReLU()
    (2): Conv1d(32, 64, kernel_size=(4,), stride=(2,))
    (3): ReLU()
    (4): Conv1d(64, 64, kernel_size=(3,), stride=(1,))
    (5): ReLU()
  )
  (fc): Sequential(
    (0): Linear(in_features=576, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=100, bias=True)
  )
)
probs 1
prob (100,)


UnboundLocalError: local variable 'done' referenced before assignment

In [2]:
env = EpistasisEnv()
print(env.observation_space)
env.observation_space.shape

Box(0, 2, (600, 100), uint8)


(600, 100)

In [4]:
env.reset()
observation, reward, done, _ = env.step(env.action_space.sample())
print('Observation : ' + str(observation.shape))
print('Reward      : ' + str(reward))
print('Done        : ' + str(done))

ZeroDivisionError: division by zero

## Создали среду и запустили

In [23]:
env = EpistasisEnv()
env.reset()

array([[0, 0, 1, ..., 0, 0, 0],
       [0, 0, 1, ..., 1, 0, 0],
       [0, 0, 1, ..., 0, 0, 0],
       ...,
       [0, 1, 1, ..., 2, 0, 0],
       [0, 0, 2, ..., 2, 0, 0],
       [0, 1, 0, ..., 2, 0, 0]])

In [24]:
env.action_space.sample()

array([0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1,
       0, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 1,
       0, 1, 0, 1, 1, 1, 1, 0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 0, 0, 1,
       0, 0, 1, 1, 1, 1, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1,
       0, 0, 0, 1, 0, 1, 1, 1, 0, 0, 1, 1], dtype=uint8)

In [25]:
probs = np.random.random(size=100)
probs /= probs.sum()

In [26]:
action_selector = EpiProbabilityActionSelector()
probs = np.array([probs])

action = action_selector(probs)
print(action.shape, action)

EpiProbabilityActionSelector - probs shape: (1, 100)
(1, 100) [[0. 0. 1. 0. 0. 0. 1. 0. 0. 0. 1. 0. 1. 0. 0. 1. 0. 0. 0. 1. 1. 1. 1. 1.
  1. 0. 1. 0. 1. 1. 1. 1. 0. 1. 1. 0. 1. 0. 1. 1. 0. 0. 1. 0. 0. 1. 0. 1.
  1. 1. 1. 0. 1. 1. 1. 0. 0. 1. 1. 1. 1. 0. 1. 0. 1. 1. 0. 0. 1. 1. 0. 1.
  1. 0. 1. 1. 0. 1. 0. 0. 1. 0. 1. 1. 1. 1. 0. 0. 0. 1. 0. 0. 1. 0. 1. 0.
  0. 0. 0. 0.]]


In [27]:
env.step(env.action_space.sample())

snp_ids=[0, 3, 4, 5, 8, 11, 12, 15, 16, 19, 21, 22, 27, 28, 29, 32, 34, 35, 37, 39, 40, 42, 43, 45, 47, 48, 50, 52, 53, 54, 57, 60, 61, 64, 68, 74, 76, 78, 79, 84, 86, 87, 89, 92, 96, 99]


(array([[0, 0, 1, ..., 1, 0, 0],
        [0, 0, 2, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        ...,
        [0, 0, 1, ..., 1, 0, 0],
        [0, 0, 2, ..., 1, 0, 0],
        [0, 1, 1, ..., 1, 0, 0]]),
 1.9990076523764393,
 True,
 {})

In [11]:
env = gym.make("CartPole-v0")
print(env.observation_space)
env.observation_space.shape

Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)


(4,)

In [None]:
env = gym.make('CartPole-v0')
env.reset()
for i in range(500):
   observation, reward, done, _ = env.step(env.action_space.sample())
   print('Observation : ' + str(observation.shape))
   print('Reward      : ' + str(reward))
   print('Done        : ' + str(done))
   if done:
        print(observation)
   print('---------------------')
env.close()