Skip to content

Commit

Permalink
Merge pull request #380 from CartoDB/372-Add_simplification_to_pipeline
Browse files Browse the repository at this point in the history
Add simplifications to the ETL pipeline
  • Loading branch information
antoniocarlon committed Nov 20, 2017
2 parents 32f8a48 + aee99be commit aea76e6
Show file tree
Hide file tree
Showing 25 changed files with 403 additions and 240 deletions.
2 changes: 1 addition & 1 deletion tasks/au/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ class BCP(TableTask):
resolution = Parameter()

def version(self):
return 2
return 3

def requires(self):
import_data = {}
Expand Down
18 changes: 13 additions & 5 deletions tasks/au/geo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from luigi import Parameter, WrapperTask

from tasks.base_tasks import ColumnsTask, DownloadUnzipTask, Shp2TempTableTask, TableTask, TagsTask
from tasks.base_tasks import (ColumnsTask, DownloadUnzipTask, Shp2TempTableTask, SimplifiedTempTableTask, TableTask,
TagsTask)
from tasks.util import shell
from tasks.meta import GEOM_REF, GEOM_NAME, OBSColumn, current_session, OBSTag
from tasks.tags import SectionTags, SubsectionTags, BoundaryTags
Expand Down Expand Up @@ -121,6 +122,14 @@ def input_shp(self):
yield shp


class SimplifiedRawGeometry(SimplifiedTempTableTask):
year = Parameter()
resolution = Parameter()

def requires(self):
return ImportGeography(year=self.year, resolution=self.resolution)


class GeographyColumns(ColumnsTask):

resolution = Parameter()
Expand Down Expand Up @@ -169,7 +178,6 @@ def columns(self):
tags=[source, license, sections['au'], subsections['names']],
)


cartographic_boundaries = [GEO_LGA, GEO_POA, GEO_CED, GEO_SED, GEO_SSC,
GEO_SA1, GEO_SA2, GEO_SA3, GEO_SA4,
GEO_STE, GEO_GCCSA, GEO_UCL,
Expand All @@ -178,7 +186,6 @@ def columns(self):
GEO_STE, GEO_GCCSA, GEO_UCL,
GEO_SOS, GEO_SOSR, GEO_SUA, GEO_RA]


