Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added ETL #120

Merged
merged 41 commits into from
Dec 21, 2020
Merged

added ETL #120

merged 41 commits into from
Dec 21, 2020

Conversation

Giulio2002
Copy link
Contributor

No description provided.

db/silkworm/etl/buffer.cpp Outdated Show resolved Hide resolved
db/silkworm/etl/buffer.cpp Outdated Show resolved Hide resolved
db/silkworm/etl/file_provider.cpp Show resolved Hide resolved
db/silkworm/etl/file_provider.cpp Outdated Show resolved Hide resolved
db/silkworm/etl/file_provider.cpp Show resolved Hide resolved
db/silkworm/etl/file_provider.cpp Outdated Show resolved Hide resolved
db/silkworm/etl/file_provider.hpp Outdated Show resolved Hide resolved
db/silkworm/etl/file_provider.cpp Outdated Show resolved Hide resolved
@codecov-io
Copy link

codecov-io commented Dec 19, 2020

Codecov Report

Merging #120 (2536bbb) into master (a79b571) will decrease coverage by 0.01%.
The diff coverage is 0.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #120      +/-   ##
==========================================
- Coverage   85.80%   85.79%   -0.02%     
==========================================
  Files          60       60              
  Lines        5559     5560       +1     
==========================================
  Hits         4770     4770              
