Skip to content

Commit

Permalink
- Reformatting code based on Pinot style
Browse files Browse the repository at this point in the history
- Using dynamically generated null-bitmap instead of keeping a copy in memory
  • Loading branch information
icefury71 committed Sep 26, 2019
1 parent d8d5901 commit 005dec8
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@
import java.util.Collection;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.core.data.GenericRow;
import org.roaringbitmap.RoaringBitmap;

import static org.apache.pinot.common.utils.CommonConstants.Segment.NULL_FIELDS;


public class NullValueTransformer implements RecordTransformer {
private final Collection<FieldSpec> _fieldSpecs;
Expand Down Expand Up @@ -60,7 +59,7 @@ public GenericRow transform(GenericRow record) {
}
}

record.putField(NULL_FIELDS, nullColumnBitMap);
record.putField(CommonConstants.Segment.NULL_FIELDS, nullColumnBitMap);
return record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.metadata.RowMetadata;
import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.indexsegment.IndexSegmentUtils;
Expand All @@ -55,6 +56,7 @@
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
import org.apache.pinot.core.segment.index.data.source.ColumnDataSource;
import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
import org.apache.pinot.core.segment.index.readers.PresenceVectorReader;
import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnContext;
import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnProvider;
import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory;
Expand All @@ -67,8 +69,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.pinot.common.utils.CommonConstants.Segment.NULL_FIELDS;


