Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,37 @@ public int getLen() {
return this.isNull.length;
}

// ---------------------------------------------------------------------------------------------
// Flink 2.1-compatible accessors. Backed by the existing public {@code offsets}, {@code lengths}
// and {@code child} fields so legacy callers continue to work; the new {@link
// org.apache.hudi.table.format.cow.vector.reader.NestedColumnReader} (FLINK-35702 port) and any
// future Flink-2.1-style caller use these accessors.
// ---------------------------------------------------------------------------------------------

public long[] getOffsets() {
return offsets;
}

public void setOffsets(long[] offsets) {
this.offsets = offsets;
}

public long[] getLengths() {
return lengths;
}

public void setLengths(long[] lengths) {
this.lengths = lengths;
}

public ColumnVector getChild() {
return child;
}

public void setChild(ColumnVector child) {
this.child = child;
}

@Override
public ArrayData getArray(int i) {
long offset = offsets[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import lombok.Getter;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.columnar.vector.ColumnVector;
import org.apache.flink.table.data.columnar.vector.MapColumnVector;
import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
import org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
Expand All @@ -31,16 +32,74 @@ public class HeapMapColumnVector extends AbstractHeapVector
implements WritableColumnVector, MapColumnVector {

@Getter
private final WritableColumnVector keys;
private WritableColumnVector keys;
@Getter
private final WritableColumnVector values;
private WritableColumnVector values;

// ---------------------------------------------------------------------------------------------
// Flink 2.1 Dremel-style state. Populated by {@link
// org.apache.hudi.table.format.cow.vector.reader.NestedColumnReader} (FLINK-35702 port). The
// legacy {@link #getMap(int)} implementation below continues to use {@code ColumnarGroupMapData}
// — wiring it through these offsets/lengths happens in a follow-up PR that switches the read
// path. Left here so the new readers can compile against the additive surface.
// ---------------------------------------------------------------------------------------------
private long[] offsets;
private long[] lengths;
private int size;

public HeapMapColumnVector(int len, WritableColumnVector keys, WritableColumnVector values) {
super(len);
this.offsets = new long[len];
this.lengths = new long[len];
this.keys = keys;
this.values = values;
}

public long[] getOffsets() {
return offsets;
}

public void setOffsets(long[] offsets) {
this.offsets = offsets;
}

public long[] getLengths() {
return lengths;
}

public void setLengths(long[] lengths) {
this.lengths = lengths;
}

public int getSize() {
return size;
}

public void setSize(int size) {
this.size = size;
}

public void setKeys(WritableColumnVector keys) {
this.keys = keys;
}

public void setValues(WritableColumnVector values) {
this.values = values;
}

/**
* Returns the keys child vector typed as {@link ColumnVector}, matching the Flink 2.1 contract
* consumed by {@code NestedColumnReader}. Functionally equivalent to {@link #getKeys()}.
*/
public ColumnVector getKeyColumnVector() {
return keys;
}

/** Counterpart of {@link #getKeyColumnVector()} for the values child vector. */
public ColumnVector getValueColumnVector() {
return values;
}

@Override
public MapData getMap(int rowId) {
return new ColumnarGroupMapData(keys, values, rowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ public HeapRowColumnVector(int len, WritableColumnVector... vectors) {
this.vectors = vectors;
}

/**
* Flink 2.1-compatible accessor for the children vectors. Backed by the existing public {@code
* vectors} field so legacy callers continue to work; the new {@link
* org.apache.hudi.table.format.cow.vector.reader.NestedColumnReader} (FLINK-35702 port) and any
* future Flink-2.1-style caller use this accessor.
*/
public WritableColumnVector[] getFields() {
return vectors;
}

/** Counterpart of {@link #getFields()}. */
public void setFields(WritableColumnVector[] fields) {
this.vectors = fields;
}

@Override
public ColumnarRowData getRow(int i) {
ColumnarRowData columnarRowData = new ColumnarRowData(new VectorizedColumnBatch(vectors));
Expand Down
Loading
Loading