Skip to content

Commit

Permalink
More sensible example, fixed process.py
Browse files Browse the repository at this point in the history
  • Loading branch information
CristianCantoro committed Apr 15, 2014
1 parent 1e8a1fa commit f572918
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 119 deletions.
39 changes: 22 additions & 17 deletions examples/example.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,37 @@
"geometry": {
"coordinates": [
[
41.6238076,
15.1027029
12.491951286792755,
41.89017016073198
],
[
41.6237129,
15.1024066
12.492468953132628,
41.89031791683654
],
[
41.6234308,
15.1027489
12.492184638977049,
41.890429732039834
],
[
41.6232767,
15.1030065
],
[
41.6224843,
15.1030852
],
[
41.6226488,
15.1035361
12.491945922374725,
41.890333890448986
]
],
"type": "LineString"
},
"properties": {
"osmid": 123456
},
"type": "Feature"
},
{
"geometry": {
"coordinates": [
12.49195396900177,
41.89027598608483
],
"type": "Point"
},
"properties": {
"osmid": 265752390
},
Expand All @@ -40,6 +45,6 @@
],
"type": "FeatureCollection"
},
"instruction": "This is molise"
"instruction": "A task in Rome"
}
]
203 changes: 101 additions & 102 deletions examples/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

HEADERS = {'content-type': 'application/json'}


def is_running_instance(api_url):
try:
r = requests.get(base + 'ping')
Expand All @@ -41,6 +42,72 @@ def create_challenge_if_not_exists(slug, title):
print 'challenge existed.'


def get_tasks_from_db(args):
db_user = args.user
if not args.user:
db_user = getpass.getuser()

db_name = args.database
if not args.database:
db_name = 'osm'

db_query = args.query

db_string = "dbname={db_name} user={db_user}".format(db_name=db_name,
db_user=db_user
)

if args.host:
db_string += " host={db_host}".format(db_host=args.host)

# open a connection, get a cursor
conn = psycopg2.connect(db_string)
cur = conn.cursor(cursor_factory=DictCursor)
register_hstore(cur)
# get our results
cur.execute(db_query)
nodes = cur.fetchall()

for node in nodes:
osmid = node["id"]

geom = {
"type": "FeatureCollection",
"features": [{
"type": "Feature",
"properties": {"osmid": osmid},
"geometry": json.loads(geojson.dumps(
wkb.loads(node["geom"].decode("hex"))))
}]
}

yield prepare_task(node=node,
args=args,
osmid=osmid,
geom=geom
)


def get_tasks_from_json(args):

with open(args.json_file, 'r') as infile:
tasks = json.load(infile)

for task in tasks:
if not args.close:
osmid = task['geometries']['features'][0]['properties']['osmid']
geom = task['geometries']

yield prepare_task(node=task,
args=args,
osmid=osmid,
geom=geom
)
else:
for task in tasks:
yield task


def get_current_task_statuses(slug):
r = requests.get(mr_api_querystatuses_endpoint.format(slug=slug))
return dict((s['identifier'], s['status']) for s in r.json())
Expand All @@ -51,7 +118,7 @@ def generate_id(slug, osmid, payload):
payload['geometries']['features'] = \
sorted(payload['geometries']['features'])

