Skip to content

Commit

Permalink
GCE list nodes performance improvement. Resolves LIBCLOUD-826.
Browse files Browse the repository at this point in the history
We leverage the aggregated disk call and store the result.  For the list node operation, we've added an extra parameter to use the cached data, which results to true.

Tests and fixtures updated as well.
  • Loading branch information
supertom committed Jan 6, 2017
1 parent c06497f commit d5efc32
Show file tree
Hide file tree
Showing 3 changed files with 311 additions and 29 deletions.
221 changes: 203 additions & 18 deletions libcloud/compute/drivers/gce.py
Expand Up @@ -72,6 +72,7 @@ class GCEConnection(GoogleBaseConnection):
GCEConnection extends :class:`google.GoogleBaseConnection` for 2 reasons:
1. modify request_path for GCE URI.
2. Implement gce_params functionality described below.
3. Add request_aggregated_items method for making aggregated API calls.
If the parameter gce_params is set to a dict prior to calling request(),
the URL parameters will be updated to include those key/values FOR A
Expand Down Expand Up @@ -130,6 +131,80 @@ def request(self, *args, **kwargs):

return response

def request_aggregated_items(self, api_name):
"""
Perform request(s) to obtain all results from 'api_name'.
This method will make requests to the aggregated 'api_name' until
all results are received. It will then, through a helper function,
combine all results and return a single 'items' dictionary.
:param api_name: Name of API to call. Consult API docs
for valid names.
:type api_name: ``str``
:return: dict in the format of the API response.
format: { 'items': {'key': {api_name: []}} }
ex: { 'items': {'zones/us-central1-a': {disks: []}} }
:rtype: ``dict``
"""
request_path = "/aggregated/%s" % api_name
api_responses = []

params = {'maxResults': 500}
more_results = True
while more_results:
self.gce_params = params
response = self.request(request_path, method='GET').object
if 'items' in response:
api_responses.append(response)
more_results = 'pageToken' in params
return self._merge_response_items(api_name, api_responses)

def _merge_response_items(self, list_name, response_list):
"""
Take a list of API responses ("item"-portion only) and combine them.
Helper function to combine multiple aggegrated responses into a single
dictionary that resembles an API response.
Note: keys that don't have a 'list_name" key (including warnings)
are omitted.
:param list_name: Name of list in dict. Practically, this is
the name of the API called (e.g. 'disks').
:type list_name: ``str``
:param response_list: list of API responses (e.g. resp['items']).
Each entry in the list is the result of a
single API call. Expected format is:
[ { items: {
key1: { api_name:[]},
key2: { api_name:[]}
}}, ... ]
:type response_list: ``dict``
:return: dict in the format of:
{ items: {key: {api_name:[]}, key2: {api_name:[]}} }
ex: { items: {
'us-east1-a': {'disks': []},
'us-east1-b': {'disks': []}
}}
:rtype: ``dict``
"""
merged_items = {}
for resp in response_list:
if 'items' in resp:
# example k would be a zone or region name
# example v would be { "disks" : [], "otherkey" : "..." }
for k, v in resp['items'].items():
if list_name in v:
merged_items.setdefault(k, {}).setdefault(
list_name, [])
# Combine the list with the existing list.
merged_items[k][list_name] += v[list_name]
return {'items': merged_items}


class GCEList(object):
"""
Expand Down Expand Up @@ -1719,6 +1794,10 @@ def __init__(self, user_id, key=None, datacenter=None, project=None,
else:
self.region = None

# Volume details are looked up in this name-zone dict.
# It is populated if the volume name is not found or the dict is empty.
self._ex_volume_dict = {}

def ex_add_access_config(self, node, name, nic, nat_ip=None,
config_type=None):
"""
Expand Down Expand Up @@ -2278,14 +2357,19 @@ def ex_list_networks(self):
for n in response.get('items', [])]
return list_networks

