Skip to content

Commit

Permalink
Simplify time conversion logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Neha Pawar authored and Neha Pawar committed Apr 14, 2020
1 parent 55c789a commit 2570bf7
Show file tree
Hide file tree
Showing 11 changed files with 362 additions and 360 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ public class ExpressionTransformer implements RecordTransformer {

private final Map<String, ExpressionEvaluator> _expressionEvaluators = new HashMap<>();

private final String _timeColumn;

public ExpressionTransformer(Schema schema) {
_timeColumn = schema.getTimeColumnName();
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
if (!fieldSpec.isVirtualColumn()) {
ExpressionEvaluator expressionEvaluator = ExpressionEvaluatorFactory.getExpressionEvaluator(fieldSpec);
Expand All @@ -55,9 +52,9 @@ public GenericRow transform(GenericRow record) {
for (Map.Entry<String, ExpressionEvaluator> entry : _expressionEvaluators.entrySet()) {
String column = entry.getKey();
ExpressionEvaluator transformExpressionEvaluator = entry.getValue();
// Skip transformation if column value already exist. Value can exist for time transformation (incoming name = outgoing name)
// Skip transformation if column value already exist.
// NOTE: column value might already exist for OFFLINE data
if (record.getValue(column) == null || column.equals(_timeColumn)) {
if (record.getValue(column) == null) {
Object result = transformExpressionEvaluator.evaluate(record);
record.putValue(column, result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Preconditions;
import java.io.File;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
import org.apache.pinot.spi.data.FieldSpec;
Expand All @@ -28,6 +29,7 @@
import org.apache.pinot.spi.data.readers.RecordReaderFactory;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -88,8 +90,8 @@ public void testRecordReaders()
GenericRow avroRecord = defaultTransformer.transform(avroRecordReader.next());
GenericRow csvRecord = defaultTransformer.transform(csvRecordReader.next());
GenericRow jsonRecord = defaultTransformer.transform(jsonRecordReader.next());
assertEquals(avroRecord, csvRecord);
assertEquals(avroRecord, jsonRecord);
checkEqualCSV(avroRecord, csvRecord);
checkEqual(avroRecord, jsonRecord);

// Check the values from the first record
if (numRecords == 1) {
Expand Down Expand Up @@ -133,13 +135,13 @@ public void testSameIncomingOutgoing()
GenericRow avroRecord = avroRecordReader.next();
GenericRow csvRecord = csvRecordReader.next();
GenericRow jsonRecord = jsonRecordReader.next();
assertEquals(avroRecord, csvRecord);
assertEquals(avroRecord, jsonRecord);
checkEqualCSV(avroRecord, csvRecord);
checkEqual(avroRecord, jsonRecord);

// Check the values from the first record
if (numRecords == 1) {
// Should be in incoming time data type (LONG)
assertEquals(avroRecord.getValue("time_day"), 1072889503L);
assertEquals(Long.valueOf(avroRecord.getValue("time_day").toString()), new Long(1072889503L));
}
}
assertEquals(numRecords, 10001);
Expand All @@ -164,13 +166,13 @@ public void testDifferentIncomingOutgoing()
GenericRow avroRecord = avroRecordReader.next();
GenericRow csvRecord = csvRecordReader.next();
GenericRow jsonRecord = jsonRecordReader.next();
assertEquals(avroRecord, csvRecord);
assertEquals(avroRecord, jsonRecord);
checkEqualCSV(avroRecord, csvRecord);
checkEqual(avroRecord, jsonRecord);

// Check the values from the first record
if (numRecords == 1) {
// Incoming time column
assertEquals(avroRecord.getValue("time_day"), 1072889503L);
assertEquals(Long.valueOf(avroRecord.getValue("time_day").toString()), new Long(1072889503L));

// Outgoing time column
assertEquals(avroRecord.getValue("column2"), 231355578);
Expand Down Expand Up @@ -198,8 +200,8 @@ public void testNoIncoming()
GenericRow avroRecord = avroRecordReader.next();
GenericRow csvRecord = csvRecordReader.next();
GenericRow jsonRecord = jsonRecordReader.next();
assertEquals(avroRecord, csvRecord);
assertEquals(avroRecord, jsonRecord);
checkEqualCSV(avroRecord, csvRecord);
checkEqual(avroRecord, jsonRecord);

// Check the values from the first record
if (numRecords == 1) {
Expand Down Expand Up @@ -232,13 +234,13 @@ public void testNoOutgoing()
GenericRow avroRecord = avroRecordReader.next();
GenericRow csvRecord = csvRecordReader.next();
GenericRow jsonRecord = jsonRecordReader.next();
assertEquals(avroRecord, csvRecord);
assertEquals(avroRecord, jsonRecord);
checkEqualCSV(avroRecord, csvRecord);
checkEqual(avroRecord, jsonRecord);

// Check the values from the first record
if (numRecords == 1) {
// Incoming time column
assertEquals(avroRecord.getValue("time_day"), 1072889503L);
assertEquals(Long.valueOf(avroRecord.getValue("time_day").toString()), new Long(1072889503L));

// Outgoing time column should be null
assertNull(avroRecord.getValue("outgoing"));
Expand All @@ -247,4 +249,35 @@ public void testNoOutgoing()
assertEquals(numRecords, 10001);
}
}

/**
* True data types are not achieved until DataType transformer. Hence, pure equality might not work in most cases (Integer Long etc)
*/
private void checkEqual(GenericRow row1, GenericRow row2) {
for (Map.Entry<String, Object> entry : row1.getFieldToValueMap().entrySet()) {
if (entry.getValue() == null) {
Assert.assertNull(row2.getValue(entry.getKey()));
} else {
Assert.assertEquals(entry.getValue().toString(), row2.getValue(entry.getKey()).toString());
}
}
}

/**
* True data types are not achieved until DataType transformer. Hence, pure equality might not work in most cases (Integer Long etc)
* Empty string gets treated as null value in CSV, because we no longer have data types
*/
private void checkEqualCSV(GenericRow row, GenericRow csvRecord) {
for (Map.Entry<String, Object> entry : row.getFieldToValueMap().entrySet()) {
Object row1Value = entry.getValue();
String row1Key = entry.getKey();
if (row1Value == null) {
Assert.assertNull(csvRecord.getValue(row1Key));
} else if (row1Value.toString().isEmpty()) {
Assert.assertEquals(csvRecord.getValue(row1Key), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING);
} else {
Assert.assertEquals(row1Value.toString(), csvRecord.getValue(row1Key).toString());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.data.readers.AbstractRecordExtractorTest;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.BytesUtils;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -123,136 +119,4 @@ public void testGroovyExpressionTransformer()
// convert to LONG
Assert.assertEquals(genericRow.getValue("hoursSinceEpoch"), 437222L);
}

@Test
public void testTimeSpecConversion() {
long validMillis = 1585724400000L;
long validHours = 440479;
ExpressionTransformer expressionTransformer;

// all combinations of timespec, and values in the incoming outgoing
Schema pinotSchema;

// 1] only incoming defined
pinotSchema = new Schema.SchemaBuilder()
.addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "incoming")).build();

// correct value - use incoming
expressionTransformer = new ExpressionTransformer(pinotSchema);
GenericRow genericRow = new GenericRow();
genericRow.putValue("incoming", validMillis);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("incoming"), validMillis);

// incorrect value - use whatever is
expressionTransformer = new ExpressionTransformer(pinotSchema);
genericRow = new GenericRow();
genericRow.putValue("incoming", -1);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("incoming"), -1);

// 2] both incoming and outgoing defined - exactly the same
pinotSchema = new Schema.SchemaBuilder()
.addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "time"),
new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "time")).build();

// correct value - use incoming
expressionTransformer = new ExpressionTransformer(pinotSchema);
genericRow = new GenericRow();
genericRow.putValue("time", validMillis);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("time"), validMillis);

// incorrect value - use whatever is
expressionTransformer = new ExpressionTransformer(pinotSchema);
genericRow = new GenericRow();
genericRow.putValue("time", -1);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("time"), -1);

// 3] both incoming and outgoing defined - same column name only
pinotSchema = new Schema.SchemaBuilder()
.addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "time"),
new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.HOURS, "time")).build();

