Skip to content
This repository has been archived by the owner on Dec 11, 2022. It is now read-only.

Commit

Permalink
Adding kubernetes orchestrator for rollouts, adding requirements for …
Browse files Browse the repository at this point in the history
…incremental docker builds
  • Loading branch information
Ajay Deshpande authored and zach-nervana committed Oct 23, 2018
1 parent 6541bc7 commit ce9838a
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 15 deletions.
16 changes: 16 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
annoy==1.8.3
Pillow==4.3.0
matplotlib==2.0.2
numpy==1.14.5
pandas==0.22.0
pygame==1.9.3
PyOpenGL==3.1.0
scipy==0.19.0
scikit-image==0.13.0
box2d==2.3.2
gym==0.10.5
bokeh==0.13.0
futures==3.1.1
wxPython==4.0.1
kubernetes==7.0.0
redis==2.10.6
15 changes: 15 additions & 0 deletions rl_coach/agents/dqn_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from rl_coach.core_types import EnvironmentSteps
from rl_coach.exploration_policies.e_greedy import EGreedyParameters
from rl_coach.memories.non_episodic.experience_replay import ExperienceReplayParameters
from rl_coach.memories.non_episodic.distributed_experience_replay import DistributedExperienceReplayParameters
from rl_coach.schedules import LinearSchedule


Expand All @@ -50,6 +51,20 @@ def __init__(self):
self.create_target_network = True


class DQNAgentParametersDistributed(AgentParameters):
def __init__(self):
super().__init__(algorithm=DQNAlgorithmParameters(),
exploration=EGreedyParameters(),
memory=DistributedExperienceReplayParameters(),
networks={"main": DQNNetworkParameters()})
self.exploration.epsilon_schedule = LinearSchedule(1, 0.1, 1000000)
self.exploration.evaluation_epsilon = 0.05

@property
def path(self):
return 'rl_coach.agents.dqn_agent:DQNAgent'


