Skip to content

Commit

Permalink
Added managed memcache service and an AWS ElastiCache backend
Browse files Browse the repository at this point in the history
  • Loading branch information
tedsta committed Apr 3, 2017
1 parent 678c254 commit 1b4dd7c
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 28 deletions.
112 changes: 88 additions & 24 deletions perfkitbenchmarker/linux_benchmarks/memcached_ycsb_benchmark.py
@@ -1,4 +1,4 @@
# Copyright 2016 PerfKitBenchmarker Authors. All rights reserved.
# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs YCSB against memcached.
"""Runs YCSB against different memcached-like offerings.
This benchmark runs two workloads against memcached using YCSB (the Yahoo! Cloud
Serving Benchmark).
Expand All @@ -21,15 +21,45 @@
"""

import functools
import logging

from perfkitbenchmarker import configs
from perfkitbenchmarker import flags
from perfkitbenchmarker import providers
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.linux_packages import memcached_server
from perfkitbenchmarker.linux_packages import ycsb
from perfkitbenchmarker.providers.aws import aws_network

FLAGS = flags.FLAGS

flags.DEFINE_enum('memcached_managed', providers.GCP,
[providers.GCP, providers.AWS],
'Managed memcached provider (GCP/AWS) to use.')

flags.DEFINE_enum('memcached_scenario', 'custom',
['custom', 'managed'],
'select one scenario to run: \n'
'custom: Provision VMs and install memcached ourselves. \n'
'managed: Use the specified provider\'s managed memcache.')

flags.DEFINE_enum('memcached_elasticache_region', 'us-west-1',
['ap-northeast-1', 'ap-northeast-2', 'ap-southeast-1',
'ap-southeast-2', 'ap-south-1', 'cn-north-1', 'eu-central-1',
'eu-west-1', 'us-gov-west-1', 'sa-east-1', 'us-east-1',
'us-east-2', 'us-west-1', 'us-west-2'],
'The region to use for AWS ElastiCache memcached servers.')

flags.DEFINE_enum('memcached_elasticache_node_type', 'cache.m3.medium',
['cache.t2.micro', 'cache.t2.small', 'cache.t2.medium',
'cache.m3.medium', 'cache.m3.large', 'cache.m3.xlarge',
'cache.m3.2xlarge', 'cache.m4.large', 'cache.m4.xlarge',
'cache.m4.2xlarge', 'cache.m4.4xlarge', 'cache.m4.10xlarge'],
'The node type to use for AWS ElastiCache memcached servers.')

flags.DEFINE_integer('memcached_elasticache_num_servers', 1,
'The number of memcached instances for AWS ElastiCache.')


BENCHMARK_NAME = 'memcached_ycsb'
BENCHMARK_CONFIG = """
Expand Down Expand Up @@ -74,23 +104,53 @@ def Prepare(benchmark_spec):
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
loaders = benchmark_spec.vm_groups['clients']
assert loaders, benchmark_spec.vm_groups

# Memcached cluster
memcached_vms = benchmark_spec.vm_groups['servers']
assert memcached_vms, 'No memcached VMs: {0}'.format(benchmark_spec.vm_groups)
clients = benchmark_spec.vm_groups['clients']
assert clients, benchmark_spec.vm_groups

hosts = []

if FLAGS.memcached_scenario == 'managed':
# We need to delete the managed memcached backend when we're done
benchmark_spec.always_call_cleanup = True

if FLAGS.memcached_managed == providers.GCP:
raise NotImplementedError("GCP managed memcached backend not implemented "
"yet")
elif FLAGS.memcached_managed == providers.AWS:
cluster_id = 'pkb%s' % FLAGS.run_uri
service = providers.aws.elasticache.ElastiCacheMemcacheService(
aws_network.AwsNetwork.GetNetwork(clients[0]),
cluster_id, FLAGS.memcached_elasticache_region,
FLAGS.memcached_elasticache_node_type,
FLAGS.memcached_elasticache_num_servers)
service.Create()
hosts = service.GetHosts()
benchmark_spec.service = service
benchmark_spec.metadata = service.GetMetadata()
else:
# custom scenario
# Install memcached on all the servers
servers = benchmark_spec.vm_groups['servers']
assert servers, 'No memcached servers: {0}'.format(benchmark_spec.vm_groups)
memcached_install_fns = \
[functools.partial(memcached_server.ConfigureAndStart, vm)
for vm in servers]
vm_util.RunThreaded(lambda f: f(), memcached_install_fns)
hosts = ['%s:%s' % (vm.internal_ip, memcached_server.MEMCACHED_PORT)
for vm in servers]
benchmark_spec.metadata = {'ycsb_client_vms': FLAGS.ycsb_client_vms,
'ycsb_server_vms': FLAGS.ycsb_server_vms,
'num_vms': len(servers),
'cache_size': FLAGS.memcached_size_mb}

assert len(hosts) > 0

memcached_install_fns = [functools.partial(memcached_server.ConfigureAndStart,
vm)
for vm in memcached_vms]
ycsb_install_fns = [functools.partial(vm.Install, 'ycsb')
for vm in loaders]

vm_util.RunThreaded(lambda f: f(), memcached_install_fns + ycsb_install_fns)
for vm in clients]
vm_util.RunThreaded(lambda f: f(), ycsb_install_fns)
benchmark_spec.executor = ycsb.YCSBExecutor(
'memcached',
**{'memcached.hosts': ','.join([vm.internal_ip for vm in memcached_vms])})
**{'memcached.hosts': ','.join(hosts)})


def Run(benchmark_spec):
Expand All @@ -103,17 +163,16 @@ def Run(benchmark_spec):
Returns:
A list of sample.Sample instances.
"""
loaders = benchmark_spec.vm_groups['clients']
memcached_vms = benchmark_spec.vm_groups['servers']