def list_nodes(self, ex_zone=None):
def list_nodes(self, ex_zone=None, ex_use_disk_cache=True):
"""
Return a list of nodes in the current zone or all zones.
:keyword ex_zone: Optional zone name or 'all'
:type ex_zone: ``str`` or :class:`GCEZone` or
:class:`NodeLocation` or ``None``
:keyword ex_use_disk_cache: Disk information for each node will
retrieved from a dictionary rather
than making a distinct API call for it.
:type ex_use_disk_cache: ``bool``
:return: List of Node objects
:rtype: ``list`` of :class:`Node`
"""
Expand All @@ -2295,16 +2379,20 @@ def list_nodes(self, ex_zone=None):
request = '/aggregated/instances'
else:
request = '/zones/%s/instances' % (zone.name)

response = self.connection.request(request, method='GET').object

if 'items' in response:
# The aggregated response returns a dict for each zone
if zone is None:
# Create volume cache now for fast lookups of disk info.
self._ex_populate_volume_dict()
for v in response['items'].values():
for i in v.get('instances', []):
try:
list_nodes.append(self._to_node(i))
list_nodes.append(
self._to_node(i,
use_disk_cache=ex_use_disk_cache)
)
# If a GCE node has been deleted between
# - is was listed by `request('.../instances', 'GET')
# - it is converted by `self._to_node(i)`
Expand All @@ -2317,7 +2405,9 @@ def list_nodes(self, ex_zone=None):
else:
for i in response['items']:
try:
list_nodes.append(self._to_node(i))
list_nodes.append(
self._to_node(i, use_disk_cache=ex_use_disk_cache)
)
# If a GCE node has been deleted between
# - is was listed by `request('.../instances', 'GET')
# - it is converted by `self._to_node(i)`
Expand All @@ -2327,6 +2417,8 @@ def list_nodes(self, ex_zone=None):
# other nodes.
except ResourceNotFoundError:
pass
# Clear the volume cache as lookups are complete.
self._ex_volume_dict = {}
return list_nodes

def ex_list_regions(self):
Expand Down Expand Up @@ -6929,26 +7021,36 @@ def ex_get_snapshot(self, name):
response = self.connection.request(request, method='GET').object
return self._to_snapshot(response)

def ex_get_volume(self, name, zone=None):
def ex_get_volume(self, name, zone=None, use_cache=False):
"""
Return a Volume object based on a volume name and optional zone.
:param name: The name of the volume
:type name: ``str``
To improve performance, we request all disks and allow the user
to consult the cache dictionary rather than making an API call.
:param name: The name of the volume
:type name: ``str``
:keyword zone: The zone to search for the volume in (set to 'all' to
search all zones)
:type zone: ``str`` or :class:`GCEZone` or :class:`NodeLocation`
or ``None``
:keyword use_cache: Search for the volume in the existing cache of
volumes. If True, we omit the API call and search
self.volumes_dict. If False, a call to
disks/aggregatedList is made prior to searching
self._ex_volume_dict.
:type use_cache: ``bool``
:return: A StorageVolume object for the volume
:rtype: :class:`StorageVolume`
"""
zone = self._set_zone(zone) or self._find_zone_or_region(
name, 'disks', res_name='Volume')
request = '/zones/%s/disks/%s' % (zone.name, name)
response = self.connection.request(request, method='GET').object
return self._to_storage_volume(response)
if not self._ex_volume_dict or use_cache is False:
# Make the API call and build volume dictionary
self._ex_populate_volume_dict()

return self._ex_lookup_volume(name, zone)

