HIVE-21671: Replicate Streaming ingestion with transactional batch size as 1.#615
HIVE-21671: Replicate Streaming ingestion with transactional batch size as 1.#615sankarh wants to merge 2 commits intoapache:masterfrom
Conversation
| public static void classLevelSetup() throws Exception { | ||
| HashMap<String, String> overrides = new HashMap<>(); | ||
| overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), | ||
| GzipJSONMessageEncoder.class.getCanonicalName()); |
There was a problem hiding this comment.
Add a test suite for json format also
There was a problem hiding this comment.
I think it is not needed as no new event added for streaming support. Other suit covers the write events.
| new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); | ||
| HashMap<String, String> acidEnableConf = new HashMap<String, String>() {{ | ||
| put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); | ||
| put("hive.support.concurrency", "true"); |
There was a problem hiding this comment.
set strict managed table to true
There was a problem hiding this comment.
I think, no need as it have significance only for table type which we set explicitly.
| .run("select msg from " + tblName + " order by msg") | ||
| .verifyResults((new String[] {"val1", "val2"})); | ||
|
|
||
| // Begin another transaction, write more records and commit 2nd transaction |
There was a problem hiding this comment.
commit second after load
There was a problem hiding this comment.
Ok. updating the comment.
| "clustered by (id) into 5 buckets " + | ||
| "stored as orc tblproperties(\"transactional\"=\"true\")"); | ||
|
|
||
| // Static partition values |
There was a problem hiding this comment.
The two test case are identical ..common code can be consolidated to one
There was a problem hiding this comment.
Keeping it separate is more readable. Also, it's not exactly same. We have where clause in static partition test which is not there in unpartitioned one.
| String key = (partitionValues == null) ? tableObject.getFullyQualifiedName() | ||
| : partitionValues.toString(); | ||
| if (!writePaths.containsKey(key)) { | ||
| writePaths.put(key, new WriteDirInfo(partitionValues, writeDir)); |
There was a problem hiding this comment.
what if for same partition value path is already there ..is it ok to ignore it ? i think assert can be added for writeDir to be same
There was a problem hiding this comment.
It is not possible to have more than one delta directory for partition from same streaming txn. So, even if multiple bucket files created within same delta dir, this is guaranteed that writeDir is same. Will add assert.
| .finalDestination(partitionPath); | ||
|
|
||
| // Add write directory information in the connection object. | ||
| conn.addWriteDirectoryInfo(partitionValues, AcidUtils.baseOrDeltaSubdirPath(partitionPath, options)); |
There was a problem hiding this comment.
This will be always delta ..using base or delta may give wrong impression that it can be base or delete delta in some cases
There was a problem hiding this comment.
It is a common method used by multiple places. Also, the input options decide what is the dir type. I think, it is not confusing as reader would understand from the code. I will keep it as it is.
| recordWriter.close(); | ||
|
|
||
| // Add write notification events if it is enabled. | ||
| conn.addWriteNotificationEvents(); |
There was a problem hiding this comment.
write notification logs should be written before commit txn event
There was a problem hiding this comment.
Yes, it is already called before commit. Otherwise test wouldn't pass.
1da72bb to
179720b
Compare
No description provided.