digest = hashlib.md5(json.dumps(payload), sort_keys=True).hexdigest()
digest = hashlib.md5(json.dumps(payload, sort_keys=True)).hexdigest()
return "{slug}-{osmid}-{digest}".format(slug=slug,
osmid=osmid,
digest=digest
Expand All @@ -61,10 +128,9 @@ def generate_id(slug, osmid, payload):
def prepare_task(node, args, osmid, geom):
instruction = node.get("instruction", None) or args.instruction or ''

payload = json.dumps({"instruction": instruction,
"geometries": geom
}
)
payload = {"instruction": instruction,
"geometries": geom
}

identifier = node.get('id', None)

Expand All @@ -89,17 +155,17 @@ def select_tasks(newtasks, oldtasks):
continue
else:
# altrimenti (l'identificativo non esiste)
# è un nuovo task, lo carico
# nuovo task, lo carico
yield identifier, payload


def post_tasks(slug, newtasks, oldtasks):
def post_tasks(slug, tasks):
# and fire!
s = requests.session()

task_requests = []
newids = set()
for identifier, payload in select_tasks(newtasks, oldtasks):
for identifier, payload in tasks:
newids.add(identifier)
task_requests.append(
grequests.put(
Expand All @@ -111,38 +177,31 @@ def post_tasks(slug, newtasks, oldtasks):
return grequests.map(task_requests), newids


def update_tasks_instruction(slug, tasks, instruction):
def update_tasks(slug, tasks, instruction=None, statuses=None):
s = requests.session()

task_requests = []
for identifier, payload in tasks:
task_requests.append(
grequests.put(
mr_api_addtask_endpoint.format(slug=slug, id=identifier),
session=s,
data=payload,
headers=HEADERS))
if instruction is not None:
payload = {"instruction": instruction,
"geometries": payload["geometries"]
}

return grequests.map(task_requests), newids


def update_tasks_geometries(slug, tasks):
s = requests.session()
if identifier not in statuses.keys():
continue

task_requests = []
for identifier in closeids:
task_requests.append(
grequests.put(
mr_api_addtask_endpoint.format(slug=slug, id=identifier),
session=s,
data=payload,
headers=HEADERS))

return grequests.map(task_requests), newids
return grequests.map(task_requests)


def close_tasks(slug, closeids):
payload = {"status" : "deleted"}
payload = {"status": "deleted"}

s = requests.session()

Expand All @@ -155,69 +214,7 @@ def close_tasks(slug, closeids):
data=payload,
headers=HEADERS))

return grequests.map(task_requests), newids


def get_tasks_from_db(args):
db_user = args.user
if not args.user:
db_user = getpass.getuser()

db_name = args.database
if not args.database:
db_name = 'osm'

db_query = args.query

db_string = "dbname={db_name} user={db_user}".format(db_name=db_name,
db_user=db_user
)

if args.host:
db_string += " host={db_host}".format(db_host=args.host)

# open a connection, get a cursor
conn = psycopg2.connect(db_string)
cur = conn.cursor(cursor_factory=DictCursor)
register_hstore(cur)
# get our results
cur.execute(db_query)
nodes = cur.fetchall()

for node in nodes:
osmid = node["id"]

geom = {
"type": "FeatureCollection",
"features": [{
"type": "Feature",
"properties": {"osmid": osmid},
"geometry": json.loads(geojson.dumps(
wkb.loads(node["geom"].decode("hex"))))
}]
}

yield prepare_task(node=node,
args=args,
osmid=osmid,
geom=geom
)


def get_tasks_from_json(args):

with open(args.json_file, 'r') as infile:
tasks = json.load(infile)

for task in tasks:
osmid = task['geometries']['features'][0]['properties']['osmid']
geom = task['geometries']

yield prepare_task(node=task,
args=args,
osmid=osmid,
geom=geom
)
return grequests.map(task_requests)


def write_responses(responses, output):
Expand Down Expand Up @@ -318,7 +315,6 @@ def write_responses(responses, output):
help='JSON file with tasks')

args = parser.parse_args()
print args

if args.verbose:
rootlogger.setLevel(logging.INFO)
Expand Down Expand Up @@ -357,7 +353,9 @@ def write_responses(responses, output):
else:
create_challenge_if_not_exists(slug, challenge_title)

statuses = get_current_task_statuses(slug)
statuses = {}
if not args.dry:
statuses = get_current_task_statuses(slug)

tasks = []
if 'query' in args:
Expand All @@ -370,32 +368,33 @@ def write_responses(responses, output):
if not args.dry or args.force_post:
if not (args.close or
args.update_geometries or args.update_instruction):

tasks = select_tasks(tasks, statuses)
responses, newids = post_tasks(slug=slug, tasks=tasks)

write_responses(responses, args.output)

oldids = set(old
oldids = set(old
for old in statuses.keys()
if old['status'] in CFS_STATUSES)

closeids = oldids - newids
responses = close_tasks(slug, closeids)

else:
if args.update_instruction:
# prepare stuff to update/close
responses = update_tasks_instruction(
slug=slug,
tasks=tasks,
instructions=args.instruction)

elif args.update_geometries:
responses = update_tasks_geometries(slug=slug, tasks=tasks)

if args.update_instruction or args.update_geometries:
instructions = args.instruction or None
responses = update_tasks(slug=slug,
tasks=tasks,
instructions=instructions,
statuses=statuses
)
else:
# args.close = True
# calculate closeids
responses = close_tasks(slug=slug, closeids=closeids)
responses = close_tasks(slug=slug,
closeids=tasks,
statuses=statuses
)

write_responses(responses, args.output)

Expand Down

0 comments on commit f572918

Please sign in to comment.