Skip to content

Commit

Permalink
Merge pull request #564 from CartoDB/560-Redesign_of_DownloadUnzipTask
Browse files Browse the repository at this point in the history
DownloadUncompressTask redesigned
  • Loading branch information
antoniocarlon committed Sep 11, 2018
2 parents 6c44fd9 + f760824 commit f902c39
Show file tree
Hide file tree
Showing 29 changed files with 238 additions and 424 deletions.
2 changes: 1 addition & 1 deletion docs/source/development.rst
Expand Up @@ -52,7 +52,7 @@ over again. These tasks are meant to take care of the most repetitive aspects.
:members:
:show-inheritance:

.. autoclass:: tasks.base_tasks.DownloadUnzipTask
.. autoclass:: tasks.base_tasks.RepoFileUnzipTask
:members:
:show-inheritance:

Expand Down
21 changes: 10 additions & 11 deletions docs/source/example.rst
Expand Up @@ -40,7 +40,7 @@ The actual flow of ``Task`` dependencies could be charted like this:
node_width = 240;
node_height = 60;

download_data [ label = "Download data", description = ":class:`~.tasks.DownloadUnzipTask`, :class:`Task` " ];
download_data [ label = "Download data", description = ":class:`~.tasks.RepoFileUnzipTask`, :class:`Task` " ];
import_data [ label = "Import data", description = ":class:`~.tasks.CSV2TempTableTask`, :class:`~.tasks.GeoFile2TempTableTask`, :class:`~.tasks.TempTableTask` "];
process_data [ label = "Preprocess data", description = ":class:`~.tasks.TempTableTask`", stacked ];
generate_metadata [ label = "Write metadata", description = ":class:`~.tasks.ColumnsTask`" ];
Expand Down Expand Up @@ -85,7 +85,7 @@ And navigate to it in your browser.
from tasks.util import underscore_slugify, shell, classpath
from tasks.base_tasks import (TempTableTask, TableTask, ColumnsTask,
DownloadUnzipTask, CSV2TempTableTask)
RepoFileUnzipTask, CSV2TempTableTask)
from tasks.meta import current_session, DENOMINATOR
# We like OrderedDict because it makes it easy to pass dicts
Expand Down Expand Up @@ -113,23 +113,22 @@ And navigate to it in your browser.
The first step of most ETLs is going to be downloading the source and
saving it to a temporary folder.

``DownloadUnzipTask`` is a utility class that handles the file naming
and unzipping of the temporary output for you. You just have to write
the code which will do the download to the output file name.
``RepoFileUnzipTask`` is a utility class that handles the file naming
and unzipping of the temporary output for you. It requires RepoFile
so all the files are managed by the repository. You just have to write
the code which will tell the downloader where to get the source file
from.

.. code:: python
class DownloadQCEW(DownloadUnzipTask):
class DownloadQCEW(RepoFileUnzipTask):
year = IntParameter()
URL = 'http://www.bls.gov/cew/data/files/{year}/csv/{year}_qtrly_singlefile.zip'
def download(self):
shell('wget -O {output}.zip {url}'.format(
output=self.output().path,
url=self.URL.format(year=self.year)
))
def get_url(self):
return URL.format(year=self.year)
Within the IPython environment, we can create and run the task within a
sandbox.
Expand Down
16 changes: 8 additions & 8 deletions docs/source/validation.rst
Expand Up @@ -16,7 +16,7 @@ Proper use of utility classes

There are extensive :ref:`abstract-classes` available for development. These
can do things like download and unzip a file to disk
(:ref:`tasks.base_tasks.DownloadUnzipTask`) and import a CSV on disk to a temporary
(:ref:`tasks.base_tasks.RepoFileUnzipTask`) and import a CSV on disk to a temporary
table (:ref:`tasks.base_tasks.CSV2TempTableTask`). These classes should be used when
possible to minimize specialized ETL code. In particular, these tasks save
output to well-known locations so as to avoid redundantly running the same
Expand Down Expand Up @@ -79,17 +79,17 @@ An example of this:

.. code:: python
from tasks.base_tasks import DownloadUnzipTask
from tasks.base_tasks import RepoFileUnzipTask
class MyBadTask(DownloadUnzipTask):
class MyBadTask(RepoFileUnzipTask):
goodparam = Parameter()
badparam = Parameter()
def url(self):
def get_url(self):
return 'http://somesite/with/data/{}'.format(self.goodparam)
:ref:`tasks.base_tasks.DownloadUnzipTask` will generate the location for a unique
:ref:`tasks.base_tasks.RepoFileUnzipTask` will generate the location for a unique
output file automatically based off of all its params, but ``badparam`` above
doesn't actually affect the file being downloaded. That means if we change
``badparam`` we'll download the same file twice.
Expand All @@ -102,17 +102,17 @@ parameters. For example:

