Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-542: Adding dictionary encoding to FileWriter #334

Closed
wants to merge 23 commits into from

Conversation

elahrvivaz
Copy link
Contributor

@elahrvivaz elahrvivaz commented Feb 10, 2017

WIP for comments

@wesm
Copy link
Member

wesm commented Feb 12, 2017

Will review when I can (possibly tomorrow, or Monday). I will make an effort to do the C++ side of the implementation in the next 2 weeks to help move us toward integration tests.

@wesm
Copy link
Member

wesm commented Feb 13, 2017

Watch out for git merge -- it's almost always better to rebase

@elahrvivaz
Copy link
Contributor Author

thanks, I'll squash/rebase once I've gotten closer to finishing.

@wesm
Copy link
Member

wesm commented Feb 16, 2017

I'll give this some more careful review tomorrow -- I will start on the C++ implementation tomorrow or this weekend sometime

@wesm
Copy link
Member

wesm commented Feb 16, 2017

@julienledem if you have time to review, that would be very helpful. thanks!

@elahrvivaz
Copy link
Contributor Author

thanks, I think it's pretty much done, at least for a first cut

@elahrvivaz elahrvivaz changed the title WIP: ARROW-542: Adding dictionary encoding to FileWriter ARROW-542: Adding dictionary encoding to FileWriter Feb 16, 2017
@wesm
Copy link
Member

wesm commented Feb 16, 2017

Awesome. Really excited to get this working

@julienledem
Copy link
Member

I'm currently reviewing

Copy link
Member

@julienledem julienledem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for your contribution.
This is a big patch.
Please see my comments bellow.