- Misses        789      790       +1     
Impacted Files Coverage Δ
db/silkworm/db/util.hpp 90.90% <0.00%> (-9.10%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a79b571...ca6676f. Read the comment docs.

@AndreaLanfranchi
Copy link
Contributor

I am struggling to understand what collector does particularly in its load method

db/silkworm/etl/collector.cpp Outdated Show resolved Hide resolved
db/silkworm/etl/collector.cpp Outdated Show resolved Hide resolved
db/silkworm/etl/collector.cpp Outdated Show resolved Hide resolved
db/silkworm/etl/collector.cpp Outdated Show resolved Hide resolved
db/silkworm/etl/collector.cpp Outdated Show resolved Hide resolved
@AndreaLanfranchi
Copy link
Contributor

@Giulio2002
About this

    while (heap.size() != 0) {

        std::pop_heap(heap.begin(), heap.end(), std::greater<Entry>());
        auto entry{heap.back()};
        heap.pop_back();
        auto pairs{load(entry.key, entry.value)};
        for (auto pair: pairs) table->put(pair.key, pair.value);
		auto next{data_providers_.at(entry.i).read_entry()};
        next.i = entry.i;
        if (next.key.size() == 0) {
            data_providers_.at(entry.i).reset();
            continue;
        }
        heap.push_back(next);
        std::push_heap(heap.begin(), heap.end(), std::greater<Entry>());
    }

Can you explain what it does ? My gut feeling is you're mixing a heap and a stack or a queue.
Correct me if I am wrong but my understanding is you're reading data from files and inserting results of reads sequentially into db tables ... when first file is completely read pass to next one and so on. Right ?
So why you're processing data backwards ? From last file to first one ?
I'm puzzled

@Giulio2002
Copy link
Contributor Author

@Giulio2002
About this

    while (heap.size() != 0) {

        std::pop_heap(heap.begin(), heap.end(), std::greater<Entry>());
        auto entry{heap.back()};
        heap.pop_back();
        auto pairs{load(entry.key, entry.value)};
        for (auto pair: pairs) table->put(pair.key, pair.value);
		auto next{data_providers_.at(entry.i).read_entry()};
        next.i = entry.i;
        if (next.key.size() == 0) {
            data_providers_.at(entry.i).reset();
            continue;
        }
        heap.push_back(next);
        std::push_heap(heap.begin(), heap.end(), std::greater<Entry>());
    }

Can you explain what it does ? My gut feeling is you're mixing a heap and a stack or a queue.
Correct me if I am wrong but my understanding is you're reading data from files and inserting results of reads sequentially into db tables ... when first file is completely read pass to next one and so on. Right ?
So why you're processing data backwards ? From last file to first one ?
I'm puzzled

ETL uses external merge sorting in order to put all the entries in sorted order by key. The file providers class just helps reading each entry. what it's doing is keeping in the heap (which i now replaced with a priority queue) at least one entry from each file at all times, and gradually put them in the database as they are sorted. The i member in the struct Entry just indicates from which file each entry is from when they are passed to the heap, so that when it's their turn for being processed, we can read another entry from that file(given we are not at EOF)

@Giulio2002
Copy link
Contributor Author

I am struggling to understand what collector does particularly in its load method

The load method is the equivalent of the LoadFunc of the golang implementation. It's just a component of the original implementation. In ETL, this function is called just before an entry is put in the db and can modify each entry RIGHT before they are put in the DB, or just do something additional.

@AndreaLanfranchi
Copy link
Contributor

Thank you so much for explanation but, besides forgiving me being so dumb, after your last commits I'm even more puzzled (if possible) :D

Let me recap and please correct me where I'm wrong:

  • collector is a hybrid component : it stores data into memory (buffer) and whenever memory overflows suggested capacity it flushes data to file on disk. Right ? This operation is carried by ...
  • flush_buffer which sorts buffer data and inserts (into data_providers_) a new instance of FileProvider which eventually writes down data on disk. buffer_ is emptied and ready to begin to accept new data til max capacity reached.

Am I right so far ?

Now come my problems in understanding Collector::load: my understanding is that while Collector::collect behaves as data producer, Collector::load should be the data consumer. Right ? My points are :

  • method load accepts as a parameter *table and load ... this should be avoided to have a parameter label with the same name of the called method (but this is negligible now). I assume the load parameter is a pointer to function wich returns a vector of data.
  • if I enter load and I have an empty data_providers_ vector I see buffer_ data are sorted and written to table. Eventually buffer_ is emptied and function returns.
  • if, instead, some data on disk already exists then buffer_ is flushed to disk too (regardless it has/hasn't reached capacity) and the vector of data_providers_ has incremented its size by one.

The first problem I see is you flush_buffer in any case which could mean you might end up writing an empty file if the buffer_ is empty which means every time I call Collector::load, if there are previously flushed files, for sure I will create a new empty one. In fact flush_buffer does not check buffer_ has data.

Now you create a prority queue populating it from one read obtained by each data_provider expecting the queue to have the smallest Entry on top. Here I observe you insert in the queue also empty values.
I'd rather change the range loop from this

    auto queue{std::priority_queue<Entry, std::vector<Entry>, std::greater<Entry>>()};
    for (auto& data_provider: data_providers_)
    {
        queue.push(data_provider.read_entry());
    }

to this

    auto queue{std::priority_queue<Entry, std::vector<Entry>, std::greater<Entry>>()};
    for (auto& data_provider : data_providers_) {
        auto item{data_provider.read_entry()};
        if (item.key.size()) {
            queue.push(item);
        }
    }

Even in this case however we do not know if data_provider has returned an empty item due to EOF reached (so effectively no data to process) or cause some other error has occurred. But besides this you will notice that Entry is defined as

struct Entry {
    ByteView key;
    ByteView value;
    int i; // Used only for heap operations
};

but data_provider.read_entry() does not value that i so all of your inserted records now have i==0 so by consequence all subsequent code where you refer to data_providers_.at(entry.i) will always point to first file.

At this point my understanding is you want to keep processing the smallest possible Entry obtained from any file. But this to me is a bit misleading as you don't want the smallest Entry ... but the smallest Entry.key So you may want to create a custom comparator for queue such as :

    auto key_comparer = [](Entry left, Entry right) { return left.key > right.key; };
    std::priority_queue<Entry, std::vector<Entry>, decltype(key_comparer) > queue(key_comparer);

Then in bottom loop you pick the smallest key by copy and immediately remove from the queue. There is no need. Simply defer the removal when you do not longer need the reference.

Other caveats :

  • your FileProvider constructor accepts an int as a parameter but you create instances passing data_providers_.size() which is an unsigned long long: this causes an auto narrow casting.
  • change FileProvider so it stores in id_ the ordinal number it has been created with: in such way you can mod the read_entry method to create an etl::Entry with i valued
  • change Buffer to expose its size() so you can decide whether or not has data
  • dont' use auto when the declared type is not from a function return: use an explicit declaration instead
  • collector does not detect or handle whether or not one or more data_provider is in error
  • be verbose in comments: comments help others understand

db/silkworm/etl/file_provider.hpp Outdated Show resolved Hide resolved
db/silkworm/etl/file_provider.cpp Outdated Show resolved Hide resolved
db/silkworm/etl/collector.hpp Outdated Show resolved Hide resolved
db/silkworm/etl/collector.hpp Show resolved Hide resolved
db/silkworm/etl/collector.hpp Outdated Show resolved Hide resolved
db/silkworm/etl/collector.cpp Outdated Show resolved Hide resolved
db/silkworm/etl/buffer.hpp Outdated Show resolved Hide resolved
@yperbasis yperbasis mentioned this pull request Dec 21, 2020
@AndreaLanfranchi AndreaLanfranchi merged commit 50852de into master Dec 21, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants