Skip to content

Commit

Permalink
Merge pull request #157 from CartoDB/add-errors-on-null-only
Browse files Browse the repository at this point in the history
catch empty return values and error on them
  • Loading branch information
andy-esch committed Jan 9, 2018
2 parents 65c7841 + 32bb3b1 commit e5a03fc
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 201 deletions.
105 changes: 47 additions & 58 deletions src/py/crankshaft/crankshaft/analysis_data_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,84 +2,73 @@
import plpy
import pysal_utils as pu

NULL_VALUE_ERROR = ('No usable data passed to analysis. Check your input rows '
'for null values and fill in appropriately.')

class AnalysisDataProvider:
def get_getis(self, w_type, params):
"""fetch data for getis ord's g"""

def verify_data(func):
"""decorator to verify data result before returning to algorithm"""
def wrapper(*args, **kwargs):
"""Error checking"""
try:
query = pu.construct_neighbor_query(w_type, params)
result = plpy.execute(query)
# if there are no neighbors, exit
if len(result) == 0:
return pu.empty_zipped_array(4)
data = func(*args, **kwargs)
if not data:
plpy.error(NULL_VALUE_ERROR)
else:
return result
except plpy.SPIError, err:
plpy.error('Analysis failed: %s' % err)
return data
except Exception as err:
plpy.error('Analysis failed: {}'.format(err))

def get_markov(self, w_type, params):
"""fetch data for spatial markov"""
try:
query = pu.construct_neighbor_query(w_type, params)
data = plpy.execute(query)
return []

return wrapper

if len(data) == 0:
return pu.empty_zipped_array(4)

return data
except plpy.SPIError, err:
plpy.error('Analysis failed: %s' % err)
class AnalysisDataProvider(object):
@verify_data
def get_getis(self, w_type, params):
"""fetch data for getis ord's g"""
query = pu.construct_neighbor_query(w_type, params)
return plpy.execute(query)

@verify_data
def get_markov(self, w_type, params):
"""fetch data for spatial markov"""
query = pu.construct_neighbor_query(w_type, params)
return plpy.execute(query)

@verify_data
def get_moran(self, w_type, params):
"""fetch data for moran's i analyses"""
try:
query = pu.construct_neighbor_query(w_type, params)
data = plpy.execute(query)

# if there are no neighbors, exit
if len(data) == 0:
return pu.empty_zipped_array(2)
return data
except plpy.SPIError, err:
plpy.error('Analysis failed: %s' % e)
return pu.empty_zipped_array(2)
query = pu.construct_neighbor_query(w_type, params)
return plpy.execute(query)

@verify_data
def get_nonspatial_kmeans(self, query):
"""fetch data for non-spatial kmeans"""
try:
data = plpy.execute(query)
return data
except plpy.SPIError, err:
plpy.error('Analysis failed: %s' % err)
return plpy.execute(query)

@verify_data
def get_spatial_kmeans(self, params):
"""fetch data for spatial kmeans"""
query = ("SELECT "
"array_agg({id_col} ORDER BY {id_col}) as ids,"
"array_agg(ST_X({geom_col}) ORDER BY {id_col}) As xs,"
"array_agg(ST_Y({geom_col}) ORDER BY {id_col}) As ys "
"FROM ({subquery}) As a "
"WHERE {geom_col} IS NOT NULL").format(**params)
try:
data = plpy.execute(query)
return data
except plpy.SPIError, err:
plpy.error('Analysis failed: %s' % err)
query = '''
SELECT
array_agg("{id_col}" ORDER BY "{id_col}") as ids,
array_agg(ST_X("{geom_col}") ORDER BY "{id_col}") As xs,
array_agg(ST_Y("{geom_col}") ORDER BY "{id_col}") As ys
FROM ({subquery}) As a
WHERE "{geom_col}" IS NOT NULL
'''.format(**params)
return plpy.execute(query)

@verify_data
def get_gwr(self, params):
"""fetch data for gwr analysis"""
query = pu.gwr_query(params)
try:
query_result = plpy.execute(query)
return query_result
except plpy.SPIError, err:
plpy.error('Analysis failed: %s' % err)
return plpy.execute(query)