public class MutableSegmentImpl implements MutableSegment {
// For multi-valued column, forward-index.
Expand Down Expand Up @@ -97,7 +97,6 @@ public class MutableSegmentImpl implements MutableSegment {
private final Map<String, RealtimeInvertedIndexReader> _invertedIndexMap = new HashMap<>();
private final Map<String, BloomFilterReader> _bloomFilterMap = new HashMap<>();
private final Map<String, RealtimePresenceVectorReaderWriter> _presenceVectorMap = new HashMap<>();
private final Map<Integer, RoaringBitmap> _docIdToNullColumnsBitsetMap = new HashMap<>();
private final IdMap<FixedIntArray> _recordIdMap;
private boolean _aggregateMetrics;

Expand Down Expand Up @@ -395,15 +394,11 @@ private void addInvertedIndex(int docId, Map<String, Object> dictIdMap) {
* @param docId specified docId for this row
*/
private void addPresenceVector(GenericRow row, int docId) {
if (row.getValue(NULL_FIELDS) == null) {
if (row.getValue(CommonConstants.Segment.NULL_FIELDS) == null) {
return;
}

// Keep track of the null columns bitmap associated with this docId
// This is used to populate NULL_FIELDS when re-creating a row
RoaringBitmap nullColumnsBitMap = (RoaringBitmap) row.getValue(NULL_FIELDS);
_docIdToNullColumnsBitsetMap.put(docId, nullColumnsBitMap);

RoaringBitmap nullColumnsBitMap = (RoaringBitmap) row.getValue(CommonConstants.Segment.NULL_FIELDS);
for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
String columnName = fieldSpec.getName();
if (nullColumnsBitMap.contains(_schema.getColumnId(columnName))) {
Expand Down Expand Up @@ -502,14 +497,24 @@ public List<StarTreeV2> getStarTrees() {
* @return Generic row with physical columns of the specified row.
*/
public GenericRow getRecord(int docId, GenericRow reuse) {
RoaringBitmap nullColumnBitMap = null;

for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
reuse.putField(column, IndexSegmentUtils
.getValue(docId, fieldSpec, _indexReaderWriterMap.get(column), _dictionaryMap.get(column),
_maxNumValuesMap.getOrDefault(column, 0)));

PresenceVectorReader reader = _presenceVectorMap.get(column);
if (reader != null && !reader.isPresent(docId)) {
if (nullColumnBitMap == null) {
nullColumnBitMap = new RoaringBitmap();
}
nullColumnBitMap.add(_schema.getColumnId(column));
}
}

reuse.putField(NULL_FIELDS, _docIdToNullColumnsBitsetMap.get(docId));
reuse.putField(CommonConstants.Segment.NULL_FIELDS, nullColumnBitMap);
return reuse;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,22 @@
import org.apache.pinot.core.segment.index.readers.PresenceVectorReader;
import org.roaringbitmap.buffer.MutableRoaringBitmap;


/**
* Defines a real-time presence vector to be used in realtime ingestion.
*/
public class RealtimePresenceVectorReaderWriter implements PresenceVectorReader {
private final MutableRoaringBitmap _nullBitmap;
private final MutableRoaringBitmap _nullBitmap;

public RealtimePresenceVectorReaderWriter() {
_nullBitmap = new MutableRoaringBitmap();
}
public RealtimePresenceVectorReaderWriter() {
_nullBitmap = new MutableRoaringBitmap();
}

public void setNull(int docId) {
_nullBitmap.add(docId);
}
public void setNull(int docId) {
_nullBitmap.add(docId);
}

public boolean isPresent(int docId) {
return !_nullBitmap.contains(docId);
}
public boolean isPresent(int docId) {
return !_nullBitmap.contains(docId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.data.StarTreeIndexSpec;
import org.apache.pinot.common.utils.BytesUtils;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.FileUtils;
import org.apache.pinot.common.utils.time.TimeUtils;
import org.apache.pinot.core.data.GenericRow;
Expand Down Expand Up @@ -67,7 +68,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.pinot.common.utils.CommonConstants.Segment.NULL_FIELDS;
import static org.apache.pinot.core.segment.creator.impl.V1Constants.MetadataKeys.Column.*;
import static org.apache.pinot.core.segment.creator.impl.V1Constants.MetadataKeys.Segment.*;
import static org.apache.pinot.core.segment.creator.impl.V1Constants.MetadataKeys.StarTree.*;
Expand Down Expand Up @@ -262,11 +262,10 @@ private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, SegmentG

@Override
public void indexRow(GenericRow row) {

// Determine if we need to process presence vector per row, per column
RoaringBitmap nullColumnsBitMap = null;
if (row.getValue(NULL_FIELDS) != null) {
nullColumnsBitMap = (RoaringBitmap) row.getValue(NULL_FIELDS);
if (row.getValue(CommonConstants.Segment.NULL_FIELDS) != null) {
nullColumnsBitMap = (RoaringBitmap) row.getValue(CommonConstants.Segment.NULL_FIELDS);
}

for (String columnName : _forwardIndexCreatorMap.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public interface ColumnIndexContainer {

/**
*
* @return Get the bloom filter for the column if it exists else {@code null}
* @return Get the presence vector for the column if it exists else {@code null}
*/
PresenceVectorReaderImpl getPresenceVector();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
*/
public interface PresenceVectorReader {

/**
* Check if the given docId is present in the corresponding column
*
* @param docId specifies ID to check for presence
* @return true if docId is present (non null). False otherwise
*/
boolean isPresent(int docId);

/**
* Check if the given docId is present in the corresponding column
*
* @param docId specifies ID to check for presence
* @return true if docId is present (non null). False otherwise
*/
boolean isPresent(int docId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class PresenceVectorReaderImpl implements PresenceVectorReader {
ImmutableRoaringBitmap _nullBitmap;

public PresenceVectorReaderImpl(PinotDataBuffer presenceVectorBuffer) throws IOException {
_nullBitmap = new ImmutableRoaringBitmap(presenceVectorBuffer.toDirectByteBuffer(0, (int)presenceVectorBuffer.size()));
_nullBitmap = new ImmutableRoaringBitmap(presenceVectorBuffer.toDirectByteBuffer(0, (int) presenceVectorBuffer.size()));
}

public boolean isPresent(int docId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.indexsegment.mutable;

import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.JSONRecordReader;
Expand All @@ -39,66 +38,66 @@
import java.util.Collections;
import java.util.List;


import static org.apache.pinot.common.utils.CommonConstants.Segment.NULL_FIELDS;

public class MutableSegmentImplPresenceVectorTest {
private static final String PINOT_SCHEMA_FILE_PATH = "data/test_presence_vector_pinot_schema.json";
private static final String DATA_FILE = "data/test_presence_vector_data.json";
private static CompositeTransformer _recordTransformer;
private static Schema _schema;
private static MutableSegmentImpl _mutableSegmentImpl;
private static List<String> _finalNullColumns;

@BeforeClass
public void setup() throws IOException {
URL schemaResourceUrl = this.getClass().getClassLoader().getResource(PINOT_SCHEMA_FILE_PATH);
URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE);
_schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
_recordTransformer = CompositeTransformer.getDefaultTransformer(_schema);
File avroFile = new File(dataResourceUrl.getFile());
_mutableSegmentImpl = MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema,
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet(),
false);
GenericRow reuse = new GenericRow();
try (RecordReader recordReader = new JSONRecordReader(avroFile, _schema)) {
while (recordReader.hasNext()) {
recordReader.next(reuse);
GenericRow transformedRow = _recordTransformer.transform(reuse);
_mutableSegmentImpl.index(transformedRow, null);
}
}
_finalNullColumns = Arrays.asList("signup_email", "cityid");
}
public class MutableSegmentImplPresenceVectorTest {
private static final String PINOT_SCHEMA_FILE_PATH = "data/test_presence_vector_pinot_schema.json";
private static final String DATA_FILE = "data/test_presence_vector_data.json";
private static CompositeTransformer _recordTransformer;
private static Schema _schema;
private static MutableSegmentImpl _mutableSegmentImpl;
private static List<String> _finalNullColumns;

@Test
public void testPresenceVector() throws Exception {
ColumnDataSource cityIdDataSource = _mutableSegmentImpl.getDataSource("cityid");
ColumnDataSource descriptionDataSource = _mutableSegmentImpl.getDataSource("description");
PresenceVectorReader cityIdPresenceVector = cityIdDataSource.getPresenceVector();
PresenceVectorReader descPresenceVector = descriptionDataSource.getPresenceVector();
Assert.assertFalse(cityIdPresenceVector.isPresent(0));
Assert.assertTrue(cityIdPresenceVector.isPresent(1));
Assert.assertTrue(descPresenceVector.isPresent(0));
Assert.assertTrue(descPresenceVector.isPresent(1));
@BeforeClass
public void setup()
throws IOException {
URL schemaResourceUrl = this.getClass().getClassLoader().getResource(PINOT_SCHEMA_FILE_PATH);
URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE);
_schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
_recordTransformer = CompositeTransformer.getDefaultTransformer(_schema);
File avroFile = new File(dataResourceUrl.getFile());
_mutableSegmentImpl = MutableSegmentImplTestUtils
.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
false);
GenericRow reuse = new GenericRow();
try (RecordReader recordReader = new JSONRecordReader(avroFile, _schema)) {
while (recordReader.hasNext()) {
recordReader.next(reuse);
GenericRow transformedRow = _recordTransformer.transform(reuse);
_mutableSegmentImpl.index(transformedRow, null);
}
}
_finalNullColumns = Arrays.asList("signup_email", "cityid");
}

@Test
public void testGetRecord() {
GenericRow reuse = new GenericRow();
_mutableSegmentImpl.getRecord(0, reuse);
List<String> nullColumns = new ArrayList<>();
RoaringBitmap nullBitmap = (RoaringBitmap) reuse.getValue(NULL_FIELDS);
for (String colName : _schema.getColumnNames()) {
if (nullBitmap.contains(_schema.getColumnId(colName))) {
nullColumns.add(colName);
}
}
Assert.assertEquals(nullColumns, _finalNullColumns);
@Test
public void testPresenceVector()
throws Exception {
ColumnDataSource cityIdDataSource = _mutableSegmentImpl.getDataSource("cityid");
ColumnDataSource descriptionDataSource = _mutableSegmentImpl.getDataSource("description");
PresenceVectorReader cityIdPresenceVector = cityIdDataSource.getPresenceVector();
PresenceVectorReader descPresenceVector = descriptionDataSource.getPresenceVector();
Assert.assertFalse(cityIdPresenceVector.isPresent(0));
Assert.assertTrue(cityIdPresenceVector.isPresent(1));
Assert.assertTrue(descPresenceVector.isPresent(0));
Assert.assertTrue(descPresenceVector.isPresent(1));
}

_mutableSegmentImpl.getRecord(1, reuse);
Assert.assertNull(reuse.getValue(NULL_FIELDS));
@Test
public void testGetRecord() {
GenericRow reuse = new GenericRow();
_mutableSegmentImpl.getRecord(0, reuse);
List<String> nullColumns = new ArrayList<>();
RoaringBitmap nullBitmap = (RoaringBitmap) reuse.getValue(NULL_FIELDS);
for (String colName : _schema.getColumnNames()) {
if (nullBitmap.contains(_schema.getColumnId(colName))) {
nullColumns.add(colName);
}
}
Assert.assertEquals(nullColumns, _finalNullColumns);

_mutableSegmentImpl.getRecord(1, reuse);
Assert.assertNull(reuse.getValue(NULL_FIELDS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,22 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class RealtimePresenceVectorReaderWriterTest {
private static RealtimePresenceVectorReaderWriter readerWriter = null;
private static RealtimePresenceVectorReaderWriter readerWriter = null;

@BeforeClass
public void setup() {
readerWriter = new RealtimePresenceVectorReaderWriter();
}
@BeforeClass
public void setup() {
readerWriter = new RealtimePresenceVectorReaderWriter();
}

@Test
public void testRealtimePresenceVectorReaderWriter() {
for (int i=0;i<100;i++) {
readerWriter.setNull(i);
}
for (int i=0;i<100;i++) {
Assert.assertFalse(readerWriter.isPresent(i));
}
@Test
public void testRealtimePresenceVectorReaderWriter() {
for (int i = 0; i < 100; i++) {
readerWriter.setNull(i);
}
for (int i = 0; i < 100; i++) {
Assert.assertFalse(readerWriter.isPresent(i));
}
}
}
Loading

0 comments on commit 005dec8

Please sign in to comment.