Skip to content

Commit

Permalink
Merge pull request #567 from CartoDB/559-Base_task_for_interpolations
Browse files Browse the repository at this point in the history
Base task for interpolations
  • Loading branch information
antoniocarlon committed Aug 29, 2018
2 parents 39e4975 + 039273f commit 6c44fd9
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 129 deletions.
137 changes: 136 additions & 1 deletion tasks/base_tasks.py
Expand Up @@ -26,7 +26,7 @@
from lib.logger import get_logger

from tasks.meta import (OBSColumn, OBSTable, metadata, current_session,
session_commit, session_rollback)
session_commit, session_rollback, GEOM_REF)
from tasks.targets import (ColumnTarget, TagTarget, CartoDBTarget, PostgresTarget, TableTarget)
from tasks.util import (classpath, query_cartodb, sql_to_cartodb_table, underscore_slugify, shell,
create_temp_schema, unqualified_task_id, generate_tile_summary, uncompress_file)
Expand Down Expand Up @@ -1735,3 +1735,138 @@ def complete(self):

def output(self):
return LocalTarget(self._get_filepath())


class BaseInterpolationTask(TableTask):
def requires(self):
'''
This method MUST be overriden in subclasses.
Subclasses MUST define the following requirements
(MUST return a dictionary with the following keys):
'source_geom_columns' a task with the source geometry columns
'source_geom' a task with the source geometries (interpolated from)
'source_data_columns' a task with the source data columns
'source_data' a task with the source data (interpolated from)
'target_geom_columns' a task with the target geometry columns
'target_geom' a task with the source geometries (interpolated to)
'target_data_columns' a task with the target data columns
'''
raise NotImplementedError('The requires method must be overriden in subclasses')

def get_interpolation_parameters(self):
'''
This method MUST be overriden in subclasses.
Subclasses MUST define the following parameters
(MUST return a dictionary with the following keys):
'source_data_geoid' the name of the id column from the source data table
'source_geom_geoid' the name of the id column from the source geometries table
'target_data_geoid' the name of the id column from the target data table
'target_geom_geoid' the name of the id column from the target geometries table
'source_geom_geomfield' the name of the geometry column from the source geometries table
'target_geom_geomfield' the name of the geometry column from the target geometries table
'''
raise NotImplementedError('The get_interpolation_parameters method must be overriden in subclasses')

def targets(self):
return {
self.input()['target_geom'].obs_table: GEOM_REF,
}


class InterpolationTask(BaseInterpolationTask):
'''
This task interpolates the data for one geography level from the data/geometries from
another geography level by computing the intersections and area relations between the
geometries.
'''

def populate(self):
input_ = self.input()
interpolation_params = self.get_interpolation_parameters()

colnames = [x for x in list(self.columns().keys()) if x.lower() != interpolation_params['target_geom_geoid']]

stmt = '''
INSERT INTO {output} ({target_data_geoid}, {out_colnames})
SELECT {target_geom_geoid}, {sum_colnames}
FROM (
SELECT CASE WHEN ST_Within(target_geom_table.{target_geom_geomfield},
source_geom_table.{source_geom_geomfield})
THEN ST_Area(target_geom_table.{target_geom_geomfield}) /
Nullif(ST_Area(source_geom_table.{source_geom_geomfield}), 0)
WHEN ST_Within(source_geom_table.{source_geom_geomfield},
target_geom_table.{target_geom_geomfield})
THEN 1
ELSE ST_Area(ST_Intersection(source_geom_table.{source_geom_geomfield},
target_geom_table.{target_geom_geomfield})) /
Nullif(ST_Area(source_geom_table.{source_geom_geomfield}), 0)
END area_ratio,
target_geom_table.{target_geom_geoid}, {in_colnames}
FROM {source_data_table} source_data_table,
{source_geom_table} source_geom_table,
{target_geom_table} target_geom_table
WHERE source_data_table.{source_data_geoid} = source_geom_table.{source_geom_geoid}
AND ST_Intersects(source_geom_table.{source_geom_geomfield},
target_geom_table.{target_geom_geomfield}) = True
) q GROUP BY {target_geom_geoid}
'''.format(
output=self.output().table,
sum_colnames=', '.join(['round(sum({x} / Nullif(area_ratio, 0))) {x}'.format(x=x) for x in colnames]),
out_colnames=', '.join(colnames),
in_colnames=', '.join(colnames),
source_data_table=input_['source_data'].table,
source_geom_table=input_['source_geom'].table,
target_geom_table=input_['target_geom'].table,
source_data_geoid=interpolation_params['source_data_geoid'],
source_geom_geoid=interpolation_params['source_geom_geoid'],
target_data_geoid=interpolation_params['target_data_geoid'],
target_geom_geoid=interpolation_params['target_geom_geoid'],
source_geom_geomfield=interpolation_params['source_geom_geomfield'],
target_geom_geomfield=interpolation_params['target_geom_geomfield'],
)

current_session().execute(stmt)


class CoupledInterpolationTask(BaseInterpolationTask):
'''
This task interpolates the data for one geography level from the data/geometries from
another geography level when both layers are coupled.
Calculating the measurements for the target layer is a matter of adding up the values
from the source layer.
'''

