Skip to content
This repository has been archived by the owner on May 26, 2018. It is now read-only.

Commit

Permalink
fix #34 #35
Browse files Browse the repository at this point in the history
  • Loading branch information
ikit committed Oct 21, 2016
1 parent deddf00 commit e967d3f
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 142 deletions.
154 changes: 64 additions & 90 deletions pirus/api_v1/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,60 @@ def notify_all(src, msg):
ws[0].send_str(msg)


def process_generic_get(query_string, allowed_fields):
# 1- retrieve query parameters
get_params = MultiDict(parse_qsl(query_string))
r_range = get_params.get('range', "0-" + str(RANGE_DEFAULT))
r_fields = get_params.get('fields', None)
r_order = get_params.get('order', None)
r_sort = get_params.get('sort', None)
r_filter = get_params.get('filter', None)

# 2- fields to extract
fields = allowed_fields
if r_fields is not None:
fields = []
for f in r_fields.split(','):
f = f.strip().lower()
if f in allowed_fields:
fields.append(f)
if len(fields) == 0:
return rest_error("No valid fields provided : " + get_params.get('fields'))

# 3- Build json query for mongoengine
query = {}
if r_filter is not None:
query = {"$or" : []}
for k in fields:
query["$or"].append({k : {'$regex': r_filter}})

# 4- Order
order = ['-create_date', "name"]
if r_sort is not None and r_order is not None:
r_sort = r_sort.split(',')
r_order = r_order.split(',')
if len(r_sort) == len(r_order):
order = []
for i in range(0, len(r_sort)):
f = r_sort[i].strip().lower()
if f in allowed_fields:
if r_order[i] == "desc":
f = "-" + f
order.append(f)
order = tuple(order)

# 5- limit
r_range = r_range.split("-")
offset=0
limit=RANGE_DEFAULT
try:
offset = int(r_range[0])
limit = int(r_range[1])
except:
return rest_error("No valid range provided : " + get_params.get('range') )

# 6- Return processed data
return fields, query, order, offset, limit



Expand Down Expand Up @@ -108,58 +161,10 @@ def __init__(self):
pass

def get(self, request):
# 1- retrieve query parameters
get_params = MultiDict(parse_qsl(request.query_string))
r_range = get_params.get('range', "0-" + str(RANGE_DEFAULT))
r_fields = get_params.get('fields', None)
r_order = get_params.get('order', None)
r_sort = get_params.get('sort', None)
r_filter = get_params.get('filter', None)

# 2- fields to extract
fields = PirusFile.public_fields
if r_fields is not None:
fields = []
for f in r_fields.split(','):
f = f.strip().lower()
if f in PirusFile.public_fields:
fields.append(f)
if len(fields) == 0:
return rest_error("No valid fields provided : " + get_params.get('fields'))

# 3- Build json query for mongoengine
query = {}
if r_filter is not None:
query = {"$or" : []}
for k in fields:
query["$or"].append({k : {'$regex': r_filter}})

# 4- Order
order = ['-create_date', "name"]
if r_sort is not None and r_order is not None:
r_sort = r_sort.split(',')
r_order = r_order.split(',')
if len(r_sort) == len(r_order):
order = []
for i in range(0, len(r_sort)):
f = r_sort[i].strip().lower()
if f in PirusFile.public_fields:
if r_order[i] == "desc":
f = "-" + f
order.append(f)
order = tuple(order)
# Generic processing of the get query
fields, query, order, offset, limit = process_generic_get(request.query_string, PirusFile.public_fields)

# 5- limit
r_range = r_range.split("-")
offset=0
limit=RANGE_DEFAULT
try:
offset = int(r_range[0])
limit = int(r_range[1])
except:
return rest_error("No valid range provided : " + get_params.get('range') )

# 6- Return result of the query !
# Return result of the query for PirusFile
return rest_success([p.export_client_data(fields) for p in PirusFile.objects(__raw__=query).order_by(*order)[offset:limit]])


Expand Down Expand Up @@ -276,7 +281,9 @@ def __init__(self):
pass

def get(self, request):
return rest_success([i for i in list_pipelines()])
fields, query, order, offset, limit = process_generic_get(request.query_string, Pipeline.public_fields)
return rest_success([p.export_client_data(fields) for p in Pipeline.objects(__raw__=query).order_by(*order)[offset:limit]])


async def post(self, request):
# 1- Retrieve pirus package from post request
Expand Down Expand Up @@ -339,7 +346,8 @@ def __init__(self):
pass

def get(self, request):
return rest_success([r.export_client_data() for r in Run.objects.all().limit(10)])
fields, query, order, offset, limit = process_generic_get(request.query_string, Run.public_fields)
return rest_success([p.export_client_data(fields) for p in Run.objects(__raw__=query).order_by(*order)[offset:limit]])


async def post(self, request):
Expand Down Expand Up @@ -368,7 +376,7 @@ async def post(self, request):
run.import_data({
"pipe_id" : pipe_id,
"name" : config["run"]["name"],
"celery_id" : str(cw.id),
"private_id" : str(cw.id),
"start" : str(datetime.datetime.now().timestamp()),
"status" : "WAITING",
"config" : json.dumps(config),
Expand Down Expand Up @@ -398,7 +406,7 @@ def download_file(self, run_id, filename, location=RUNS_DIR):
run = Run.from_id(run_id)
if run == None:
return rest_error("Unable to find the run with id " + str(run_id))
path = os.path.join(location, run.celery_id, filename)
path = os.path.join(location, run.private_id, filename)

if not os.path.exists(path):
return rest_error("File not found. " + filename + " doesn't exists for the run " + str(run_id))
Expand Down Expand Up @@ -444,46 +452,12 @@ def get_file(self, request):
filename = request.match_info.get('filename', "")
return self.download_file(run_id, filename)

# async def up_progress(self, request):
# # 1- Retrieve data from request
# data = await request.json()
# pipe_id = data["pipeline_id"]
# config = data["config"]
# inputs = data["inputs"]
# run_id = request.match_info.get('run_id', -1)
# complete = request.match_info.get('complete', None)
# print("RunHandler[up_progress] : taskid=" + run_id, complete)
# run = Run.from_celery_id(run_id)
# if run is not None:
# p = run.progress.copy()
# p.update({"value" : complete, "label" : str(complete) + " %"})
# # TODO FIXME : workaround to force the update of dynamic field "progress" - only updating progress dictionary doesn't work :(
# run.progress = 0
# run.save()
# run.progress = p
# run.save()
# msg = '{"action":"run_progress", "data" : ' + json.dumps(run.export_client_data()) + '}'
# print (msg)
# notify_all(None, msg)
# return web.Response()

# def up_status(self, request):
# run_id = request.match_info.get('run_id', -1)
# status = request.match_info.get('status', None)
# print("RunHandler[up_status] : taskid=" + run_id , status)
# run = Run.from_celery_id(run_id)
# if run is not None:
# run.status = status
# run.save()
# msg = '{"action":"run_progress", "data" : ' + json.dumps(last_runs()) + '}'
# notify_all(None, msg)
# return web.Response()

async def up_data(self, request):
# 1- Retrieve data from request
data = await request.json()
run_id = request.match_info.get('run_id', -1)
run = Run.from_celery_id(run_id)
run = Run.from_private_id(run_id)
if run is not None:
if "progress" in data.keys():
run.progress = data["progress"]
Expand Down

0 comments on commit e967d3f

Please sign in to comment.