Skip to content

Commit

Permalink
Merge cd58bf7 into b0ac489
Browse files Browse the repository at this point in the history
  • Loading branch information
amitsharmaak committed Sep 28, 2016
2 parents b0ac489 + cd58bf7 commit b59489f
Show file tree
Hide file tree
Showing 56 changed files with 1,573 additions and 1,150 deletions.
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ buildscript {
}

dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:1.3.7.RELEASE")
classpath("org.springframework.boot:spring-boot-gradle-plugin:1.3.8.RELEASE")
classpath("io.spring.gradle:dependency-management-plugin:0.6.0.RELEASE")
classpath("org.asciidoctor:asciidoctor-gradle-plugin:1.5.3")
classpath("gradle.plugin.com.gorylenko.gradle-git-properties:gradle-git-properties:1.4.11")
Expand All @@ -16,7 +16,7 @@ buildscript {

plugins {
id 'com.github.kt3k.coveralls' version '2.6.3'
id 'nebula.netflixoss' version '3.2.3'
id 'nebula.netflixoss' version '3.4.0'
id 'nebula.optional-base' version '3.0.3'
id 'nebula.provided-base' version '3.0.3'
}
Expand Down Expand Up @@ -63,7 +63,7 @@ subprojects {

dependencyManagement {
imports {
mavenBom 'io.spring.platform:platform-bom:2.0.7.RELEASE'
mavenBom 'io.spring.platform:platform-bom:2.0.8.RELEASE'
// mavenBom 'org.springframework.cloud:spring-cloud-starter-parent:Brixton.RC1'
}
}
Expand Down
5 changes: 5 additions & 0 deletions genie-client/src/main/python/pygenie/adapter/genie_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ def construct_base_payload(job):
'version': job.get('job_version')
}

if job.get('genie_cpu'):
payload['cpu'] = job.get('genie_cpu')
if job.get('genie_memory'):
payload['memory'] = job.get('genie_memory')

return payload

def get(self, job_id, path=None, if_not_found=None, **kwargs):
Expand Down
26 changes: 18 additions & 8 deletions genie-client/src/main/python/pygenie/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,13 @@ def __init__(self, conf=None):
self.path_job = self.host \
+ self.conf.genie.get('job_url', '/api/v3/jobs')

def get_applications(self, filters=None):
def get_applications(self, filters=None, req_size=1000):
"""
Get a list of applications.
Args:
filters (dict): a dictionary of filters to use in the query.
req_size (int): the number of items to return per request.
Yields:
dict: an application
Expand All @@ -129,10 +130,12 @@ def get_applications(self, filters=None):
"""
params = filters or {}
_verify_filters(params, ['name', 'user', 'status', 'tag', 'type', 'page'])
_verify_filters(params, ['name', 'user', 'size', 'status', 'tag',
'type', 'page'])

# Iterate through any responses until we get to the end
params['page'] = 0
params['size'] = req_size
while True:
resp = _call(self.path_application, method='GET', params=params)

Expand Down Expand Up @@ -665,7 +668,7 @@ def delete_all_commands(self):
_call(self.path_command, method='DELETE', raise_not_status=204)

# TODO: Page and size should be separate arguments
def get_commands(self, filters=None):
def get_commands(self, filters=None, req_size=1000):
"""
Get all of the commands in a cluster.
Expand All @@ -677,6 +680,7 @@ def get_commands(self, filters=None):
Args:
filters (dict): a dictionary of filters to use. Valid key parameters
are: name, user, status, tag
req_size (int): the number of items to return per request.
Yields:
dict: a command dictionary
Expand All @@ -693,10 +697,11 @@ def get_commands(self, filters=None):
_check_type(filters, dict)

params = filters or {}
_verify_filters(params, ['name', 'user', 'status', 'tag'])
_verify_filters(params, ['name', 'user', 'size', 'status', 'tag'])

# Iterate through any responses until we get to the end
params['page'] = 0
params['size'] = req_size
while True:
resp = _call(self.path_command, method='GET', params=params)

Expand All @@ -716,7 +721,7 @@ def get_commands(self, filters=None):
logger.info('Fetching additional commands from genie [%s/%s]',
params['page'], resp['page']['totalPages'])

def get_clusters(self, filters=None):
def get_clusters(self, filters=None, req_size=1000):
"""
Get all of the clusters.
Expand All @@ -728,6 +733,7 @@ def get_clusters(self, filters=None):
Args:
filters (optional[dict]): a dictionary of filters to use. Valid key parameters
are: name, status, tag
req_size (int): the number of items to return per request.
Yields:
dict: a cluster configuration
Expand All @@ -741,10 +747,11 @@ def get_clusters(self, filters=None):
"""
params = filters or {}
_verify_filters(params, ['name', 'status', 'tag', 'page'])
_verify_filters(params, ['name', 'size', 'status', 'tag', 'page'])

# Iterate through any responses until we get to the end
params['page'] = 0
params['size'] = req_size
while True:
resp = _call(self.path_cluster, method='GET', params=params)

Expand Down Expand Up @@ -1440,14 +1447,15 @@ def remove_tags_for_command(self, command_id, tag):
path = self.path_command + '/' + command_id + '/' + tag
_call(path, method='DELETE', raise_not_status=204)

def get_jobs(self, filters=None):
def get_jobs(self, filters=None, req_size=1000):
"""
Get jobs. This command makes pages through results from Genie and will
make multiple API calls until all of the results have been returned.
Args:
filters (dict): filter the jobs by these value(s). Valid parameters
are id, clusterName, user, status, and tag.
req_size (int): the number of items to return per request.
Yields:
dict: a job
Expand All @@ -1461,10 +1469,12 @@ def get_jobs(self, filters=None):
"""
params = filters or {}
_verify_filters(params, ['id', 'clusterName', 'user', 'status', 'tag'])
_verify_filters(params, ['id', 'clusterName', 'user', 'size', 'status'
'tag'])

# Iterate through any responses until we get to the end
params['page'] = 0
params['size'] = req_size
while True:
resp = _call(self.path_job, method='GET', params=params)

Expand Down
47 changes: 47 additions & 0 deletions genie-client/src/main/python/pygenie/jobs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ def __init__(self, conf=None):
self._dependencies = list()
self._description = None
self._email = None
self._genie_cpu = None
self._genie_memory = None
self._group = None
self._job_id = uuid_str()
self._job_name = None
Expand Down Expand Up @@ -464,6 +466,28 @@ def sig_handler(signum, frame):
running_job = execute_job(self)
return running_job

@add_to_repr('overwrite')
def genie_cpu(self, cpu):
"""
Set the number of CPUs for Genie to allocate when executing the job.
Example:
>>> job = GenieJob() \\
... .genie_cpu(2)
Args:
cpu (int): Number of CPUs to allocate.
Returns:
:py:class:`GenieJob`: self
"""

assert int(cpu) > 0, 'number of CPUs cannot be less than 1'

self._genie_cpu = int(cpu)

return self

@unicodify
@arg_string
@add_to_repr('overwrite')
Expand All @@ -488,6 +512,29 @@ def email(self, email):
logger.warning("Use .genie_email('%s') to set Genie email.", email)
return self.genie_email(email)

@add_to_repr('overwrite')
def genie_memory(self, memory):
"""
Set the amount of memory (MB) for Genie to allocate when executing the job.
Example:
>>> # set Genie to allocate 6 GB of memory
>>> job = GenieJob() \\
... .genie_memory(6000)
Args:
memory (int): Amount of memory (MB) to allocate.
Returns:
:py:class:`GenieJob`: self
"""

assert int(memory) > 0, 'memory amount (MB) cannot be less than 1'

self._genie_memory = int(memory)

return self

@unicodify
@add_to_repr('overwrite')
def genie_setup_file(self, setup_file):
Expand Down
30 changes: 23 additions & 7 deletions genie-client/src/main/python/pygenie/jobs/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ def cmd_args(self):
elif self._script is not None:
self._add_dependency({'name': filename, 'data': self._script})

params_str = ' '.join([
"-d '{name}={value}'" \
.format(name=k,
value=unicode(v).replace("'", "''")) \
for k, v in self._parameters.items()
])
# put parameters into a parameter file and specify parameter file on command line
# this is to get around weird quoting issues in parameter values, etc
param_str = self._parameter_file
if param_str:
self._add_dependency({
'name': '_hive_parameters.txt',
'data': param_str
})

props_str = ' '.join([
'--hiveconf {name}={value}'.format(name=k, value=v) \
Expand All @@ -83,9 +85,23 @@ def cmd_args(self):
.format(prop_file=prop_file_str,
props=props_str,
filename=filename,
params=params_str) \
params='-i _hive_parameters.txt' if param_str else '') \
.strip()

@property
def _parameter_file(self):
"""Takes specified parameters and creates a string for the parameter file."""

param_file = ""

for name, value in self._parameters.items():
param_file = '{p}SET hivevar:{name}={value};\n' \
.format(p=param_file,
name=name,
value=unicode(value))

return param_file.strip()

def headers(self):
"""
Sets hive.cli.print.header so that if the hive query is outputing
Expand Down
4 changes: 2 additions & 2 deletions genie-client/src/main/python/pygenie/jobs/pig.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def cmd_args(self):
for p in self._parameter_files
])

