diff --git a/pom.xml b/pom.xml index ed6ef485c26d..95792e31260c 100644 --- a/pom.xml +++ b/pom.xml @@ -184,7 +184,7 @@ 2.17.1 42.5.1 2.3 - 1.5.6 + 1.5.8 1.10.19 1.7.4 2.0.0-M5 diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java index 2931c043e277..220431a444e5 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java @@ -1928,52 +1928,22 @@ private BigRow createRandomRow(long[] intValues, double[] doubleValues, new MiddleStruct(inner, inner2), list(), map(inner,inner2)); } - private static class MyMemoryManager implements MemoryManager { - final long totalSpace; - double rate; - Path path = null; - long lastAllocation = 0; - int rows = 0; - MemoryManager.Callback callback; - - MyMemoryManager(Configuration conf, long totalSpace, double rate) { - this.totalSpace = totalSpace; - this.rate = rate; - } - - @Override - public void addWriter(Path path, long requestedAllocation, - MemoryManager.Callback callback) { - this.path = path; - this.lastAllocation = requestedAllocation; - this.callback = callback; - } - - @Override - public synchronized void removeWriter(Path path) { - this.path = null; - this.lastAllocation = 0; - } - - @Override - public void addedRow(int count) throws IOException { - rows += count; - if (rows >= 100) { - callback.checkMemory(rate); - rows = 0; - } - } - } - @Test public void testMemoryManagementV11() throws Exception { + OrcConf.ROWS_BETWEEN_CHECKS.setLong(conf, 100); + final long poolSize = 50_000; ObjectInspector inspector; synchronized (TestOrcFile.class) { inspector = ObjectInspectorFactory.getReflectionObjectInspector (InnerStruct.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } - MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1); + MemoryManager memoryManager = new MemoryManagerImpl(poolSize); + // set up 10 files that all request the full size. + MemoryManager.Callback ignore = newScale -> false; + for(int f=0; f < 9; ++f) { + memoryManager.addWriter(new Path("file-" + f), poolSize, ignore); + } Writer writer = OrcFile.createWriter(testFilePath, OrcFile.writerOptions(conf) .inspector(inspector) @@ -1981,15 +1951,14 @@ public void testMemoryManagementV11() throws Exception { .stripeSize(50000) .bufferSize(100) .rowIndexStride(0) - .memory(memory) + .memory(memoryManager) .batchSize(100) .version(OrcFile.Version.V_0_11)); - assertEquals(testFilePath, memory.path); + assertEquals(0.1, ((MemoryManagerImpl) memoryManager).getAllocationScale()); for(int i=0; i < 2500; ++i) { writer.addRow(new InnerStruct(i*300, Integer.toHexString(10*i))); } writer.close(); - assertEquals(null, memory.path); Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); int i = 0; @@ -2004,29 +1973,35 @@ public void testMemoryManagementV11() throws Exception { @Test public void testMemoryManagementV12() throws Exception { + OrcConf.ROWS_BETWEEN_CHECKS.setLong(conf, 100); + final long poolSize = 50_000; ObjectInspector inspector; synchronized (TestOrcFile.class) { inspector = ObjectInspectorFactory.getReflectionObjectInspector (InnerStruct.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } - MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1); + MemoryManager memoryManager = new MemoryManagerImpl(poolSize); + // set up 10 files that all request the full size. + MemoryManager.Callback ignore = newScale -> false; + for(int f=0; f < 9; ++f) { + memoryManager.addWriter(new Path("file-" + f), poolSize, ignore); + } Writer writer = OrcFile.createWriter(testFilePath, - OrcFile.writerOptions(conf) - .inspector(inspector) - .compress(CompressionKind.NONE) - .stripeSize(50000) - .bufferSize(100) - .rowIndexStride(0) - .memory(memory) - .batchSize(100) - .version(OrcFile.Version.V_0_12)); - assertEquals(testFilePath, memory.path); + OrcFile.writerOptions(conf) + .inspector(inspector) + .compress(CompressionKind.NONE) + .stripeSize(50000) + .bufferSize(100) + .rowIndexStride(0) + .memory(memoryManager) + .batchSize(100) + .version(OrcFile.Version.V_0_12)); + assertEquals(0.1, ((MemoryManagerImpl) memoryManager).getAllocationScale()); for(int i=0; i < 2500; ++i) { writer.addRow(new InnerStruct(i*300, Integer.toHexString(10*i))); } writer.close(); - assertEquals(null, memory.path); Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); int i = 0; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index cc29384e3734..753c70738609 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -23,9 +23,8 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.orc.CompressionKind; -import org.apache.orc.MemoryManager; +import org.apache.orc.OrcConf; import org.apache.orc.StripeInformation; -import org.apache.orc.impl.MemoryManagerImpl; import org.apache.orc.impl.OrcAcidUtils; import org.junit.Assert; import org.junit.Rule; @@ -1064,6 +1063,7 @@ static String getColumnTypesProperty() { public void testRecordReaderOldBaseAndDelta() throws Exception { final int BUCKET = 10; Configuration conf = new Configuration(); + OrcConf.ROWS_BETWEEN_CHECKS.setLong(conf, 2); OrcOutputFormat of = new OrcOutputFormat(); FileSystem fs = FileSystem.getLocal(conf); Path root = new Path(tmpDir, "testOldBaseAndDelta").makeQualified(fs); @@ -1073,25 +1073,11 @@ public void testRecordReaderOldBaseAndDelta() throws Exception { inspector = ObjectInspectorFactory.getReflectionObjectInspector (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } - - // write the base - MemoryManager mgr = new MemoryManagerImpl(conf){ - int rowsAddedSinceCheck = 0; - - @Override - public synchronized void addedRow(int rows) throws IOException { - rowsAddedSinceCheck += rows; - if (rowsAddedSinceCheck >= 2) { - notifyWriters(); - rowsAddedSinceCheck = 0; - } - } - }; // make 5 stripes with 2 rows each Writer writer = OrcFile.createWriter(new Path(root, "0000010_0"), OrcFile.writerOptions(conf).inspector(inspector).fileSystem(fs) .blockPadding(false).bufferSize(10000).compress(CompressionKind.NONE) - .stripeSize(1).memory(mgr).batchSize(2).version(OrcFile.Version.V_0_11)); + .stripeSize(1).batchSize(2).version(OrcFile.Version.V_0_11)); String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3", "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"}; for(int i=0; i < values.length; ++i) { @@ -1195,6 +1181,7 @@ public synchronized void addedRow(int rows) throws IOException { public void testRecordReaderNewBaseAndDelta() throws Exception { final int BUCKET = 11; Configuration conf = new Configuration(); + OrcConf.ROWS_BETWEEN_CHECKS.setLong(conf, 2); OrcOutputFormat of = new OrcOutputFormat(); FileSystem fs = FileSystem.getLocal(conf); Path root = new Path(tmpDir, "testRecordReaderNewBaseAndDelta").makeQualified(fs); @@ -1205,20 +1192,6 @@ public void testRecordReaderNewBaseAndDelta() throws Exception { (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } - // write the base - MemoryManager mgr = new MemoryManagerImpl(conf){ - int rowsAddedSinceCheck = 0; - - @Override - public synchronized void addedRow(int rows) throws IOException { - rowsAddedSinceCheck += rows; - if (rowsAddedSinceCheck >= 2) { - notifyWriters(); - rowsAddedSinceCheck = 0; - } - } - }; - // make 5 stripes with 2 rows each OrcRecordUpdater.OrcOptions options = (OrcRecordUpdater.OrcOptions) new OrcRecordUpdater.OrcOptions(conf) @@ -1228,8 +1201,9 @@ public synchronized void addedRow(int rows) throws IOException { final int BUCKET_PROPERTY = BucketCodec.V1.encode(options); options.orcOptions(OrcFile.writerOptions(conf) - .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE) - .memory(mgr).batchSize(2)); + .stripeSize(1).blockPadding(false) + .compress(CompressionKind.NONE) + .batchSize(2)); options.finalDestination(root); RecordUpdater ru = of.getRecordUpdater(root, options); String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",