cols = OrderedDict([
('geom_name', geom_name),
('geom_id', geom_id),
Expand All @@ -192,17 +199,18 @@ def columns(self):
col.tags.append(boundary_type['cartographic_boundary'])
return cols


class Geography(TableTask):

year = Parameter()
resolution = Parameter()

def version(self):
return 3
return 4

def requires(self):
return {
'data': ImportGeography(resolution=self.resolution, year=self.year),
'data': SimplifiedRawGeometry(resolution=self.resolution, year=self.year),
'columns': GeographyColumns(resolution=self.resolution)
}

Expand Down
22 changes: 22 additions & 0 deletions tasks/base_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from tasks.targets import (ColumnTarget, TagTarget, CartoDBTarget, PostgresTarget, TableTarget)
from tasks.util import (classpath, query_cartodb, sql_to_cartodb_table, underscore_slugify, shell,
create_temp_schema, unqualified_task_id, generate_tile_summary)
from tasks.simplification import SIMPLIFIED_SUFFIX
from tasks.simplify import Simplify

LOGGER = get_logger(__name__)

Expand Down Expand Up @@ -478,6 +480,26 @@ def clear_temp_table(task):
session.flush()


class SimplifiedTempTableTask(TempTableTask):
def get_table_id(self):
'''
Subclasses may override this method if an ETL task
needs a custom table_id.
Returns schema.tablename_without_hash
'''
return '.'.join([self.input().schema, '_'.join(self.input().tablename.split('_')[:-1])])

def run(self):
yield Simplify(schema=self.input().schema,
table=self.input().tablename,
table_id=self.get_table_id())

def output(self):
return PostgresTarget(self.input().schema,
self.input().tablename + SIMPLIFIED_SUFFIX)


class GdbFeatureClass2TempTableTask(TempTableTask):
'''
A task that extracts one vector shape layer from a geodatabase to a
Expand Down
2 changes: 1 addition & 1 deletion tasks/br/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ class Censos(TableTask):
resolution = Parameter()

def version(self):
return 5
return 6

def states(self):
'''
Expand Down
22 changes: 17 additions & 5 deletions tasks/br/geo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from luigi import Task, Parameter, WrapperTask

from tasks.base_tasks import ColumnsTask, DownloadUnzipTask, Shp2TempTableTask, TableTask
from tasks.base_tasks import ColumnsTask, DownloadUnzipTask, Shp2TempTableTask, SimplifiedTempTableTask, TableTask
from tasks.util import shell
from tasks.meta import GEOM_REF, GEOM_NAME, OBSColumn, current_session
from tasks.tags import SectionTags, SubsectionTags, BoundaryTags
Expand Down Expand Up @@ -157,18 +157,29 @@ def input_shp(self):
yield shp


class SimplifiedImportGeography(BaseParams, SimplifiedTempTableTask):
def get_table_id(self):
# Removing the hash and the State
# For example the table_id for 'br.geo.importgeography_setores_censitar_ac_b5dd2616fb'
# will be 'br.geo.importgeography_setores_censitar'
return '.'.join([self.input().schema, '_'.join(self.input().tablename.split('_')[:-2])])

def requires(self):
return ImportGeography(state=self.state, resolution=self.resolution)


class ImportAllStates(BaseParams, WrapperTask):

def requires(self):
for state in STATES:
yield ImportGeography(state=state)
yield SimplifiedImportGeography(state=state)


class ImportAllGeographies(BaseParams, WrapperTask):

def requires(self):
for resolution in GEOGRAPHIES:
yield ImportGeography(resolution=resolution)
yield SimplifiedImportGeography(resolution=resolution)


class GeographyColumns(ColumnsTask):
Expand Down Expand Up @@ -239,17 +250,18 @@ def columns(self):

return cols


class Geography(TableTask):

resolution = Parameter()

def version(self):
return 3
return 4

def requires(self):
import_data = {}
for state in STATES:
import_data[state] = ImportGeography(state=state, resolution=self.resolution)
import_data[state] = SimplifiedImportGeography(state=state, resolution=self.resolution)
return {
'data': import_data,
'columns': GeographyColumns(resolution=self.resolution)
Expand Down
15 changes: 11 additions & 4 deletions tasks/ca/statcan/geo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from luigi import Parameter, WrapperTask

from tasks.base_tasks import ColumnsTask, DownloadUnzipTask, Shp2TempTableTask, TableTask
from tasks.base_tasks import ColumnsTask, DownloadUnzipTask, Shp2TempTableTask, TableTask, SimplifiedTempTableTask
from tasks.util import shell
from tasks.meta import GEOM_REF, GEOM_NAME, OBSColumn, current_session
from tasks.tags import SectionTags, SubsectionTags, BoundaryTags
Expand Down Expand Up @@ -64,7 +64,6 @@
}



# http://www12.statcan.gc.ca/census-recensement/2011/geo/bound-limit/bound-limit-2011-eng.cfm
# 2011 Boundary Files
class DownloadGeography(DownloadUnzipTask):
Expand Down Expand Up @@ -97,6 +96,13 @@ def input_shp(self):
yield shp


class SimplifiedImportGeography(SimplifiedTempTableTask):
resolution = Parameter(default=GEO_PR)

def requires(self):
return ImportGeography(resolution=self.resolution)


class GeographyColumns(ColumnsTask):

resolution = Parameter(default=GEO_PR)
Expand Down Expand Up @@ -160,11 +166,11 @@ class Geography(TableTask):
resolution = Parameter(default=GEO_PR)

def version(self):
return 8
return 9

def requires(self):
return {
'data': ImportGeography(resolution=self.resolution),
'data': SimplifiedImportGeography(resolution=self.resolution),
'columns': GeographyColumns(resolution=self.resolution)
}

Expand Down Expand Up @@ -193,6 +199,7 @@ def requires(self):
for resolution in GEOGRAPHIES:
yield Geography(resolution=resolution)


class AllGeographyColumns(WrapperTask):

def requires(self):
Expand Down
28 changes: 20 additions & 8 deletions tasks/es/cnig.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from luigi import Task, Parameter, LocalTarget, WrapperTask

from tasks.base_tasks import ColumnsTask, TableTask, TagsTask, Shp2TempTableTask
from tasks.base_tasks import ColumnsTask, TableTask, TagsTask, Shp2TempTableTask, SimplifiedTempTableTask
from tasks.util import shell, classpath

from tasks.tags import SectionTags, SubsectionTags, BoundaryTags
Expand Down Expand Up @@ -58,6 +58,15 @@ def input_shp(self):
)).strip()