metadata = {'ycsb_client_vms': FLAGS.ycsb_client_vms,
'num_vms': len(memcached_vms),
'cache_size': FLAGS.memcached_size_mb}
logging.info('Start benchmarking memcached service, scenario is %s.',
FLAGS.memcached_scenario)

clients = benchmark_spec.vm_groups['clients']

samples = list(benchmark_spec.executor.LoadAndRun(loaders))
samples = list(benchmark_spec.executor.LoadAndRun(clients))

for sample in samples:
sample.metadata.update(metadata)
sample.metadata.update(benchmark_spec.metadata)

return samples

Expand All @@ -125,5 +184,10 @@ def Cleanup(benchmark_spec):
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
memcached_vms = benchmark_spec.vm_groups['servers']
vm_util.RunThreaded(memcached_server.StopMemcached, memcached_vms)
if FLAGS.memcached_scenario == 'managed':
service = benchmark_spec.service
service.Destroy()
else:
# Custom scenario
servers = benchmark_spec.vm_groups['servers']
vm_util.RunThreaded(memcached_server.StopMemcached, servers)
5 changes: 5 additions & 0 deletions perfkitbenchmarker/linux_packages/memcached_server.py
Expand Up @@ -122,6 +122,11 @@ def StopMemcached(server):
(server.internal_ip, MEMCACHED_PORT))


def FlushMemcachedServer(ip, port):
vm_util.IssueCommand(
'(echo -e "flush_all\n" ; sleep 1)| netcat %s %s' % (ip, port))


def Uninstall(vm):
vm.RemoteCommand('pkill memcached')
vm.RemoteCommand('rm -rf %s' % MEMCACHED_DIR)
2 changes: 1 addition & 1 deletion perfkitbenchmarker/linux_packages/ycsb.py
Expand Up @@ -265,7 +265,7 @@ def LineFilter(line):
# Drop ">" from ">1000"
if name.startswith('>'):
name = name[1:]
val = float(val) if '.' in val else int(val)
val = float(val) if '.' in val or 'nan' in val.lower() else int(val)
if name.isdigit():
if val:
op_result[data_type].append((int(name), val))
Expand Down
35 changes: 35 additions & 0 deletions perfkitbenchmarker/memcache_service.py
@@ -0,0 +1,35 @@
# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class MemcacheService(object):
CLOUD = None

def __init__(self):
pass

def Create(self):
raise NotImplementedError

def Destroy(self):
raise NotImplementedError

def Flush(self):
raise NotImplementedError

def GetHosts(self):
raise NotImplementedError