class DQNAgentParameters(AgentParameters):
def __init__(self):
super().__init__(algorithm=DQNAlgorithmParameters(),
Expand Down
24 changes: 13 additions & 11 deletions rl_coach/memories/non_episodic/distributed_experience_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#

from typing import List, Tuple, Union, Dict, Any
from typing import List, Tuple, Union

import numpy as np
import redis
Expand All @@ -23,14 +23,16 @@

from rl_coach.core_types import Transition
from rl_coach.memories.memory import Memory, MemoryGranularity, MemoryParameters
from rl_coach.utils import ReaderWriterLock


class DistributedExperienceReplayParameters(MemoryParameters):
def __init__(self):
super().__init__()
self.max_size = (MemoryGranularity.Transitions, 1000000)
self.allow_duplicates_in_batch_sampling = True
self.redis_ip = 'localhost'
self.redis_port = 6379
self.redis_db = 0

@property
def path(self):
Expand All @@ -41,19 +43,19 @@ class DistributedExperienceReplay(Memory):
"""
A regular replay buffer which stores transition without any additional structure
"""
def __init__(self, max_size: Tuple[MemoryGranularity, int], allow_duplicates_in_batch_sampling: bool=True,
redis_ip = 'localhost', redis_port = 6379, db = 0):
def __init__(self, max_size: Tuple[MemoryGranularity, int], allow_duplicates_in_batch_sampling: bool=True,
redis_ip='localhost', redis_port=6379, redis_db=0):
"""
:param max_size: the maximum number of transitions or episodes to hold in the memory
:param allow_duplicates_in_batch_sampling: allow having the same transition multiple times in a batch
"""

super().__init__(max_size)
if max_size[0] != MemoryGranularity.Transitions:
raise ValueError("Experience replay size can only be configured in terms of transitions")
self.allow_duplicates_in_batch_sampling = allow_duplicates_in_batch_sampling

self.db = db
self.db = redis_db
self.redis_connection = redis.Redis(redis_ip, redis_port, self.db)

def length(self) -> int:
Expand All @@ -67,15 +69,15 @@ def num_transitions(self) -> int:
Get the number of transitions in the ER
"""
return self.redis_connection.info(section='keyspace')['db{}'.format(self.db)]['keys']

def sample(self, size: int) -> List[Transition]:
"""
Sample a batch of transitions form the replay buffer. If the requested size is larger than the number
of samples available in the replay buffer then the batch will return empty.
:param size: the size of the batch to sample
:param beta: the beta parameter used for importance sampling
:return: a batch (list) of selected transitions from the replay buffer
"""
"""
transition_idx = dict()
if self.allow_duplicates_in_batch_sampling:
while len(transition_idx) != size:
Expand Down Expand Up @@ -129,7 +131,7 @@ def get_transition(self, transition_index: int, lock: bool=True) -> Union[None,
:return: the corresponding transition
"""
return pickle.loads(self.redis_connection.get(transition_index))

def remove_transition(self, transition_index: int, lock: bool=True) -> None:
"""
Remove the transition in the given index.
Expand All @@ -140,7 +142,7 @@ def remove_transition(self, transition_index: int, lock: bool=True) -> None:
:return: None
"""
self.redis_connection.delete(transition_index)

# for API compatibility
def get(self, transition_index: int, lock: bool=True) -> Union[None, Transition]:
"""
Expand Down Expand Up @@ -173,5 +175,5 @@ def mean_reward(self) -> np.ndarray:
:return: the mean reward
"""
mean = np.mean([pickle.loads(self.redis_connection.get(key)).reward for key in self.redis_connection.keys()])

return mean
15 changes: 15 additions & 0 deletions rl_coach/orchestrators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright (c) 2017 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
19 changes: 19 additions & 0 deletions rl_coach/orchestrators/deploy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@



class DeployParameters(object):

def __init__(self):
pass


class Deploy(object):

def __init__(self, deploy_parameters):
self.deploy_parameters = deploy_parameters

def setup(self) -> bool:
pass

def deploy(self) -> bool:
pass
153 changes: 153 additions & 0 deletions rl_coach/orchestrators/kubernetes_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@

from rl_coach.orchestrators.deploy import Deploy, DeployParameters
from kubernetes import client, config


class KubernetesParameters(DeployParameters):

def __init__(self, image: str, command: list(), arguments: list() = list(), synchronized: bool = False,
num_workers: int = 1, kubeconfig: str = None, namespace: str = None, redis_ip: str = None,
redis_port: int = None, redis_db: int = 0):
self.image = image
self.synchronized = synchronized
self.command = command
self.arguments = arguments
self.kubeconfig = kubeconfig
self.num_workers = num_workers
self.namespace = namespace
self.redis_ip = redis_ip
self.redis_port = redis_port
self.redis_db = redis_db


class Kubernetes(Deploy):

def __init__(self, deploy_parameters: KubernetesParameters):
super().__init__(deploy_parameters)
self.deploy_parameters = deploy_parameters

def setup(self) -> bool:
if self.deploy_parameters.kubeconfig:
config.load_kube_config()
else:
config.load_incluster_config()

if not self.deploy_parameters.namespace:
_, current_context = config.list_kube_config_contexts()
self.deploy_parameters.namespace = current_context['context']['namespace']

if not self.deploy_parameters.redis_ip:
# Need to spin up a redis service and a deployment.
if not self.deploy_redis():
print("Failed to setup redis")
return False

self.deploy_parameters.command += ['-r', self.deploy_parameters.redis_ip, '-p', '{}'.format(self.deploy_parameters.redis_port)]

return True

def deploy_redis(self) -> bool:
container = client.V1Container(
name="redis-server",
image='redis:4-alpine',
)
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={'app': 'redis-server'}),
spec=client.V1PodSpec(
containers=[container]
)
)
deployment_spec = client.V1DeploymentSpec(
replicas=1,
template=template,
selector=client.V1LabelSelector(
match_labels={'app': 'redis-server'}
)
)

deployment = client.V1Deployment(
api_version='apps/v1',
kind='Deployment',
metadata=client.V1ObjectMeta(name='redis-server', labels={'app': 'redis-server'}),
spec=deployment_spec
)

api_client = client.AppsV1Api()
try:
api_client.create_namespaced_deployment(self.deploy_parameters.namespace, deployment)
except client.rest.ApiException as e:
print("Got exception: %s\n while creating redis-server", e)
return False

core_v1_api = client.CoreV1Api()

service = client.V1Service(
api_version='v1',
kind='Service',
metadata=client.V1ObjectMeta(
name='redis-service'
),
spec=client.V1ServiceSpec(
selector={'app': 'redis-server'},
ports=[client.V1ServicePort(
protocol='TCP',
port=6379,
target_port=6379
)]
)
)

try:
core_v1_api.create_namespaced_service(self.deploy_parameters.namespace, service)
self.deploy_parameters.redis_ip = 'redis-service.{}.svc'.format(self.deploy_parameters.namespace)
self.deploy_parameters.redis_port = 6379
return True
except client.rest.ApiException as e:
print("Got exception: %s\n while creating a service for redis-server", e)
return False

def deploy(self) -> bool:
if self.deploy_parameters.synchronized:
return self.create_k8s_job()
else:
return self.create_k8s_deployment()

def create_k8s_deployment(self) -> bool:
container = client.V1Container(
name="worker",
image=self.deploy_parameters.image,
command=self.deploy_parameters.command,
args=self.deploy_parameters.arguments,
image_pull_policy='Always'
)
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={'app': 'worker'}),
spec=client.V1PodSpec(
containers=[container]
)
)
deployment_spec = client.V1DeploymentSpec(
replicas=self.deploy_parameters.num_workers,
template=template,
selector=client.V1LabelSelector(
match_labels={'app': 'worker'}
)
)

deployment = client.V1Deployment(
api_version='apps/v1',
kind='Deployment',
metadata=client.V1ObjectMeta(name='rollout-worker'),
spec=deployment_spec
)

api_client = client.AppsV1Api()
try:
api_client.create_namespaced_deployment(self.deploy_parameters.namespace, deployment)
return True
except client.rest.ApiException as e:
print("Got exception: %s\n while creating deployment", e)
return False

def create_k8s_job(self):
pass
18 changes: 18 additions & 0 deletions rl_coach/orchestrators/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from rl_coach.orchestrators.kubernetes_orchestrator import KubernetesParameters, Kubernetes

# image = 'gcr.io/constant-cubist-173123/coach:latest'
image = 'ajaysudh/testing:coach'
command = ['python3', 'rl_coach/rollout_worker.py']
# command = ['sleep', '10h']

params = KubernetesParameters(image, command, kubeconfig='~/.kube/config', redis_ip='redis-service.ajay.svc', redis_port=6379, num_workers=10)
# params = KubernetesParameters(image, command, kubeconfig='~/.kube/config')

obj = Kubernetes(params)
if not obj.setup():
print("Could not setup")

if obj.deploy():
print("Successfully deployed")
else:
print("Could not deploy")

0 comments on commit ce9838a

Please sign in to comment.