def populate(self):
input_ = self.input()
interpolation_params = self.get_interpolation_parameters()

colnames = [x for x in list(self.columns().keys()) if x.lower() != interpolation_params['target_geom_geoid']]

stmt = '''
INSERT INTO {output} ({target_data_geoid}, {out_colnames})
SELECT target_geom_table.{target_geom_geoid}, {sum_colnames}
FROM {source_data_table} source_data_table,
{source_geom_table} source_geom_table,
{target_geom_table} target_geom_table
WHERE source_geom_table.{source_geom_geoid} = source_data_table.{source_data_geoid}
AND ST_Intersects(target_geom_table.{target_geom_geomfield},
ST_PointOnSurface(source_geom_table.{source_geom_geomfield}))
GROUP BY target_geom_table.{target_geom_geoid}
'''.format(
output=self.output().table,
out_colnames=', '.join(colnames),
sum_colnames=', '.join(['sum({x}) {x}'.format(x=x) for x in colnames]),
source_data_table=input_['source_data'].table,
source_geom_table=input_['source_geom'].table,
target_geom_table=input_['target_geom'].table,
target_data_geoid=interpolation_params['target_data_geoid'],
target_geom_geoid=interpolation_params['target_geom_geoid'],
source_data_geoid=interpolation_params['source_data_geoid'],
source_geom_geoid=interpolation_params['source_geom_geoid'],
source_geom_geomfield=interpolation_params['source_geom_geomfield'],
target_geom_geomfield=interpolation_params['target_geom_geomfield'],
)

current_session().execute(stmt)
175 changes: 47 additions & 128 deletions tasks/uk/census/wrapper.py
Expand Up @@ -4,7 +4,7 @@

from tasks.meta import current_session, GEOM_REF
from lib.timespan import get_timespan
from tasks.base_tasks import MetaWrapper, TableTask
from tasks.base_tasks import TableTask, InterpolationTask, CoupledInterpolationTask
from tasks.uk.cdrc import OutputAreas, OutputAreaColumns
from tasks.uk.census.metadata import CensusColumns
from tasks.uk.datashare import PostcodeAreas, PostcodeAreasColumns
Expand Down Expand Up @@ -157,80 +157,44 @@ def table_column_expression(data):
current_session().execute(stmt)


class CensusPostcodeAreas(TableTask):
def requires(self):
deps = {
'geom_oa_columns': OutputAreaColumns(),
'geom_pa_columns': PostcodeAreasColumns(),
'data_columns': CensusColumns(),
'geo_oa': OutputAreas(),
'geo_pa': PostcodeAreas(),
'census': CensusOutputAreas(),
}

return deps

def targets(self):
return {
self.input()['geo_pa'].obs_table: GEOM_REF,
}

class CensusPostcodeAreas(InterpolationTask):
def table_timespan(self):
return get_timespan('2011')

def columns(self):
cols = OrderedDict()
input_ = self.input()
cols['GeographyCode'] = input_['geom_pa_columns']['pa_id']
cols.update(input_['data_columns'])
cols['pa_id'] = input_['target_geom_columns']['pa_id']
cols.update(input_['source_data_columns'])
return cols

def populate(self):
input_ = self.input()
colnames = [x for x in list(self.columns().keys()) if x != 'GeographyCode']

stmt = '''
INSERT INTO {output} (geographycode, {out_colnames})
SELECT pa_id, {sum_colnames}
FROM (
SELECT CASE WHEN ST_Within(geo_pa.the_geom, geo_oa.the_geom)
THEN ST_Area(geo_pa.the_geom) / Nullif(ST_Area(geo_oa.the_geom), 0)
WHEN ST_Within(geo_oa.the_geom, geo_pa.the_geom)
THEN 1
ELSE ST_Area(ST_Intersection(geo_oa.the_geom, geo_pa.the_geom)) / Nullif(ST_Area(geo_oa.the_geom), 0)
END area_ratio,
pa_id, {in_colnames}
FROM {census_table} census,
{geo_oa_table} geo_oa,
{geo_pa_table} geo_pa
WHERE census.geographycode = geo_oa.oa_sa
AND ST_Intersects(geo_oa.the_geom, geo_pa.the_geom) = True
) q GROUP BY pa_id
'''.format(
output=self.output().table,
sum_colnames=', '.join(['round(sum({x} / area_ratio)) {x}'.format(x=x) for x in colnames]),
out_colnames=', '.join(colnames),
in_colnames=', '.join(colnames),
census_table=input_['census'].table,
geo_oa_table=input_['geo_oa'].table,
geo_pa_table=input_['geo_pa'].table,
)
def requires(self):
deps = {
'source_geom_columns': OutputAreaColumns(),
'source_geom': OutputAreas(),
'source_data_columns': CensusColumns(),
'source_data': CensusOutputAreas(),
'target_geom_columns': PostcodeAreasColumns(),
'target_geom': PostcodeAreas(),
'target_data_columns': CensusColumns(),
}