class SimplifiedImportGeometry(SimplifiedTempTableTask):
resolution = Parameter()
timestamp = Parameter()
id_aux = Parameter() # X for Peninsula and Balearic Islands, Y for Canary Islands

def requires(self):
return ImportGeometry(resolution=self.resolution, timestamp=self.timestamp, id_aux=self.id_aux)


class LicenseTags(TagsTask):

def version(self):
Expand Down Expand Up @@ -145,6 +154,7 @@ def columns(self):

return columns


class GeomRefColumns(ColumnsTask):

def version(self):
Expand All @@ -168,6 +178,7 @@ def columns(self):
)
return cols


class GeomNameColumns(ColumnsTask):

def version(self):
Expand Down Expand Up @@ -200,19 +211,19 @@ class Geometry(TableTask):
timestamp = Parameter(default='20150101')

def version(self):
return 11
return 12

def requires(self):
return {
'geom_columns': GeometryColumns(),
'geomref_columns': GeomRefColumns(),
'geomname_columns': GeomNameColumns(),
'peninsula_data': ImportGeometry(resolution=self.resolution,
timestamp=self.timestamp,
id_aux='x'),
'canary_data': ImportGeometry(resolution=self.resolution,
timestamp=self.timestamp,
id_aux='y')
'peninsula_data': SimplifiedImportGeometry(resolution=self.resolution,
timestamp=self.timestamp,
id_aux='x'),
'canary_data': SimplifiedImportGeometry(resolution=self.resolution,
timestamp=self.timestamp,
id_aux='y')
}

def timespan(self):
Expand Down Expand Up @@ -252,6 +263,7 @@ def populate(self):
session.execute(peninsula_query)
session.execute(canary_query)


class AllGeometries(WrapperTask):

def requires(self):
Expand Down
15 changes: 10 additions & 5 deletions tasks/es/ine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from lib.logger import get_logger

from tasks.base_tasks import ColumnsTask, TableTask, TagsTask, TempTableTask, MetaWrapper
from tasks.base_tasks import ColumnsTask, TableTask, TagsTask, TempTableTask, SimplifiedTempTableTask, MetaWrapper
from tasks.meta import OBSColumn, OBSTag, current_session, DENOMINATOR, GEOM_REF
from tasks.util import shell, classpath
from tasks.tags import SectionTags, SubsectionTags, UnitTags, BoundaryTags
Expand Down Expand Up @@ -55,6 +55,11 @@ def run(self):
shell(cmd)


class SimplifiedRawGeometry(SimplifiedTempTableTask):
def requires(self):
return RawGeometry()


class GeometryColumns(ColumnsTask):

def version(self):
Expand Down Expand Up @@ -99,12 +104,12 @@ def columns(self):
class Geometry(TableTask):

def version(self):
return 6
return 7

def requires(self):
return {
'meta': GeometryColumns(),
'data': RawGeometry()
'data': SimplifiedRawGeometry()
}

def columns(self):
Expand Down Expand Up @@ -1747,7 +1752,7 @@ def requires(self):
}

def version(self):
return 3
return 4

def timespan(self):
return '2011'
Expand Down Expand Up @@ -1993,7 +1998,7 @@ def requires(self):
}

def version(self):
return 3
return 4

def columns(self):
'''
Expand Down
2 changes: 1 addition & 1 deletion tasks/eu/eurostat.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class TableEU(TableTask):
year = Parameter()

def version(self):
return 8
return 9

def timespan(self):
return str(self.year).replace('_',' - ')
Expand Down
Loading

0 comments on commit aea76e6

Please sign in to comment.