.. code:: python
from tasks.base_tasks import DownloadUnzipTask
from tasks.base_tasks import RepoFileUnzipTask
class MyBadTask(DownloadUnzipTask):
class MyBadTask(RepoFileUnzipTask):
'''
My URL doesn't depend on `badparam`!
'''
goodparam = Parameter()
badparam = Parameter(default='foo')
def url(self):
def get_url(self):
return 'http://somesite/with/data/{}'.format(self.goodparam)
Now it's easy to simply forget that ``badparam`` even exists! But it still
Expand Down
24 changes: 8 additions & 16 deletions tasks/au/data.py
Expand Up @@ -7,7 +7,7 @@

from lib.timespan import get_timespan
from tasks.util import shell, copyfile
from tasks.base_tasks import ColumnsTask, DownloadUnzipTask, TableTask, CSV2TempTableTask, MetaWrapper, RepoFile
from tasks.base_tasks import ColumnsTask, RepoFileUnzipTask, TableTask, CSV2TempTableTask, MetaWrapper
from tasks.meta import current_session, OBSColumn, GEOM_REF
from tasks.au.geo import (SourceTags, LicenseTags, GEOGRAPHIES, GeographyColumns, Geography, GEO_MB, GEO_SA1)
from tasks.tags import SectionTags, SubsectionTags, UnitTags
Expand Down Expand Up @@ -39,28 +39,20 @@
URL = 'http://www.censusdata.abs.gov.au/CensusOutput/copsubdatapacks.nsf/All%20docs%20by%20catNo/{year}_{profile}_{resolution}_for_{state}/$File/{year}_{profile}_{resolution}_for_{state}_{header}-header.zip'


class DownloadData(DownloadUnzipTask):
class DownloadData(RepoFileUnzipTask):

year = Parameter()
resolution = Parameter()
profile = Parameter()
state = Parameter()
header = Parameter()

def version(self):
return 1

def requires(self):
return RepoFile(resource_id=self.task_id,
version=self.version(),
url=URL.format(year=self.year,
profile=self.profile,
resolution=self.resolution,
state=self.state,
header=self.header))

def download(self):
copyfile(self.input().path, '{output}.zip'.format(output=self.output().path))
def get_url(self):
return URL.format(year=self.year,
profile=self.profile,
resolution=self.resolution,
state=self.state,
header=self.header)


class ImportData(CSV2TempTableTask):
Expand Down
15 changes: 4 additions & 11 deletions tasks/au/geo.py
Expand Up @@ -3,7 +3,7 @@

from lib.timespan import get_timespan

from tasks.base_tasks import (ColumnsTask, DownloadUnzipTask, GeoFile2TempTableTask, SimplifiedTempTableTask, TableTask,
from tasks.base_tasks import (ColumnsTask, RepoFileUnzipTask, GeoFile2TempTableTask, SimplifiedTempTableTask, TableTask,
TagsTask, RepoFile)
from tasks.util import shell, copyfile, classpath, uncompress_file
from tasks.meta import GEOM_REF, GEOM_NAME, OBSColumn, current_session, OBSTag, OBSTable
Expand Down Expand Up @@ -97,23 +97,16 @@ def tags(self):
]


class DownloadGeography(DownloadUnzipTask):
class DownloadGeography(RepoFileUnzipTask):

year = Parameter()
resolution = Parameter()

URL = 'http://www.censusdata.abs.gov.au/CensusOutput/copsubdatapacks.nsf/All%20docs%20by%20catNo/Boundaries_{year}_{resolution}/\$File/{year}_{resolution}_shape.zip'

def version(self):
return 1

def requires(self):
return RepoFile(resource_id=self.task_id,
version=self.version(),
url=self.URL.format(resolution=self.resolution, year=self.year))
def get_url(self):
return self.URL.format(resolution=self.resolution, year=self.year)

def download(self):
copyfile(self.input().path, '{output}.zip'.format(output=self.output().path))

class DownloadAndMergeMeshBlocks(Task):

Expand Down
107 changes: 45 additions & 62 deletions tasks/base_tasks.py
Expand Up @@ -27,9 +27,10 @@

from tasks.meta import (OBSColumn, OBSTable, metadata, current_session,
session_commit, session_rollback, GEOM_REF)
from tasks.targets import (ColumnTarget, TagTarget, CartoDBTarget, PostgresTarget, TableTarget)
from tasks.targets import (ColumnTarget, TagTarget, CartoDBTarget, PostgresTarget, TableTarget, RepoTarget)
from tasks.util import (classpath, query_cartodb, sql_to_cartodb_table, underscore_slugify, shell,
create_temp_schema, unqualified_task_id, generate_tile_summary, uncompress_file)
create_temp_schema, unqualified_task_id, generate_tile_summary, uncompress_file,
copyfile)
from tasks.simplification import SIMPLIFIED_SUFFIX
from tasks.simplify import Simplify

Expand Down Expand Up @@ -408,31 +409,35 @@ def output(self):
return target


class DownloadUncompressTask(Task):
class RepoFileUncompressTask(Task):
'''
Download a compressed file to location {output} and uncompresses it to the folder
{output}. Subclasses only need to define the following methods:
:meth:`~.tasks.DownloadUncompressTask.download`
:meth:`~.tasks.DownloadUncompressTask.uncompress`
:meth:`~.tasks.RepoFileUncompressTask.uncompress`
:meth:`~.tasks.RepoFileUncompressTask.get_url`
'''