# put parameters into a parameter file and specify paramter file on command line
# put parameters into a parameter file and specify parameter file on command line
# this is to get around weird quoting issues in parameter values, etc
param_str = self._parameter_file
if param_str:
Expand Down Expand Up @@ -108,7 +108,7 @@ def _parameter_file(self):
name=name,
value=unicode(value).replace('"', '\\"'))

return param_file
return param_file.strip()

@unicodify
@arg_list
Expand Down
30 changes: 30 additions & 0 deletions genie-client/src/main/python/pygenie/jobs/running.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,21 @@ def command_name(self):
str: The command name.
"""

@property
def cpu(self):
"""
Get the job's allocated number of CPUs.
Example:
>>> running_job.cpu
10
Returns:
int: Number of allocated CPUs.
"""

return self.request_data.get('cpu')

@property
@get_from_info('cluster_name', info_section='cluster')
def cluster_name(self):
Expand Down Expand Up @@ -416,6 +431,21 @@ def kill(self):
resp = self._adapter.kill_job(job_id=self._job_id)
return resp

@property
def memory(self):
"""
Get the job's allocated memory (MB).
Example:
>>> running_job.memory
1600
Returns:
int: Allocated memory (MB).
"""

return self.request_data.get('memory')

@property
@get_from_info('output_uri', info_section='job')
def output_uri(self):
Expand Down
2 changes: 1 addition & 1 deletion genie-client/src/main/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

setup(
name='nflx-genie-client',
version='3.0.42',
version='3.0.45',
author='Netflix Inc.',
author_email='genieoss@googlegroups.com',
keywords='genie hadoop cloud netflix client bigdata presto',
Expand Down

0 comments on commit b59489f

Please sign in to comment.