current_session().execute(stmt)
return deps

def get_interpolation_parameters(self):
params = {
'source_data_geoid': 'geographycode',
'source_geom_geoid': 'oa_sa',
'target_data_geoid': 'pa_id',
'target_geom_geoid': 'pa_id',
'source_geom_geomfield': 'the_geom',
'target_geom_geomfield': 'the_geom',
}

class CensusPostcodeEntitiesFromOAs(TableTask):
def requires(self):
'''
This method must be overriden in subclasses.
'''
raise NotImplementedError('The requires method must be overriden in subclasses')
return params

def targets(self):
return {
self.input()['target_geom'].obs_table: GEOM_REF,
}

class CensusPostcodeEntitiesFromOAs(InterpolationTask):
def table_timespan(self):
return get_timespan('2011')

Expand All @@ -241,39 +205,17 @@ def columns(self):
cols.update(input_['target_data_columns'])
return cols

def populate(self):
input_ = self.input()
colnames = [x for x in list(self.columns().keys()) if x != 'GeographyCode']

stmt = '''
INSERT INTO {output} (geographycode, {out_colnames})
SELECT geographycode, {sum_colnames}
FROM (
SELECT CASE WHEN ST_Within(geo_pe.the_geom, geo_oa.the_geom)
THEN ST_Area(geo_pe.the_geom) / Nullif(ST_Area(geo_oa.the_geom), 0)
WHEN ST_Within(geo_oa.the_geom, geo_pe.the_geom)
THEN 1
ELSE ST_Area(ST_Intersection(geo_oa.the_geom, geo_pe.the_geom)) / Nullif(ST_Area(geo_oa.the_geom), 0)
END area_ratio,
geo_pe.geographycode, {in_colnames}
FROM {census_table} census,
{geo_oa_table} geo_oa,
{geo_pe_table} geo_pe
WHERE census.geographycode = geo_oa.oa_sa
AND ST_Intersects(geo_oa.the_geom, geo_pe.the_geom) = True
) q GROUP BY geographycode
'''.format(
output=self.output().table,
sum_colnames=', '.join(['round(sum({x} / area_ratio)) {x}'.format(x=x) for x in colnames]),
out_colnames=', '.join(colnames),
in_colnames=', '.join(colnames),
census_table=input_['source_data'].table,
geo_oa_table=input_['source_geom'].table,
geo_pe_table=input_['target_geom'].table,
geo_id='pa_id'
)
def get_interpolation_parameters(self):
params = {
'source_data_geoid': 'geographycode',
'source_geom_geoid': 'oa_sa',
'target_data_geoid': 'geographycode',
'target_geom_geoid': 'geographycode',
'source_geom_geomfield': 'the_geom',
'target_geom_geomfield': 'the_geom',
}

current_session().execute(stmt)
return params


class CensusPostcodeDistricts(CensusPostcodeEntitiesFromOAs):
Expand Down Expand Up @@ -306,25 +248,14 @@ def requires(self):
return deps


class CensusSOAsFromOAs(TableTask):
class CensusSOAsFromOAs(CoupledInterpolationTask):
'''
As the SOAs and OAs layers are coupled and SOAs are bigger than OAs,
calculating the measurements for the SOAs is a matter of adding up
the values, so the data for the Super Output Areas is currently extracted
from the Output Areas.
'''

def requires(self):
'''
This method must be overriden in subclasses.
'''
raise NotImplementedError('The requires method must be overriden in subclasses')

def targets(self):
return {
self.input()['target_geom'].obs_table: GEOM_REF,
}

def table_timespan(self):
return get_timespan('2011')

Expand All @@ -335,29 +266,17 @@ def columns(self):
cols.update(input_['target_data_columns'])
return cols

def populate(self):
input_ = self.input()
colnames = [x for x in list(self.columns().keys()) if x != 'GeographyCode']

stmt = '''
INSERT INTO {output} (geographycode, {out_colnames})
SELECT geo_target.geographycode, {sum_colnames}
FROM {data_source} data_source,
{geo_source} geo_source,
{geo_target} geo_target
WHERE geo_source.oa_sa = data_source.geographycode
AND ST_Intersects(geo_target.the_geom, ST_PointOnSurface(geo_source.the_geom))
GROUP BY geo_target.geographycode
'''.format(
output=self.output().table,
out_colnames=', '.join(colnames),
sum_colnames=', '.join(['sum({x}) {x}'.format(x=x) for x in colnames]),
data_source=input_['source_data'].table,
geo_source=input_['source_geom'].table,
geo_target=input_['target_geom'].table,
)
def get_interpolation_parameters(self):
params = {
'source_data_geoid': 'geographycode',
'source_geom_geoid': 'oa_sa',
'target_data_geoid': 'geographycode',
'target_geom_geoid': 'geographycode',
'source_geom_geomfield': 'the_geom',
'target_geom_geomfield': 'the_geom',
}

current_session().execute(stmt)
return params


class CensusLowerSuperOutputAreas(CensusSOAsFromOAs):
Expand Down

0 comments on commit 6c44fd9

Please sign in to comment.