def download(self):
'''
Subclasses must override this. A good starting point is:
def version(self):
return 1

.. code:: python
def requires(self):
return RepoFile(resource_id=self.task_id, version=self.version(), url=self.get_url())

shell('wget -O {output}.zip {url}'.format(
output=self.output().path,
url=<URL>
))
def get_url(self):
'''
raise NotImplementedError('DownloadUncompressTask must define download()')
Subclasses must override this.
'''
raise NotImplementedError('RepoFileUncompressTask must define get_url()')

def copy_from_repo(self):
copyfile(self.input().path,
'{output}.{extension}'.format(output=self.output().path,
extension=self.compressed_extension))

def run(self):
os.makedirs(self.output().path)
try:
self.download()
self.copy_from_repo()
self.uncompress()
except:
os.rmdir(self.output().path)
Expand All @@ -442,7 +447,7 @@ def uncompress(self):
'''
Subclasses must override this.
'''
raise NotImplementedError('DownloadUncompressTask must define uncompress()')
raise NotImplementedError('RepoFileUncompressTask must define uncompress()')

def output(self):
'''
Expand All @@ -453,29 +458,29 @@ def output(self):
return LocalTarget(os.path.join('tmp', classpath(self), self.task_id))


class DownloadUnzipTask(DownloadUncompressTask):
class RepoFileUnzipTask(RepoFileUncompressTask):
'''
Download a zip file to location {output}.zip and unzip it to the folder
{output}. Subclasses only need to define a
:meth:`~.tasks.DownloadUnzipTask.download` method.
{output}.
'''
compressed_extension = 'zip'

def uncompress(self):
output = self.output().path
uncompress_file(self.output().path)


class DownloadGUnzipTask(DownloadUncompressTask):
class RepoFileGUnzipTask(RepoFileUncompressTask):
'''
Download a gz file to location {output}.gz and unzip it to the file
{output}/task_id.{file_extension} . Subclasses only need to define a
:meth:`~.tasks.DownloadGUnzipTask.download` method.
{output}/task_id.{file_extension}.
'''

compressed_extension = 'gz'
file_extension = Parameter(default='csv')

def uncompress(self):
gunzip = gzip.GzipFile('{output}.gz'.format(output=self.output().path), 'rb')
gunzip = gzip.GzipFile('{output}.{extension}'.format(output=self.output().path,
extension=self.compressed_extension),
'rb')
with open(os.path.join(self.output().path, '{filename}.{extension}'.format(
filename=self.task_id, extension=self.file_extension)), 'wb') as outfile:
outfile.write(gunzip.read())
Expand Down Expand Up @@ -1666,7 +1671,7 @@ class RepoFile(Task):
downloader = Parameter(default=base_downloader, significant=False)

_repo_dir = 'repository'
_path = None
_new_file_name = str(uuid.uuid4())

def requires(self):
return CreateRepoTable()
Expand All @@ -1677,32 +1682,25 @@ def run(self):
digest = digest_file(self.output().path)
self._to_db(self.resource_id, self.version, digest, self.url, self.output().path)

def _create_filepath(self):
return self._build_path(str(uuid.uuid4()))

def _build_path(self, filename):
return os.path.join(self._repo_dir, self.resource_id, str(self.version), filename)

def _retrieve_remote_file(self):
LOGGER.info('Downloading remote file')
if os.path.isfile(self.output().path):
os.remove(self.output().path)
self.downloader(self.url, self.output().path)

def _from_db(self, resource_id, version):
checksum, url, path = None, None, None
def _to_db(self, resource_id, version, checksum, url, path):
LOGGER.info('Storing entry in repository')
query = '''
SELECT checksum, url, path FROM "{schema}".{table}
WHERE id = '{resource_id}'
AND version = {version}
DELETE FROM "{schema}".{table}
WHERE id = '{resource_id}' AND version = {version};
'''.format(schema=self.input().schema,
table=self.input().tablename,
resource_id=resource_id,
version=version)
result = current_session().execute(query).fetchone()
if result:
checksum, url, path = result

return checksum, url, path

def _to_db(self, resource_id, version, checksum, url, path):
version=version,
url=url,
checksum=checksum,
path=path)
current_session().execute(query)
query = '''
INSERT INTO "{schema}".{table}
(id, version, url, checksum, path, added)
Expand All @@ -1717,24 +1715,9 @@ def _to_db(self, resource_id, version, checksum, url, path):
current_session().execute(query)
current_session().commit()

def _get_filepath(self):
path = self._from_db(self.resource_id, self.version)[2]
if not path:
if not self._path:
self._path = self._create_filepath()
path = self._path

return path

def complete(self):
deps = self.deps()
if deps and not all([d.complete() for d in deps]):
return False

return self._from_db(self.resource_id, self.version)[2] is not None

def output(self):
return LocalTarget(self._get_filepath())
return RepoTarget(self.input().schema, self.input().tablename,
self._repo_dir, self.resource_id, self.version, self._new_file_name)


class BaseInterpolationTask(TableTask):
Expand Down

0 comments on commit f902c39

Please sign in to comment.