-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Description
Affected Version
Druid v0.17.0 and v0.18.0
Description
When you run a native ingestion spec without explicitly specifying partitionDimension, the data rollup observed is less than when you specify it explicitly. This behavior is contradictory to that observed in Hadoop and mentioned in the documentation.
Steps to Reproduce
To generate data you can use this python script:
import json
import time
import random
import sys
n = 10
if len(sys.argv) == 2:
n = int(sys.argv[1])
UNITS_IN_HOUR = 3600
now = int(time.time())
now -= now % UNITS_IN_HOUR
for i in range(n):
obj = {"time" : now + random.randrange(UNITS_IN_HOUR)}
for k in range(1, 7):
obj["v" + str(k)] = i % (10 ** k)
print(json.dumps(obj))Create 3 files with n = 1000000
Ingest this data using the following spec
{
"type": "index_parallel",
"spec": {
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "local",
"filter": "*",
"baseDir": "/path/to/data/"
},
"inputFormat": {
"type": "json"
}
},
"tuningConfig": {
"type": "index_parallel",
"partitionsSpec": {
"type": "hashed",
"numShards": 2,
"partitionDimensions": [ "v1" ]
},
"forceGuaranteedRollup": true,
"maxNumConcurrentSubTasks": 10
},
"dataSchema": {
"dataSource": "1m_json_data",
"granularitySpec": {
"type": "uniform",
"queryGranularity": "HOUR",
"rollup": true,
"intervals": [
<change this range based on your time>
],
"segmentGranularity": "DAY"
},
"timestampSpec": {
"column": "time",
"format": "posix"
},
"dimensionsSpec": {
"dimensions": [
{
"type": "long",
"name": "v1"
}
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "sum_v2",
"type": "longSum",
"fieldName": "v2"
},
{
"name": "sum_v3",
"type": "longSum",
"fieldName": "v3"
},
{
"name": "sum_v4",
"type": "longSum",
"fieldName": "v4"
},
{
"name": "sum_v5",
"type": "longSum",
"fieldName": "v5"
},
{
"name": "sum_v6",
"type": "longSum",
"fieldName": "v6"
}
]
}
}
}
You will find there are 10 rows being created once the ingestion complete.
Now remove the line "partitionDimensions": [ "v1" ]
You will find 20 rows getting created
Problem Diagnosis
The problem arises in this piece of code: src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
@VisibleForTesting
List<Object> getGroupKey(final long timestamp, final InputRow inputRow)
{
if (partitionDimensions.isEmpty()) {
return Rows.toGroupKey(timestamp, inputRow);
} else {
return Lists.transform(partitionDimensions, inputRow::getDimension);
}
}If partitionDimension is empty then we use timestamp as well as the dimensions to hash while we ignore the timestamp when we have an explicit partitionDimension present. I am not completely sure why that is the case but we can discuss.
That being said I found out this piece of code is also used for hadoop ingestion where we do not see this problem. The difference is in Hadoop we pass a constant timestamp by using something like this rollupGran.bucketStart(inputRow.getTimestamp()).getMillis(). This is missing in the native ingestion code path.
Next Steps
I have both types of fixes implemented and tested. Hoping to hear your thoughts on this before moving ahead.