From 9b51c1b6be541afd6eaf38055d0a6e1d5a36bcd1 Mon Sep 17 00:00:00 2001 From: Stas Panasiuk Date: Fri, 4 Aug 2023 15:49:01 -0400 Subject: [PATCH 1/3] [CYB-170] Added TableApi indexer table mapping validation --- .../hive/tableapi/TableApiAbstractJob.java | 563 ++++++++++-------- 1 file changed, 303 insertions(+), 260 deletions(-) diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java index c9e8116d5..89dbe1d39 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java +++ b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java @@ -9,20 +9,6 @@ import com.cloudera.cyber.scoring.ScoredMessage; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.Streams; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.FormatDescriptor; -import org.apache.flink.table.api.Schema; -import org.apache.flink.table.api.SqlDialect; -import org.apache.flink.table.api.TableDescriptor; -import org.apache.flink.table.api.bridge.java.StreamStatementSet; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.types.Row; -import org.springframework.util.StringUtils; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -33,263 +19,320 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.FormatDescriptor; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.TableDescriptor; +import org.apache.flink.table.api.bridge.java.StreamStatementSet; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; public abstract class TableApiAbstractJob { - private static final String TABLES_INIT_FILE_PARAM = "flink.tables-init-file"; - private static final String MAPPING_FILE_PARAM = "flink.mapping-file"; - protected static final String KAFKA_TABLE = "KafkaTempView"; - - protected static final String BASE_COLUMN_MAPPING_JSON = "base-column-mapping.json"; - - private final List defaultMappingList; - private final List defaultColumnList; - protected final DataStream source; - protected final StreamExecutionEnvironment env; - protected final ParameterTool params; - protected final String connectorName; - - public TableApiAbstractJob(ParameterTool params, StreamExecutionEnvironment env, DataStream source, - String connectorName, String baseTableJson) throws IOException { - this.params = params; - this.env = env; - this.source = source; - this.connectorName = connectorName; - defaultMappingList = Utils.readResourceFile(BASE_COLUMN_MAPPING_JSON, getClass(), - new TypeReference>() { - }); - defaultColumnList = Utils.readResourceFile(baseTableJson, getClass(), - new TypeReference>() { - }); - } - - public StreamExecutionEnvironment startJob() throws Exception { - System.out.println("Creating Table env..."); - final StreamTableEnvironment tableEnv = getTableEnvironment(); - - System.out.println("Configuring Table env..."); - configure(tableEnv); - - System.out.printf("Registering %s catalog... %s%n", connectorName, String.join(", ", tableEnv.listCatalogs())); - registerCatalog(tableEnv); - - System.out.println("Getting tables config..."); - final Map> tablesConfig = getTablesConfig(); - - System.out.println("Creating tables..."); - final Set tableList = getExistingTableList(tableEnv); - setConnectorDialect(tableEnv); - - tablesConfig.forEach( - (tableName, columnList) -> createTableIfNotExists(tableEnv, tableList, tableName, columnList)); - tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); - - System.out.println("Getting topic mapping..."); - final Map topicMapping = getTopicMapping(); - - System.out.println("Creating Kafka table..."); - createKafkaTable(tableEnv); - - System.out.printf("Executing %s insert...%n", connectorName); - executeInsert(tableEnv, topicMapping, tablesConfig); - - System.out.println("TableApiJob is done!"); - return jobReturnValue(); - } - - protected HashSet getExistingTableList(StreamTableEnvironment tableEnv) { - return new HashSet<>(Arrays.asList(tableEnv.listTables())); - } - - protected StreamExecutionEnvironment jobReturnValue() { - return env; - } - - protected void executeInsert(StreamTableEnvironment tableEnv, Map topicMapping, Map> tablesConfig) { - System.out.printf("Filling Insert statement list...%s%n", Arrays.toString(tableEnv.listTables())); - final StreamStatementSet insertStatementSet = tableEnv.createStatementSet(); - - topicMapping.forEach((topic, mappingDto) -> { - final String insertSql = buildInsertSql(topic, mappingDto); - try { - insertStatementSet.addInsertSql(insertSql); - System.out.printf("Insert SQL added to the queue for the table: %s%nSQL: %s%n", mappingDto.getTableName(), insertSql); - } catch (Exception e) { - System.err.printf("Error adding insert to the statement set: %s%n", insertSql); - throw e; - } + private static final String TABLES_INIT_FILE_PARAM = "flink.tables-init-file"; + private static final String MAPPING_FILE_PARAM = "flink.mapping-file"; + protected static final String KAFKA_TABLE = "KafkaTempView"; + + protected static final String BASE_COLUMN_MAPPING_JSON = "base-column-mapping.json"; + + private final List defaultMappingList; + private final List defaultColumnList; + protected final DataStream source; + protected final StreamExecutionEnvironment env; + protected final ParameterTool params; + protected final String connectorName; + + public TableApiAbstractJob(ParameterTool params, StreamExecutionEnvironment env, DataStream source, + String connectorName, String baseTableJson) throws IOException { + this.params = params; + this.env = env; + this.source = source; + this.connectorName = connectorName; + defaultMappingList = Utils.readResourceFile(BASE_COLUMN_MAPPING_JSON, getClass(), + new TypeReference>() { }); - - System.out.println("Executing Insert statement list..."); - insertStatementSet.execute(); - } - - protected abstract void registerCatalog(StreamTableEnvironment tableEnv); - - protected void setConnectorDialect(StreamTableEnvironment tableEnv) { - //to be overwritten if needed - } - - protected void createTableIfNotExists(StreamTableEnvironment tableEnv, Set tableList, String tableName, List columnList) { - if (tableList.contains(tableName)) { - System.out.printf("%s table [%s] already exists. Skipping its creation.%n", connectorName, tableName); - } else { - System.out.printf("Creating %s table %s...%n", connectorName, tableName); - final Schema schema = FlinkSchemaUtil.buildTableSchema(columnList); - final TableDescriptor tableDescriptor = buildTableDescriptor(schema); - try { - System.out.printf("Creating %s table %s: %s%n", connectorName, tableName, tableDescriptor); - tableEnv.createTable(tableName, tableDescriptor); - } catch (Exception e) { - System.err.printf("Error creating the %s: %s%n", connectorName, tableDescriptor); - throw e; - } - System.out.printf("%s table created: %s%n", connectorName, tableName); - } - } - - private TableDescriptor buildTableDescriptor(Schema schema) { - return fillTableOptions(TableDescriptor - .forConnector(getTableConnector()) - .schema(schema) - .partitionedBy("dt", "hr") - .format(getFormatDescriptor())) - .build(); - } - - protected abstract String getTableConnector(); - - protected abstract FormatDescriptor getFormatDescriptor(); - - - protected TableDescriptor.Builder fillTableOptions(TableDescriptor.Builder builder) { - return builder - .option("partition.time-extractor.timestamp-pattern", "$dt $hr:00:00") - .option("sink.partition-commit.trigger", "process-time") - .option("sink.partition-commit.delay", "1 h") - .option("sink.partition-commit.policy.kind", "metastore,success-file") - .option("hive.storage.file-format", "orc"); - } - - private StreamTableEnvironment getTableEnvironment() { - return StreamTableEnvironment.create(env); - } - - private void createKafkaTable(StreamTableEnvironment tableEnv) { - final SingleOutputStreamOperator newSource = source.map(ScoredMessage::toRow, - ScoredMessage.FLINK_TYPE_INFO); - tableEnv.createTemporaryView(KAFKA_TABLE, newSource); - tableEnv.executeSql("DESCRIBE " + KAFKA_TABLE).print(); - } - - private void configure(StreamTableEnvironment tableEnv) { - final HashMap conf = new HashMap<>(); - conf.put("pipeline.name", params.get("flink.job.name", String.format("Indexing - %s TableApi", connectorName))); - tableEnv.getConfig().addConfiguration(Configuration.fromMap(conf)); - tableEnv.createTemporarySystemFunction("filterMap", FilterMapFunction.class); - } - - protected final String buildInsertSql(String topic, MappingDto mappingDto) { - return String.join("\n", getInsertSqlPrefix() + " " + mappingDto.getTableName() + "(" + getInsertColumns(mappingDto) + ") " + getInsertSqlSuffix(), - " SELECT " + getFromColumns(mappingDto), - " from " + KAFKA_TABLE, - String.format(" where `source`='%s'", topic)); - } - - protected String getInsertSqlPrefix() { - return "INSERT INTO "; - } - - protected String getInsertSqlSuffix() { - return ""; - } - - protected String getInsertColumns(MappingDto mappingDto) { - return mappingDto.getColumnMapping().stream() - .map(MappingColumnDto::getName) - .collect(Collectors.joining(", ", " ", " ")); - } - - private String getFromColumns(MappingDto mappingDto) { - return mappingDto.getColumnMapping().stream() - .map(mappingColumnDto -> { - final String kafkaName = mappingColumnDto.getKafkaName(); - final String path = mappingColumnDto.getPath(); - - String fullPath; - if (path.startsWith("..")) { - fullPath = path.substring(2); - } else { - fullPath = String.format("message.%s", path); - } - if (StringUtils.hasText(fullPath)) { - fullPath = String.join(".", fullPath.split("\\.")); - } - - fullPath = fullPath + kafkaName; - - final String transformation = mappingColumnDto.getTransformation(); - return StringUtils.hasText(transformation) - ? String.format(transformation, "(" + fullPath + ")", mappingDto.getIgnoreFields().stream() - .collect(Collectors.joining("','", "'", "'"))) - : fullPath; - }) - .collect(Collectors.joining(", ", " ", " ")); - } - - protected Map getTopicMapping() throws IOException { - TypeReference> typeRef - = new TypeReference>() { - }; - final HashMap columnMappingMap = Utils.readFile(params.getRequired(MAPPING_FILE_PARAM), typeRef); - - //adding the default column mappings to each topic - columnMappingMap.values().forEach(mapping -> { - final List customMappings = Optional.ofNullable(mapping.getColumnMapping()) - .orElse(Collections.emptyList()); - final Collection columnMapping = Streams.concat( - customMappings.stream(), - defaultMappingList.stream()) - .collect(Collectors.toMap(MappingColumnDto::getName, Function.identity(), (f, s) -> f)).values(); - - mapping.setColumnMapping(new ArrayList<>(columnMapping)); + defaultColumnList = Utils.readResourceFile(baseTableJson, getClass(), + new TypeReference>() { }); - return columnMappingMap; + } + + public StreamExecutionEnvironment startJob() throws Exception { + System.out.println("Creating Table env..."); + final StreamTableEnvironment tableEnv = getTableEnvironment(); + + System.out.println("Configuring Table env..."); + configure(tableEnv); + + System.out.printf("Registering %s catalog... %s%n", connectorName, String.join(", ", tableEnv.listCatalogs())); + registerCatalog(tableEnv); + + System.out.println("Getting tables config..."); + final Map> tablesConfig = getTablesConfig(); + + System.out.println("Creating tables..."); + final Set tableList = getExistingTableList(tableEnv); + setConnectorDialect(tableEnv); + + tablesConfig.forEach( + (tableName, columnList) -> createTableIfNotExists(tableEnv, tableList, tableName, columnList)); + tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); + + System.out.println("Getting topic mapping..."); + final Map topicMapping = getTopicMapping(); + + System.out.println("Validating output tables mappings..."); + validateMappings(tablesConfig, topicMapping); + + System.out.println("Creating Kafka table..."); + createKafkaTable(tableEnv); + + System.out.printf("Executing %s insert...%n", connectorName); + executeInsert(tableEnv, topicMapping, tablesConfig); + + System.out.println("TableApiJob is done!"); + return jobReturnValue(); + } + + private void validateMappings(Map> tablesConfig, + Map topicMapping) { + for (Entry entry : topicMapping.entrySet()) { + final String source = entry.getKey(); + final MappingDto mappingDto = entry.getValue(); + final String tableName = mappingDto.getTableName(); + + if (!StringUtils.hasText(tableName)) { + throw new RuntimeException(String.format("Provided empty table name for the [%s] source!", source)); + } + + final List tableColumnList = tablesConfig.get(tableName); + if (CollectionUtils.isEmpty(tableColumnList)) { + throw new RuntimeException( + String.format("Configuration for table with name [%s] has no columns specified", tableName)); + } + + final Set tableColumns = tableColumnList.stream() + .map(TableColumnDto::getName) + .collect(Collectors.toSet()); + + final List invalidColumnList = mappingDto.getColumnMapping().stream() + .map(MappingColumnDto::getName) + .filter(columnName -> !StringUtils.hasText(columnName) || !tableColumns.contains(columnName)) + .collect(Collectors.toList()); + if (!invalidColumnList.isEmpty()) { + throw new RuntimeException( + String.format("Found invalid column mappings for source [%s]. Those columns are either not present in the table config or have empty names: %s", source, + invalidColumnList)); + } } - - protected Map> getTablesConfig() throws IOException { - TypeReference>> typeRef - = new TypeReference>>() { - }; - final String filePath = params.get(TABLES_INIT_FILE_PARAM); - if (filePath == null) { - return Collections.emptyMap(); - } - final HashMap> columnMap = Utils.readFile(filePath, typeRef); - final List partitionColumns = Arrays.asList(TableColumnDto.builder() - .name("dt") - .type("string") - .build(), TableColumnDto.builder() - .name("hr") - .type("string") - .build()); - //adding the default columns to each table - columnMap.forEach((tableName, columnList) -> { - final List customColumns = Optional.ofNullable(columnList) - .orElse(Collections.emptyList()); - final Map combinedColumnMap = Streams.concat(customColumns.stream(), defaultColumnList.stream(), partitionColumns.stream()) - .collect(Collectors.toMap(TableColumnDto::getName, Function.identity(), (f, s) -> f, LinkedHashMap::new)); - //partition columns should be placed last - partitionColumns.forEach(col -> combinedColumnMap.put(col.getName(), col)); - - columnMap.put(tableName, new ArrayList<>(combinedColumnMap.values())); - }); - return columnMap; + } + + protected HashSet getExistingTableList(StreamTableEnvironment tableEnv) { + return new HashSet<>(Arrays.asList(tableEnv.listTables())); + } + + protected StreamExecutionEnvironment jobReturnValue() { + return env; + } + + protected void executeInsert(StreamTableEnvironment tableEnv, Map topicMapping, + Map> tablesConfig) { + System.out.printf("Filling Insert statement list...%s%n", Arrays.toString(tableEnv.listTables())); + final StreamStatementSet insertStatementSet = tableEnv.createStatementSet(); + + topicMapping.forEach((topic, mappingDto) -> { + final String insertSql = buildInsertSql(topic, mappingDto); + try { + insertStatementSet.addInsertSql(insertSql); + System.out.printf("Insert SQL added to the queue for the table: %s%nSQL: %s%n", mappingDto.getTableName(), + insertSql); + } catch (Exception e) { + System.err.printf("Error adding insert to the statement set: %s%n", insertSql); + throw e; + } + }); + + System.out.println("Executing Insert statement list..."); + insertStatementSet.execute(); + } + + protected abstract void registerCatalog(StreamTableEnvironment tableEnv); + + protected void setConnectorDialect(StreamTableEnvironment tableEnv) { + //to be overwritten if needed + } + + protected void createTableIfNotExists(StreamTableEnvironment tableEnv, Set tableList, String tableName, + List columnList) { + if (tableList.contains(tableName)) { + System.out.printf("%s table [%s] already exists. Skipping its creation.%n", connectorName, tableName); + } else { + System.out.printf("Creating %s table %s...%n", connectorName, tableName); + final Schema schema = FlinkSchemaUtil.buildTableSchema(columnList); + final TableDescriptor tableDescriptor = buildTableDescriptor(schema); + try { + System.out.printf("Creating %s table %s: %s%n", connectorName, tableName, tableDescriptor); + tableEnv.createTable(tableName, tableDescriptor); + } catch (Exception e) { + System.err.printf("Error creating the %s: %s%n", connectorName, tableDescriptor); + throw e; + } + System.out.printf("%s table created: %s%n", connectorName, tableName); + } + } + + private TableDescriptor buildTableDescriptor(Schema schema) { + return fillTableOptions(TableDescriptor + .forConnector(getTableConnector()) + .schema(schema) + .partitionedBy("dt", "hr") + .format(getFormatDescriptor())) + .build(); + } + + protected abstract String getTableConnector(); + + protected abstract FormatDescriptor getFormatDescriptor(); + + + protected TableDescriptor.Builder fillTableOptions(TableDescriptor.Builder builder) { + return builder + .option("partition.time-extractor.timestamp-pattern", "$dt $hr:00:00") + .option("sink.partition-commit.trigger", "process-time") + .option("sink.partition-commit.delay", "1 h") + .option("sink.partition-commit.policy.kind", "metastore,success-file"); + } + + private StreamTableEnvironment getTableEnvironment() { + return StreamTableEnvironment.create(env); + } + + private void createKafkaTable(StreamTableEnvironment tableEnv) { + final SingleOutputStreamOperator newSource = source.map(ScoredMessage::toRow, + ScoredMessage.FLINK_TYPE_INFO); + tableEnv.createTemporaryView(KAFKA_TABLE, newSource); + tableEnv.executeSql("DESCRIBE " + KAFKA_TABLE).print(); + } + + private void configure(StreamTableEnvironment tableEnv) { + final HashMap conf = new HashMap<>(); + conf.put("pipeline.name", params.get("flink.job.name", String.format("Indexing - %s TableApi", connectorName))); + tableEnv.getConfig().addConfiguration(Configuration.fromMap(conf)); + tableEnv.createTemporarySystemFunction("filterMap", FilterMapFunction.class); + } + + protected final String buildInsertSql(String topic, MappingDto mappingDto) { + return String.join("\n", + getInsertSqlPrefix() + " " + mappingDto.getTableName() + "(" + getInsertColumns(mappingDto) + ") " + + getInsertSqlSuffix(), + " SELECT " + getFromColumns(mappingDto), + " from " + KAFKA_TABLE, + String.format(" where `source`='%s'", topic)); + } + + protected String getInsertSqlPrefix() { + return "INSERT INTO "; + } + + protected String getInsertSqlSuffix() { + return ""; + } + + protected String getInsertColumns(MappingDto mappingDto) { + return mappingDto.getColumnMapping().stream() + .map(MappingColumnDto::getName) + .collect(Collectors.joining(", ", " ", " ")); + } + + private String getFromColumns(MappingDto mappingDto) { + return mappingDto.getColumnMapping().stream() + .map(mappingColumnDto -> { + final String kafkaName = mappingColumnDto.getKafkaName(); + final String path = mappingColumnDto.getPath(); + + String fullPath; + if (path.startsWith("..")) { + fullPath = path.substring(2); + } else { + fullPath = String.format("message.%s", path); + } + if (StringUtils.hasText(fullPath)) { + fullPath = String.join(".", fullPath.split("\\.")); + } + + fullPath = fullPath + kafkaName; + + final String transformation = mappingColumnDto.getTransformation(); + return StringUtils.hasText(transformation) + ? String.format(transformation, "(" + fullPath + ")", mappingDto.getIgnoreFields().stream() + .collect(Collectors.joining("','", "'", "'"))) + : fullPath; + }) + .collect(Collectors.joining(", ", " ", " ")); + } + + protected Map getTopicMapping() throws IOException { + TypeReference> typeRef + = new TypeReference>() { + }; + final HashMap columnMappingMap = Utils.readFile(params.getRequired(MAPPING_FILE_PARAM), + typeRef); + + //adding the default column mappings to each topic + columnMappingMap.values().forEach(mapping -> { + final List customMappings = Optional.ofNullable(mapping.getColumnMapping()) + .orElse(Collections.emptyList()); + final Collection columnMapping = Streams.concat( + customMappings.stream(), + defaultMappingList.stream()) + .collect(Collectors.toMap(MappingColumnDto::getName, Function.identity(), (f, s) -> f)).values(); + + mapping.setColumnMapping(new ArrayList<>(columnMapping)); + }); + return columnMappingMap; + } + + protected Map> getTablesConfig() throws IOException { + TypeReference>> typeRef + = new TypeReference>>() { + }; + final String filePath = params.get(TABLES_INIT_FILE_PARAM); + if (filePath == null) { + return Collections.emptyMap(); } + final HashMap> columnMap = Utils.readFile(filePath, typeRef); + final List partitionColumns = Arrays.asList(TableColumnDto.builder() + .name("dt") + .type("string") + .build(), TableColumnDto.builder() + .name("hr") + .type("string") + .build()); + //adding the default columns to each table + columnMap.forEach((tableName, columnList) -> { + final List customColumns = Optional.ofNullable(columnList) + .orElse(Collections.emptyList()); + final Map combinedColumnMap = Streams.concat(customColumns.stream(), + defaultColumnList.stream(), partitionColumns.stream()) + .collect(Collectors.toMap(TableColumnDto::getName, Function.identity(), (f, s) -> f, LinkedHashMap::new)); + //partition columns should be placed last + partitionColumns.forEach(col -> combinedColumnMap.put(col.getName(), col)); + + columnMap.put(tableName, new ArrayList<>(combinedColumnMap.values())); + }); + return columnMap; + } } From 5ee7cc0a59883dc46608b87e29f7e81a00426658 Mon Sep 17 00:00:00 2001 From: Stas Panasiuk Date: Mon, 7 Aug 2023 23:25:11 -0400 Subject: [PATCH 2/3] [CYB-170] Updated TableApi indexer mapping validation logic to check the existing table as well as table config --- .../hive/tableapi/TableApiAbstractJob.java | 79 +++++++++++++------ .../hive/tableapi/impl/TableApiKafkaJob.java | 17 ++-- .../indexing/hive/util/FlinkSchemaUtil.java | 15 ++-- 3 files changed, 69 insertions(+), 42 deletions(-) diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java index 89dbe1d39..a5ed9f89d 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java +++ b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java @@ -35,8 +35,9 @@ import org.apache.flink.table.api.TableDescriptor; import org.apache.flink.table.api.bridge.java.StreamStatementSet; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.types.Row; -import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -45,6 +46,8 @@ public abstract class TableApiAbstractJob { private static final String TABLES_INIT_FILE_PARAM = "flink.tables-init-file"; private static final String MAPPING_FILE_PARAM = "flink.mapping-file"; protected static final String KAFKA_TABLE = "KafkaTempView"; + protected static final String TEMP_INPUT_TABLE = "TEMP_TABLE_API_INPUT_TABLE"; + protected static final String TEMP_OUTPUT_TABLE = "TEMP_TABLE_API_OUTPUT_TABLE"; protected static final String BASE_COLUMN_MAPPING_JSON = "base-column-mapping.json"; @@ -83,18 +86,17 @@ public StreamExecutionEnvironment startJob() throws Exception { final Map> tablesConfig = getTablesConfig(); System.out.println("Creating tables..."); - final Set tableList = getExistingTableList(tableEnv); setConnectorDialect(tableEnv); - tablesConfig.forEach( - (tableName, columnList) -> createTableIfNotExists(tableEnv, tableList, tableName, columnList)); - tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); + final Map tableSchemaMap = createTables(tableEnv, tablesConfig); System.out.println("Getting topic mapping..."); final Map topicMapping = getTopicMapping(); System.out.println("Validating output tables mappings..."); - validateMappings(tablesConfig, topicMapping); + validateMappings(tableSchemaMap, topicMapping); + + tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); System.out.println("Creating Kafka table..."); createKafkaTable(tableEnv); @@ -106,7 +108,23 @@ public StreamExecutionEnvironment startJob() throws Exception { return jobReturnValue(); } - private void validateMappings(Map> tablesConfig, + /** + * Creates tables in the tableEnv based on the tablesConfig. If table already exists, its schema is fetched. + * + * @param tableEnv is the Flink environment in which the tables are going to be created. + * @param tablesConfig config map with the table name as a key and column map as a value. + * @return Map with table name as a key and table schema as a value. + */ + private Map createTables(StreamTableEnvironment tableEnv, + Map> tablesConfig) { + final Set tableList = getExistingTableList(tableEnv); + + return tablesConfig.entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, + entry -> createTableIfNotExists(tableEnv, tableList, entry.getKey(), entry.getValue()))); + } + + private void validateMappings(Map tableSchemaMap, Map topicMapping) { for (Entry entry : topicMapping.entrySet()) { final String source = entry.getKey(); @@ -117,14 +135,13 @@ private void validateMappings(Map> tablesConfig, throw new RuntimeException(String.format("Provided empty table name for the [%s] source!", source)); } - final List tableColumnList = tablesConfig.get(tableName); - if (CollectionUtils.isEmpty(tableColumnList)) { - throw new RuntimeException( - String.format("Configuration for table with name [%s] has no columns specified", tableName)); + final ResolvedSchema schema = tableSchemaMap.get(tableName); + if (schema == null) { + throw new RuntimeException(String.format("Table [%s] is not found!", tableName)); } - final Set tableColumns = tableColumnList.stream() - .map(TableColumnDto::getName) + final Set tableColumns = schema.getColumns().stream() + .map(Column::getName) .collect(Collectors.toSet()); final List invalidColumnList = mappingDto.getColumnMapping().stream() @@ -174,23 +191,27 @@ protected void setConnectorDialect(StreamTableEnvironment tableEnv) { //to be overwritten if needed } - protected void createTableIfNotExists(StreamTableEnvironment tableEnv, Set tableList, String tableName, + protected ResolvedSchema createTableIfNotExists(StreamTableEnvironment tableEnv, Set tableList, + String tableName, List columnList) { if (tableList.contains(tableName)) { System.out.printf("%s table [%s] already exists. Skipping its creation.%n", connectorName, tableName); - } else { - System.out.printf("Creating %s table %s...%n", connectorName, tableName); - final Schema schema = FlinkSchemaUtil.buildTableSchema(columnList); - final TableDescriptor tableDescriptor = buildTableDescriptor(schema); - try { - System.out.printf("Creating %s table %s: %s%n", connectorName, tableName, tableDescriptor); - tableEnv.createTable(tableName, tableDescriptor); - } catch (Exception e) { - System.err.printf("Error creating the %s: %s%n", connectorName, tableDescriptor); - throw e; - } - System.out.printf("%s table created: %s%n", connectorName, tableName); + return tableEnv.from(tableName).getResolvedSchema(); + } + + System.out.printf("Creating %s table %s...%n", connectorName, tableName); + final ResolvedSchema resolvedSchema = FlinkSchemaUtil.getResolvedSchema(columnList); + final Schema schema = FlinkSchemaUtil.buildSchema(resolvedSchema); + final TableDescriptor tableDescriptor = buildTableDescriptor(schema); + try { + System.out.printf("Creating %s table %s: %s%n", connectorName, tableName, tableDescriptor); + tableEnv.createTable(tableName, tableDescriptor); + } catch (Exception e) { + System.err.printf("Error creating the %s: %s%n", connectorName, tableDescriptor); + throw e; } + System.out.printf("%s table created: %s%n", connectorName, tableName); + return resolvedSchema; } private TableDescriptor buildTableDescriptor(Schema schema) { @@ -304,6 +325,12 @@ protected Map getTopicMapping() throws IOException { return columnMappingMap; } + /** + * Method provides table schemas from the config file. + * + * @return Map with table name as a key, and a list of columns as a value. + * @throws IOException in case it can't read the config file + */ protected Map> getTablesConfig() throws IOException { TypeReference>> typeRef = new TypeReference>>() { diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiKafkaJob.java b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiKafkaJob.java index c082aadc7..6a46427e6 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiKafkaJob.java +++ b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiKafkaJob.java @@ -5,7 +5,13 @@ import com.cloudera.cyber.indexing.TableColumnDto; import com.cloudera.cyber.indexing.hive.tableapi.TableApiAbstractJob; import com.cloudera.cyber.indexing.hive.util.AvroSchemaUtil; +import com.cloudera.cyber.indexing.hive.util.FlinkSchemaUtil; import com.cloudera.cyber.scoring.ScoredMessage; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -16,12 +22,7 @@ import org.apache.flink.table.api.FormatDescriptor; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; - -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import org.apache.flink.table.catalog.ResolvedSchema; public class TableApiKafkaJob extends TableApiAbstractJob { @@ -32,8 +33,8 @@ public TableApiKafkaJob(ParameterTool params, StreamExecutionEnvironment env, Da } @Override - protected void createTableIfNotExists(StreamTableEnvironment tableEnv, Set tableList, String tableName, List columnList) { - + protected ResolvedSchema createTableIfNotExists(StreamTableEnvironment tableEnv, Set tableList, String tableName, List columnList) { + return FlinkSchemaUtil.getResolvedSchema(columnList); } @Override diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/FlinkSchemaUtil.java b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/FlinkSchemaUtil.java index 9d7e883b9..58c3856e6 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/FlinkSchemaUtil.java +++ b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/FlinkSchemaUtil.java @@ -1,23 +1,22 @@ package com.cloudera.cyber.indexing.hive.util; import com.cloudera.cyber.indexing.TableColumnDto; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.types.DataType; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - public class FlinkSchemaUtil { - public static Schema buildTableSchema(List columnList) { + public static Schema buildSchema(ResolvedSchema resolvedSchema) { return Schema.newBuilder() - .fromResolvedSchema(getResolvedSchema(columnList)) - .build(); + .fromResolvedSchema(resolvedSchema) + .build(); } public static ResolvedSchema getResolvedSchema(List columnList) { From 5e50b7e2574d9146d54b0ad8aed4e0b6082083f3 Mon Sep 17 00:00:00 2001 From: Stas Panasiuk Date: Mon, 14 Aug 2023 01:22:29 -0400 Subject: [PATCH 3/3] [CYB-170] Updated TableApi indexer mapping validation logic + unit test for validation --- .../hive/tableapi/TableApiAbstractJob.java | 94 ++++++++--- .../hive/tableapi/impl/TableApiKafkaJob.java | 152 +++++++++--------- .../tableapi/TableApiAbstractJobTest.java | 84 ++++++++++ 3 files changed, 237 insertions(+), 93 deletions(-) create mode 100644 flink-cyber/flink-indexing/flink-indexing-hive/src/test/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJobTest.java diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java index a5ed9f89d..2adf5f7d3 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java +++ b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java @@ -37,7 +37,9 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; +import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -46,8 +48,6 @@ public abstract class TableApiAbstractJob { private static final String TABLES_INIT_FILE_PARAM = "flink.tables-init-file"; private static final String MAPPING_FILE_PARAM = "flink.mapping-file"; protected static final String KAFKA_TABLE = "KafkaTempView"; - protected static final String TEMP_INPUT_TABLE = "TEMP_TABLE_API_INPUT_TABLE"; - protected static final String TEMP_OUTPUT_TABLE = "TEMP_TABLE_API_OUTPUT_TABLE"; protected static final String BASE_COLUMN_MAPPING_JSON = "base-column-mapping.json"; @@ -83,12 +83,13 @@ public StreamExecutionEnvironment startJob() throws Exception { registerCatalog(tableEnv); System.out.println("Getting tables config..."); - final Map> tablesConfig = getTablesConfig(); + final Map> rawTablesConfig = getRawTablesConfig(); + final Map> tablesConfig = appendDefaultTablesConfig(rawTablesConfig); System.out.println("Creating tables..."); setConnectorDialect(tableEnv); - final Map tableSchemaMap = createTables(tableEnv, tablesConfig); + final Map tableSchemaMap = createTables(tableEnv, rawTablesConfig, tablesConfig); System.out.println("Getting topic mapping..."); final Map topicMapping = getTopicMapping(); @@ -111,20 +112,21 @@ public StreamExecutionEnvironment startJob() throws Exception { /** * Creates tables in the tableEnv based on the tablesConfig. If table already exists, its schema is fetched. * - * @param tableEnv is the Flink environment in which the tables are going to be created. - * @param tablesConfig config map with the table name as a key and column map as a value. + * @param tableEnv is the Flink environment in which the tables are going to be created. + * @param rawTablesConfig map that contains contents of tables config file without any modifications. + * @param tablesConfig modified version of rawTablesConfig that contains default and partition columns. * @return Map with table name as a key and table schema as a value. */ private Map createTables(StreamTableEnvironment tableEnv, - Map> tablesConfig) { + Map> rawTablesConfig, Map> tablesConfig) { final Set tableList = getExistingTableList(tableEnv); return tablesConfig.entrySet().stream() .collect(Collectors.toMap(Entry::getKey, - entry -> createTableIfNotExists(tableEnv, tableList, entry.getKey(), entry.getValue()))); + entry -> createTableIfNotExists(tableEnv, tableList, entry.getKey(), rawTablesConfig.get(entry.getKey()), entry.getValue()))); } - private void validateMappings(Map tableSchemaMap, + protected void validateMappings(Map tableSchemaMap, Map topicMapping) { for (Entry entry : topicMapping.entrySet()) { final String source = entry.getKey(); @@ -150,8 +152,9 @@ private void validateMappings(Map tableSchemaMap, .collect(Collectors.toList()); if (!invalidColumnList.isEmpty()) { throw new RuntimeException( - String.format("Found invalid column mappings for source [%s]. Those columns are either not present in the table config or have empty names: %s", source, - invalidColumnList)); + String.format( + "Found invalid column mappings for source [%s]. Those columns are either not present in the table config or have empty names: %s", + source, invalidColumnList)); } } } @@ -191,14 +194,21 @@ protected void setConnectorDialect(StreamTableEnvironment tableEnv) { //to be overwritten if needed } - protected ResolvedSchema createTableIfNotExists(StreamTableEnvironment tableEnv, Set tableList, - String tableName, - List columnList) { + private ResolvedSchema createTableIfNotExists(StreamTableEnvironment tableEnv, Set tableList, + String tableName, List rawColumnList, List columnList) { if (tableList.contains(tableName)) { - System.out.printf("%s table [%s] already exists. Skipping its creation.%n", connectorName, tableName); - return tableEnv.from(tableName).getResolvedSchema(); + return handleExistingTable(tableEnv, tableName, columnList); + } + + if (CollectionUtils.isEmpty(rawColumnList)) { + throw new RuntimeException(String.format("%s table [%s] config is empty. Aborting...", connectorName, tableName)); } + return createTable(tableEnv, tableName, columnList); + } + + protected ResolvedSchema createTable(StreamTableEnvironment tableEnv, String tableName, + List columnList) { System.out.printf("Creating %s table %s...%n", connectorName, tableName); final ResolvedSchema resolvedSchema = FlinkSchemaUtil.getResolvedSchema(columnList); final Schema schema = FlinkSchemaUtil.buildSchema(resolvedSchema); @@ -214,6 +224,37 @@ protected ResolvedSchema createTableIfNotExists(StreamTableEnvironment tableEnv, return resolvedSchema; } + protected ResolvedSchema handleExistingTable(StreamTableEnvironment tableEnv, String tableName, + List columnList) { + final ResolvedSchema existingTableSchema = tableEnv.from(tableName).getResolvedSchema(); + if (CollectionUtils.isEmpty(columnList)) { + System.out.printf("%s table [%s] already exists and no table config provided. Skipping its creation.%n", + connectorName, + tableName); + return existingTableSchema; + } + + final List existingColumns = existingTableSchema.getColumns(); + final List configColumns = FlinkSchemaUtil.getResolvedSchema(columnList).getColumns(); + for (Column configColumn : configColumns) { + final DataType dataType = configColumn.getDataType(); + final String columnName = configColumn.getName(); + final boolean matchingColumnExists = existingColumns.stream() + .anyMatch(c -> c.getName().equals(columnName) && c.getDataType().equals(dataType)); + if (!matchingColumnExists) { + throw new RuntimeException(String.format( + "%s table [%s] already exists, but table config was provided for it as well. " + + "In this case all columns from table config should be present in the existing table, " + + "but we didn't find the [%s] column of type [%s].", + connectorName, tableName, columnName, dataType)); + } + } + System.out.printf("%s table [%s] already exists and provided table config matches it. Skipping its creation.%n", + connectorName, + tableName); + return existingTableSchema; + } + private TableDescriptor buildTableDescriptor(Schema schema) { return fillTableOptions(TableDescriptor .forConnector(getTableConnector()) @@ -326,12 +367,12 @@ protected Map getTopicMapping() throws IOException { } /** - * Method provides table schemas from the config file. + * Method provides contents of the tables config file. * * @return Map with table name as a key, and a list of columns as a value. - * @throws IOException in case it can't read the config file + * @throws IOException in case it can't read the tables config file. */ - protected Map> getTablesConfig() throws IOException { + protected Map> getRawTablesConfig() throws IOException { TypeReference>> typeRef = new TypeReference>>() { }; @@ -339,7 +380,20 @@ protected Map> getTablesConfig() throws IOException if (filePath == null) { return Collections.emptyMap(); } - final HashMap> columnMap = Utils.readFile(filePath, typeRef); + return Utils.readFile(filePath, typeRef); + } + + /** + * Method appends default and partition columns to the raw tables config. + * + * @return Map with table name as a key, and a list of columns as a value. + */ + protected Map> appendDefaultTablesConfig( + Map> rawTablesConfig) { + if (rawTablesConfig == null || rawTablesConfig.isEmpty()) { + return Collections.emptyMap(); + } + final Map> columnMap = new HashMap<>(rawTablesConfig); final List partitionColumns = Arrays.asList(TableColumnDto.builder() .name("dt") .type("string") diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiKafkaJob.java b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiKafkaJob.java index 6a46427e6..7896717b9 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiKafkaJob.java +++ b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiKafkaJob.java @@ -26,79 +26,85 @@ public class TableApiKafkaJob extends TableApiAbstractJob { - private static final String BASE_TABLE_JSON = "base-hive-table.json"; - - public TableApiKafkaJob(ParameterTool params, StreamExecutionEnvironment env, DataStream source) throws IOException { - super(params, env, source, "Kafka", BASE_TABLE_JSON); - } - - @Override - protected ResolvedSchema createTableIfNotExists(StreamTableEnvironment tableEnv, Set tableList, String tableName, List columnList) { - return FlinkSchemaUtil.getResolvedSchema(columnList); - } - - @Override - protected void executeInsert(StreamTableEnvironment tableEnv, Map topicMapping, Map> tablesConfig) { - topicMapping.forEach((topic, mappingDto) -> { - final String insertSql = buildInsertSql(topic, mappingDto); - try { - //create view - tableEnv.executeSql(insertSql); - final KafkaSink kafkaSink = new FlinkUtils<>(GenericRecord.class).createKafkaSink(mappingDto.getTableName(), "indexing-job", params); - - //read from view and write to kafka sink - final Table table = tableEnv.from(mappingDto.getTableName()); - final String schemaString = AvroSchemaUtil.convertToAvro(tablesConfig.get(mappingDto.getTableName())).toString(); - - final DataStream stream = tableEnv.toDataStream(table).map(row -> { - final Schema schema = new Schema.Parser().parse(schemaString); - final GenericRecord record = new GenericData.Record(schema); - final Set fieldNames = row.getFieldNames(true); - if (fieldNames != null) { - for (String fieldName : fieldNames) { - AvroSchemaUtil.putRowIntoAvro(row, record, fieldName); - } - } - - return record; - }); - stream.sinkTo(kafkaSink); - System.out.printf("Insert SQL added to the queue for the table: %s%nSQL: %s%n", mappingDto.getTableName(), insertSql); - } catch (Exception e) { - System.err.printf("Error adding insert to the statement set: %s%n", insertSql); - throw e; + private static final String BASE_TABLE_JSON = "base-hive-table.json"; + + public TableApiKafkaJob(ParameterTool params, StreamExecutionEnvironment env, DataStream source) + throws IOException { + super(params, env, source, "Kafka", BASE_TABLE_JSON); + } + + @Override + protected ResolvedSchema createTable(StreamTableEnvironment tableEnv, String tableName, + List columnList) { + return FlinkSchemaUtil.getResolvedSchema(columnList); + } + + @Override + protected void executeInsert(StreamTableEnvironment tableEnv, Map topicMapping, + Map> tablesConfig) { + topicMapping.forEach((topic, mappingDto) -> { + final String insertSql = buildInsertSql(topic, mappingDto); + try { + //create view + tableEnv.executeSql(insertSql); + final KafkaSink kafkaSink = new FlinkUtils<>(GenericRecord.class).createKafkaSink( + mappingDto.getTableName(), "indexing-job", params); + + //read from view and write to kafka sink + final Table table = tableEnv.from(mappingDto.getTableName()); + final String schemaString = AvroSchemaUtil.convertToAvro(tablesConfig.get(mappingDto.getTableName())) + .toString(); + + final DataStream stream = tableEnv.toDataStream(table).map(row -> { + final Schema schema = new Schema.Parser().parse(schemaString); + final GenericRecord record = new GenericData.Record(schema); + final Set fieldNames = row.getFieldNames(true); + if (fieldNames != null) { + for (String fieldName : fieldNames) { + AvroSchemaUtil.putRowIntoAvro(row, record, fieldName); } + } + + return record; }); - } - - @Override - protected HashSet getExistingTableList(StreamTableEnvironment tableEnv) { - //Kafka tables are temporary, so no tables are present on the job creation - return new HashSet<>(); - } - - @Override - protected void registerCatalog(StreamTableEnvironment tableEnv) { - - } - - @Override - protected String getTableConnector() { - return "filesystem"; - } - - @Override - protected FormatDescriptor getFormatDescriptor() { - return null; - } - - @Override - protected String getInsertSqlPrefix() { - return "CREATE TEMPORARY VIEW "; - } - - @Override - protected String getInsertSqlSuffix() { - return " AS "; - } + stream.sinkTo(kafkaSink); + System.out.printf("Insert SQL added to the queue for the table: %s%nSQL: %s%n", mappingDto.getTableName(), + insertSql); + } catch (Exception e) { + System.err.printf("Error adding insert to the statement set: %s%n", insertSql); + throw e; + } + }); + } + + @Override + protected HashSet getExistingTableList(StreamTableEnvironment tableEnv) { + //Kafka tables are temporary, so no tables are present on the job creation + return new HashSet<>(); + } + + @Override + protected void registerCatalog(StreamTableEnvironment tableEnv) { + + } + + @Override + protected String getTableConnector() { + return "filesystem"; + } + + @Override + protected FormatDescriptor getFormatDescriptor() { + return null; + } + + @Override + protected String getInsertSqlPrefix() { + return "CREATE TEMPORARY VIEW "; + } + + @Override + protected String getInsertSqlSuffix() { + return " AS "; + } } diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/src/test/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJobTest.java b/flink-cyber/flink-indexing/flink-indexing-hive/src/test/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJobTest.java new file mode 100644 index 000000000..ea4c730ff --- /dev/null +++ b/flink-cyber/flink-indexing/flink-indexing-hive/src/test/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJobTest.java @@ -0,0 +1,84 @@ +package com.cloudera.cyber.indexing.hive.tableapi; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.cloudera.cyber.indexing.MappingColumnDto; +import com.cloudera.cyber.indexing.MappingDto; +import com.cloudera.cyber.indexing.hive.tableapi.impl.TableApiHiveJob; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +class TableApiAbstractJobTest { + + public static final String GIVEN_SOURCE = "source"; + public static final String GIVEN_TABLE_NAME = "tableName"; + private final TableApiAbstractJob job = new TableApiHiveJob(null, null, null); + + TableApiAbstractJobTest() throws IOException { + } + + public static Stream mappingsData() { + return Stream.of( + Arguments.of(new HashMap<>(), new HashMap<>()), + + Arguments.of(Collections.singletonMap(GIVEN_TABLE_NAME, ResolvedSchema.of( + Column.physical("column1", DataTypes.STRING()), + Column.physical("column2", DataTypes.STRING()))), + Collections.singletonMap(GIVEN_SOURCE, + new MappingDto(GIVEN_TABLE_NAME, new ArrayList<>(), Arrays.asList( + new MappingColumnDto("column1", null, null, null, false), + new MappingColumnDto("column2", null, null, null, false)))))); + } + + public static Stream mappingsExceptionData() { + return Stream.of( + Arguments.of(new HashMap<>(), + Collections.singletonMap(GIVEN_SOURCE, + new MappingDto(" ", new ArrayList<>(), new ArrayList<>())), + RuntimeException.class, + String.format("Provided empty table name for the [%s] source!", GIVEN_SOURCE)), + + Arguments.of(new HashMap<>(), + Collections.singletonMap(GIVEN_SOURCE, + new MappingDto(GIVEN_TABLE_NAME, new ArrayList<>(), new ArrayList<>())), + RuntimeException.class, + String.format("Table [%s] is not found!", GIVEN_TABLE_NAME)), + + Arguments.of(Collections.singletonMap(GIVEN_TABLE_NAME, ResolvedSchema.of()), + Collections.singletonMap(GIVEN_SOURCE, + new MappingDto(GIVEN_TABLE_NAME, new ArrayList<>(), Arrays.asList( + new MappingColumnDto(" ", null, null, null, false), + new MappingColumnDto("someName", null, null, null, false)))), + RuntimeException.class, + String.format( + "Found invalid column mappings for source [%s]. Those columns are either not present in the table config or have empty names: %s", + GIVEN_SOURCE, "[ , someName]"))); + } + + @ParameterizedTest + @MethodSource("mappingsData") + void shouldValidateMappings(Map givenTableSchemaMap, + Map givenTopicMapping) { + job.validateMappings(givenTableSchemaMap, givenTopicMapping); + } + + @ParameterizedTest + @MethodSource("mappingsExceptionData") + void shouldThrowExceptionWhenValidateMappings(Map givenTableSchemaMap, + Map givenTopicMapping, Class expectedException, + String expectedExceptionMessage) { + assertThatThrownBy(() -> job.validateMappings(givenTableSchemaMap, givenTopicMapping)) + .isInstanceOf(expectedException) + .hasMessage(expectedExceptionMessage); + } +} \ No newline at end of file