@verify_data
def get_gwr_predict(self, params):
"""fetch data for gwr predict"""
query = pu.gwr_predict_query(params)
try:
query_result = plpy.execute(query)
return query_result
except plpy.SPIError, err:
plpy.error('Analysis failed: %s' % err)
return plpy.execute(query)
14 changes: 7 additions & 7 deletions src/py/crankshaft/crankshaft/clustering/getis.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# High level interface ---------------------------------------


class Getis:
class Getis(object):
def __init__(self, data_provider=None):
if data_provider is None:
self.data_provider = AnalysisDataProvider()
Expand All @@ -31,13 +31,13 @@ def getis_ord(self, subquery, attr,
# geometries with attributes that are null are ignored
# resulting in a collection of not as near neighbors if kNN is chosen

qvals = OrderedDict([("id_col", id_col),
("attr1", attr),
("geom_col", geom_col),
("subquery", subquery),
("num_ngbrs", num_ngbrs)])
params = OrderedDict([("id_col", id_col),
("attr1", attr),
("geom_col", geom_col),
("subquery", subquery),
("num_ngbrs", num_ngbrs)])

result = self.data_provider.get_getis(w_type, qvals)
result = self.data_provider.get_getis(w_type, params)
attr_vals = pu.get_attributes(result)

# build PySAL weight object
Expand Down
10 changes: 5 additions & 5 deletions src/py/crankshaft/crankshaft/clustering/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from crankshaft.analysis_data_provider import AnalysisDataProvider


class Kmeans:
class Kmeans(object):
def __init__(self, data_provider=None):
if data_provider is None:
self.data_provider = AnalysisDataProvider()
Expand All @@ -20,12 +20,12 @@ def spatial(self, query, no_clusters, no_init=20):
"geom_col": "the_geom",
"id_col": "cartodb_id"}

data = self.data_provider.get_spatial_kmeans(params)
result = self.data_provider.get_spatial_kmeans(params)

# Unpack query response
xs = data[0]['xs']
ys = data[0]['ys']
ids = data[0]['ids']
xs = result[0]['xs']
ys = result[0]['ys']
ids = result[0]['ids']

km = KMeans(n_clusters=no_clusters, n_init=no_init)
labels = km.fit_predict(zip(xs, ys))
Expand Down
2 changes: 1 addition & 1 deletion src/py/crankshaft/crankshaft/clustering/moran.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# High level interface ---------------------------------------


