From c46e31cfb3b98d1d99e339d7815d681b1fddc554 Mon Sep 17 00:00:00 2001 From: lamber-ken Date: Mon, 2 Dec 2019 08:18:06 +0800 Subject: [PATCH 1/4] [checkstyle] Add ConstantName java checkstyle rule --- .../org/apache/hudi/cli/utils/HiveUtil.java | 6 ++-- .../org/apache/hudi/AbstractHoodieClient.java | 12 +++---- .../org/apache/hudi/HoodieReadClient.java | 2 +- .../apache/hudi/HoodieClientTestHarness.java | 8 ++--- .../hudi/common/HoodieClientTestUtils.java | 4 +-- .../hudi/common/TestRawTripPayload.java | 4 +-- .../hudi/avro/MercifulJsonConverter.java | 4 +-- .../hudi/common/table/HoodieTableConfig.java | 4 +-- .../common/table/HoodieTableMetaClient.java | 12 +++---- .../log/AbstractHoodieLogRecordScanner.java | 32 +++++++++---------- .../common/table/log/HoodieLogFileReader.java | 14 ++++---- .../common/table/log/HoodieLogFormat.java | 12 +++---- .../table/log/HoodieLogFormatReader.java | 4 +-- .../table/log/HoodieLogFormatWriter.java | 24 +++++++------- .../log/HoodieMergedLogRecordScanner.java | 12 +++---- .../table/timeline/HoodieActiveTimeline.java | 26 +++++++-------- .../timeline/HoodieArchivedTimeline.java | 2 +- .../table/timeline/HoodieDefaultTimeline.java | 2 +- .../common/util/FailSafeConsistencyGuard.java | 12 +++---- .../apache/hudi/common/util/RocksDBDAO.java | 18 +++++------ .../hudi/common/util/SerializationUtils.java | 6 ++-- .../common/util/collection/DiskBasedMap.java | 4 +-- .../util/collection/ExternalSpillableMap.java | 6 ++-- .../queue/FunctionBasedQueueProducer.java | 6 ++-- .../queue/IteratorBasedQueueProducer.java | 6 ++-- .../common/minicluster/HdfsTestService.java | 10 +++--- .../minicluster/ZookeeperTestService.java | 10 +++--- .../common/table/log/TestHoodieLogFormat.java | 12 +++---- .../view/TestHoodieTableFileSystemView.java | 4 +-- .../table/view/TestIncrementalFSViewSync.java | 12 +++---- .../hudi/common/util/TestCompactionUtils.java | 6 ++-- .../service/FileSystemViewHandler.java | 18 +++++------ .../sources/helpers/KafkaOffsetGen.java | 8 ++--- .../utilities/sources/TestKafkaSource.java | 6 ++-- style/checkstyle.xml | 3 ++ 35 files changed, 167 insertions(+), 164 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/HiveUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/HiveUtil.java index 8ebb41ca48c89..d41f2f5c64944 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/HiveUtil.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/HiveUtil.java @@ -31,13 +31,13 @@ */ public class HiveUtil { - private static final String driverName = "org.apache.hive.jdbc.HiveDriver"; + private static final String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver"; static { try { - Class.forName(driverName); + Class.forName(DRIVER_NAME); } catch (ClassNotFoundException e) { - throw new IllegalStateException("Could not find " + driverName + " in classpath. ", e); + throw new IllegalStateException("Could not find " + DRIVER_NAME + " in classpath. ", e); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java b/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java index 5f3a27eb96322..1d60840ae5421 100644 --- a/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java @@ -39,7 +39,7 @@ */ public abstract class AbstractHoodieClient implements Serializable, AutoCloseable { - private static final Logger logger = LogManager.getLogger(AbstractHoodieClient.class); + private static final Logger LOG = LogManager.getLogger(AbstractHoodieClient.class); protected final transient FileSystem fs; protected final transient JavaSparkContext jsc; @@ -79,7 +79,7 @@ public void close() { private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig) { if (timelineServer.isPresent() && shouldStopTimelineServer) { // Stop only if owner - logger.info("Stopping Timeline service !!"); + LOG.info("Stopping Timeline service !!"); timelineServer.get().stop(); } @@ -94,7 +94,7 @@ private synchronized void startEmbeddedServerView() { if (config.isEmbeddedTimelineServerEnabled()) { if (!timelineServer.isPresent()) { // Run Embedded Timeline Server - logger.info("Starting Timeline service !!"); + LOG.info("Starting Timeline service !!"); timelineServer = Option.of(new EmbeddedTimelineService(jsc.hadoopConfiguration(), jsc.getConf(), config.getClientSpecifiedViewStorageConfig())); try { @@ -102,14 +102,14 @@ private synchronized void startEmbeddedServerView() { // Allow executor to find this newly instantiated timeline service config.setViewStorageConfig(timelineServer.get().getRemoteFileSystemViewConfig()); } catch (IOException e) { - logger.warn("Unable to start timeline service. Proceeding as if embedded server is disabled", e); + LOG.warn("Unable to start timeline service. Proceeding as if embedded server is disabled", e); stopEmbeddedServerView(false); } } else { - logger.info("Timeline Server already running. Not restarting the service"); + LOG.info("Timeline Server already running. Not restarting the service"); } } else { - logger.info("Embedded Timeline Server is disabled. Not starting timeline service"); + LOG.info("Embedded Timeline Server is disabled. Not starting timeline service"); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java index 4bfdc3229dca3..16f79fb3c7d1a 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java @@ -58,7 +58,7 @@ */ public class HoodieReadClient extends AbstractHoodieClient { - private static final Logger logger = LogManager.getLogger(HoodieReadClient.class); + private static final Logger LOG = LogManager.getLogger(HoodieReadClient.class); /** * TODO: We need to persist the index type into hoodie.properties and be able to access the index just with a simple diff --git a/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java index 3e5f4983b297e..163debed1e871 100644 --- a/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java @@ -47,7 +47,7 @@ */ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable { - private static final Logger logger = LoggerFactory.getLogger(HoodieClientTestHarness.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class); protected transient JavaSparkContext jsc = null; protected transient SQLContext sqlContext; @@ -119,13 +119,13 @@ protected void initSparkContexts() { */ protected void cleanupSparkContexts() { if (sqlContext != null) { - logger.info("Clearing sql context cache of spark-session used in previous test-case"); + LOG.info("Clearing sql context cache of spark-session used in previous test-case"); sqlContext.clearCache(); sqlContext = null; } if (jsc != null) { - logger.info("Closing spark context used in previous test-case"); + LOG.info("Closing spark context used in previous test-case"); jsc.close(); jsc.stop(); jsc = null; @@ -157,7 +157,7 @@ protected void initFileSystemWithDefaultConfiguration() { */ protected void cleanupFileSystem() throws IOException { if (fs != null) { - logger.warn("Closing file-system instance used in previous test-run"); + LOG.warn("Closing file-system instance used in previous test-run"); fs.close(); } } diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java index f6cf940be2494..a7bd7af9ee93a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java @@ -72,7 +72,7 @@ */ public class HoodieClientTestUtils { - private static final transient Logger log = LogManager.getLogger(HoodieClientTestUtils.class); + private static final transient Logger LOG = LogManager.getLogger(HoodieClientTestUtils.class); private static final Random RANDOM = new Random(); public static List collectStatuses(Iterator> statusListItr) { @@ -166,7 +166,7 @@ public static Dataset readCommit(String basePath, SQLContext sqlContext, Ho try { HashMap paths = getLatestFileIDsToFullPath(basePath, commitTimeline, Arrays.asList(commitInstant)); - log.info("Path :" + paths.values()); + LOG.info("Path :" + paths.values()); return sqlContext.read().parquet(paths.values().toArray(new String[paths.size()])) .filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime)); } catch (Exception e) { diff --git a/hudi-client/src/test/java/org/apache/hudi/common/TestRawTripPayload.java b/hudi-client/src/test/java/org/apache/hudi/common/TestRawTripPayload.java index 15fc3c332c908..63c5b53277821 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/TestRawTripPayload.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/TestRawTripPayload.java @@ -46,7 +46,7 @@ */ public class TestRawTripPayload implements HoodieRecordPayload { - private static final transient ObjectMapper mapper = new ObjectMapper(); + private static final transient ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private String partitionPath; private String rowKey; private byte[] jsonDataCompressed; @@ -71,7 +71,7 @@ public TestRawTripPayload(String jsonData, String rowKey, String partitionPath, public TestRawTripPayload(String jsonData) throws IOException { this.jsonDataCompressed = compressData(jsonData); this.dataSize = jsonData.length(); - Map jsonRecordMap = mapper.readValue(jsonData, Map.class); + Map jsonRecordMap = OBJECT_MAPPER.readValue(jsonData, Map.class); this.rowKey = jsonRecordMap.get("_row_key").toString(); this.partitionPath = jsonRecordMap.get("time").toString().split("T")[0].replace("-", "/"); this.isDeleted = false; diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java index c57dc84ad97ac..57552c77cc57f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java @@ -41,7 +41,7 @@ */ public class MercifulJsonConverter { - private static final Map fieldTypeProcessors = getFieldTypeProcessors(); + private static final Map FIELD_TYPE_PROCESSORS = getFieldTypeProcessors(); private final ObjectMapper mapper; @@ -125,7 +125,7 @@ private static Object convertJsonToAvroField(Object value, String name, Schema s throw new HoodieJsonToAvroConversionException(null, name, schema); } - JsonToAvroFieldProcessor processor = fieldTypeProcessors.get(schema.getType()); + JsonToAvroFieldProcessor processor = FIELD_TYPE_PROCESSORS.get(schema.getType()); if (null != processor) { return processor.convertToAvro(value, name, schema); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index a5f643a68d95b..5a637f99fedfd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -47,7 +47,7 @@ */ public class HoodieTableConfig implements Serializable { - private static final transient Logger log = LogManager.getLogger(HoodieTableConfig.class); + private static final transient Logger LOG = LogManager.getLogger(HoodieTableConfig.class); public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties"; public static final String HOODIE_TABLE_NAME_PROP_NAME = "hoodie.table.name"; @@ -67,7 +67,7 @@ public class HoodieTableConfig implements Serializable { public HoodieTableConfig(FileSystem fs, String metaPath) { Properties props = new Properties(); Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE); - log.info("Loading dataset properties from " + propertyPath); + LOG.info("Loading dataset properties from " + propertyPath); try { try (FSDataInputStream inputStream = fs.open(propertyPath)) { props.load(inputStream); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index cd86b7ab96044..601996933082d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -64,7 +64,7 @@ */ public class HoodieTableMetaClient implements Serializable { - private static final transient Logger log = LogManager.getLogger(HoodieTableMetaClient.class); + private static final transient Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class); public static String METAFOLDER_NAME = ".hoodie"; public static String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp"; public static String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + File.separator + ".aux"; @@ -92,7 +92,7 @@ public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadAc public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad, ConsistencyGuardConfig consistencyGuardConfig) throws DatasetNotFoundException { - log.info("Loading HoodieTableMetaClient from " + basePath); + LOG.info("Loading HoodieTableMetaClient from " + basePath); this.basePath = basePath; this.consistencyGuardConfig = consistencyGuardConfig; this.hadoopConf = new SerializableConfiguration(conf); @@ -103,10 +103,10 @@ public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadAc DatasetNotFoundException.checkValidDataset(fs, basePathDir, metaPathDir); this.tableConfig = new HoodieTableConfig(fs, metaPath); this.tableType = tableConfig.getTableType(); - log.info("Finished Loading Table of type " + tableType + " from " + basePath); + LOG.info("Finished Loading Table of type " + tableType + " from " + basePath); this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad; if (loadActiveTimelineOnLoad) { - log.info("Loading Active commit timeline for " + basePath); + LOG.info("Loading Active commit timeline for " + basePath); getActiveTimeline(); } } @@ -303,7 +303,7 @@ public static HoodieTableMetaClient initTableType(Configuration hadoopConf, Stri */ public static HoodieTableMetaClient initDatasetAndGetMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException { - log.info("Initializing " + basePath + " as hoodie dataset " + basePath); + LOG.info("Initializing " + basePath + " as hoodie dataset " + basePath); Path basePathDir = new Path(basePath); final FileSystem fs = FSUtils.getFs(basePath, hadoopConf); if (!fs.exists(basePathDir)) { @@ -340,7 +340,7 @@ public static HoodieTableMetaClient initDatasetAndGetMetaClient(Configuration ha // We should not use fs.getConf as this might be different from the original configuration // used to create the fs in unit tests HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); - log.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath); + LOG.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath); return metaClient; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java index bdd010edc7376..e6b1b4bcd5031 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java @@ -66,7 +66,7 @@ */ public abstract class AbstractHoodieLogRecordScanner { - private static final Logger log = LogManager.getLogger(AbstractHoodieLogRecordScanner.class); + private static final Logger LOG = LogManager.getLogger(AbstractHoodieLogRecordScanner.class); // Reader schema for the records protected final Schema readerSchema; @@ -131,7 +131,7 @@ public void scan() { Set scannedLogFiles = new HashSet<>(); while (logFormatReaderWrapper.hasNext()) { HoodieLogFile logFile = logFormatReaderWrapper.getLogFile(); - log.info("Scanning log file " + logFile); + LOG.info("Scanning log file " + logFile); scannedLogFiles.add(logFile); totalLogFiles.set(scannedLogFiles.size()); // Use the HoodieLogFileReader to iterate through the blocks in the log file @@ -145,7 +145,7 @@ public void scan() { } switch (r.getBlockType()) { case AVRO_DATA_BLOCK: - log.info("Reading a data block from file " + logFile.getPath()); + LOG.info("Reading a data block from file " + logFile.getPath()); if (isNewInstantBlock(r) && !readBlocksLazily) { // If this is an avro data block belonging to a different commit/instant, // then merge the last blocks and records into the main result @@ -155,7 +155,7 @@ public void scan() { currentInstantLogBlocks.push(r); break; case DELETE_BLOCK: - log.info("Reading a delete block from file " + logFile.getPath()); + LOG.info("Reading a delete block from file " + logFile.getPath()); if (isNewInstantBlock(r) && !readBlocksLazily) { // If this is a delete data block belonging to a different commit/instant, // then merge the last blocks and records into the main result @@ -177,7 +177,7 @@ public void scan() { // written per ingestion batch for a file but in reality we need to rollback (B1 & B2) // The following code ensures the same rollback block (R1) is used to rollback // both B1 & B2 - log.info("Reading a command block from file " + logFile.getPath()); + LOG.info("Reading a command block from file " + logFile.getPath()); // This is a command block - take appropriate action based on the command HoodieCommandBlock commandBlock = (HoodieCommandBlock) r; String targetInstantForCommandBlock = @@ -196,34 +196,34 @@ public void scan() { HoodieLogBlock lastBlock = currentInstantLogBlocks.peek(); // handle corrupt blocks separately since they may not have metadata if (lastBlock.getBlockType() == CORRUPT_BLOCK) { - log.info("Rolling back the last corrupted log block read in " + logFile.getPath()); + LOG.info("Rolling back the last corrupted log block read in " + logFile.getPath()); currentInstantLogBlocks.pop(); numBlocksRolledBack++; } else if (lastBlock.getBlockType() != CORRUPT_BLOCK && targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) { // rollback last data block or delete block - log.info("Rolling back the last log block read in " + logFile.getPath()); + LOG.info("Rolling back the last log block read in " + logFile.getPath()); currentInstantLogBlocks.pop(); numBlocksRolledBack++; } else if (!targetInstantForCommandBlock .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME))) { // invalid or extra rollback block - log.warn("TargetInstantTime " + targetInstantForCommandBlock + LOG.warn("TargetInstantTime " + targetInstantForCommandBlock + " invalid or extra rollback command block in " + logFile.getPath()); break; } else { // this should not happen ideally - log.warn("Unable to apply rollback command block in " + logFile.getPath()); + LOG.warn("Unable to apply rollback command block in " + logFile.getPath()); } } - log.info("Number of applied rollback blocks " + numBlocksRolledBack); + LOG.info("Number of applied rollback blocks " + numBlocksRolledBack); break; default: throw new UnsupportedOperationException("Command type not yet supported."); } break; case CORRUPT_BLOCK: - log.info("Found a corrupt block in " + logFile.getPath()); + LOG.info("Found a corrupt block in " + logFile.getPath()); totalCorruptBlocks.incrementAndGet(); // If there is a corrupt block - we will assume that this was the next data block currentInstantLogBlocks.push(r); @@ -234,13 +234,13 @@ public void scan() { } // merge the last read block when all the blocks are done reading if (!currentInstantLogBlocks.isEmpty()) { - log.info("Merging the final data blocks"); + LOG.info("Merging the final data blocks"); processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size()); } // Done progress = 1.0f; } catch (Exception e) { - log.error("Got exception when reading log file", e); + LOG.error("Got exception when reading log file", e); throw new HoodieIOException("IOException when reading log file "); } finally { try { @@ -249,7 +249,7 @@ public void scan() { } } catch (IOException ioe) { // Eat exception as we do not want to mask the original exception that can happen - log.error("Unable to close log format reader", ioe); + LOG.error("Unable to close log format reader", ioe); } } } @@ -297,7 +297,7 @@ private void processAvroDataBlock(HoodieAvroDataBlock dataBlock) throws Exceptio */ private void processQueuedBlocksForInstant(Deque lastBlocks, int numLogFilesSeen) throws Exception { while (!lastBlocks.isEmpty()) { - log.info("Number of remaining logblocks to merge " + lastBlocks.size()); + LOG.info("Number of remaining logblocks to merge " + lastBlocks.size()); // poll the element at the bottom of the stack since that's the order it was inserted HoodieLogBlock lastBlock = lastBlocks.pollLast(); switch (lastBlock.getBlockType()) { @@ -308,7 +308,7 @@ private void processQueuedBlocksForInstant(Deque lastBlocks, int Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey); break; case CORRUPT_BLOCK: - log.warn("Found a corrupt block which was not rolled back"); + LOG.warn("Found a corrupt block which was not rolled back"); break; default: break; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index 897af55836640..65aef8cfa6966 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -54,11 +54,11 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB - private static final Logger log = LogManager.getLogger(HoodieLogFileReader.class); + private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class); private final FSDataInputStream inputStream; private final HoodieLogFile logFile; - private static final byte[] magicBuffer = new byte[6]; + private static final byte[] MAGIC_BUFFER = new byte[6]; private final Schema readerSchema; private HoodieLogFormat.LogFormatVersion nextBlockVersion; private boolean readBlockLazily; @@ -112,7 +112,7 @@ public void run() { try { close(); } catch (Exception e) { - log.warn("unable to close input stream for log file " + logFile, e); + LOG.warn("unable to close input stream for log file " + logFile, e); // fail silently for any sort of exception } } @@ -210,12 +210,12 @@ private HoodieLogBlock readBlock() throws IOException { } private HoodieLogBlock createCorruptBlock() throws IOException { - log.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos()); + LOG.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos()); long currentPos = inputStream.getPos(); long nextBlockOffset = scanForNextAvailableBlockOffset(); // Rewind to the initial start and read corrupted bytes till the nextBlockOffset inputStream.seek(currentPos); - log.info("Next available block in " + logFile + " starts at " + nextBlockOffset); + LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset); int corruptedBlockSize = (int) (nextBlockOffset - currentPos); long contentPosition = inputStream.getPos(); byte[] corruptedBytes = HoodieLogBlock.readOrSkipContent(inputStream, corruptedBlockSize, readBlockLazily); @@ -313,8 +313,8 @@ private boolean readMagic() throws IOException { private boolean hasNextMagic() throws IOException { long pos = inputStream.getPos(); // 1. Read magic header from the start of the block - inputStream.readFully(magicBuffer, 0, 6); - if (!Arrays.equals(magicBuffer, HoodieLogFormat.MAGIC)) { + inputStream.readFully(MAGIC_BUFFER, 0, 6); + if (!Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC)) { return false; } return true; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java index 0c04d74d4cf40..678a2764d0614 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java @@ -51,7 +51,7 @@ public interface HoodieLogFormat { * The current version of the log format. Anytime the log format changes this version needs to be bumped and * corresponding changes need to be made to {@link HoodieLogFormatVersion} */ - int currentVersion = 1; + int CURRENT_VERSION = 1; String UNKNOWN_WRITE_TOKEN = "1-0-1"; @@ -104,7 +104,7 @@ interface Reader extends Closeable, Iterator { */ class WriterBuilder { - private static final Logger log = LogManager.getLogger(WriterBuilder.class); + private static final Logger LOG = LogManager.getLogger(WriterBuilder.class); // Default max log file size 512 MB public static final long DEFAULT_SIZE_THRESHOLD = 512 * 1024 * 1024L; @@ -188,7 +188,7 @@ public WriterBuilder onParentPath(Path parentPath) { } public Writer build() throws IOException, InterruptedException { - log.info("Building HoodieLogFormat Writer"); + LOG.info("Building HoodieLogFormat Writer"); if (fs == null) { throw new IllegalArgumentException("fs is not specified"); } @@ -210,7 +210,7 @@ public Writer build() throws IOException, InterruptedException { } if (logVersion == null) { - log.info("Computing the next log version for " + logFileId + " in " + parentPath); + LOG.info("Computing the next log version for " + logFileId + " in " + parentPath); Option> versionAndWriteToken = FSUtils.getLatestLogVersion(fs, parentPath, logFileId, fileExtension, commitTime); if (versionAndWriteToken.isPresent()) { @@ -222,7 +222,7 @@ public Writer build() throws IOException, InterruptedException { // Use rollover write token as write token to create new log file with tokens logWriteToken = rolloverLogWriteToken; } - log.info("Computed the next log version for " + logFileId + " in " + parentPath + " as " + logVersion + LOG.info("Computed the next log version for " + logFileId + " in " + parentPath + " as " + logVersion + " with write-token " + logWriteToken); } @@ -234,7 +234,7 @@ public Writer build() throws IOException, InterruptedException { Path logPath = new Path(parentPath, FSUtils.makeLogFileName(logFileId, fileExtension, commitTime, logVersion, logWriteToken)); - log.info("HoodieLogFile on path " + logPath); + LOG.info("HoodieLogFile on path " + logPath); HoodieLogFile logFile = new HoodieLogFile(logPath); if (bufferSize == null) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java index 6128b2c720821..62c694bdd7d94 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java @@ -43,7 +43,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { private final boolean reverseLogReader; private int bufferSize; - private static final Logger log = LogManager.getLogger(HoodieLogFormatReader.class); + private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class); HoodieLogFormatReader(FileSystem fs, List logFiles, Schema readerSchema, boolean readBlocksLazily, boolean reverseLogReader, int bufferSize) throws IOException { @@ -100,7 +100,7 @@ public boolean hasNext() { } catch (IOException io) { throw new HoodieIOException("unable to initialize read with log file ", io); } - log.info("Moving to the next reader for logfile " + currentReader.getLogFile()); + LOG.info("Moving to the next reader for logfile " + currentReader.getLogFile()); return this.currentReader.hasNext(); } return false; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java index fe8e4a95b1fa1..0914902590ad8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java @@ -44,7 +44,7 @@ */ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { - private static final Logger log = LogManager.getLogger(HoodieLogFormatWriter.class); + private static final Logger LOG = LogManager.getLogger(HoodieLogFormatWriter.class); private HoodieLogFile logFile; private final FileSystem fs; @@ -76,11 +76,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { if (fs.exists(path)) { boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme()); if (isAppendSupported) { - log.info(logFile + " exists. Appending to existing file"); + LOG.info(logFile + " exists. Appending to existing file"); try { this.output = fs.append(path, bufferSize); } catch (RemoteException e) { - log.warn("Remote Exception, attempting to handle or recover lease", e); + LOG.warn("Remote Exception, attempting to handle or recover lease", e); handleAppendExceptionOrRecoverLease(path, e); } catch (IOException ioe) { if (ioe.getMessage().toLowerCase().contains("not supported")) { @@ -93,11 +93,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { } if (!isAppendSupported) { this.logFile = logFile.rollOver(fs, rolloverLogWriteToken); - log.info("Append not supported.. Rolling over to " + logFile); + LOG.info("Append not supported.. Rolling over to " + logFile); createNewFile(); } } else { - log.info(logFile + " does not exist. Create a new file"); + LOG.info(logFile + " does not exist. Create a new file"); // Block size does not matter as we will always manually autoflush createNewFile(); } @@ -120,7 +120,7 @@ public Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedE // Find current version HoodieLogFormat.LogFormatVersion currentLogFormatVersion = - new HoodieLogFormatVersion(HoodieLogFormat.currentVersion); + new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION); long currentSize = this.output.size(); // 1. Write the magic header for the start of the block @@ -179,7 +179,7 @@ private Writer rolloverIfNeeded() throws IOException, InterruptedException { if (getCurrentSize() > sizeThreshold) { // TODO - make an end marker which seals the old log file (no more appends possible to that // file). - log.info("CurrentSize " + getCurrentSize() + " has reached threshold " + sizeThreshold + LOG.info("CurrentSize " + getCurrentSize() + " has reached threshold " + sizeThreshold + ". Rolling over to the next version"); HoodieLogFile newLogFile = logFile.rollOver(fs, rolloverLogWriteToken); // close this writer and return the new writer @@ -230,12 +230,12 @@ private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e) // hours). During this time, if a fs.append() API is invoked for a file whose last block is eligible to be // appended to, then the NN will throw an exception saying that it couldn't find any active replica with the // last block. Find more information here : https://issues.apache.org/jira/browse/HDFS-6325 - log.warn("Failed to open an append stream to the log file. Opening a new log file..", e); + LOG.warn("Failed to open an append stream to the log file. Opening a new log file..", e); // Rollover the current log file (since cannot get a stream handle) and create new one this.logFile = logFile.rollOver(fs, rolloverLogWriteToken); createNewFile(); } else if (e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) { - log.warn("Another task executor writing to the same log file(" + logFile + ". Rolling over"); + LOG.warn("Another task executor writing to the same log file(" + logFile + ". Rolling over"); // Rollover the current log file (since cannot get a stream handle) and create new one this.logFile = logFile.rollOver(fs, rolloverLogWriteToken); createNewFile(); @@ -244,13 +244,13 @@ private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e) // this happens when either another task executor writing to this file died or // data node is going down. Note that we can only try to recover lease for a DistributedFileSystem. // ViewFileSystem unfortunately does not support this operation - log.warn("Trying to recover log on path " + path); + LOG.warn("Trying to recover log on path " + path); if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) { - log.warn("Recovered lease on path " + path); + LOG.warn("Recovered lease on path " + path); // try again this.output = fs.append(path, bufferSize); } else { - log.warn("Failed to recover lease on path " + path); + LOG.warn("Failed to recover lease on path " + path); throw new HoodieException(e); } } else { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 3ac121c11e1f3..e6246c48d8729 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -54,7 +54,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner implements Iterable> { - private static final Logger log = LogManager.getLogger(HoodieMergedLogRecordScanner.class); + private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class); // Final map of compacted/merged records private final ExternalSpillableMap> records; @@ -81,12 +81,12 @@ public HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List scan(); this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer(); this.numMergedRecordsInLog = records.size(); - log.info("MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes); - log.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries()); - log.info( + LOG.info("MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes); + LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries()); + LOG.info( "Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize()); - log.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries()); - log.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes()); + LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries()); + LOG.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes()); } catch (IOException e) { throw new HoodieIOException("IOException when reading log file "); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 8c50ef1a4c661..117de84e220ac 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -67,7 +67,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { INFLIGHT_CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION})); - private static final transient Logger log = LogManager.getLogger(HoodieActiveTimeline.class); + private static final transient Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class); protected HoodieTableMetaClient metaClient; private static AtomicReference lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE)); @@ -92,7 +92,7 @@ protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set inc try { this.setInstants(HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(metaClient.getFs(), new Path(metaClient.getMetaPath()), includedExtensions)); - log.info("Loaded instants " + getInstants()); + LOG.info("Loaded instants " + getInstants()); } catch (IOException e) { throw new HoodieIOException("Failed to scan metadata", e); } @@ -210,30 +210,30 @@ protected Stream filterInstantsByAction(String action) { } public void createInflight(HoodieInstant instant) { - log.info("Creating a new in-flight instant " + instant); + LOG.info("Creating a new in-flight instant " + instant); // Create the in-flight file createFileInMetaPath(instant.getFileName(), Option.empty()); } public void saveAsComplete(HoodieInstant instant, Option data) { - log.info("Marking instant complete " + instant); + LOG.info("Marking instant complete " + instant); Preconditions.checkArgument(instant.isInflight(), "Could not mark an already completed instant as complete again " + instant); transitionState(instant, HoodieTimeline.getCompletedInstant(instant), data); - log.info("Completed " + instant); + LOG.info("Completed " + instant); } public void revertToInflight(HoodieInstant instant) { - log.info("Reverting " + instant + " to inflight "); + LOG.info("Reverting " + instant + " to inflight "); revertStateTransition(instant, HoodieTimeline.getInflightInstant(instant)); - log.info("Reverted " + instant + " to inflight"); + LOG.info("Reverted " + instant + " to inflight"); } public HoodieInstant revertToRequested(HoodieInstant instant) { - log.warn("Reverting " + instant + " to requested "); + LOG.warn("Reverting " + instant + " to requested "); HoodieInstant requestedInstant = HoodieTimeline.getRequestedInstant(instant); revertStateTransition(instant, HoodieTimeline.getRequestedInstant(instant)); - log.warn("Reverted " + instant + " to requested"); + LOG.warn("Reverted " + instant + " to requested"); return requestedInstant; } @@ -249,12 +249,12 @@ public void deleteCompactionRequested(HoodieInstant instant) { } private void deleteInstantFile(HoodieInstant instant) { - log.info("Deleting instant " + instant); + LOG.info("Deleting instant " + instant); Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), instant.getFileName()); try { boolean result = metaClient.getFs().delete(inFlightCommitFilePath, false); if (result) { - log.info("Removed in-flight " + instant); + LOG.info("Removed in-flight " + instant); } else { throw new HoodieIOException("Could not delete in-flight instant " + instant); } @@ -391,7 +391,7 @@ private void revertStateTransition(HoodieInstant curr, HoodieInstant revert) { if (!success) { throw new HoodieIOException("Could not rename " + currFilePath + " to " + revertFilePath); } - log.info("Renamed " + currFilePath + " to " + revertFilePath); + LOG.info("Renamed " + currFilePath + " to " + revertFilePath); } } catch (IOException e) { throw new HoodieIOException("Could not complete revert " + curr, e); @@ -429,7 +429,7 @@ private void createFileInPath(Path fullPath, Option content) { // If the path does not exist, create it first if (!metaClient.getFs().exists(fullPath)) { if (metaClient.getFs().createNewFile(fullPath)) { - log.info("Created a new file in meta path: " + fullPath); + LOG.info("Created a new file in meta path: " + fullPath); } else { throw new HoodieIOException("Failed to create file " + fullPath); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index a58ff39ad7b16..952af9fab3a5c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -54,7 +54,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { private HoodieTableMetaClient metaClient; private Map readCommits = new HashMap<>(); - private static final transient Logger log = LogManager.getLogger(HoodieArchivedTimeline.class); + private static final transient Logger LOG = LogManager.getLogger(HoodieArchivedTimeline.class); public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) { // Read back the commits to make sure diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 90067f27ebd7e..97749bdebeb3a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -47,7 +47,7 @@ */ public class HoodieDefaultTimeline implements HoodieTimeline { - private static final transient Logger log = LogManager.getLogger(HoodieDefaultTimeline.class); + private static final transient Logger LOG = LogManager.getLogger(HoodieDefaultTimeline.class); private static final String HASHING_ALGORITHM = "SHA-256"; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java index 88786066e9607..ff4e1c11ab346 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java @@ -39,7 +39,7 @@ */ public class FailSafeConsistencyGuard implements ConsistencyGuard { - private static final transient Logger log = LogManager.getLogger(FailSafeConsistencyGuard.class); + private static final transient Logger LOG = LogManager.getLogger(FailSafeConsistencyGuard.class); private final FileSystem fs; private final ConsistencyGuardConfig consistencyGuardConfig; @@ -86,7 +86,7 @@ public void waitForFilesVisibility(String dirPath, List files, FileVisib retryTillSuccess((retryNum) -> { try { - log.info("Trying " + retryNum); + LOG.info("Trying " + retryNum); FileStatus[] entries = fs.listStatus(dir); List gotFiles = Arrays.stream(entries).map(e -> Path.getPathWithoutSchemeAndAuthority(e.getPath())) .map(p -> p.toString()).collect(Collectors.toList()); @@ -95,7 +95,7 @@ public void waitForFilesVisibility(String dirPath, List files, FileVisib switch (event) { case DISAPPEAR: - log.info("Following files are visible" + candidateFiles); + LOG.info("Following files are visible" + candidateFiles); // If no candidate files gets removed, it means all of them have disappeared return !altered; case APPEAR: @@ -104,7 +104,7 @@ public void waitForFilesVisibility(String dirPath, List files, FileVisib return candidateFiles.isEmpty(); } } catch (IOException ioe) { - log.warn("Got IOException waiting for file event. Have tried " + retryNum + " time(s)", ioe); + LOG.warn("Got IOException waiting for file event. Have tried " + retryNum + " time(s)", ioe); } return false; }, "Timed out waiting for files to become visible"); @@ -155,7 +155,7 @@ private void waitForFileVisibility(Path filePath, FileVisibility visibility) thr return; } } catch (IOException ioe) { - log.warn("Got IOException waiting for file visibility. Retrying", ioe); + LOG.warn("Got IOException waiting for file visibility. Retrying", ioe); } sleepSafe(waitMs); @@ -176,7 +176,7 @@ private void waitForFileVisibility(Path filePath, FileVisibility visibility) thr private void retryTillSuccess(Function predicate, String timedOutMessage) throws TimeoutException { long waitMs = consistencyGuardConfig.getInitialConsistencyCheckIntervalMs(); int attempt = 0; - log.info("Max Attempts=" + consistencyGuardConfig.getMaxConsistencyChecks()); + LOG.info("Max Attempts=" + consistencyGuardConfig.getMaxConsistencyChecks()); while (attempt < consistencyGuardConfig.getMaxConsistencyChecks()) { boolean success = predicate.apply(attempt); if (success) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java index 9453ec5d61038..8f9498f747075 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java @@ -55,7 +55,7 @@ */ public class RocksDBDAO { - protected static final transient Logger log = LogManager.getLogger(RocksDBDAO.class); + protected static final transient Logger LOG = LogManager.getLogger(RocksDBDAO.class); private transient ConcurrentHashMap managedHandlesMap; private transient ConcurrentHashMap managedDescriptorMap; @@ -86,7 +86,7 @@ private RocksDB getRocksDB() { */ private void init() throws HoodieException { try { - log.info("DELETING RocksDB persisted at " + rocksDBBasePath); + LOG.info("DELETING RocksDB persisted at " + rocksDBBasePath); FileIOUtils.deleteDirectory(new File(rocksDBBasePath)); managedHandlesMap = new ConcurrentHashMap<>(); @@ -99,7 +99,7 @@ private void init() throws HoodieException { dbOptions.setLogger(new org.rocksdb.Logger(dbOptions) { @Override protected void log(InfoLogLevel infoLogLevel, String logMsg) { - log.info("From Rocks DB : " + logMsg); + LOG.info("From Rocks DB : " + logMsg); } }); final List managedColumnFamilies = loadManagedColumnFamilies(dbOptions); @@ -121,7 +121,7 @@ protected void log(InfoLogLevel infoLogLevel, String logMsg) { managedDescriptorMap.put(familyNameFromDescriptor, descriptor); } } catch (RocksDBException | IOException re) { - log.error("Got exception opening rocks db instance ", re); + LOG.error("Got exception opening rocks db instance ", re); throw new HoodieException(re); } } @@ -135,10 +135,10 @@ private List loadManagedColumnFamilies(DBOptions dbOptio List existing = RocksDB.listColumnFamilies(options, rocksDBBasePath); if (existing.isEmpty()) { - log.info("No column family found. Loading default"); + LOG.info("No column family found. Loading default"); managedColumnFamilies.add(getColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); } else { - log.info("Loading column families :" + existing.stream().map(String::new).collect(Collectors.toList())); + LOG.info("Loading column families :" + existing.stream().map(String::new).collect(Collectors.toList())); managedColumnFamilies .addAll(existing.stream().map(RocksDBDAO::getColumnFamilyDescriptor).collect(Collectors.toList())); } @@ -352,7 +352,7 @@ public Stream> prefixSearch(String colu } } - log.info("Prefix Search for (query=" + prefix + ") on " + columnFamilyName + ". Total Time Taken (msec)=" + LOG.info("Prefix Search for (query=" + prefix + ") on " + columnFamilyName + ". Total Time Taken (msec)=" + timer.endTimer() + ". Serialization Time taken(micro)=" + timeTakenMicro + ", num entries=" + results.size()); return results.stream(); } @@ -366,7 +366,7 @@ public Stream> prefixSearch(String colu */ public void prefixDelete(String columnFamilyName, String prefix) { Preconditions.checkArgument(!closed); - log.info("Prefix DELETE (query=" + prefix + ") on " + columnFamilyName); + LOG.info("Prefix DELETE (query=" + prefix + ") on " + columnFamilyName); final RocksIterator it = getRocksDB().newIterator(managedHandlesMap.get(columnFamilyName)); it.seek(prefix.getBytes()); // Find first and last keys to be deleted @@ -389,7 +389,7 @@ public void prefixDelete(String columnFamilyName, Strin // Delete the last entry getRocksDB().delete(lastEntry.getBytes()); } catch (RocksDBException e) { - log.error("Got exception performing range delete"); + LOG.error("Got exception performing range delete"); throw new HoodieException(e); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java index 7bc0f5fa3dfcb..1c17e83346e4c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java @@ -40,7 +40,7 @@ public class SerializationUtils { // Caching kryo serializer to avoid creating kryo instance for every serde operation - private static final ThreadLocal serializerRef = + private static final ThreadLocal SERIALIZER_REF = ThreadLocal.withInitial(() -> new KryoSerializerInstance()); // Serialize @@ -56,7 +56,7 @@ public class SerializationUtils { * @throws IOException if the serialization fails */ public static byte[] serialize(final Object obj) throws IOException { - return serializerRef.get().serialize(obj); + return SERIALIZER_REF.get().serialize(obj); } // Deserialize @@ -83,7 +83,7 @@ public static T deserialize(final byte[] objectData) { if (objectData == null) { throw new IllegalArgumentException("The byte[] must not be null"); } - return (T) serializerRef.get().deserialize(objectData); + return (T) SERIALIZER_REF.get().deserialize(objectData); } private static class KryoSerializerInstance implements Serializable { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java index 97b897bb9678a..cf137709e0951 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java @@ -54,7 +54,7 @@ */ public final class DiskBasedMap implements Map, Iterable { - private static final Logger log = LogManager.getLogger(DiskBasedMap.class); + private static final Logger LOG = LogManager.getLogger(DiskBasedMap.class); // Stores the key and corresponding value's latest metadata spilled to disk private final Map valueMetadataMap; // Write only file @@ -111,7 +111,7 @@ private void initFile(File writeOnlyFile) throws IOException { writeOnlyFile.getParentFile().mkdir(); } writeOnlyFile.createNewFile(); - log.info("Spilling to file location " + writeOnlyFile.getAbsolutePath() + " in host (" + LOG.info("Spilling to file location " + writeOnlyFile.getAbsolutePath() + " in host (" + InetAddress.getLocalHost().getHostAddress() + ") with hostname (" + InetAddress.getLocalHost().getHostName() + ")"); // Make sure file is deleted when JVM exits diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java index b120c08232fad..6c344afa1952a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java @@ -56,7 +56,7 @@ public class ExternalSpillableMap " + estimatedPayloadSize); + LOG.info("Estimated Payload size => " + estimatedPayloadSize); } else if (shouldEstimatePayloadSize && inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) { // Re-estimate the size of a record by calculating the size of the entire map containing // N entries and then dividing by the number of entries present (N). This helps to get a @@ -186,7 +186,7 @@ public R put(T key, R value) { this.currentInMemoryMapSize = totalMapSize; this.estimatedPayloadSize = totalMapSize / inMemoryMap.size(); shouldEstimatePayloadSize = false; - log.info("New Estimated Payload size => " + this.estimatedPayloadSize); + LOG.info("New Estimated Payload size => " + this.estimatedPayloadSize); } if (!inMemoryMap.containsKey(key)) { // TODO : Add support for adjusting payloadSize for updates to the same key diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java index e6266f7e23e43..549683754cbd8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java @@ -30,7 +30,7 @@ */ public class FunctionBasedQueueProducer implements BoundedInMemoryQueueProducer { - private static final Logger logger = LogManager.getLogger(FunctionBasedQueueProducer.class); + private static final Logger LOG = LogManager.getLogger(FunctionBasedQueueProducer.class); private final Function, Boolean> producerFunction; @@ -40,8 +40,8 @@ public FunctionBasedQueueProducer(Function, Boolean> @Override public void produce(BoundedInMemoryQueue queue) { - logger.info("starting function which will enqueue records"); + LOG.info("starting function which will enqueue records"); producerFunction.apply(queue); - logger.info("finished function which will enqueue records"); + LOG.info("finished function which will enqueue records"); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java index a5948ac84161c..a85dffbbf17d5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java @@ -30,7 +30,7 @@ */ public class IteratorBasedQueueProducer implements BoundedInMemoryQueueProducer { - private static final Logger logger = LogManager.getLogger(IteratorBasedQueueProducer.class); + private static final Logger LOG = LogManager.getLogger(IteratorBasedQueueProducer.class); // input iterator for producing items in the buffer. private final Iterator inputIterator; @@ -41,10 +41,10 @@ public IteratorBasedQueueProducer(Iterator inputIterator) { @Override public void produce(BoundedInMemoryQueue queue) throws Exception { - logger.info("starting to buffer records"); + LOG.info("starting to buffer records"); while (inputIterator.hasNext()) { queue.insertRecord(inputIterator.next()); } - logger.info("finished buffering records"); + LOG.info("finished buffering records"); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java index f6d035a383086..c11b4afac95e7 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java @@ -38,7 +38,7 @@ */ public class HdfsTestService { - private static final Logger logger = LogManager.getLogger(HdfsTestService.class); + private static final Logger LOG = LogManager.getLogger(HdfsTestService.class); /** * Configuration settings @@ -72,7 +72,7 @@ public MiniDFSCluster start(boolean format) throws IOException { // If clean, then remove the work dir so we can start fresh. String localDFSLocation = getDFSLocation(workDir); if (format) { - logger.info("Cleaning HDFS cluster data at: " + localDFSLocation + " and starting fresh."); + LOG.info("Cleaning HDFS cluster data at: " + localDFSLocation + " and starting fresh."); File file = new File(localDFSLocation); FileIOUtils.deleteDirectory(file); } @@ -83,12 +83,12 @@ public MiniDFSCluster start(boolean format) throws IOException { datanodePort, datanodeIpcPort, datanodeHttpPort); miniDfsCluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(format).checkDataNodeAddrConfig(true) .checkDataNodeHostConfig(true).build(); - logger.info("HDFS Minicluster service started."); + LOG.info("HDFS Minicluster service started."); return miniDfsCluster; } public void stop() throws IOException { - logger.info("HDFS Minicluster service being shut down."); + LOG.info("HDFS Minicluster service being shut down."); miniDfsCluster.shutdown(); miniDfsCluster = null; hadoopConf = null; @@ -132,7 +132,7 @@ private static boolean shouldFormatDFSCluster(String localDFSLocation, boolean c private static Configuration configureDFSCluster(Configuration config, String localDFSLocation, String bindIP, int namenodeRpcPort, int namenodeHttpPort, int datanodePort, int datanodeIpcPort, int datanodeHttpPort) { - logger.info("HDFS force binding to ip: " + bindIP); + LOG.info("HDFS force binding to ip: " + bindIP); config.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + bindIP + ":" + namenodeRpcPort); config.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, bindIP + ":" + datanodePort); config.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, bindIP + ":" + datanodeIpcPort); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/ZookeeperTestService.java b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/ZookeeperTestService.java index bac93ee486729..383279f1c9e8f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/ZookeeperTestService.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/ZookeeperTestService.java @@ -53,7 +53,7 @@ */ public class ZookeeperTestService { - private static final Logger logger = LogManager.getLogger(ZookeeperTestService.class); + private static final Logger LOG = LogManager.getLogger(ZookeeperTestService.class); private static final int TICK_TIME = 2000; private static final int CONNECTION_TIMEOUT = 30000; @@ -103,7 +103,7 @@ public ZooKeeperServer start() throws IOException, InterruptedException { // NOTE: Changed from the original, where InetSocketAddress was // originally created to bind to the wildcard IP, we now configure it. - logger.info("Zookeeper force binding to: " + this.bindIP); + LOG.info("Zookeeper force binding to: " + this.bindIP); standaloneServerFactory.configure(new InetSocketAddress(bindIP, clientPort), 1000); // Start up this ZK server @@ -120,7 +120,7 @@ public ZooKeeperServer start() throws IOException, InterruptedException { } started = true; - logger.info("Zookeeper Minicluster service started on client port: " + clientPort); + LOG.info("Zookeeper Minicluster service started on client port: " + clientPort); return zooKeeperServer; } @@ -139,7 +139,7 @@ public void stop() throws IOException { standaloneServerFactory = null; zooKeeperServer = null; - logger.info("Zookeeper Minicluster service shut down."); + LOG.info("Zookeeper Minicluster service shut down."); } private void recreateDir(File dir, boolean clean) throws IOException { @@ -221,7 +221,7 @@ private static boolean waitForServerUp(String hostname, int port, long timeout) } } catch (IOException e) { // ignore as this is expected - logger.info("server " + hostname + ":" + port + " not up " + e); + LOG.info("server " + hostname + ":" + port + " not up " + e); } if (System.currentTimeMillis() > start + timeout) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java index f716c01104d51..536e73b29f0bf 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java @@ -493,7 +493,7 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep // Write out a length that does not confirm with the content outputStream.writeLong(1000); outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); - outputStream.writeInt(HoodieLogFormat.currentVersion); + outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION); // Write out a length that does not confirm with the content outputStream.writeLong(500); // Write out some bytes @@ -521,7 +521,7 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep // Write out a length that does not confirm with the content outputStream.writeLong(1000); outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); - outputStream.writeInt(HoodieLogFormat.currentVersion); + outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION); // Write out a length that does not confirm with the content outputStream.writeLong(500); // Write out some bytes @@ -691,7 +691,7 @@ public void testAvroLogRecordReaderWithRollbackPartialBlock() // Write out a length that does not confirm with the content outputStream.writeLong(1000); - outputStream.writeInt(HoodieLogFormat.currentVersion); + outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION); outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); // Write out some header @@ -1063,7 +1063,7 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback() outputStream.write(HoodieLogFormat.MAGIC); outputStream.writeLong(1000); outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); - outputStream.writeInt(HoodieLogFormat.currentVersion); + outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION); // Write out a length that does not confirm with the content outputStream.writeLong(100); outputStream.flush(); @@ -1076,7 +1076,7 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback() outputStream.write(HoodieLogFormat.MAGIC); outputStream.writeLong(1000); outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); - outputStream.writeInt(HoodieLogFormat.currentVersion); + outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION); // Write out a length that does not confirm with the content outputStream.writeLong(100); outputStream.flush(); @@ -1096,7 +1096,7 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback() outputStream.write(HoodieLogFormat.MAGIC); outputStream.writeLong(1000); outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); - outputStream.writeInt(HoodieLogFormat.currentVersion); + outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION); // Write out a length that does not confirm with the content outputStream.writeLong(100); outputStream.flush(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index 26cea6817eef9..29a9134646d30 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -68,7 +68,7 @@ @SuppressWarnings("ResultOfMethodCallIgnored") public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { - private static final transient Logger log = LogManager.getLogger(TestHoodieTableFileSystemView.class); + private static final transient Logger LOG = LogManager.getLogger(TestHoodieTableFileSystemView.class); private static String TEST_WRITE_TOKEN = "1-0-1"; @@ -495,7 +495,7 @@ protected void testViewForFileSlicesWithAsyncCompaction(boolean skipCreatingData roView.getAllDataFiles(partitionPath); fileSliceList = rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList()); - log.info("FILESLICE LIST=" + fileSliceList); + LOG.info("FILESLICE LIST=" + fileSliceList); dataFiles = fileSliceList.stream().map(FileSlice::getDataFile).filter(Option::isPresent).map(Option::get) .collect(Collectors.toList()); assertEquals("Expect only one data-files in latest view as there is only one file-group", 1, dataFiles.size()); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java index adc860dfb09d1..a80629ed9e75c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java @@ -73,7 +73,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { - private static final transient Logger log = LogManager.getLogger(TestIncrementalFSViewSync.class); + private static final transient Logger LOG = LogManager.getLogger(TestIncrementalFSViewSync.class); private static String TEST_WRITE_TOKEN = "1-0-1"; @@ -314,7 +314,7 @@ private void testCleans(SyncableFileSystemView view, List newCleanerInst Assert.assertEquals(newCleanerInstants.size(), cleanedInstants.size()); long initialFileSlices = partitions.stream().mapToLong(p -> view.getAllFileSlices(p).count()).findAny().getAsLong(); long exp = initialFileSlices; - log.info("Initial File Slices :" + exp); + LOG.info("Initial File Slices :" + exp); for (int idx = 0; idx < newCleanerInstants.size(); idx++) { String instant = cleanedInstants.get(idx); try { @@ -331,8 +331,8 @@ private void testCleans(SyncableFileSystemView view, List newCleanerInst Assert.assertEquals(State.COMPLETED, view.getLastInstant().get().getState()); Assert.assertEquals(HoodieTimeline.CLEAN_ACTION, view.getLastInstant().get().getAction()); partitions.forEach(p -> { - log.info("PARTTITION : " + p); - log.info("\tFileSlices :" + view.getAllFileSlices(p).collect(Collectors.toList())); + LOG.info("PARTTITION : " + p); + LOG.info("\tFileSlices :" + view.getAllFileSlices(p).collect(Collectors.toList())); }); partitions.forEach(p -> Assert.assertEquals(fileIdsPerPartition.size(), view.getLatestFileSlices(p).count())); @@ -373,7 +373,7 @@ private void testRestore(SyncableFileSystemView view, List newRestoreIns isDeltaCommit ? initialFileSlices : initialFileSlices - ((idx + 1) * fileIdsPerPartition.size()); view.sync(); Assert.assertTrue(view.getLastInstant().isPresent()); - log.info("Last Instant is :" + view.getLastInstant().get()); + LOG.info("Last Instant is :" + view.getLastInstant().get()); if (isRestore) { Assert.assertEquals(newRestoreInstants.get(idx), view.getLastInstant().get().getTimestamp()); Assert.assertEquals(isRestore ? HoodieTimeline.RESTORE_ACTION : HoodieTimeline.ROLLBACK_ACTION, @@ -610,7 +610,7 @@ private Map> testMultipleWriteSteps(SyncableFileSystemView int multiple = begin; for (int idx = 0; idx < instants.size(); idx++) { String instant = instants.get(idx); - log.info("Adding instant=" + instant); + LOG.info("Adding instant=" + instant); HoodieInstant lastInstant = lastInstants.get(idx); // Add a non-empty ingestion to COW table List filePaths = diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java index 47503194342a2..6600efd035401 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java @@ -56,9 +56,9 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { private static String TEST_WRITE_TOKEN = "1-0-1"; - private static final Map metrics = + private static final Map METRICS = new ImmutableMap.Builder().put("key1", 1.0).put("key2", 3.0).build(); - private Function, Map> metricsCaptureFn = (partitionFileSlice) -> metrics; + private Function, Map> metricsCaptureFn = (partitionFileSlice) -> METRICS; @Before public void init() throws IOException { @@ -249,7 +249,7 @@ private void testFileSliceCompactionOpEquality(FileSlice slice, HoodieCompaction version == COMPACTION_METADATA_VERSION_1 ? paths.get(idx) : new Path(paths.get(idx)).getName(), op.getDeltaFilePaths().get(idx)); }); - Assert.assertEquals("Metrics set", metrics, op.getMetrics()); + Assert.assertEquals("Metrics set", METRICS, op.getMetrics()); } @Override diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java index da2f305cdb3ed..fcc053fc85b43 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java @@ -53,8 +53,8 @@ */ public class FileSystemViewHandler { - private static final ObjectMapper mapper = new ObjectMapper(); - private static final Logger logger = LogManager.getLogger(FileSystemViewHandler.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Logger LOG = LogManager.getLogger(FileSystemViewHandler.class); private final FileSystemViewManager viewManager; private final Javalin app; @@ -88,8 +88,8 @@ private boolean isLocalViewBehind(Context ctx) { String timelineHashFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_HASH, ""); HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline().filterCompletedAndCompactionInstants(); - if (logger.isDebugEnabled()) { - logger.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", TimelineHash=" + timelineHashFromClient + if (LOG.isDebugEnabled()) { + LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", TimelineHash=" + timelineHashFromClient + "], localTimeline=" + localTimeline.getInstants().collect(Collectors.toList())); } @@ -119,7 +119,7 @@ private boolean syncIfLocalViewBehind(Context ctx) { synchronized (view) { if (isLocalViewBehind(ctx)) { HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline(); - logger.warn("Syncing view as client passed last known instant " + lastKnownInstantFromClient + LOG.warn("Syncing view as client passed last known instant " + lastKnownInstantFromClient + " as last known instant but server has the folling timeline :" + localTimeline.getInstants().collect(Collectors.toList())); view.sync(); @@ -134,9 +134,9 @@ private void writeValueAsString(Context ctx, Object obj) throws JsonProcessingEx boolean prettyPrint = ctx.queryParam("pretty") != null ? true : false; long beginJsonTs = System.currentTimeMillis(); String result = - prettyPrint ? mapper.writerWithDefaultPrettyPrinter().writeValueAsString(obj) : mapper.writeValueAsString(obj); + prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) : OBJECT_MAPPER.writeValueAsString(obj); long endJsonTs = System.currentTimeMillis(); - logger.debug("Jsonify TimeTaken=" + (endJsonTs - beginJsonTs)); + LOG.debug("Jsonify TimeTaken=" + (endJsonTs - beginJsonTs)); ctx.result(result); } @@ -347,12 +347,12 @@ public void handle(@NotNull Context context) throws Exception { } } catch (RuntimeException re) { success = false; - logger.error("Got runtime exception servicing request " + context.queryString(), re); + LOG.error("Got runtime exception servicing request " + context.queryString(), re); throw re; } finally { long endTs = System.currentTimeMillis(); long timeTakenMillis = endTs - beginTs; - logger + LOG .info(String.format( "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], " + "Success=%s, Query=%s, Host=%s, synced=%s", diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 6bde10f5ae328..278a35246daeb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -176,8 +176,8 @@ public static class Config { private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic"; private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents"; private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LARGEST; - public static final long defaultMaxEventsFromKafkaSource = 5000000; - public static long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = defaultMaxEventsFromKafkaSource; + public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000; + public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE; } private final HashMap kafkaParams; @@ -234,9 +234,9 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long // Come up with final set of OffsetRanges to read (account for new partitions, limit number of events) long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP, - Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE); + Config.maxEventsFromKafkaSource); maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE || maxEventsToReadFromKafka == Integer.MAX_VALUE) - ? Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE : maxEventsToReadFromKafka; + ? Config.maxEventsFromKafkaSource : maxEventsToReadFromKafka; long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka : sourceLimit; OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index f2aa79465ef7d..c6066212e1708 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -90,7 +90,7 @@ private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSo props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : - String.valueOf(Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE)); + String.valueOf(Config.maxEventsFromKafkaSource)); return props; } @@ -151,7 +151,7 @@ public void testJsonKafkaSourceWithDefaultUpperCap() throws IOException { Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); - Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 500; + Config.maxEventsFromKafkaSource = 500; /* 1. Extract without any checkpoint => get all the data, respecting default upper cap since both sourceLimit and @@ -168,7 +168,7 @@ public void testJsonKafkaSourceWithDefaultUpperCap() throws IOException { assertEquals(1500, fetch2.getBatch().get().count()); //reset the value back since it is a static variable - Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = Config.defaultMaxEventsFromKafkaSource; + Config.maxEventsFromKafkaSource = Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE; } @Test diff --git a/style/checkstyle.xml b/style/checkstyle.xml index ebfb569af0b65..864e63d7781bc 100644 --- a/style/checkstyle.xml +++ b/style/checkstyle.xml @@ -295,5 +295,8 @@ + + + From c33212851e051d970ce85e37c8c088b810fedccd Mon Sep 17 00:00:00 2001 From: lamber-ken Date: Mon, 2 Dec 2019 08:36:35 +0800 Subject: [PATCH 2/4] Add SimplifyBooleanExpression java checkstyle rule --- .../java/org/apache/hudi/common/util/SpillableMapUtils.java | 4 ++-- .../hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java | 4 ++-- style/checkstyle.xml | 3 +++ 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index da9524706ce78..85e0e9ea5f8ca 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -54,12 +54,12 @@ private static FileEntry readInternal(RandomAccessFile file, long valuePosition, byte[] key = new byte[keySize]; file.read(key, 0, keySize); byte[] value = new byte[valueSize]; - if (!(valueSize == valueLength)) { + if (valueSize != valueLength) { throw new HoodieCorruptedDataException("unequal size of payload written to external file, data may be corrupted"); } file.read(value, 0, valueSize); long crcOfReadValue = generateChecksum(value); - if (!(crc == crcOfReadValue)) { + if (crc != crcOfReadValue) { throw new HoodieCorruptedDataException( "checksum of payload written to external disk does not match, " + "data may be corrupted"); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 69c338564e9c4..b186a3ce9e442 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -75,7 +75,7 @@ import java.util.Set; import java.util.stream.Collectors; - +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class TestHoodieRealtimeRecordReader { @@ -505,7 +505,7 @@ public void testSchemaEvolutionAndRollbackBlockInLastLogFile() throws Exception JobConf jobConf = new JobConf(); List fields = schema.getFields(); - assert (firstSchemaFields.containsAll(fields) == false); + assertFalse(firstSchemaFields.containsAll(fields)); // Try to read all the fields passed by the new schema setHiveColumnNameProps(fields, jobConf, true); diff --git a/style/checkstyle.xml b/style/checkstyle.xml index 864e63d7781bc..a5bdb35e46194 100644 --- a/style/checkstyle.xml +++ b/style/checkstyle.xml @@ -298,5 +298,8 @@ + + + From 1da661b539c04a5ce26e8ba576cb05bba81cbd24 Mon Sep 17 00:00:00 2001 From: lamber-ken Date: Wed, 4 Dec 2019 09:20:44 +0800 Subject: [PATCH 3/4] collapse empty tags in scalastyle file --- style/scalastyle-config.xml | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/style/scalastyle-config.xml b/style/scalastyle-config.xml index 388afbfc8b732..c3d551df1943c 100644 --- a/style/scalastyle-config.xml +++ b/style/scalastyle-config.xml @@ -16,15 +16,15 @@ --> Scalastyle standard configuration - + - - - + + + @@ -46,7 +46,7 @@ - + @@ -62,14 +62,14 @@ - - - - - - - - + + + + + + + + @@ -85,8 +85,8 @@ - - + + @@ -108,7 +108,7 @@ - - - + + + From 9a25cc79a0a2db1d60e29c40e660375eaa60201e Mon Sep 17 00:00:00 2001 From: lamber-ken Date: Wed, 4 Dec 2019 15:56:14 +0800 Subject: [PATCH 4/4] trigger rebuild