Skip to content


hub tutorial (up to uploaders)
Browse files Browse the repository at this point in the history
  • Loading branch information
sirloon committed Jan 31, 2017
1 parent d2a8d23 commit 32e3ee9
Showing 1 changed file with 350 additions and 3 deletions.
353 changes: 350 additions & 3 deletions docs/doc/hub_tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Let’s generate it, following instruction. Now we can run it again and try to c
Warning: Permanently added '[localhost]:7022' (RSA) to the list of known hosts.
Welcome to Species hub, guest!
Let’s try a command:
Expand Down Expand Up @@ -199,7 +199,7 @@ We now need to tell the dumper what to download, that is, create that self.to_du
current_localfile = new_localfile
if force or not os.path.exists(current_localfile) or self.remote_is_better(file_to_dump, current_localfile):
# register new release (will be stored in backend)
self.to_dump.append({"remote": file_to_dump, "local":new_localfile})
self.to_dump.append({"remote": file_to_dump, "local":new_localfile})
That method tries to get the latest downloaded file and then compare that file with the remote file using
``self.remote_is_better(file_to_dump, current_localfile)``, which compares the dates and return True if the remote is more recent.
Expand Down Expand Up @@ -334,7 +334,7 @@ We can notice the timestamp used to create the folder. Let’s also have a look
We have some information about the download process, how long it took to download files, etc… We have the path to the
We have some information about the download process, how long it took to download files, etc… We have the path to the
``data_folder`` containing the latest version, the ``release`` number (here, it’s a timestamp), and a flag named ``pending_to_upload``.
That will be used later to automatically trigger an upload after a dumper has run.
Expand Down Expand Up @@ -407,3 +407,350 @@ Code is available on github:
`d3b3486f71e865235efd673d2f371b53eaa0bc5b <>`_
for whole changes and `dataload/sources/geneinfo/ <>`_ for the dumper.
Now that we have local data available, we can process them. We’re going to create 3 different uploaders, one for each datasource.
Each uploader will load data into MongoDB, into individual/single collections. Those will then be used in the last merging step.
Before going further, we’ll first create an UploaderManager instance and register some of its commands in the hub:
.. code-block:: python
import biothings.dataload.uploader as uploader
# will check every 10 seconds for sources to upload
umanager = uploader.UploaderManager(poll_schedule = '* * * * * */10', job_manager=jmanager)
# upload commands
"um" : umanager,
"upload" : umanager.upload_src,
Running the hub, we’ll see the kind of log statements:
.. code:: bash
INFO:species.hub:Found 2 resources to upload (['species', 'geneinfo'])
INFO:species.hub:Launch upload for 'species'
ERROR:species.hub:Resource 'species' needs upload but is not registerd in manager
INFO:species.hub:Launch upload for 'geneinfo'
ERROR:species.hub:Resource 'geneinfo' needs upload but is not registerd in manager
Indeed, datasources have been dumped, and a ``pending_to_upload`` flag has been to True in ``src_dump``. UploadManager polls this ``src_dump``
internal collection, looking for this flag. If set, it runs automatically the corresponding uploader(s). Since we didn’t implement any uploaders yet,
manager complains… Let’s fix that.
Taxonomy uploader
The taxonomy files we downloaded need to be parsed and stored into a MongoDB collection. We won’t go in too much details regarding the actual parsing,
there are two parsers, one for **nodes.dmp** and another for **names.dmp** files. They yield dictionaries as the result of this parsing step. We just
need to “connect” those parsers to uploaders.
Following the same approach as for dumpers, we’re going to implement our first uploaders by inheriting one the base classes available in Biothings SDK.
We have two files to parse, data will stored in two different mongoDB collections, so we’re going to have two uploaders. Each inherits from
``biothings.dataload.uploader.BaseSourceUploader``, ``load_data`` method has to be implemented, this is where we “connect” parsers.
Beside this method, another important point relates to the storage engine. ``load_data`` will, through the parser, yield documents (dictionaries).
This data is processed internally by the base uploader class (``BaseSourceUploader``) using a storage engine. ``BaseSourceUploader`` uses
```` as its engine. This storage inserts data in mongoDB collection using bulk operations for better performances.
There are other storages available, depending on how data should be inserted (eg. IgnoreDuplicatedStorage will ignore any duplicated data error).
While choosing a base uploader class, we need to consider which storage class it’s actually using behind-the-scene (an alternative way to do this is
using ``BaseSourceUploader`` and set the class attribute storage_class, such as in this uploader:
`biothings/dataload/ <>`_).
The first uploader will take care of nodes.dmp parsing and storage.
.. code-block:: python
import biothings.dataload.uploader as uploader
from .parser import parse_refseq_names, parse_refseq_nodes
class TaxonomyNodesUploader(uploader.BaseSourceUploader):
main_source = "taxonomy"
name = "nodes"
def load_data(self,data_folder):
nodes_file = os.path.join(data_folder,"nodes.dmp")"Load data from file '%s'" % nodes_file)
return parse_refseq_nodes(open(nodes_file))
* ``TaxonomyNodesUploader`` derives from ``BaseSourceUploader``
* ``name`` gives the name of the collection used to store the data. If ``main_source`` is *not* defined,
it must match ``SRC_NAME`` in dumper’s attributes
* ``main_source`` is optional and allows to define main sources and sub-sources. Since we have 2 parsers here,
we’re going to have 2 collections created. For this one, we want the collection named “nodes”. But this parser
relates to *taxonomy* datasource, so we define a ``main source`` called **taxonomy**, which matches ``SRC_NAME`` in dumper’s attributes.
* ``load_data()`` has ``data_folder`` as parameter. It will be set accordingly, to the path of the last version dumped.
Also, that method gets data from parsing function ``parse_refseq_nodes``. It’s where we “connect” the parser. We just need to
return parser’s result so the storage can actually store the data.
The other parser, for names.dmp, is almost the same:
.. code-block:: python
class TaxonomyNamesUploader(uploader.BaseSourceUploader):
main_source = "taxonomy"
name = "names"
def load_data(self,data_folder):
names_file = os.path.join(data_folder,"names.dmp")"Load data from file '%s'" % names_file)
return parse_refseq_names(open(names_file))
We then need to “expose” those parsers in taxonomy package, in `dataload/sources/taxonomy/ <>`_:
.. code-block:: python
from .uploader import TaxonomyNodesUploader, TaxonomyNamesUploader
Now let’s try to run the hub again. We should see uploader manager has automatically triggered some uploads:
.. code:: bash
INFO:species.hub:Launch upload for 'taxonomy'
INFO:taxonomy.names_upload:Uploading 'names' (collection: names)
INFO:taxonomy.nodes_upload:Uploading 'nodes' (collection: nodes)
INFO:taxonomy.nodes_upload:Load data from file './data/taxonomy/20170125/nodes.dmp'
INFO:taxonomy.names_upload:Load data from file './data/taxonomy/20170125/names.dmp'
INFO:root:Uploading to the DB...
INFO:root:Uploading to the DB...
While running, we can check what jobs are running, using top() command:
.. code:: bash
hub> top()
5795 | taxonomy.nodes | uploader | update_data | | 49.7MiB | 0.0% | 2017/01/25 14:58:40|15.49s
5796 | taxonomy.names | uploader | update_data | | 54.6MiB | 0.0% | 2017/01/25 14:58:40|15.49s
2 running job(s)
0 pending job(s), type 'top(pending)' for more
16 finished job(s), type 'top(done)' for more
We can see two uploaders running at the same time, one for each file. ``top(done)`` can also display jobs that are done and finally
``top(pending)`` can give an overview of jobs that are going to be launched when a worker is available (it happens when there are more
jobs created than the available number of workers overtime).
In ``src_dump`` collection, we can see some more information about the resource and its upload processes. Two jobs were created,
we have information about the duration, log files, etc...
.. code:: javascript
> db.src_dump.find({_id:"taxonomy"})
"_id" : "taxonomy",
"download" : {
"started_at" : ISODate("2017-01-25T13:09:26.423Z"),
"status" : "success",
"time" : "3.31s",
"logfile" : "./data/taxonomy/taxonomy_20170125_dump.log"
"data_folder" : "./data/taxonomy/20170125",
"release" : "20170125",
"upload" : {
"status" : "success",
"jobs" : {
"names" : {
"started_at" : ISODate("2017-01-25T14:58:40.034Z"),
"pid" : 5784,
"logfile" : "./data/taxonomy/taxonomy.names_20170125_upload.log",
"step" : "names",
"temp_collection" : "names_temp_eJUdh1te",
"status" : "success",
"time" : "26.61s",
"count" : 1552809,
"time_in_s" : 27
"nodes" : {
"started_at" : ISODate("2017-01-25T14:58:40.043Z"),
"pid" : 5784,
"logfile" : "./data/taxonomy/taxonomy.nodes_20170125_upload.log",
"step" : "nodes",
"temp_collection" : "nodes_temp_T5VnzRQC",
"status" : "success",
"time" : "22.4s",
"time_in_s" : 22,
"count" : 1552809
In the end, two collections were created, containing parsed data:
.. code:: javascript
> db.names.count()
> db.nodes.count()
> db.names.find().limit(2)
"_id" : 1,
"taxid" : 1,
"other_names" : [
"scientific_name" : "root"
"_id" : 2,
"other_names" : [
"not bacteria haeckel 1894"
"genbank_common_name" : "eubacteria",
"in-part" : [
"taxid" : 2,
"scientific_name" : "bacteria"
> db.nodes.find().limit(2)
{ "_id" : 1, "rank" : "no rank", "parent_taxid" : 1, "taxid" : 1 }
"_id" : 2,
"rank" : "superkingdom",
"parent_taxid" : 131567,
"taxid" : 2
Species uploader
Following the same guideline, we’re going to create another uploader for species file.
.. code-block:: python
import biothings.dataload.uploader as uploader
from .parser import parse_uniprot_speclist
class SpeciesUploader(uploader.BaseSourceUploader):
name = "species"
def load_data(self,data_folder):
nodes_file = os.path.join(data_folder,"speclist.txt")"Load data from file '%s'" % nodes_file)
return parse_uniprot_speclist(open(nodes_file))
In that case, we need only one uploader, so we just define “name” (no need to define main_source here).
We need to expose that uploader from the package, in `dataload/sources/species/ <>`_:
.. code-block:: python
from .uploader import SpeciesUploader
Let’s run this through the hub. We can use the “upload” command there (though manager should trigger the upload itself):
.. code:: bash
hub> upload("species")
[1] RUN {0.0s} upload("species")
Similar to dumpers, there are different steps we can individually call for an uploader:
* **data**: will take care of storing data
* **post**: calls post_update() method, once data has been inserted. Useful to post-process data or create an index for instance
* **master**: will register the source in src_master collection, which is used during the merge step.
Uploader method ``get_mapping()<`` can optionally returns an ElasticSearch mapping, it will be stored in src_master during
that step. We’ll see more about this later.
* **clean**: will clean temporary collections and other leftovers...
Within the hub, we can specify these steps manually (they’re all executed by default).
.. code:: bash
hub> upload("species",steps="clean")
Or using a list:
.. code:: bash
hub> upload("species",steps=["data","clean"])
Gene information uploader
Let’s move forward and implement the last uploader. The goal for this uploader is to identify whether, for a taxonomy ID, there are
existing/known genes. File contains information about genes, first column is the ``taxid``. We want to know all taxonomy IDs present
in the file, and the merged document, we want to add key such as ``{'has_gene' : True/False}``.
Obviously, we’re going to have a lot of duplicates, because for one taxid we can have many genes present in the files.
We have options here 1) remove duplicates before inserting data in database, or 2) let the database handle the duplicates (rejecting them).
Though we could process data in memory -- processed data is rather small in the end --, for demo purpose, we’ll go for the second option.
.. code-block:: python
import biothings.dataload.uploader as uploader
import as storage
from .parser import parse_geneinfo_taxid
class GeneInfoUploader(uploader.BaseSourceUploader):
storage_class = storage.IgnoreDuplicatedStorage
name = "geneinfo"
def load_data(self,data_folder):
gene_file = os.path.join(data_folder,"gene_info")"Load data from file '%s'" % gene_file)
return parse_geneinfo_taxid(open(gene_file))
* ``storage_class``: this is the most important setting in this case, we want to use a storage that will ignore any duplicated records.
* ``parse_geneinfo_taxid`` : is the parsing function, yield documents as ``{“_id” : "taxid"}``
The rest is closed to what we already encountered. Code is available on github in
`dataload/sources/geneinfo/ <>`_
When running the uploader, logs show statements like these:
.. code:: bash
INFO:species.hub:Found 1 resources to upload (['geneinfo'])
INFO:species.hub:Launch upload for 'geneinfo'
INFO:species.hub:Building task: functools.partial(<bound method UploaderManager.create_and_load of <UploaderManager [3 registered]: ['geneinfo', 'species', 'taxonomy']>>, <class 'dataload.sources.gen
einfo.uploader.GeneInfoUploader'>, job_manager=<biothings.utils.manager.JobManager object at 0x7fbf5f8c69b0>)
INFO:geneinfo_upload:Uploading 'geneinfo' (collection: geneinfo)
INFO:geneinfo_upload:Load data from file './data/geneinfo/20170125/gene_info'
INFO:root:Uploading to the DB...
INFO:root:Inserted 62 records, ignoring 9938 [0.3s]
INFO:root:Inserted 15 records, ignoring 9985 [0.28s]
INFO:root:Inserted 0 records, ignoring 10000 [0.23s]
INFO:root:Inserted 31 records, ignoring 9969 [0.25s]
INFO:root:Inserted 16 records, ignoring 9984 [0.26s]
INFO:root:Inserted 4 records, ignoring 9996 [0.21s]
INFO:root:Inserted 4 records, ignoring 9996 [0.25s]
INFO:root:Inserted 1 records, ignoring 9999 [0.25s]
INFO:root:Inserted 26 records, ignoring 9974 [0.23s]
INFO:root:Inserted 61 records, ignoring 9939 [0.26s]
INFO:root:Inserted 77 records, ignoring 9923 [0.24s]
While processing data in batch, some are inserted, others (duplicates) are ignored and discarded. The file is quite big, so the process can be long…
Note: should we want to implement the first option, the parsing function would build a dictionary indexed by taxid and would read the whole,
extracting taxid. The whole dict would then be returned, and then processed by storage engine.
So far, we’ve defined dumpers and uploaders, made them working together through some managers defined in the hub. We’re now ready to move the last step: merging data.

0 comments on commit 32e3ee9

Please sign in to comment.