def ex_get_region(self, name):
"""
Expand Down Expand Up @@ -7201,6 +7303,85 @@ def _ex_connection_class_kwargs(self):
'scopes': self.scopes,
'credential_file': self.credential_file}

def _build_volume_dict(self, zone_dict):
"""
Build a dictionary in [name][zone]=disk format.
:param zone_dict: dict in the format of:
{ items: {key: {api_name:[], key2: api_name:[]}} }
:type zone_dict: ``dict``
:return: dict of volumes, organized by name, then zone Format:
{ 'disk_name':
{'zone_name1': disk_info, 'zone_name2': disk_info} }
:rtype: ``dict``
"""
name_zone_dict = {}
for k, v in zone_dict.items():
zone_name = k.replace('zones/', '')
disks = v.get('disks', [])
for disk in disks:
n = disk['name']
name_zone_dict.setdefault(n, {})
name_zone_dict[n].update({zone_name: disk})
return name_zone_dict

def _ex_lookup_volume(self, volume_name, zone=None):
"""
Look up volume by name and zone in volume dict.
If zone isn't specified or equals 'all', we return the volume
for the first zone, as determined alphabetically.
:param volume_name: The name of the volume.
:type volume_name: ``str``
:keyword zone: The zone to search for the volume in (set to 'all' to
search all zones)
:type zone: ``str`` or ``None``
:return: A StorageVolume object for the volume.
:rtype: :class:`StorageVolume` or raise ``ResourceNotFoundError``.
"""
if volume_name not in self._ex_volume_dict:
# Possibly added through another thread/process, so re-populate
# _volume_dict and try again. If still not found, raise exception.
self._ex_populate_dict()
if volume_name not in self._ex_volume_dict:
raise ResourceNotFoundError(
'Volume name: \'%s\' not found. Zone: %s' % (
volume_name, zone), None, None)
# Disk names are not unique across zones, so if zone is None or
# 'all', we return the first one we find for that disk name. For
# consistency, we sort by keys and set the zone to the first key.
if zone is None or zone is 'all':
zone = sorted(self._ex_volume_dict[volume_name])[0]

volume = self._ex_volume_dict[volume_name].get(zone, None)
if not volume:
raise ResourceNotFoundError(
'Volume \'%s\' not found for zone %s.' % (volume_name,
zone), None, None)
return self._to_storage_volume(volume)

def _ex_populate_volume_dict(self):
"""
Fetch the volume information using disks/aggregatedList
and store it in _ex_volume_dict.
return: ``None``
"""
# fill the volume dict by making an aggegatedList call to disks.
aggregated_items = self.connection.request_aggregated_items(
"disks")

# _ex_volume_dict is in the format of:
# { 'disk_name' : { 'zone1': disk, 'zone2': disk, ... }}
self._ex_volume_dict = self._build_volume_dict(
aggregated_items['items'])

return None

def _catch_error(self, ignore_errors=False):
"""
Catch an exception and raise it unless asked to ignore it.
Expand Down Expand Up @@ -8083,15 +8264,18 @@ def _to_node_location(self, location):
country=location['name'].split('-')[0],
driver=self)

def _to_node(self, node):
def _to_node(self, node, use_disk_cache=False):
"""
Return a Node object from the JSON-response dictionary.
:param node: The dictionary describing the node.
:type node: ``dict``
:param node: The dictionary describing the node.
:type node: ``dict``
:return: Node object
:rtype: :class:`Node`
:keyword use_disk_cache: If true, ex_get_volume call will use cache.
:type use_disk_cache: ``bool``
:return: Node object
:rtype: :class:`Node`
"""
public_ips = []
private_ips = []
Expand Down Expand Up @@ -8122,7 +8306,8 @@ def _to_node(self, node):
for disk in extra['disks']:
if disk.get('boot') and disk.get('type') == 'PERSISTENT':
bd = self._get_components_from_path(disk['source'])
extra['boot_disk'] = self.ex_get_volume(bd['name'], bd['zone'])
extra['boot_disk'] = self.ex_get_volume(
bd['name'], bd['zone'], use_cache=use_disk_cache)

if 'items' in node['tags']:
tags = node['tags']['items']
Expand Down

0 comments on commit d5efc32

Please sign in to comment.