Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-678: Multithread the flat file loader #428

Closed
wants to merge 9 commits into from

Conversation

cestella
Copy link
Member

@cestella cestella commented Jan 27, 2017

Currently the flat file loader is single threaded in its writing to HBase. We could make this a lot faster by multithreading the HBase puts.

Executing this on single node vagrant with the following configuration for 100k 2-column CSV enrichment import:

  • a batch size of 128
  • number of threads varying between 1 and 6

A reasonable speedup was achieved:

Number of Threads Time (in seconds)
1 91.019
2 76.07
3 39.974
4 35.039
5 30.531
6 30.559

chart

@cestella
Copy link
Member Author

cestella commented Jan 27, 2017

Testing Plan

Preliminaries

  • Download the alexa 1m dataset:
wget http://s3.amazonaws.com/alexa-static/top-1m.csv.zip
unzip top-1m.csv.zip
  • Create a 100k and single entry selection:
head -n 100000 top-1m.csv > top-100k.csv
head -n 1 top-1m.csv > top-1.csv
  • Create an extractor.json for the CSV data by editing extractor.json and pasting in these contents:
{
  "config" : {
    "columns" : {
       "domain" : 1,
       "rank" : 0
                }
    ,"indicator_column" : "domain"
    ,"type" : "alexa"
    ,"separator" : ","
             },
  "extractor" : "CSV"
}

Verify 100k import with 5 threads:

# truncate hbase
echo "truncate 'enrichment'" | hbase shell
# import data into hbase using 5 threads
/usr/metron/0.3.0/bin/flatfile_loader.sh -i ./top-100k.csv -t enrichment -c t -e ./extractor.json -p 5 -b 128
# count data written and verify it's 100k
echo "count 'enrichment'" | hbase shell

Verify 100k import with 5 threads and a batch of 1000:

# truncate hbase
echo "truncate 'enrichment'" | hbase shell
# import data into hbase using 5 threads
/usr/metron/0.3.0/bin/flatfile_loader.sh -i ./top-100k.csv -t enrichment -c t -e ./extractor.json -p 5 -b 1000
# count data written and verify it's 100k
echo "count 'enrichment'" | hbase shell

Verify 100k import with 1 threads:

# truncate hbase
echo "truncate 'enrichment'" | hbase shell
# import data into hbase using 5 threads
/usr/metron/0.3.0/bin/flatfile_loader.sh -i ./top-100k.csv -t enrichment -c t -e ./extractor.json -p 1 -b 128
# count data written and verify it's 100k
echo "count 'enrichment'" | hbase shell

Verify 1 entry import with 5 threads:

# truncate hbase
echo "truncate 'enrichment'" | hbase shell
# import data into hbase using 5 threads
/usr/metron/0.3.0/bin/flatfile_loader.sh -i ./top-1.csv -t enrichment -c t -e ./extractor.json -p 5 -b 128
# count data written and verify it's 1
echo "count 'enrichment'" | hbase shell

Verify 1 entry import with 1 threads:

# truncate hbase
echo "truncate 'enrichment'" | hbase shell
# import data into hbase using 5 threads
/usr/metron/0.3.0/bin/flatfile_loader.sh -i ./top-1.csv -t enrichment -c t -e ./extractor.json -p 1 -b 128
# count data written and verify it's 1
echo "count 'enrichment'" | hbase shell

@cestella
Copy link
Member Author

Note, the batch size is just the split size (number of lines processed in each split) for the spliterator. The default of 128 is probably pretty good for almost all cases, I think.

@cestella
Copy link
Member Author

Just as a comment: on my vagrant box, importing the alexa 1m took 4 minutes with 6 threads and 14 minutes with 1 thread.

@cestella cestella closed this Jan 28, 2017
@cestella cestella reopened this Jan 28, 2017
@cestella cestella closed this Jan 28, 2017
@cestella cestella reopened this Jan 28, 2017
@mmiklavc
Copy link
Contributor

Tested in Vagrant quick-dev and all numbers return as expected. Reviewing code now.

.collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
Assert.assertEquals(5, count.size());
Assert.assertEquals(3, (int) count.get("foo"));
Assert.assertEquals(2, (int) count.get("bar"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor Q - was grok intentionally excluded?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hah, no, not intentionally. I can add it in.


@Test
public void testActuallyParallel() throws ExecutionException, InterruptedException, FileNotFoundException {
//With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most min(5, 2) = 2 threads will be used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea for a test

@mmiklavc
Copy link
Contributor

Manual tests all checked out. Code looks good to me. The parallelism tests were a nice addition. +1

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
2 participants