class Moran:
class Moran(object):
def __init__(self, data_provider=None):
if data_provider is None:
self.data_provider = AnalysisDataProvider()
Expand Down
77 changes: 29 additions & 48 deletions src/py/crankshaft/crankshaft/pysal_utils/pysal_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,6 @@ def get_weight(query_res, w_type='knn', num_ngbrs=5):
Construct PySAL weight from return value of query
@param query_res dict-like: query results with attributes and neighbors
"""
# if w_type.lower() == 'knn':
# row_normed_weights = [1.0 / float(num_ngbrs)] * num_ngbrs
# weights = {x['id']: row_normed_weights for x in query_res}
# else:
# weights = {x['id']: [1.0 / len(x['neighbors'])] * len(x['neighbors'])
# if len(x['neighbors']) > 0
# else [] for x in query_res}

neighbors = {x['id']: x['neighbors'] for x in query_res}
print 'len of neighbors: %d' % len(neighbors)
Expand Down Expand Up @@ -148,22 +141,21 @@ def knn(params):
"attr_where_i": attr_where.replace("idx_replace", "i"),
"attr_where_j": attr_where.replace("idx_replace", "j")}

query = "SELECT " \
"i.\"{id_col}\" As id, " \
"%(attr_select)s" \
"(SELECT ARRAY(SELECT j.\"{id_col}\" " \
"FROM ({subquery}) As j " \
"WHERE " \
"i.\"{id_col}\" <> j.\"{id_col}\" AND " \
"%(attr_where_j)s " \
"ORDER BY " \
"j.\"{geom_col}\" <-> i.\"{geom_col}\" ASC " \
"LIMIT {num_ngbrs})" \
") As neighbors " \
"FROM ({subquery}) As i " \
"WHERE " \
"%(attr_where_i)s " \
"ORDER BY i.\"{id_col}\" ASC;" % replacements
query = '''
SELECT
i."{id_col}" As id,
%(attr_select)s
(SELECT ARRAY(SELECT j."{id_col}"
FROM ({subquery}) As j
WHERE i."{id_col}" <> j."{id_col}" AND
%(attr_where_j)s AND
j."{geom_col}" IS NOT NULL
ORDER BY j."{geom_col}" <-> i."{geom_col}" ASC
LIMIT {num_ngbrs})) As neighbors
FROM ({subquery}) As i
WHERE %(attr_where_i)s AND i."{geom_col}" IS NOT NULL
ORDER BY i."{id_col}" ASC;
''' % replacements

return query.format(**params)

Expand All @@ -180,19 +172,20 @@ def queen(params):
"attr_where_i": attr_where.replace("idx_replace", "i"),
"attr_where_j": attr_where.replace("idx_replace", "j")}

query = "SELECT " \
"i.\"{id_col}\" As id, " \
"%(attr_select)s" \
"(SELECT ARRAY(SELECT j.\"{id_col}\" " \
"FROM ({subquery}) As j " \
"WHERE i.\"{id_col}\" <> j.\"{id_col}\" AND " \
"ST_Touches(i.\"{geom_col}\", j.\"{geom_col}\") AND " \
"%(attr_where_j)s)" \
") As neighbors " \
"FROM ({subquery}) As i " \
"WHERE " \
"%(attr_where_i)s " \
"ORDER BY i.\"{id_col}\" ASC;" % replacements
query = '''
SELECT
i."{id_col}" As id,
%(attr_select)s
(SELECT ARRAY(SELECT j."{id_col}"
FROM ({subquery}) As j
WHERE i."{id_col}" <> j."{id_col}" AND
ST_Touches(i."{geom_col}", j."{geom_col}") AND
%(attr_where_j)s)) As neighbors
FROM ({subquery}) As i
WHERE
%(attr_where_i)s
ORDER BY i."{id_col}" ASC;
''' % replacements

return query.format(**params)

Expand Down Expand Up @@ -256,15 +249,3 @@ def get_attributes(query_res, attr_num=1):
"""
return np.array([x['attr' + str(attr_num)] for x in query_res],
dtype=np.float)


def empty_zipped_array(num_nones):
"""
prepare return values for cases of empty weights objects (no neighbors)
Input:
@param num_nones int: number of columns (e.g., 4)
Output:
[(None, None, None, None)]
"""

return [tuple([None] * num_nones)]
1 change: 1 addition & 0 deletions src/py/crankshaft/crankshaft/random_seeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import random
import numpy


def set_random_seeds(value):
"""
Set the seeds of the RNGs (Random Number Generators)
Expand Down
8 changes: 4 additions & 4 deletions src/py/crankshaft/crankshaft/space_time_dynamics/markov.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from crankshaft.analysis_data_provider import AnalysisDataProvider


class Markov:
class Markov(object):
def __init__(self, data_provider=None):
if data_provider is None:
self.data_provider = AnalysisDataProvider()
Expand Down Expand Up @@ -61,14 +61,14 @@ def spatial_trend(self, subquery, time_cols, num_classes=7,
"subquery": subquery,
"num_ngbrs": num_ngbrs}

query_result = self.data_provider.get_markov(w_type, params)
result = self.data_provider.get_markov(w_type, params)

# build weight
weights = pu.get_weight(query_result, w_type)
weights = pu.get_weight(result, w_type)
weights.transform = 'r'

# prep time data
t_data = get_time_data(query_result, time_cols)
t_data = get_time_data(result, time_cols)

sp_markov_result = ps.Spatial_Markov(t_data,
weights,
Expand Down
3 changes: 3 additions & 0 deletions src/py/crankshaft/test/mock_plpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def debug(self, msg):
def info(self, msg):
self.infos.append(msg)

def error(self, msg):
self.notices.append(msg)

def cursor(self, query):
data = self.execute(query)
return MockCursor(data)
Expand Down
Loading

0 comments on commit e5a03fc

Please sign in to comment.