// value matches incoming - convert
expressionTransformer = new ExpressionTransformer(pinotSchema);
genericRow = new GenericRow();
genericRow.putValue("time", validMillis);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("time"), validHours);

// value matches outgoing - use outgoing
expressionTransformer = new ExpressionTransformer(pinotSchema);
genericRow = new GenericRow();
genericRow.putValue("time", validHours);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("time"), validHours);

// value matches neither - exception
expressionTransformer = new ExpressionTransformer(pinotSchema);
genericRow = new GenericRow();
genericRow.putValue("time", -1);
try {
expressionTransformer.transform(genericRow);
Assert.fail("Value matches neither incoming nor outgoing, should have failed");
} catch (Exception e) {
// expected
}

// 4] both incoming and outgoing defined - different column names
pinotSchema = new Schema.SchemaBuilder()
.addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "incoming"),
new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.HOURS, "outgoing")).build();

// only valid incoming value exists - convert
expressionTransformer = new ExpressionTransformer(pinotSchema);
genericRow = new GenericRow();
genericRow.putValue("incoming", validMillis);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("outgoing"), validHours);

// only valid outgoing value exists - use outgoing
expressionTransformer = new ExpressionTransformer(pinotSchema);
genericRow = new GenericRow();
genericRow.putValue("outgoing", validHours);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("outgoing"), validHours);

// both valid incoming and outgoing exist - convert
expressionTransformer = new ExpressionTransformer(pinotSchema);
genericRow = new GenericRow();
genericRow.putValue("incoming", validMillis);
genericRow.putValue("outgoing", validHours);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("outgoing"), validHours);

// invalid incoming, valid outgoing - use outgoing
expressionTransformer = new ExpressionTransformer(pinotSchema);
genericRow = new GenericRow();
genericRow.putValue("incoming", -1);
genericRow.putValue("outgoing", validHours);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("outgoing"), validHours);

// valid incoming, invalid outgoing - convert
expressionTransformer = new ExpressionTransformer(pinotSchema);
genericRow = new GenericRow();
genericRow.putValue("incoming", validMillis);
genericRow.putValue("outgoing", -1);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("outgoing"), validHours);

// invalid incoming and outgoing
expressionTransformer = new ExpressionTransformer(pinotSchema);
genericRow = new GenericRow();
genericRow.putValue("incoming", -1);
genericRow.putValue("outgoing", -1);
try {
expressionTransformer.transform(genericRow);
Assert.fail("Invalid incoming and outgoing time, should have failed");
} catch (Exception e) {
// expected
}
}
}

0 comments on commit 2570bf7

Please sign in to comment.