# Reinforcement - Syft Duet - Data Scientist 🥁

Contributed by [@Koukyosyumei](https://github.com/Koukyosyumei)

This example trains a CartPole Reinforcement network with Gym over Syft.
This notebook is mainly based on the original pytorch [example](https://github.com/OpenMined/PySyft/tree/dev/examples/duet/reinforcement_learning/original).

## PART 1: Connect to a Remote Duet Server

As the Data Scientist, you want to perform data science on data that is sitting in the Data Owner's Duet server in their Notebook.

In order to do this, we must run the code that the Data Owner sends us, which importantly includes their Duet Session ID. The code will look like this, importantly with their real Server ID.

```
import syft as sy
duet = sy.duet('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
```

This will create a direct connection from my notebook to the remote Duet server. Once the connection is established all traffic is sent directly between the two nodes.

Paste the code or Server ID that the Data Owner gives you and run it in the cell below. It will return your Client ID which you must send to the Data Owner to enter into Duet so it can pair your notebooks.

In [None]:
# stdlib
import os
import time
import asyncio
from pathlib import Path

# third party
import gym
import numpy as np
import time
import torch
from torch.distributions import Categorical
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

loop = asyncio.get_event_loop()

In [None]:
import syft as sy

duet = sy.join_duet(loopback=True)

### <img src="https://github.com/OpenMined/design-assets/raw/master/logos/OM/mark-primary-light.png" alt="he-black-box" width="100"/> Checkpoint 0 : Now STOP and run the Data Owner notebook until Checkpoint 1.

## PART 2: Check GPU, Define a model, optimizer, and other functions

In [None]:
config = {
    "gamma": 0.99,
    "seed": 543,
    "render": False,
    "log_interval": 10,
    "no_cuda": False,
    "log_interval": 1,
    "wait_interval": 1,
}

In [None]:
remote_torch = duet.torch
remote_torch.manual_seed(config["seed"])

In [None]:
has_cuda = False
has_cuda_ptr = remote_torch.cuda.is_available()

# lets ask to see if our Data Owner has CUDA
has_cuda = bool(
    has_cuda_ptr.get(
        request_block=True,
        reason="To run test and inference locally",
        timeout_secs=3,  # change to something slower
    )
)
print("Is cuda available ? : ", has_cuda)


use_cuda = not config["no_cuda"] and has_cuda
# now we can set the seed
remote_torch.manual_seed(config["seed"])

device = remote_torch.device("cuda" if use_cuda else "cpu")
# print(f"Data Owner device is {device.type.get()}")

In [None]:
class Policy(sy.Module):
    def __init__(self, torch_ref):
        super(Policy, self).__init__(torch_ref=torch_ref)
        self.affine1 = self.torch_ref.nn.Linear(4, 128)
        self.dropout = self.torch_ref.nn.Dropout(p=0.6)
        self.relu = self.torch_ref.nn.ReLU(True)
        self.affine2 = self.torch_ref.nn.Linear(128, 2)
        self.saved_log_probs = []
        self.rewards = []

    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = self.relu(x)
        action_scores = self.affine2(x)
        return remote_torch.softmax(action_scores, dim=1)


# You cannot see the state
def select_action(state):
    probs_ptr = remote_policy(state)
    probs = probs_ptr.get(request_block=True, delete_obj=False)
    m = Categorical(probs)
    action = m.sample()
    remote_policy.saved_log_probs.append(m.log_prob(action))
    return action.item()


def finish_episode():
    R = 0
    policy_loss = []
    returns = []
    for r in remote_policy.rewards[::-1]:
        R = r + config["gamma"] * R
        returns.insert(0, R)
    returns = torch.tensor(returns)
    returns = (returns - returns.mean()) / (returns.std() + eps)
    for log_prob, R in zip(remote_policy.saved_log_probs, returns):
        policy_loss.append(-log_prob * R)
    optimizer.zero_grad()
    policy_loss = torch.cat(policy_loss).sum()
    policy_loss.backward()
    optimizer.step()
    del remote_policy.rewards[:]
    del remote_policy.saved_log_probs[:]

In [None]:
# send our model to remote
policy = Policy(torch)
remote_policy = policy.send(duet)

In [None]:
# if we have CUDA lets send our model to the GPU
if has_cuda:
    remote_policy.cuda(device)
else:
    remote_policy.cpu()

In [None]:
optimizer = remote_torch.optim.Adam(remote_policy.parameters(), lr=1e-2)
eps = np.finfo(np.float32).eps.item()

In [None]:
reward_threshold_ptr = duet.store["reward_threshold"]
reward_threshold = reward_threshold_ptr.get(request_block=True, delete_obj=False)
print(f"reward_threshold is {reward_threshold}")

In [None]:
duet.store.pandas

In [None]:
checkpoints_folder = "./nb_checkpoints"

### <img src="https://github.com/OpenMined/design-assets/raw/master/logos/OM/mark-primary-light.png" alt="he-black-box" width="100"/> Checkpoint 1 : Now STOP and run the Data Owner notebook until Checkpoint 2.

## PART 3 Training

In [None]:
count = 0
running_reward = 10
episodes = 10
steps = 30

for i_episode in range(episodes):
    ep_reward = 0

    # 10000
    for t in range(1, steps):

        # wait for data owner to send state
        for retry in range(360):
            if Path(f"{checkpoints_folder}/DO_checkpoint_state_{count}").exists():
                break
            task = loop.create_task(asyncio.sleep(config["wait_interval"]))
            loop.run_until_complete(task)
        assert Path(f"{checkpoints_folder}/DO_checkpoint_state_{count}").exists()
        # get state from data owner
        state = duet.store[f"state_{count}"]

        action = select_action(state)
        # send action to data owner
        sy_action = sy.lib.python.Int(action)
        sy_action.tag(f"action_{count}")
        sy_action.send(duet)
        Path(f"{checkpoints_folder}/DS_checkpoint_action_{count}").touch()

        # wait for data owner to send reward
        for retry in range(360):
            if Path(f"{checkpoints_folder}/DO_checkpoint_reward_{count}").exists():
                break
            task = loop.create_task(asyncio.sleep(config["wait_interval"]))
            loop.run_until_complete(task)
        assert Path(f"{checkpoints_folder}/DO_checkpoint_reward_{count}").exists()
        # get reward from data owner
        reward_ptr = duet.store[f"reward_{count}"]
        reward = reward_ptr.get(request_block=True, delete_obj=False)
        remote_policy.rewards.append(reward)
        ep_reward += reward

        # wait for data owner to send done
        for retry in range(360):
            if Path(f"{checkpoints_folder}/DO_checkpoint_done_{count}").exists():
                break
            task = loop.create_task(asyncio.sleep(config["wait_interval"]))
            loop.run_until_complete(task)
        assert Path(f"{checkpoints_folder}/DO_checkpoint_done_{count}").exists()
        # get done from data owner
        done_ptr = duet.store[f"done_{count}"]
        done = done_ptr.get(request_block=True, delete_obj=False)

        count += 1

        if done:
            break

    running_reward = 0.05 * ep_reward + (1 - 0.05) * running_reward
    finish_episode()
    if i_episode % config["log_interval"] == 0:
        print(
            "Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}".format(
                i_episode, ep_reward, running_reward
            )
        )
    if running_reward > reward_threshold:
        print(
            "Solved! Running reward is now {} and "
            "the last episode runs to {} time steps!".format(running_reward, t)
        )
        break

In [None]:
local_policy = remote_policy.get(request_block=True, timeout_secs=5)
local_policy.save("./cartpole.pt")

### <img src="https://github.com/OpenMined/design-assets/raw/master/logos/OM/mark-primary-light.png" alt="he-black-box" width="100"/> Checkpoint 2 : Now STOP and run the Data Owner notebook until the next checkpoint.