Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ private void populateTableExecutionContextList(TypedProperties properties, Strin
//copy all the values from config to cfg
String targetBasePath = resetTarget(config, database, currentTable);
Helpers.deepCopyConfigs(config, cfg);
cfg.propsFilePath = configFilePath;
String overriddenTargetBasePath = getStringWithAltKeys(tableProperties, HoodieStreamerConfig.TARGET_BASE_PATH, true);
cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), ""))) {
Expand Down Expand Up @@ -255,6 +256,7 @@ static void deepCopyConfigs(Config globalConfig, HoodieStreamer.Config tableConf
tableConfig.clusterSchedulingWeight = globalConfig.clusterSchedulingWeight;
tableConfig.clusterSchedulingMinShare = globalConfig.clusterSchedulingMinShare;
tableConfig.sparkMaster = globalConfig.sparkMaster;
tableConfig.configs.addAll(globalConfig.configs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
Expand All @@ -48,13 +49,17 @@
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA;
import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_KEY;
import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_RESET_KEY;
import static org.apache.hudi.utilities.streamer.StreamSync.CHECKPOINT_IGNORE_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -164,6 +169,45 @@ void testGetCheckpointToResume(HoodieStreamer.Config cfg, HoodieCommitMetadata c
assertEquals(expectedResumeCheckpoint,resumeCheckpoint);
}

@ParameterizedTest
@MethodSource("getMultiTableStreamerCases")
void testCloneConfigsFromMultiTableStreamer(HoodieMultiTableStreamer.Config cfg) throws IOException {
Configuration configuration = new Configuration();
JavaSparkContext jssc = mock(JavaSparkContext.class);

when(jssc.hadoopConfiguration()).thenReturn(configuration);

HoodieMultiTableStreamer multiTableStreamer = new HoodieMultiTableStreamer(cfg, jssc);
List<TableExecutionContext> tableExecutionContextList = multiTableStreamer.getTableExecutionContexts();
tableExecutionContextList.forEach(it -> {
// make sure that the global properties are also set in each child streamer
assertTrue(it.getConfig().configs.containsAll(cfg.configs));

// make sure that each child streamer obtains the propsFilePath from multiStreamer configs, not the default value
assertNotEquals(HoodieStreamer.Config.DEFAULT_DFS_SOURCE_PROPERTIES, it.getConfig().propsFilePath);
});

verify(jssc).hadoopConfiguration();
}

private static Stream<Arguments> getMultiTableStreamerCases() {
String propFile = "src/test/resources/streamer-config/kafka-source-multi.properties";
return Stream.of(
Arguments.of(generateMultiTableStreamerConfig(propFile, Collections.emptyList())),
Arguments.of(generateMultiTableStreamerConfig(propFile, Collections.singletonList("hoodie.keygen.timebased.output.dateformat=yyyyMMdd")))
);
}

private static HoodieMultiTableStreamer.Config generateMultiTableStreamerConfig(String propsFilePath, List<String> configs) {
HoodieMultiTableStreamer.Config cfg = new HoodieMultiTableStreamer.Config();
cfg.basePathPrefix = "src/test/resources/streamer-config";
cfg.configFolder = "src/test/resources/streamer-config";
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
cfg.tableType = "MERGE_ON_READ";
return cfg;
}

private static Stream<Arguments> getCheckpointToResumeCases() {
return Stream.of(
// Checkpoint has been manually overridden (reset-checkpoint)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
include=base.properties
# schema provider configs
hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/random-value/versions/latest
#Kafka props
bootstrap.servers=localhost:9092
auto.offset.reset=earliest
schema.registry.url=http://localhost:8081

hoodie.streamer.ingestion.tablesToBeIngested=uber_hive_db.uber_hive_dummy_table_1,uber_hive_db.uber_hive_dummy_table_2
hoodie.streamer.ingestion.uber_hive_db.uber_hive_dummy_table_1.configFile=src/test/resources/streamer-config/multi-source-1.properties
hoodie.streamer.ingestion.uber_hive_db.uber_hive_dummy_table_2.configFile=src/test/resources/streamer-config/multi-source-2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
include=base.properties
# Key generator props
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.partitionpath.field=driver
hoodie.streamer.source.kafka.topic=TOPIC_1
# Schema provider props
hoodie.streamer.schemaprovider.source.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/source.avsc
hoodie.streamer.schemaprovider.target.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/target.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
include=base.properties
# Key generator props
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.partitionpath.field=driver
hoodie.streamer.source.kafka.topic=TOPIC_2
# Schema provider props
hoodie.streamer.schemaprovider.source.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/source.avsc
hoodie.streamer.schemaprovider.target.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/target.avsc