def GetMetadata(self):
raise NotImplementedError
19 changes: 16 additions & 3 deletions perfkitbenchmarker/providers/aws/aws_network.py
Expand Up @@ -62,9 +62,22 @@ def AllowPort(self, vm, start_port, end_port=None):
"""
if vm.is_static:
return
self.AllowPortInSecurityGroup(vm.region, vm.group_id, start_port, end_port)

def AllowPortInSecurityGroup(self, region, security_group,
start_port, end_port=None):
"""Opens a port on the firewall for a security group.
Args:
region: The region of the security group
security_group: The security group in which to open the ports
start_port: The first local port to open in a range.
end_port: The last local port to open in a range. If None, only start_port
will be opened.
"""
if end_port is None:
end_port = start_port
entry = (start_port, end_port, vm.group_id)
entry = (start_port, end_port, region, security_group)
if entry in self.firewall_set:
return
with self._lock:
Expand All @@ -73,8 +86,8 @@ def AllowPort(self, vm, start_port, end_port=None):
authorize_cmd = util.AWS_PREFIX + [
'ec2',
'authorize-security-group-ingress',
'--region=%s' % vm.region,
'--group-id=%s' % vm.group_id,
'--region=%s' % region,
'--group-id=%s' % security_group,
'--port=%s-%s' % (start_port, end_port),
'--cidr=0.0.0.0/0']
util.IssueRetryableCommand(
Expand Down
138 changes: 138 additions & 0 deletions perfkitbenchmarker/providers/aws/elasticache.py
@@ -0,0 +1,138 @@
# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging

from perfkitbenchmarker import errors
from perfkitbenchmarker import providers
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.linux_packages import memcached_server
from perfkitbenchmarker.memcache_service import MemcacheService
from perfkitbenchmarker.providers.aws import aws_network


ELASTICACHE_PORT = 11211


class ElastiCacheMemcacheService(MemcacheService):

CLOUD = providers.AWS

def __init__(self, network, cluster_id, region, node_type, num_servers=1):
self.cluster_id = cluster_id
self.region = region
self.node_type = node_type
self.num_servers = num_servers
self.hosts = [] # [(ip, port)]

self.vpc_id = network.subnet.vpc_id
self.security_group_id = \
network.regional_network.vpc.default_security_group_id
self.subnet_id = network.subnet.id
self.subnet_group_name = '%ssubnet' % cluster_id

def Create(self):
# Open the port memcached needs
aws_network.AwsFirewall.GetFirewall() \
.AllowPortInSecurityGroup(self.region, self.security_group_id, ELASTICACHE_PORT)

# Create a cache subnet group
cmd = ['aws', 'elasticache', 'create-cache-subnet-group',
'--region=%s' % self.region,
'--cache-subnet-group-name=%s' % self.subnet_group_name,
'--cache-subnet-group-description="PKB memcached_ycsb benchmark"',
'--subnet-ids=%s' % self.subnet_id]
vm_util.IssueCommand(cmd)

# Create the cluster
cmd = ['aws', 'elasticache', 'create-cache-cluster',
'--engine=memcached',
'--cache-subnet-group-name=%s' % self.subnet_group_name,
'--cache-cluster-id=%s' % self.cluster_id,
'--num-cache-nodes=%s' % self.num_servers,
'--region=%s' % self.region,
'--cache-node-type=%s' % self.node_type]
vm_util.IssueCommand(cmd)

# Wait for the cluster to come up
cluster_info = self._WaitForClusterUp()

# Parse out the hosts
self.hosts = \
[(node['Endpoint']['Address'], node['Endpoint']['Port'])
for node in cluster_info['CacheNodes']]
assert len(self.hosts) == self.num_servers

def Destroy(self):
# Delete the ElastiCache cluster
cmd = ['aws', 'elasticache', 'delete-cache-cluster',
'--cache-cluster-id=%s' % self.cluster_id,
'--region=%s' % self.region]
vm_util.IssueCommand(cmd)
# Don't have to delete the subnet group. It will be deleted with the subnet.

def Flush(self):
vm_util.RunThreaded(memcached_server.FlushMemcachedServer, self.hosts)

def GetHosts(self):
return ["%s:%s" % (ip, port) for ip, port in self.hosts]

def GetMetadata(self):
return {'num_servers': self.num_servers,
'elasticache_region': self.region,
'elasticache_node_type': self.node_type}

def _GetClusterInfo(self):
cmd = ['aws', 'elasticache', 'describe-cache-clusters']
cmd += ['--cache-cluster-id=%s' % self.cluster_id]
cmd += ['--region=%s' % self.region]
cmd += ['--show-cache-node-info']
out, _, _ = vm_util.IssueCommand(cmd)
return json.loads(out)["CacheClusters"][0]

@vm_util.Retry(poll_interval=15, timeout=300,
retryable_exceptions=(errors.Resource.RetryableCreationError))
def _WaitForClusterUp(self):
"""Block until the ElastiCache memcached cluster is up.
Will timeout after 5 minutes, and raise an exception. Before the timeout
expires any exceptions are caught and the status check is retried.
We check the status of the cluster using the AWS CLI.
Returns:
The cluster info json as a dict
Raises:
errors.Resource.RetryableCreationError when response is not as expected or
if there is an error connecting to the port or otherwise running the
remote check command.
"""
logging.info("Trying to get ElastiCache cluster info for %s",
self.cluster_id)
cluster_status = None
try:
cluster_info = self._GetClusterInfo()
cluster_status = cluster_info['CacheClusterStatus']
if cluster_status == 'available':
logging.info("ElastiCache memcached cluster is up and running.")
return cluster_info
except errors.VirtualMachine.RemoteCommandError as e:
raise errors.Resource.RetryableCreationError(
"ElastiCache memcached cluster not up yet: %s." % str(e))
else:
raise errors.Resource.RetryableCreationError(
"ElastiCache memcached cluster not up yet. Status: %s" %
cluster_status)

0 comments on commit 1b4dd7c

Please sign in to comment.