for (ArrowRecordBatch batch: batches) {
writer.writeRecordBatch(batch);
batch.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was this not needed before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was just an oversight - ArrowRecordBatch is Closeable, but they weren't ever being closed here.

Byte type = reader.nextBatchType();
if (type == null) {
break;
} else if (type == MessageHeader.RecordBatch) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:
else { switch (...

Byte type = reader.nextBatchType();
if (type == null) {
break;
} else if (type == MessageHeader.DictionaryBatch) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch?

Byte type = reader.nextBatchType();
assertEquals(new Byte(MessageHeader.RecordBatch), type);
try (ArrowRecordBatch result = reader.nextRecordBatch();) {
assertTrue(result != null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertNotNull

ArrowRecordBatch result = reader.nextRecordBatch();
assertTrue(result == null);
Byte type = reader.nextBatchType();
assertTrue(type == null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertNull

@@ -49,12 +53,12 @@ public DictionaryVector(ValueVector indices, Dictionary dictionary) {
* @param vector vector to encode
* @return dictionary encoded vector
*/
public static DictionaryVector encode(ValueVector vector) {
public static DictionaryVector encode(FieldVector vector) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I missed in the previous review, but the dictionary should be provided as a param at this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

dictionaryVector.allocateNewSafe();
for (Map.Entry<Integer, Integer> entry: transfers.entrySet()) {
dictionaryTransfer.copyValueSafe(entry.getKey(), entry.getValue());
}
dictionaryVector.getMutator().setValueCount(transfers.size());
Dictionary dictionary = new Dictionary(dictionaryVector, false);
Dictionary dictionary = new Dictionary(dictionaryVector);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not create Dictionaries without giving them an ID.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry that I've added confusion on this point. At least in the C++ implementation I don't intend to assign dictionary ids until entering the IPC code paths

throw new IOException("Invalid file. No batch at offset: " + block.getOffset());
}
return batch;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some common code with method bellow.

@@ -64,7 +67,7 @@ public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException {
private void checkStarted() throws IOException {
if (!started) {
started = true;
start();
writeMagic();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like we don't write the schema anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the schema is written in the footer - as far as i can tell, there was no reason to write it out a second time, and removing it didn't break anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @nongli ran into this issue (schema being written twice)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some ambiguity on whether the file serialization should be the stream bytes exactly, with a magic number header and additional metadata (schema, block index for random access, maybe stats in the future). In this case, the schema would be there twice.

The goal would be to make it as simple as possible (and a minor benefit or being more efficient going from file -> stream: just one seek and read).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll add it back in

@@ -36,6 +38,7 @@
private ReadChannel in;
private final BufferAllocator allocator;
private Schema schema;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema is read from the input stream now, so can't be final as it's not initially loaded

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. Thanks

@elahrvivaz
Copy link
Contributor Author

thanks, I will fix this up next week

@wesm
Copy link
Member

wesm commented Feb 21, 2017

I'm working on the C++ patch, should have it up later today or tomorrow. Integration tests will require follow up patches to add dictionary support to the JSON readers and writers.

@wesm
Copy link
Member

wesm commented Feb 22, 2017

As I believe @elahrvivaz noted elsewhere, this is pretty tricky to implement because the schema for the record batch embedded in the DictionaryBatch message is contained in the schema for the "parent" record batches. Having not appropriately planned ahead for this, it's resulting in a lot of refactoring

I don't think this experience suggests we should change anything about the metadata, but we may be able to offer some advice to future language implementations (cough JavaScript) to make the overall experience a little kinder.

@elahrvivaz
Copy link
Contributor Author

elahrvivaz commented Feb 22, 2017

@julienledem @wesm to consolidate the schema logic per your suggestion, I've merged the VectorLoader/Unloader with the ArrowWriter/Reader. Using them is a lot more straightforward now, but this is a pretty massive change - let me know if it's too much or hides too much functionality.
edit: I haven't added very much in the way of comments or code cleanup - wanted to get an initial approval first before spending more time.

Copy link
Member

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice work -- I think we're pretty close except for the code formatting issues and the issue around dictionaries in nested schemas.

You should add 2 test cases to account for the 2 more complicated dictionary-encoded cases: a dictionary-encoded nested vector (e.g. a List<Int32>, where the lists are dictionary encoded -- i.e. if we had [[0, 1], [0], [0, 1], [0]], you could encode this to [0, 1, 0, 1] with dictionary [[0, 1], [0]] ) and a nested vector containing dictionary-encoded values (I wrote a test case with List<String: dictionary-encoded>) -- this will force resolving the schema issue.

I think the refactoring to combine the VectorLoader/Unloader makes sense.

@@ -57,30 +57,28 @@ public ClientConnection(Socket socket) {

public void run() throws IOException {
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
List<ArrowRecordBatch> batches = new ArrayList<ArrowRecordBatch>();
List<ArrowRecordBatch> batches = new ArrayList<>();
List<ArrowDictionaryBatch> dictionaries = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used here?

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.file.ArrowWriter;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.stream.ArrowStreamReader;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you might have an import ordering problem. Some other code formatting issues below

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import ordering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think I fixed the ordering, let me know if you see anything still wrong


private ValueVector indices;
private Dictionary dictionary;
private final FieldVector indices;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

public Field getField() { return indices.getField(); }
public Field getField() {
Field field = indices.getField();
return new Field(field.getName(), field.isNullable(), field.getType(), dictionary.getEncoding(), field.getChildren());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transference of the field metadata from the indices to the dictionary should probably happen elsewhere (per above)


// go through to add dictionary id to the schema fields and to unload the dictionary batches
for (FieldVector vector: vectors) {
if (vector instanceof DictionaryVector) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is vectors the flattened tree (I don't think so but just checking)? Dictionaries can occur anywhere in a type tree, but the one constraint is that dictionaries cannot contain dictionary encoded data in their descendents.

As an example, consider the type List<String>, where the strings are dictionary encoded, so you might have

[['foo', 'bar'], ['foo'], ['bar']]

but really it's

[[0, 1], [0], [1]], and the inner child vector is a DictionaryVector. I have a test case for this in the C++ patch: https://github.com/wesm/arrow/blob/ARROW-459/cpp/src/arrow/ipc/test-common.h#L377

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirm that vectors is the top level. A vector for a complex type will have children.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the current dictionary encoding will work with complex types (lists or structs). I asked about that before and you guys said to hold off. I think that will require making dictionary a minor type so that dictionary encoded fields can be handled by the writers and readers.

Copy link
Member

@wesm wesm Feb 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I need to examine the minor type stuff more closely so I understand this. We don't have the concept of a minor type in the C++ library, because the data type class is the logical type. Seems like things are inverted here (the type classes are physical types, and the minor type is the logical type), and that is making things more difficult. I'm sorry if I've created more work with incorrect cross-communication

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confirming that it is fine to hold off on dictionary for complex types. However a primitive type can be dictionary encoded and be a child of a complex type.
See my comment bellow:
#334 (comment)
This should clarify how to create the DIctionaryVector in the right place.

Field replacement = new Field(field.getName(), field.isNullable(), dictionaryType, dictionary.getEncoding(), field.getChildren());

updatedFields.remove(fieldIndex);
updatedFields.add(fieldIndex, replacement);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I dealt with the tree traversal issue (with dictionaries occurring in children perhaps) was in 3 stages:

so I believe this is the reverse order of what is in this patch now. I'm not sure which is easier

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed.
When Reading a File:

  • you first get the schema. From the schema you can produce Map<DictionaryID, Field> which will capture the schema of each DictionaryBatch.
  • Then you read each DictionaryBatch. You know their schema from the Mapping above that let you load each dictionary in a vector (possibly a complex one with children). You get a list of Dictionaries. Each dictionary is defined as follows: Dictionary { DictionaryID id; Field schema; ValueVector values}
  • Then you read each RecordBatch. Fields that are Dictionary encoded will have a DictionaryVector instead of the ValueVector of the original type. Possibly you need to create a new MinorType for that (basically one MinorType per Vector class)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@julienledem I originally went down the path of adding a new minor type for dictionaries, but @wesm asked me to roll those changes back and keep dictionary vector as a synthetic type... making it a minor type would modify all the generated classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@julienledem having a "synthetic type" (the composes the dictionary vector and the indices type) made things significantly easier, at least on the C++ side. See, for example: https://github.com/apache/arrow/blob/master/cpp/src/arrow/ipc/test-common.h#L360

I'm not sure what are the benefits of adding a MinorType (and touching all the generated code)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me clarifies this.
We should not add a type to the Messages.fbs for dictionary. I think that was part of the original PR and was rolled back.
The MinorType class is more of a Vector Class ID. It should be renamed IMO but this is a separate discussion. In my comment I said "possibly" meaning I was not quite sure, so let's take the MinorType comment out for now.
Here is what I was thinking about:
We need to change the implementation of FieldVector.initializeChildrenFromFields() [1]. Look in particular at the implementation in MapVector [2] that uses AbstractMapVector.add [3] which uses MinorType.getNewVector [4] to create a new vector.
initializeChildrenFromFields is responsible instantiating the right Vector implementation based on each Field in the Schema. So really [2]/[3] should be changed to create a DictionaryVector instead of the type that would be used if not dictionary encoded.
That would replace some of the logic in ArrowReader.initialize() in this PR. This may not require a new MinorType, you'll see when you're in there.

[1]

void initializeChildrenFromFields(List<Field> children);

[2]
public void initializeChildrenFromFields(List<Field> children) {

[3]
FieldVector vector = minorType.getNewVector(name, allocator, callBack, precisionScale);

[4]
public abstract FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@julienledem I'll look into this more closely, but wouldn't the problem be that we don't have the actual dictionary in FieldVector.initializeChildrenFormFields? Currently a DictionaryVector requires a dictionary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another possibility would be to require the user to dictionary encode their own vectors - e.g. if you define a field as dictionary encoded, you have to set the appropriate index values yourself, instead of the dictionary values. Then the dictionaries themselves are only required when converting to/from message format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i feel like a lot of the friction comes from the fact that much of the code assumes Fields are interchangeable with vectors (in that you can create a vector instance from a field when needed), but that is not true for dictionary vectors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elahrvivaz I'd suggest that we decouple the Dictionary from the DictionaryVector for now. For context, one of the goals of dictionary encoding is to work with dictionary ids directly (to do aggregations, joins, etc faster). We don't always need the Dictionary around when doing so.
The DictionaryVector can have a reference to the dictionary id instead of the dictionary itself. You could have a DictionaryProvider { Dictionary getDictionary(ID) } that returns the dictionary for the the id. That would remove the dependency on having a reference to the Dictionary to create the DictionaryVector and still allow decoding a vector by looking up the corresponding dictionary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that sounds good, thanks

@Override
public int hashCode() {
return Objects.hash(id, dictionary, ordered);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting issues in this file beyond indentation


public boolean isOrdered() {
return ordered;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting issues

org.apache.arrow.flatbuf.DictionaryEncoding.startDictionaryEncoding(builder);
org.apache.arrow.flatbuf.DictionaryEncoding.addId(builder, dictionary.getId());
org.apache.arrow.flatbuf.DictionaryEncoding.addIsOrdered(builder, dictionary.isOrdered());
dictionaryOffset = org.apache.arrow.flatbuf.DictionaryEncoding.endDictionaryEncoding(builder);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not handling the null case for the indexType yet in C++, but we can address during integration testing

Copy link
Member

@julienledem julienledem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elahrvivaz, thanks for working on this.
I think we should not be merging VectorLoader/Unloader with the ArrowWriter/Reader. The goal is to be able to use VectorLoader and Unloader in other contexts than File Reading Writing. They're general purpose tools to load/unload buffers into vectors. If you want to define a class that encompasses reading a file and loading it into vectors then this should be a separate class the uses both the ArrowReader and VectorLoader. (favor composition over inheritance).
Sorry to spoil the fun. I do appreciate you working on this and I'm trying to not make you do extra work. I realize my previous review may have been a little unclear. Feel free to ask clarification on comments. Hopefully this one is better. We can also do a hangout if there's a need to discuss something.

arrowWriter.start();
while (true) {
int loaded = arrowReader.loadNextBatch();
if (loaded == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int loaded;
while ((loaded = arrowReader.loadNextBatch()) != 0) {
  arrowWriter.writeBatch(loaded);
}

writer.write(root);
}
int loaded = arrowReader.loadRecordBatch(rbBlock);
root.setRowCount(loaded);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems strange that we need to call setRowCount() separately from calling loadRecordBatch()
This makes the API less clear.
We should be able to load with one call. This seems to indicate that the loading happens at the wrong level.
(in this change the ArrowFileReader doesn't know the VectorSchemaRoot)

VectorSchemaRoot is supposed to bundle together everything that defines a record batch:

  • schema
  • vectors
  • rowCount (which is the same for all vectors)

While the schema and the list of vectors is immutable, the content of the vectors and rowCount changes every time we load a new RecordBatch.

Possibly the problem with VectorSchemaRoot is that the logic in it's constructors should be moved outside of it (for example, creating vectors from the schema)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case I was just re-using the VectorSchemaRoot in the unit test for simplicity, because the test checks it, so I wrapped the fields that are loaded into it.
The normal use case would be:

  • construct a reader on an input stream
  • schema and fields that are loaded are available via getters in the reader - they will read from the input stream when retrieved if not already loaded
  • call loadNextBatch or loadRecordBatch (for random access) - returns number of records loaded, and populates fields with those records
  • (do something with loaded fields)

It seemed redundant to have the VectorSchemaRoot, when most methods were retrieving the schema from the footer, not the VectorSchemaRoot. If we keep it around, the VectorSchemaRoot would have to be constructed in the reader, since we don't have the schema until something is actually read.

FieldVector from = rootVectors.get(i);
FieldVector to = arrowWriter.getVectors().get(i);
TransferPair transfer = from.makeTransferPair(to);
transfer.transfer();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From an API point of view, I don't think the writer should need to have its own vectors where the buffers need to be transferred.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

/**
* Loads buffers into vectors
*/
public class VectorLoader implements AutoCloseable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent was to be able to use this in other contexts than reading from a file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-added


private static final Logger LOGGER = LoggerFactory.getLogger(ArrowFileReader.class);

public static final byte[] MAGIC = "ARROW1".getBytes(StandardCharsets.UTF_8);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is mutable it should not be public.
If it needs to be accessed from somewhere else, you can add helper methods in this class that perform the operations needed. (comparing to data somewhere?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -47,6 +47,17 @@ public VectorSchemaRoot(FieldVector parent) {
}
}

public VectorSchemaRoot(List<Field> fields, List<FieldVector> fieldVectors) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly this is the only constructor we should have in VectorSchemaRoot (+ rowCount if vectors have data in them)
Then we would have separate (static?) methods to create a VectorSchemaRoot either from a Schema (when reading a file) or from existing vectors (when writing a file)


private ValueVector indices;
private Dictionary dictionary;
private final FieldVector indices;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meaning a regular ValueVector


public abstract class ArrowReader<T extends ReadChannel> implements AutoCloseable {

private final T in;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes 2 spaces. Sorry for being unclear.

dictionaries.put(dictionaryEncoding.getId(), dictionaryVector);
}
// create index vector
ArrowType dictionaryType = new ArrowType.Int(32, true); // TODO check actual index type
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to start with just one size. just throw UnsupportedOperationException if someone tries to use another type.

Field replacement = new Field(field.getName(), field.isNullable(), dictionaryType, dictionary.getEncoding(), field.getChildren());

updatedFields.remove(fieldIndex);
updatedFields.add(fieldIndex, replacement);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed.
When Reading a File:

  • you first get the schema. From the schema you can produce Map<DictionaryID, Field> which will capture the schema of each DictionaryBatch.
  • Then you read each DictionaryBatch. You know their schema from the Mapping above that let you load each dictionary in a vector (possibly a complex one with children). You get a list of Dictionaries. Each dictionary is defined as follows: Dictionary { DictionaryID id; Field schema; ValueVector values}
  • Then you read each RecordBatch. Fields that are Dictionary encoded will have a DictionaryVector instead of the ValueVector of the original type. Possibly you need to create a new MinorType for that (basically one MinorType per Vector class)

* Schema is modified in VectorLoader/Unloader to conform to message format
* Dictionary IDs will be assigned at that time if not predefined
* Stream reader must check for message type being read (dictionary or regular batch)
* VectorLoader now creates the VectorSchemaRoot, instead of it being passed in
Creating base class for stream/file writer
Creating base class with visitors for arrow messages
Indentation fixes
Other cleanup
…dictionary encoding to fields, restoring vector loader/unloader
@elahrvivaz
Copy link
Contributor Author

@julienledem @wesm I've gone through another round of refactoring. Main changes since last time:

  • threaded dictionary encoding throughout Fields and Field-related methods
  • fields are recursively examing for dictionary encodings
  • dictionaries are not passed around with the dictionary-encoded vectors - there is a DictionaryProvider interface for retrieving dictionaries
  • restored VectorLoader/VectorUnloader/VectorSchemaRoot

There's still some cleanup to do, but I'd appreciate your thoughts on the current approach.

Thanks,

@elahrvivaz
Copy link
Contributor Author

@wesm I added a test for a vector of List<String:encoded> - I think the other example you mentioned of List:encoded won't work right now since I haven't implemented dictionary encoding for lists and structs (maps)

@elahrvivaz
Copy link
Contributor Author

@wesm I think I know what the issue is - ran into it with the echo endpoint - let me see if I can push a fix

break;
} else {
writer.writeBatch();
reader.loadNextBatch();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a blocker, but having these two classes "wired" together through a mutable row batch (the root) seems like a bad code smell. It might be worth eventually changing the API to be

reader.loadNextBatch(root)
writer.writeBatch(root)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't disagree... It's complicated by the fact that both the reader and writer have to manipulate the schema to account for dictionaries.

Copy link
Member

@wesm wesm Mar 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you had a array or list of in-memory record batches in Java, can you write them with this API? I was able to add dictionaries to the C++ stream reader/writers without modifying the stream / file API, and schemas are immutable. I think it would be worth working toward that eventually, maybe we should open some JIRAs to cover

  • Making Java schemas immutable
  • Looser coupling between the reader and writer classes here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The writeRecordBatch method is still there, but it's protected visibility now. Is there a use case for creating batches outside the VectorUnloader? I consolidated the logic so that the file-reading/writing API was a lot simpler, but the underlying logic is the same.
The problem with schemas is that the java api doesn't represent things the same as the message format. In java, the vector type is the actual type of the vector, whereas in the message format it's the logical type. So in java a dictionary encoded varchar vector is type Int, but in the message format it's type varchar.
I think reading and then writing to another source seems like kind of an artificial use case that we're using for tests. The general workflow for writing is:

  • create a VectorSchemaRoot object with your vectors
  • create a writer with the VectorSchemaRoot and an output stream
  • modify your vectors, call writer.writeBatch, repeat
    For reading it's:
  • create a reader from an input stream - this will construct the VectorSchemaRoot object based on the schema read
  • call reader.loadBatch, do something with the vectors loaded into the VectorSchemaRoot, repeat
    Since they both use VectorSchemaRoot for the loaded vectors, it is easy to couple them together for the read/write.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably a question for @julienledem -- at least in C++, a basic unit of currency is std::vector<std::shared_ptr<RecordBatch>>, so to be unable to accumulate record batches in some kind of collection (like a std::deque or std::vector), particularly in a concurrent context, and write them out, would be very limiting.

With this API, I am not sure how you would do concurrent reads and writes of record batches. I don't think there's anything to be done right now, but I think we should open JIRAs at least pointing out the potential problems

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, how would you do concurrent reads/writes anyway? Wouldn't you have to read/write from/to the underlying streams?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading from one stream and writing to a different one (e.g. read from a socket, write to a file on disk)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are still methods to directly read/write the RecordBatches, they are just protected access. Possibly extracting out the lower-level methods into a subclass would be useful, so that they could be used directly if wanted but bypassed if not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on having the schema immutable.
On the java side here is we expect the following:

  • there's a single writer for a batch.
  • once the writer is done writing (usually ending by calling setRowCount()) there can be multiple concurrent readers.
  • as one of the goals is controlling how much memory is in use, the RecordBatch acts as an iterator. You load a Record Batch, then work with it, than load the next one once you're done. With fixed width schema, no re-allocation need to happen.
    Agreed we could improve the apis around this.

@wesm
Copy link
Member

wesm commented Mar 15, 2017

Integration tests still fail for me with the latest changes

@wesm
Copy link
Member

wesm commented Mar 15, 2017

I can try to help with your C++ build, are you using at least gcc 4.8? This part of the codebase doesn't work on Windows yet

@elahrvivaz
Copy link
Contributor Author

elahrvivaz commented Mar 15, 2017

thanks, I'm on a pretty vanilla ubuntu with gcc 5.4.0:

$ gcc --version
gcc (Ubuntu 5.4.0-6ubuntu1~16.04.4) 5.4.0 20160609

I installed cmake through sudo apt install cmake, but the cmake build is failing to find boost:

Make Error at /usr/share/cmake-3.5/Modules/FindBoost.cmake:1677 (message):
  Unable to find the requested Boost libraries.

  Unable to find the Boost header files.  Please set BOOST_ROOT to the root
  directory containing Boost or BOOST_INCLUDEDIR to the directory containing
  Boost's headers.
Call Stack (most recent call first):
  CMakeLists.txt:428 (find_package)


-- Boost include dir: Boost_INCLUDE_DIR-NOTFOUND
-- Boost libraries: 
CMake Error at CMakeLists.txt:400 (message):
  No static or shared library provided for boost_system
Call Stack (most recent call first):
  CMakeLists.txt:440 (ADD_THIRDPARTY_LIB)

@wesm
Copy link
Member

wesm commented Mar 15, 2017

Looks like we need to add boost installation instructions to the README. here's the package requirements we're using in Travis

sudo apt-get install libboost-dev libboost-filesystem-dev libboost-system-dev

should work

@elahrvivaz
Copy link
Contributor Author

thanks, i've got it running now

@wesm
Copy link
Member

wesm commented Mar 15, 2017

I'm looking at the C++ stream reader. It may not be dealing with a 0 control message properly, is that new?

@elahrvivaz
Copy link
Contributor Author

It was there before but I'm not sure it was always being sent. Should i just remove it?

@wesm
Copy link
Member

wesm commented Mar 15, 2017

That's the problem then -- @nongli and I discussed having this in the format so it's fine to keep sending it. Let me put up a commit for you to cherry pick

@elahrvivaz
Copy link
Contributor Author

great, thanks

@wesm
Copy link
Member

wesm commented Mar 15, 2017

see wesm@c2f216a

the integration tests still fail, but for a different reason:

-- Creating binary inputs
java -cp /home/wesm/code/arrow/java/tools/target/arrow-tools-0.2.1-SNAPSHOT-jar-with-dependencies.jar org.apache.arrow.tools.Integration -a /tmp/tmp_di6xk1g/34e1af729a2f4f2c8b1e8aa1db164d76 -j /home/wesm/code/arrow/integration/data/struct_example.json -c JSON_TO_ARROW
-- Validating file
/home/wesm/code/arrow/cpp/test-build/debug/json-integration-test --integration --arrow=/tmp/tmp_di6xk1g/34e1af729a2f4f2c8b1e8aa1db164d76 --json=/home/wesm/code/arrow/integration/data/struct_example.json --mode=VALIDATE
-- Validating stream
java -cp /home/wesm/code/arrow/java/tools/target/arrow-tools-0.2.1-SNAPSHOT-jar-with-dependencies.jar org.apache.arrow.tools.FileToStream /tmp/tmp_di6xk1g/34e1af729a2f4f2c8b1e8aa1db164d76 /tmp/tmp_di6xk1g/55cb603413b845bc88e003c3004992d1
cat /tmp/tmp_di6xk1g/55cb603413b845bc88e003c3004992d1 | /home/wesm/code/arrow/cpp/test-build/debug/stream-to-file > /tmp/tmp_di6xk1g/8ac9b39fc8414b2582174c73cda0a512
/home/wesm/code/arrow/cpp/test-build/debug/json-integration-test --integration --arrow=/tmp/tmp_di6xk1g/8ac9b39fc8414b2582174c73cda0a512 --json=/home/wesm/code/arrow/integration/data/struct_example.json --mode=VALIDATE
Command failed: /home/wesm/code/arrow/cpp/test-build/debug/json-integration-test --integration --arrow=/tmp/tmp_di6xk1g/8ac9b39fc8414b2582174c73cda0a512 --json=/home/wesm/code/arrow/integration/data/struct_example.json --mode=VALIDATE
With output:
--------------
Error message: Invalid: Record batch 0 did not match
JSON:
struct_nullable: 
  -- is_valid: [false, true, true, true, false, true, false]
  -- child 0 type: int32 values: [1402032511, null, 137773603, 410361374, 1959836418, null, null]
  -- child 1 type: string values: [null, "MhRNxD4", "3F9HBxK", "aVd88fp", null, "3loZrRf", null]

Arrow:
struct_nullable: 
  -- is_valid: all not null
  -- child 0 type: int32 values: [1402032511, null, 137773603, 410361374, 1959836418, null, null]
  -- child 1 type: string values: [null, "MhRNxD4", "3F9HBxK", "aVd88fp", null, "3loZrRf", null]


--------------
Traceback (most recent call last):
  File "integration_test.py", line 767, in <module>
    run_all_tests(debug=args.debug)
  File "integration_test.py", line 757, in run_all_tests
    runner.run()
  File "integration_test.py", line 624, in run
    consumer.validate(json_path, consumer_file_path)
  File "integration_test.py", line 725, in validate
    return self._run(arrow_path, json_path, 'VALIDATE')
  File "integration_test.py", line 722, in _run
    run_cmd(cmd)
  File "integration_test.py", line 84, in run_cmd
    raise e
  File "integration_test.py", line 76, in run_cmd
    output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
  File "/home/wesm/anaconda3/envs/arrow-test/lib/python3.5/subprocess.py", line 629, in check_output
    **kwargs).stdout
  File "/home/wesm/anaconda3/envs/arrow-test/lib/python3.5/subprocess.py", line 711, in run
    output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['/home/wesm/code/arrow/cpp/test-build/debug/json-integration-test', '--integration', '--arrow=/tmp/tmp_di6xk1g/8ac9b39fc8414b2582174c73cda0a512', '--json=/home/wesm/code/arrow/integration/data/struct_example.json', '--mode=VALIDATE']' returned non-zero exit status 1

this seems like an unloading/loading problem. let me attach the JSON file that causes this issue -- you can see the command lines to run in that output

@wesm
Copy link
Member

wesm commented Mar 15, 2017

ah it's failing on the struct_example.json which is in the repo

Change-Id: I770e7400d9a4eab32086c0a0f3b92b0a65c8c0e1
@elahrvivaz
Copy link
Contributor Author

Thanks, I'm able to reproduce the issue now

@elahrvivaz
Copy link
Contributor Author

@wesm I think I found the problem - BitVector wasn't loading itself correctly. Not entirely sure why this wasn't a problem before...
The integration tests are still failing for me on io-hdfs-test, but I'm wondering if I need to set up hdfs for that or something.

@wesm
Copy link
Member

wesm commented Mar 16, 2017

@elahrvivaz ah, don't worry about the C++ unit tests. I need to make the io-hdfs-test fail more gracefully on systems where HDFS is unavailable. let me try out the integration tests locally

@wesm
Copy link
Member

wesm commented Mar 16, 2017

Cool integration tests passing for me locally!

@elahrvivaz
Copy link
Contributor Author

woot!

@wesm
Copy link
Member

wesm commented Mar 16, 2017

The OS X build timed out, but the rest passed. I'm merging. Thanks a lot for your work on this!!

@asfgit asfgit closed this in 49f666e Mar 16, 2017
@elahrvivaz
Copy link
Contributor Author

awesome, thanks!

@wesm
Copy link
Member

wesm commented Mar 16, 2017

oops there's a java/tools/tmptestfilesio, I'll open a patch to remove

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants