Skip to content

Commit

Permalink
[HUDI-159] Redesigning bundles for lighter-weight integrations
Browse files Browse the repository at this point in the history
 - Documented principles applied for redesign at packaging/README.md
 - No longer depends on incl commons-codec, commons-io, commons-pool, commons-dbcp, commons-lang, commons-logging, avro-mapred
 - Introduce new FileIOUtils & added checkstyle rule for illegal import of above
 - Parquet, Avro dependencies moved to provided scope to enable being picked up from Hive/Spark/Presto instead
 - Pickup jackson jars for Hive sync tool from HIVE_HOME & unbundling jackson everywhere
 - Remove hive-jdbc standalone jar from being bundled in Spark/Hive/Utilities bundles
 - 6.5x reduced number of classes across bundles
  • Loading branch information
vinothchandar authored and bvaradar committed Sep 11, 2019
1 parent 0e6f078 commit 7a973a6
Show file tree
Hide file tree
Showing 60 changed files with 678 additions and 1,369 deletions.
6 changes: 0 additions & 6 deletions hudi-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,6 @@
<artifactId>spark-sql_2.11</artifactId>
</dependency>

<!-- Apache Commons -->
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.shell</groupId>
<artifactId>spring-shell</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
Expand All @@ -37,6 +36,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
Expand Down
16 changes: 3 additions & 13 deletions hudi-cli/src/main/java/org/apache/hudi/cli/utils/HiveUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
package org.apache.hudi.cli.utils;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import javax.sql.DataSource;
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.joda.time.DateTime;

Expand All @@ -42,17 +41,8 @@ public class HiveUtil {
private static Connection connection;

private static Connection getConnection(String jdbcUrl, String user, String pass) throws SQLException {
DataSource ds = getDatasource(jdbcUrl, user, pass);
return ds.getConnection();
}

private static DataSource getDatasource(String jdbcUrl, String user, String pass) {
BasicDataSource ds = new BasicDataSource();
ds.setDriverClassName(driverName);
ds.setUrl(jdbcUrl);
ds.setUsername(user);
ds.setPassword(pass);
return ds;
connection = DriverManager.getConnection(jdbcUrl, user, pass);
return connection;
}

public static long countRecords(String jdbcUrl, HoodieTableMetaClient source, String dbName, String user, String pass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import java.io.File;
import java.net.URISyntaxException;
import org.apache.commons.lang.StringUtils;
import org.apache.hudi.HoodieWriteClient;
import org.apache.hudi.cli.commands.SparkMain;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -43,7 +43,7 @@ public static SparkLauncher initLauncher(String propertiesFile) throws URISyntax
SparkLauncher sparkLauncher = new SparkLauncher().setAppResource(currentJar)
.setMainClass(SparkMain.class.getName());

if (StringUtils.isNotEmpty(propertiesFile)) {
if (!StringUtils.isEmpty(propertiesFile)) {
sparkLauncher.setPropertiesFile(propertiesFile);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

package org.apache.hudi.io.compact.strategy;

import com.google.common.annotations.VisibleForTesting;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -45,8 +46,8 @@ public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionS
public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig,
List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
// The earliest partition to compact - current day minus the target partitions limit
String earliestPartitionPathToCompact = dateFormat.format(DateUtils.addDays(new Date(), -1 * writeConfig
.getTargetPartitionsPerDayBasedCompaction()));
String earliestPartitionPathToCompact = dateFormat.format(
getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction()));
// Filter out all partitions greater than earliestPartitionPathToCompact
List<HoodieCompactionOperation> eligibleCompactionOperations = operations.stream()
.collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet().stream()
Expand All @@ -61,13 +62,20 @@ public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeCon
@Override
public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> partitionPaths) {
// The earliest partition to compact - current day minus the target partitions limit
String earliestPartitionPathToCompact = dateFormat.format(DateUtils.addDays(new Date(), -1 * writeConfig
.getTargetPartitionsPerDayBasedCompaction()));
String earliestPartitionPathToCompact = dateFormat.format(
getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction()));
// Get all partitions and sort them
List<String> filteredPartitionPaths = partitionPaths.stream().map(partition -> partition.replace("/", "-"))
.sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/"))
.filter(e -> comparator.compare(earliestPartitionPathToCompact, e) >= 0)
.collect(Collectors.toList());
return filteredPartitionPaths;
}

@VisibleForTesting
public static Date getDateAtOffsetFromToday(int offset) {
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DATE, offset);
return calendar.getTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.codahale.metrics.MetricRegistry;
import com.google.common.io.Closeables;
import java.io.Closeable;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
Expand All @@ -37,9 +36,9 @@ public class Metrics {
private static volatile boolean initialized = false;
private static Metrics metrics = null;
private final MetricRegistry registry;
private MetricsReporter reporter = null;
private MetricsReporter reporter;

private Metrics(HoodieWriteConfig metricConfig) throws ConfigurationException {
private Metrics(HoodieWriteConfig metricConfig) {
registry = new MetricRegistry();

reporter = MetricsReporterFactory.createReporter(metricConfig, registry);
Expand Down Expand Up @@ -72,7 +71,7 @@ public static synchronized void init(HoodieWriteConfig metricConfig) {
}
try {
metrics = new Metrics(metricConfig);
} catch (ConfigurationException e) {
} catch (Exception e) {
throw new HoodieException(e);
}
initialized = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
Expand All @@ -55,6 +54,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ConsistencyGuardConfig;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
Expand Down Expand Up @@ -562,9 +562,8 @@ public void testCommitWritesRelativePaths() throws Exception {
// Read from commit file
String filename = HoodieTestUtils.getCommitFilePath(basePath, commitTime);
FileInputStream inputStream = new FileInputStream(filename);
String everything = IOUtils.toString(inputStream, "UTF-8");
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything.toString(),
HoodieCommitMetadata.class);
String everything = FileIOUtils.readAsUTFString(inputStream);
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything, HoodieCommitMetadata.class);
HashMap<String, String> paths = metadata.getFileIdAndFullPaths(basePath);
inputStream.close();

Expand Down Expand Up @@ -600,7 +599,7 @@ public void testRollingStatsInMetadata() throws Exception {
// Read from commit file
String filename = HoodieTestUtils.getCommitFilePath(basePath, commitTime);
FileInputStream inputStream = new FileInputStream(filename);
String everything = IOUtils.toString(inputStream, "UTF-8");
String everything = FileIOUtils.readAsUTFString(inputStream);
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything.toString(),
HoodieCommitMetadata.class);
HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromJsonString(metadata.getExtraMetadata()
Expand Down Expand Up @@ -629,7 +628,7 @@ public void testRollingStatsInMetadata() throws Exception {
// Read from commit file
filename = HoodieTestUtils.getCommitFilePath(basePath, commitTime);
inputStream = new FileInputStream(filename);
everything = IOUtils.toString(inputStream, "UTF-8");
everything = FileIOUtils.readAsUTFString(inputStream);
metadata = HoodieCommitMetadata.fromJsonString(everything.toString(), HoodieCommitMetadata.class);
rollingStatMetadata = HoodieCommitMetadata.fromJsonString(metadata.getExtraMetadata()
.get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY), HoodieRollingStatMetadata.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -32,11 +31,11 @@
import java.util.zip.InflaterInputStream;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.io.IOUtils;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.MercifulJsonConverter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;

/**
Expand Down Expand Up @@ -132,10 +131,9 @@ private byte[] compressData(String jsonData) throws IOException {


private String unCompressData(byte[] data) throws IOException {
InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data));
StringWriter sw = new StringWriter(dataSize);
IOUtils.copy(iis, sw);
return sw.toString();
try (InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data))) {
return FileIOUtils.readAsUTFString(iis, dataSize);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
Expand Down Expand Up @@ -82,7 +82,7 @@ public void testRecordReading() throws Exception {
final int numRecords = 128;
final List<HoodieRecord> hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, numRecords);
final BoundedInMemoryQueue<HoodieRecord, HoodieInsertValueGenResult<HoodieRecord>> queue =
new BoundedInMemoryQueue(FileUtils.ONE_KB, getTransformFunction(HoodieTestDataGenerator.avroSchema));
new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.avroSchema));
// Produce
Future<Boolean> resFuture =
executorService.submit(() -> {
Expand Down Expand Up @@ -122,7 +122,7 @@ public void testCompositeProducerRecordReading() throws Exception {
final List<List<HoodieRecord>> recs = new ArrayList<>();

final BoundedInMemoryQueue<HoodieRecord, HoodieInsertValueGenResult<HoodieRecord>> queue =
new BoundedInMemoryQueue(FileUtils.ONE_KB, getTransformFunction(HoodieTestDataGenerator.avroSchema));
new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.avroSchema));

// Record Key to <Producer Index, Rec Index within a producer>
Map<String, Tuple2<Integer, Integer>> keyToProducerAndIndexMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.WriteStatus;
Expand All @@ -39,6 +38,7 @@
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.HoodieCreateHandle;
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testSchemaEvolutionOnUpdate() throws Exception {

private HoodieWriteConfig makeHoodieClientConfig(String schema) throws Exception {
// Prepare the AvroParquetIO
String schemaStr = IOUtils.toString(getClass().getResourceAsStream(schema), "UTF-8");
String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream(schema));
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.BloomFilter;
Expand All @@ -48,6 +47,7 @@
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
Expand Down Expand Up @@ -103,7 +103,7 @@ public void init() throws IOException {
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
// We have some records to be tagged (two different partitions)
schemaStr = IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8");
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.TestRawTripPayload;
Expand All @@ -44,6 +43,7 @@
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieAvroUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -78,7 +78,7 @@ public void init() throws IOException {
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
// We have some records to be tagged (two different partitions)
schemaStr = IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8");
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.common.model.HoodieDataFile;
import org.apache.hudi.common.model.HoodieLogFile;
Expand Down Expand Up @@ -161,11 +160,12 @@ public void testBoundedPartitionAwareCompactionSimple() {
SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd");
Date today = new Date();
String currentDay = format.format(today);
String currentDayMinus1 = format.format(DateUtils.addDays(today, -1));
String currentDayMinus2 = format.format(DateUtils.addDays(today, -2));
String currentDayMinus3 = format.format(DateUtils.addDays(today, -3));
String currentDayPlus1 = format.format(DateUtils.addDays(today, 1));
String currentDayPlus5 = format.format(DateUtils.addDays(today, 5));

String currentDayMinus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(-1));
String currentDayMinus2 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(-2));
String currentDayMinus3 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(-3));
String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1));
String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5));

Map<Long, String> keyToPartitionMap = new ImmutableMap.Builder()
.put(120 * MB, currentDay)
Expand Down Expand Up @@ -208,11 +208,12 @@ public void testUnboundedPartitionAwareCompactionSimple() {
SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd");
Date today = new Date();
String currentDay = format.format(today);
String currentDayMinus1 = format.format(DateUtils.addDays(today, -1));
String currentDayMinus2 = format.format(DateUtils.addDays(today, -2));
String currentDayMinus3 = format.format(DateUtils.addDays(today, -3));
String currentDayPlus1 = format.format(DateUtils.addDays(today, 1));
String currentDayPlus5 = format.format(DateUtils.addDays(today, 5));

String currentDayMinus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(-1));
String currentDayMinus2 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(-2));
String currentDayMinus3 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(-3));
String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1));
String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5));

Map<Long, String> keyToPartitionMap = new ImmutableMap.Builder()
.put(120 * MB, currentDay)
Expand Down
Loading

0 comments on commit 7a973a6

Please sign in to comment.