From cda6874b207573f6d2528a827af1e816c3175325 Mon Sep 17 00:00:00 2001 From: patricker Date: Mon, 18 Sep 2017 04:43:57 +0000 Subject: [PATCH 01/11] NIFI-4521 MS SQL CDC Processor --- nifi-assembly/pom.xml | 6 + .../nifi-cdc-mssql-nar/pom.xml | 47 + .../src/main/resources/META-INF/NOTICE | 5 + .../nifi-cdc-mssql-processors/pom.xml | 90 ++ .../apache/nifi/cdc/mssql/MSSQLCDCUtils.java | 286 ++++++ .../mssql/event/MSSQLColumnDefinition.java | 67 ++ .../nifi/cdc/mssql/event/MSSQLTableInfo.java | 48 + .../cdc/mssql/event/TableCapturePlan.java | 142 +++ .../mssql/processors/CaptureChangeMSSQL.java | 409 +++++++++ .../org.apache.nifi.processor.Processor | 15 + .../cdc/mssql/CaptureChangeMSSQLTest.java | 828 ++++++++++++++++++ .../nifi-cdc/nifi-cdc-mssql-bundle/pom.xml | 34 + nifi-nar-bundles/nifi-cdc/pom.xml | 1 + 13 files changed, 1978 insertions(+) create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLColumnDefinition.java create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLTableInfo.java create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/TableCapturePlan.java create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 6871b03940ef..8d1b7b7b02ab 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -665,6 +665,12 @@ language governing permissions and limitations under the License. --> 1.14.0-SNAPSHOT nar + + org.apache.nifi + nifi-cdc-mssql-nar + 1.8.0-SNAPSHOT + nar + org.apache.nifi nifi-parquet-nar diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml new file mode 100644 index 000000000000..49220bbae3eb --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml @@ -0,0 +1,47 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-cdc-mssql-bundle + 1.9.0-SNAPSHOT + + + nifi-cdc-mssql-nar + 1.9.0-SNAPSHOT + nar + NiFi Microsoft SQL Change Data Capture (CDC) NAR + + true + true + + + + + org.apache.nifi + nifi-standard-services-api-nar + 1.9.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-cdc-mssql-processors + 1.9.0-SNAPSHOT + + + diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 000000000000..fa0bbade0629 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,5 @@ +nifi-cdc-mssql-nar +Copyright 2014-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml new file mode 100644 index 000000000000..759f13804278 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml @@ -0,0 +1,90 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-cdc-mssql-bundle + 1.9.0-SNAPSHOT + + + nifi-cdc-mssql-processors + jar + + + + org.apache.nifi + nifi-api + 1.9.0-SNAPSHOT + + + org.apache.nifi + nifi-utils + 1.9.0-SNAPSHOT + + + org.apache.nifi + nifi-dbcp-service-api + 1.9.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-cdc-api + 1.9.0-SNAPSHOT + + + org.apache.nifi + nifi-record-serialization-service-api + 1.9.0-SNAPSHOT + + + org.apache.nifi + nifi-record + 1.9.0-SNAPSHOT + compile + + + org.apache.nifi + nifi-mock + 1.9.0-SNAPSHOT + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + test + + + org.apache.nifi + nifi-dbcp-service + 1.9.0-SNAPSHOT + test + + + org.apache.nifi + nifi-mock-record-utils + 1.9.0-SNAPSHOT + test + + + diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java new file mode 100644 index 000000000000..89b932535852 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql; + +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; + +import java.sql.Connection; +import java.sql.Timestamp; +import java.sql.Types; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +public class MSSQLCDCUtils { + private static final String _columnSplit = "\n,"; + + final String LIST_CHANGE_TRACKING_TABLES_SQL = "SELECT object_id,\n" + + " DB_NAME() AS [databaseName], \n" + + " SCHEMA_NAME(OBJECTPROPERTY(object_id, 'SchemaId')) AS [schemaName], \n" + + " OBJECT_NAME(object_id) AS [tableName], \n" + + " SCHEMA_NAME(OBJECTPROPERTY(source_object_id, 'SchemaId')) AS [sourceSchemaName],\n" + + " OBJECT_NAME(source_object_id) AS [sourceTableName] \n" + + "FROM [cdc].[change_tables]"; + + final String LIST_TABLE_COLUMNS = "select cc.object_id\n" + + ",cc.column_name\n" + + ",cc.column_id\n" + + ",cc.column_type\n" + + ",cc.column_ordinal\n" + + ",CASE WHEN ic.object_id IS NULL THEN 0 ELSE 1 END \"key\"\n" + + "FROM cdc.captured_columns cc\n" + + "LEFT OUTER JOIN cdc.index_columns ic ON \n" + + "(ic.object_id = cc.object_id AND ic.column_name = cc.column_name)\n" + + "where cc.object_id=?\n" + + "ORDER BY cc.column_ordinal"; + + public String getLIST_CHANGE_TRACKING_TABLES_SQL(){ + return LIST_CHANGE_TRACKING_TABLES_SQL; + } + + public String getLIST_TABLE_COLUMNS(){ + return LIST_TABLE_COLUMNS; + } + + public String getCURRENT_TIMESTAMP(){ + return "CURRENT_TIMESTAMP"; + } + + public List getCDCTableList(Connection con) throws SQLException, CDCException { + ArrayList cdcTables = new ArrayList<>(); + + try(final Statement st = con.createStatement()){ + final ResultSet resultSet = st.executeQuery(getLIST_CHANGE_TRACKING_TABLES_SQL()); + + while (resultSet.next()) { + int objectId = resultSet.getInt("object_id"); + String databaseName = resultSet.getString("databaseName"); + String schemaName = resultSet.getString("schemaName"); + String tableName = resultSet.getString("tableName"); + String sourceSchemaName = resultSet.getString("sourceSchemaName"); + String sourceTableName = resultSet.getString("sourceTableName"); + + MSSQLTableInfo ti = new MSSQLTableInfo(databaseName, schemaName, tableName, sourceSchemaName, sourceTableName, Integer.toUnsignedLong(objectId), null); + cdcTables.add(ti); + } + + for (MSSQLTableInfo ti:cdcTables) { + List tableColums = getCaptureColumns(con, ti.getTableId()); + + ti.setColumns(tableColums); + } + } + + return cdcTables; + } + + public List getCaptureColumns(Connection con, long objectId) throws SQLException, CDCException { + ArrayList tableColumns = new ArrayList<>(); + try(final PreparedStatement st = con.prepareStatement(getLIST_TABLE_COLUMNS())){ + st.setLong(1, objectId); + + final ResultSet resultSet = st.executeQuery(); + while (resultSet.next()) { + String columnName = resultSet.getString("column_name"); + int columnId = resultSet.getInt("column_id"); + String columnType = resultSet.getString("column_type"); + int columnOrdinal = resultSet.getInt("column_ordinal"); + int isColumnKey = resultSet.getInt("key"); + + int jdbcType = TranslateMSSQLTypeToJDBCTypes(columnType); + + //get column list + MSSQLColumnDefinition col = new MSSQLColumnDefinition(jdbcType, columnName, columnOrdinal, isColumnKey==1); + tableColumns.add(col); + } + } + + return tableColumns; + } + + public String getSnapshotSelectStatement(MSSQLTableInfo tableInfo){ + final StringBuilder sbQuery = new StringBuilder(); + + sbQuery.append("SELECT " + getCURRENT_TIMESTAMP() + " tran_begin_time"); + + sbQuery.append(_columnSplit); + sbQuery.append(getCURRENT_TIMESTAMP() + " \"tran_end_time\""); + + sbQuery.append(_columnSplit); + sbQuery.append("0 trans_id"); + + sbQuery.append(_columnSplit); + sbQuery.append("0 start_lsn"); + + sbQuery.append(_columnSplit); + sbQuery.append("0 seqval"); + + sbQuery.append(_columnSplit); + sbQuery.append("2 operation"); + + sbQuery.append(_columnSplit); + sbQuery.append("0 update_mask"); + + for(ColumnDefinition col : tableInfo.getColumns()){ + MSSQLColumnDefinition mssqlColumnDefinition = (MSSQLColumnDefinition)col; + + sbQuery.append(_columnSplit); + sbQuery.append("\"" + mssqlColumnDefinition.getName() + "\""); + } + + sbQuery.append(_columnSplit); + sbQuery.append(getCURRENT_TIMESTAMP() + " EXTRACT_TIME"); + sbQuery.append("\n"); + sbQuery.append("FROM " + tableInfo.getSourceSchemaName() + ".\""+ tableInfo.getSourceTableName() + "\""); + + return sbQuery.toString(); + } + + public String getCDCSelectStatement(MSSQLTableInfo tableInfo, boolean includePreupdateValues, Timestamp maxTime){ + final StringBuilder sbQuery = new StringBuilder(); + + sbQuery.append("SELECT t.tran_begin_time\n" + + ",t.tran_end_time \"tran_end_time\"\n" + + ",CAST(t.tran_id AS bigint) trans_id\n" + + ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" + + ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" + + ",\"o\".\"__$operation\" operation\n" + + ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask"); + + for(ColumnDefinition col : tableInfo.getColumns()){ + MSSQLColumnDefinition mssqlColumnDefinition = (MSSQLColumnDefinition)col; + + sbQuery.append(_columnSplit); + sbQuery.append("\"o\".\"" + mssqlColumnDefinition.getName() + "\""); + } + + sbQuery.append(_columnSplit); + sbQuery.append(getCURRENT_TIMESTAMP() + " EXTRACT_TIME"); + sbQuery.append("\n"); + sbQuery.append("FROM cdc.\""+ tableInfo.getTableName() + "\" \"o\"\nINNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")"); + + if(maxTime != null || !includePreupdateValues){ + sbQuery.append("\nWHERE "); + } + + if(maxTime != null){ + sbQuery.append("t.tran_end_time > ?"); + + if(!includePreupdateValues){ + sbQuery.append(" AND "); + } + } + + if(!includePreupdateValues){ + sbQuery.append("\"o\".\"__$operation\" <> 3"); + } + + sbQuery.append("\nORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), \"o\".\"__$seqval\", \"o\".\"__$operation\""); + + return sbQuery.toString(); + } + + public String getCDCRowCountStatement(MSSQLTableInfo tableInfo, boolean includePreupdateValues, Timestamp maxTime){ + final StringBuilder sbQuery = new StringBuilder(); + + sbQuery.append("SELECT COUNT(*) rowcnt \n"); + sbQuery.append("FROM cdc.\""+ tableInfo.getTableName() + "\" \"o\"\nINNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")"); + + if(maxTime != null || !includePreupdateValues){ + sbQuery.append("WHERE "); + } + + if(maxTime != null){ + sbQuery.append("\nt.tran_end_time > ?"); + + if(!includePreupdateValues){ + sbQuery.append(" AND "); + } + } + + if(!includePreupdateValues){ + sbQuery.append("\"o\".\"__$operation\" <> 3"); + } + + return sbQuery.toString(); + } + + //List from https://docs.microsoft.com/en-us/sql/connect/jdbc/using-basic-data-types + public static int TranslateMSSQLTypeToJDBCTypes(String mssqltype) throws CDCException { + switch (mssqltype){ + case "bigint": + return Types.BIGINT; + case "binary": + return Types.BINARY; + case "bit": + return Types.BIT; + case "char": + case "uniqueidentifier": + return Types.CHAR; + case "date": + return Types.DATE; + case "datetime": + case "datetime2": + case "smalldatetime": + return Types.TIMESTAMP; + case "decimal": + return Types.DECIMAL; + case "float": + return Types.DOUBLE; + case "image": + return Types.LONGVARBINARY; + case "int": + return Types.INTEGER; + case "money": + case "smallmoney": + return Types.DECIMAL; + case "nchar": + return Types.NCHAR; + case "ntext": + case "text": + case "xml": + return Types.LONGVARCHAR; + case "numeric": + return Types.NUMERIC; + case "nvarchar": + case "varchar": + return Types.VARCHAR; + case "real": + return Types.REAL; + case "smallint": + return Types.SMALLINT; + case "time": + return Types.TIME; + case "timestamp": + return Types.BINARY; + case "tinyint": + return Types.TINYINT; + case "udt": + case "varbinary": + return Types.VARBINARY; + } + + throw new CDCException("Unrecognized MS SQL data type " + mssqltype); + } +} + diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLColumnDefinition.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLColumnDefinition.java new file mode 100644 index 000000000000..5e56674b01ea --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLColumnDefinition.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.event; + +import org.apache.nifi.cdc.event.ColumnDefinition; + +import java.sql.JDBCType; + +public class MSSQLColumnDefinition extends ColumnDefinition { + + private boolean isKey=false; + private int columnOrdinal; + private int columnSize=255; + + public boolean getIsKey(){ + return isKey; + } + public int getColumnOrdinal(){ + return columnOrdinal; + } + + public int getColumnSize(){ + return columnSize; + } + + public void setColumnSize(int size){ + columnSize = size; + } + + public String getCustomTypeName(){ + return ""; + } + + public boolean isRequired(){ + return isKey; + } + + public void setIsKey(boolean isKey){ + this.isKey = isKey; + } + + public JDBCType getDataType(){ + return JDBCType.valueOf(this.getType()); + + } + + public MSSQLColumnDefinition(int type, String name, int columnOrdinal, boolean isKey) { + super(type, name); + + this.isKey = isKey; + this.columnOrdinal = columnOrdinal; + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLTableInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLTableInfo.java new file mode 100644 index 000000000000..b1295ea6bda0 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLTableInfo.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.event; + +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.event.TableInfo; + +import java.util.List; + +public class MSSQLTableInfo extends TableInfo { + + private String schemaName; + + private String sourceSchemaName; + private String sourceTableName; + + public String getSchemaName() { + return schemaName; + } + public String getSourceSchemaName() { + return sourceSchemaName; + } + public String getSourceTableName() { + return sourceTableName; + } + + public MSSQLTableInfo(String databaseName, String schemaName, String tableName, String sourceSchemaName, String sourceTableName, Long tableId, List columns) { + super(databaseName, tableName, tableId, columns); + + this.schemaName = schemaName; + this.sourceSchemaName = sourceSchemaName; + this.sourceTableName = sourceTableName; + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/TableCapturePlan.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/TableCapturePlan.java new file mode 100644 index 000000000000..aa067aab68e9 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/TableCapturePlan.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.event; + +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.util.StringUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +public class TableCapturePlan { + + public enum PlanTypes{ + CDC, + SNAPSHOT + } + + public MSSQLTableInfo getTable() { + return table; + } + + public int getRowLimit() { + return rowLimit; + } + + public boolean getCaptureBaseline() { + return captureBaseline; + } + + public boolean getIncludePreupdateValues() { + return includePreupdateValues; + } + + public Timestamp getMaxTime() { + return maxTime; + } + + public PlanTypes getPlanType() { + return planType; + } + + private MSSQLTableInfo table; + private int rowLimit; + private boolean captureBaseline; + private boolean includePreupdateValues; + private Timestamp maxTime; + + private PlanTypes planType; + + public TableCapturePlan(MSSQLTableInfo table, int rowLimit, boolean captureBaseline, boolean includePreupdateValues, String sTime){ + this.table = table; + + this.rowLimit = rowLimit; + this.captureBaseline = captureBaseline; + this.includePreupdateValues = includePreupdateValues; + + if(!StringUtils.isEmpty(sTime)){ + this.maxTime = Timestamp.valueOf(sTime); + } + } + + public void computeCapturePlan(Connection con, MSSQLCDCUtils mssqlcdcUtils) throws SQLException { + //No Row Limit Options + if(getRowLimit() == 0){ + //No starting point for CDC data extract + if(getMaxTime() == null){ + //If we need a data baseline then capture a snapshot, otherwise CDC + this.planType = getCaptureBaseline()?PlanTypes.SNAPSHOT:PlanTypes.CDC; + } else { + //We have a starting point for data extraction + this.planType = PlanTypes.CDC; + } + } else { + //We may need to capture a data baseline depending on CDC Row Count + + //No starting point for CDC data extract + if(getMaxTime() == null) { + //There is no previous starting point, and we need a data baseline anyways + // do a full snapshot + if(getCaptureBaseline()){ + this.planType = PlanTypes.SNAPSHOT; + } else { + //There is no previous starting point, and depending on row count + // we may choose to pull a full snapshot anyways, even though we + // didn't ask to capture a baseline + String sqlQuery = mssqlcdcUtils.getCDCRowCountStatement(getTable(), includePreupdateValues, null); + long rowCount = getRowCount(con, sqlQuery); + + //If we didn't pass the Row Limit threshold, then use CDC, else get a Snapshot + this.planType = rowCount < getRowLimit()?PlanTypes.CDC:PlanTypes.SNAPSHOT; + } + } else { + //We have a starting point for data extraction + + //Assume CDC unless we have a reason to change it + this.planType = PlanTypes.CDC; + + String sqlQuery = mssqlcdcUtils.getCDCRowCountStatement(getTable(), includePreupdateValues, getMaxTime()); + long rowCount = getRowCount(con, sqlQuery); + + if(rowCount >= getRowLimit()){ + this.planType = PlanTypes.SNAPSHOT; + } + } + } + + return; + } + + private long getRowCount(Connection con, String sqlQuery) throws SQLException { + try(final PreparedStatement st = con.prepareStatement(sqlQuery)) { + if(getMaxTime() != null){ + st.setTimestamp(1, getMaxTime()); + } + + final ResultSet resultSet = st.executeQuery(); + resultSet.next(); + long rowCount = resultSet.getLong(1); + + resultSet.close(); + + return rowCount; + } + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java new file mode 100644 index 000000000000..c738c02024e2 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.processors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.event.TableCapturePlan; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.ResultSetRecordSet; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "jdbc", "cdc", "mssql"}) +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events " + + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred. In a cluster, it is recommended to run " + + "this processor on primary only.") +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so " + + "that it can continue from the same point in time if restarted.") +@WritesAttributes({ + @WritesAttribute(attribute = "tablename", description="Name of the table this changeset was captured from."), + @WritesAttribute(attribute="mssqlcdc.row.count", description="The number of rows in this changeset"), + @WritesAttribute(attribute="fullsnapshot", description="Whether this was a full snapshot of the base table or not.")}) +@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial " + + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp.{table_name}`, one for each table. " + + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.") +public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor { + public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp."; + + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("cdcmssql-dbcp-service") + .displayName("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain connection to database") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + public static final PropertyDescriptor CDC_TABLES = new PropertyDescriptor.Builder() + .name("cdcmssql-cdc-table-list") + .displayName("CDC Table List") + .description("The comma delimited list of tables in the source database to monitor for changes. If no tables " + + "are specified the [cdc].[change_tables] table is queried for all of the available tables with change tracking enabled in the database.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new PropertyDescriptor.Builder() + .name("cdcmssql-initial-snapshot") + .displayName("Generate an Initial Source Table Snapshot") + .description("Usually CDC only includes recent historic changes. Setting this property to true will cause a snapshot of the " + + "source table to be taken using the same schema as the CDC extracts. The snapshot time will be used as the starting point " + + "for extracting CDC changes.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new PropertyDescriptor.Builder() + .name("cdcmssql-full-snapshot-row-limit") + .displayName("Changeset Row Limit") + .description("If a very large change occurs on the source table, " + + "the generated changeset may be too large to quickly merge into a destination system. " + + "Use this property to set a cut-off point where instead of returning a changeset a full snapshot will be generated instead. " + + "The fullsnapshot attribute will be set to true when this happens.") + .required(true) + .defaultValue("0") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor INCLUDE_PREUPDATE_VALUES = new PropertyDescriptor.Builder() + .name("cdcmssql-include-preupdate-values") + .displayName("Include Pre-Update Values") + .description("When an update transaction occurs, should both the old values (operation=3) and the new values (operation=4) be returned. " + + "If true, both rows will always be in the same FlowFile and be sequential records, old values followed by new values. " + + "If you have unchangeable primary key(s) you do not need the old values to match-up the updated record to an old version, " + + "and you could reduce the number of rows to process by setting this property to false.") + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully created FlowFile from SQL query result set.") + .build(); + + protected List descriptors; + protected Set relationships; + + protected final Map schemaCache = new ConcurrentHashMap(); + + // A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time + protected Map maxValueProperties; + protected MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils(); + + public MSSQLCDCUtils getMssqlcdcUtils(){ + return mssqlcdcUtils; + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList(); + descriptors.add(RECORD_WRITER); + descriptors.add(DBCP_SERVICE); + descriptors.add(CDC_TABLES); + descriptors.add(TAKE_INITIAL_SNAPSHOT); + descriptors.add(FULL_SNAPSHOT_ROW_LIMIT); + descriptors.add(INCLUDE_PREUPDATE_VALUES); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + if(!propertyDescriptorName.startsWith("initial.timestamp.")){ + return null; + } + + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .displayName(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(false) + .dynamic(true) + .build(); + } + + @Override + public void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) throws ProcessException { + ProcessSession session = processSessionFactory.createSession(); + + final ComponentLog logger = getLogger(); + final RecordSetWriterFactory writerFactory = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final DBCPService dbcpService = processContext.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + + final boolean takeInitialSnapshot = processContext.getProperty(TAKE_INITIAL_SNAPSHOT).asBoolean(); + final boolean includePreupdateValues = processContext.getProperty(INCLUDE_PREUPDATE_VALUES).asBoolean(); + final int fullSnapshotRowLimit = processContext.getProperty(FULL_SNAPSHOT_ROW_LIMIT).evaluateAttributeExpressions().asInteger(); + + final String[] allTables = schemaCache.keySet().toArray(new String[schemaCache.size()]); + + String[] tables = StringUtils + .split(processContext.getProperty(CDC_TABLES).evaluateAttributeExpressions().getValue(), ","); + + if(tables == null || tables.length == 0){ + tables = allTables; + } + + final StateManager stateManager = processContext.getStateManager(); + final StateMap stateMap; + try { + stateMap = stateManager.getState(Scope.CLUSTER); + } catch (final IOException ioe) { + logger.error("Failed to retrieve observed current timestamp values from the State Manager. Will not perform " + + "query until this is accomplished.", ioe); + processContext.yield(); + return; + } + + // Make a mutable copy of the current state property map. This will be updated and eventually + // set as the current state map (after the session has been committed) + final Map statePropertyMap = new HashMap<>(stateMap.toMap()); + + // If an initial max value for the table has been specified using properties, and this table is not in the state manager, sync it to the state property map + for (final Map.Entry maxProp : maxValueProperties.entrySet()) { + String maxPropKey = maxProp.getKey().toLowerCase(); + String tableName = getStateKey(INITIAL_TIMESTAMP_PROP_START, maxPropKey); + if (!statePropertyMap.containsKey(tableName)) { + String newMaxPropValue = maxProp.getValue(); + + statePropertyMap.put(tableName, newMaxPropValue); + } + } + + //Build a capture plan for each table + ArrayList tableCapturePlans = new ArrayList<>(); + try (final Connection con = dbcpService.getConnection()){ + for (String t : tables) { + String tableKey = t.toLowerCase(); + if (!schemaCache.containsKey(tableKey)) { + throw new ProcessException("Unknown CDC enabled table named " + t + ". Known table names: " + String.join(", ", allTables)); + } + + MSSQLTableInfo tableInfo = schemaCache.get(tableKey); + + //Get Max Timestamp from state (if it exists) + String sTime = null; + if (statePropertyMap.containsKey(tableKey)) { + sTime = statePropertyMap.get(tableKey); + } + + TableCapturePlan tableCapturePlan = new TableCapturePlan(tableInfo, fullSnapshotRowLimit, takeInitialSnapshot, includePreupdateValues, sTime); + + //Determine Plan Type + tableCapturePlan.computeCapturePlan(con, getMssqlcdcUtils()); + + tableCapturePlans.add(tableCapturePlan); + } + + for (TableCapturePlan capturePlan : tableCapturePlans) { + final String selectQuery; + + if(capturePlan.getPlanType() == TableCapturePlan.PlanTypes.CDC){ + selectQuery = getMssqlcdcUtils().getCDCSelectStatement(capturePlan.getTable(), capturePlan.getIncludePreupdateValues(), capturePlan.getMaxTime()); + } else if(capturePlan.getPlanType() == TableCapturePlan.PlanTypes.SNAPSHOT){ + selectQuery = getMssqlcdcUtils().getSnapshotSelectStatement(capturePlan.getTable()); + } else { + throw new ProcessException("Unknown Capture Plan type, '" + capturePlan.getPlanType() + "'."); + } + + FlowFile cdcFlowFile = session.create(); + try(final PreparedStatement st = con.prepareStatement(selectQuery)) { + if(capturePlan.getPlanType() == TableCapturePlan.PlanTypes.CDC && capturePlan.getMaxTime() != null){ + st.setTimestamp(1, capturePlan.getMaxTime()); + } + + final ResultSet resultSet = st.executeQuery(); + ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, null); + + final Map attributes = new HashMap<>(); + final AtomicReference maxTimestamp = new AtomicReference<>(); + final AtomicLong rowCount = new AtomicLong(); + final AtomicInteger fieldCount = new AtomicInteger(); + cdcFlowFile = session.write(cdcFlowFile, new StreamCallback() { + @Override + public void process(final InputStream in, final OutputStream out) throws IOException { + Long rows=0L; + final RecordSchema writeSchema = resultSetRecordSet.getSchema(); + fieldCount.set(writeSchema.getFieldCount()); + try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) { + writer.beginRecordSet(); + + Record record; + while ((record = resultSetRecordSet.next()) != null) { + writer.write(record); + + rows++; + maxTimestamp.set((Timestamp)record.getValue("tran_end_time")); + } + + final WriteResult writeResult = writer.finishRecordSet(); + + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + } catch (SchemaNotFoundException e) { + e.printStackTrace(); + } + + rowCount.set(rows); + } + }); + + if(rowCount.get() == 0){ + session.remove(cdcFlowFile); + continue; + } + + attributes.put("tablename", capturePlan.getTable().getSourceTableName()); + attributes.put("mssqlcdc.row.count", rowCount.toString()); + attributes.put("maxvalue.tran_end_time", maxTimestamp.toString()); + attributes.put("fullsnapshot", Boolean.toString(capturePlan.getPlanType() == TableCapturePlan.PlanTypes.SNAPSHOT)); + + cdcFlowFile = session.putAllAttributes(cdcFlowFile, attributes); + session.transfer(cdcFlowFile, REL_SUCCESS); + + statePropertyMap.put(capturePlan.getTable().getSourceTableName().toLowerCase(), maxTimestamp.toString()); + stateManager.setState(statePropertyMap, Scope.CLUSTER); + + session.commit(); + } catch (IOException e) { + session.remove(cdcFlowFile); + throw new ProcessException("Failed to update cluster state with new timestamps.", e); + } + } + } catch (SQLException e) { + throw new ProcessException("Error working with MS SQL CDC Database.", e); + } + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + //Prefetch list of all CDC tables and their schemas. + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + try (final Connection con = dbcpService.getConnection()) { + List tableSchemas = getMssqlcdcUtils().getCDCTableList(con); + + for (MSSQLTableInfo ti:tableSchemas) { + schemaCache.put(ti.getSourceTableName().toLowerCase(), ti); + } + } catch (SQLException e) { + throw new ProcessException("Unable to communicate with database in order to determine CDC tables", e); + } catch (CDCException e) { + throw new ProcessException(e.getMessage(), e); + } + + maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); + } + + protected static String getStateKey(String prefix, String tableName) { + return tableName.toLowerCase().replace(prefix,""); + } + + protected Map getDefaultMaxValueProperties(final Map properties){ + final Map defaultMaxValues = new HashMap<>(); + + for (final Map.Entry entry : properties.entrySet()) { + final String key = entry.getKey().getName(); + + if(!key.startsWith(INITIAL_TIMESTAMP_PROP_START)) { + continue; + } + + defaultMaxValues.put(key.substring(INITIAL_TIMESTAMP_PROP_START.length()), entry.getValue()); + } + + return defaultMaxValues; + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 000000000000..8b592e17e723 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.cdc.mssql.processors.CaptureChangeMSSQL \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java new file mode 100644 index 000000000000..9d956ee78a7b --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java @@ -0,0 +1,828 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.processors.CaptureChangeMSSQL; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.SQLNonTransientConnectionException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CaptureChangeMSSQLTest { + + private TestRunner runner; + private MockCaptureChangeMSSQL processor; + private final static String DB_LOCATION = "target/db_qdt"; + + + @BeforeClass + public static void setupBeforeClass() throws IOException, SQLException { + System.setProperty("derby.stream.error.file", "target/derby.log"); + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ioe) { + // Do nothing, may not have existed + } + + // load CDC schema to database + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Connection con = dbcp.getConnection(); + Statement stmt = con.createStatement(); + + stmt.execute("CREATE TABLE cdc.change_tables(\n" + + "object_id int,\n" + + //These four columns are computed from object_id/source_object_id in MS SQL, but for testing we put them as strings + "schemaName varchar(128),\n" + + "tableName varchar(128),\n" + + "sourceSchemaName varchar(128),\n" + + "sourceTableName varchar(128),\n" + + + "version int,\n" + + "capture_instance varchar(128),\n" + + "start_lsn int,\n" + + "end_lsn int,\n" + + "supports_net_changes BOOLEAN,\n" + + "has_drop_pending BOOLEAN,\n" + + "role_name varchar(128),\n" + + "index_name varchar(128),\n" + + "filegroup_name varchar(128),\n" + + "create_date TIMESTAMP,\n" + + "partition_switch BOOLEAN)"); + + stmt.execute("CREATE TABLE cdc.lsn_time_mapping(\n" + + "start_lsn int,\n" + + "tran_begin_time TIMESTAMP,\n" + + "tran_end_time TIMESTAMP,\n" + + "tran_id int,\n" + + "tran_begin_lsn int)"); + + stmt.execute("CREATE TABLE cdc.index_columns(\n" + + "object_id int,\n" + + "column_name varchar(128),\n" + + "index_ordinal int,\n" + + "column_id int)"); + + stmt.execute("CREATE TABLE cdc.captured_columns(\n" + + "object_id int,\n" + + "column_name varchar(128),\n" + + "column_id int,\n" + + "column_type varchar(128),\n" + + "column_ordinal int,\n" + + "is_computed BOOLEAN)"); + } + + @AfterClass + public static void cleanUpAfterClass() throws Exception { + try { + DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true"); + } catch (SQLNonTransientConnectionException e) { + // Do nothing, this is what happens at Derby shutdown + } + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ioe) { + // Do nothing, may not have existed + } + } + + @Before + public void setup() throws InitializationException, IOException, SQLException { + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map dbcpProperties = new HashMap<>(); + + processor = new MockCaptureChangeMSSQL(); + runner = TestRunners.newTestRunner(processor); + + runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(CaptureChangeMSSQL.DBCP_SERVICE, "dbcp"); + + final MockRecordWriter writerService = new MockRecordWriter(null, false); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + runner.setProperty(CaptureChangeMSSQL.RECORD_WRITER, "writer"); + + runner.getStateManager().clear(Scope.CLUSTER); + } + + @After + public void teardown() throws IOException { + runner.getStateManager().clear(Scope.CLUSTER); + runner = null; + } + + @Test + public void testSelectGenerator(){ + MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils(); + + List columns = new ArrayList<>(); + columns.add(new MSSQLColumnDefinition(Types.INTEGER, "ID", 1, true)); + columns.add(new MSSQLColumnDefinition(Types.VARCHAR, "LastName", 2, false)); + columns.add(new MSSQLColumnDefinition(Types.VARCHAR, "FirstName", 3, false)); + + MSSQLTableInfo ti = new MSSQLTableInfo("NiFi", "cdc", "Names", + "dbo", "dbo_Names_CT", 1000L, columns); + + String noMaxTime = mssqlcdcUtils.getCDCSelectStatement(ti, true, null); + + Assert.assertEquals("SELECT t.tran_begin_time\n" + + ",t.tran_end_time \"tran_end_time\"\n" + + ",CAST(t.tran_id AS bigint) trans_id\n" + + ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" + + ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" + + ",\"o\".\"__$operation\" operation\n" + + ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" + + ",\"o\".\"ID\"\n" + + ",\"o\".\"LastName\"\n" + + ",\"o\".\"FirstName\"\n" + + ",CURRENT_TIMESTAMP EXTRACT_TIME\n" + + "FROM cdc.\"Names\" \"o\"\n" + + "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")\n" + + "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), \"o\".\"__$seqval\", \"o\".\"__$operation\"", noMaxTime); + + String withMaxTime = mssqlcdcUtils.getCDCSelectStatement(ti, true, new Timestamp(0)); + + Assert.assertEquals("SELECT t.tran_begin_time\n" + + ",t.tran_end_time \"tran_end_time\"\n" + + ",CAST(t.tran_id AS bigint) trans_id\n" + + ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" + + ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" + + ",\"o\".\"__$operation\" operation\n" + + ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" + + ",\"o\".\"ID\"\n" + + ",\"o\".\"LastName\"\n" + + ",\"o\".\"FirstName\"\n" + + ",CURRENT_TIMESTAMP EXTRACT_TIME\n" + + "FROM cdc.\"Names\" \"o\"\n" + + "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")\n" + + "WHERE t.tran_end_time > ?\n" + + "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), \"o\".\"__$seqval\", \"o\".\"__$operation\"", withMaxTime); + + String noMaxTimeNoPreUpdateValues = mssqlcdcUtils.getCDCSelectStatement(ti, false, null); + + Assert.assertEquals("SELECT t.tran_begin_time\n" + + ",t.tran_end_time \"tran_end_time\"\n" + + ",CAST(t.tran_id AS bigint) trans_id\n" + + ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" + + ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" + + ",\"o\".\"__$operation\" operation\n" + + ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" + + ",\"o\".\"ID\"\n" + + ",\"o\".\"LastName\"\n" + + ",\"o\".\"FirstName\"\n" + + ",CURRENT_TIMESTAMP EXTRACT_TIME\n" + + "FROM cdc.\"Names\" \"o\"\n" + + "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")\n" + + "WHERE \"o\".\"__$operation\" <> 3\n" + + "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), \"o\".\"__$seqval\", \"o\".\"__$operation\"", noMaxTimeNoPreUpdateValues); + + String withMaxTimeNoPreUpdateValues = mssqlcdcUtils.getCDCSelectStatement(ti, false, new Timestamp(0)); + + Assert.assertEquals("SELECT t.tran_begin_time\n" + + ",t.tran_end_time \"tran_end_time\"\n" + + ",CAST(t.tran_id AS bigint) trans_id\n" + + ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" + + ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" + + ",\"o\".\"__$operation\" operation\n" + + ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" + + ",\"o\".\"ID\"\n" + + ",\"o\".\"LastName\"\n" + + ",\"o\".\"FirstName\"\n" + + ",CURRENT_TIMESTAMP EXTRACT_TIME\n" + + "FROM cdc.\"Names\" \"o\"\n" + + "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")\n" + + "WHERE t.tran_end_time > ? AND \"o\".\"__$operation\" <> 3\n" + + "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), \"o\".\"__$seqval\", \"o\".\"__$operation\"", withMaxTimeNoPreUpdateValues); + } + + @Test + public void testRetrieveAllChanges() throws SQLException, IOException { + setupNamesTable(); + + runner.setIncomingConnection(false); + + runner.setProperty(CaptureChangeMSSQL.CDC_TABLES, "Names"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + Map attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("4", attributes.get("mssqlcdc.row.count")); + Assert.assertEquals("2017-01-01 02:03:06.567", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-01-01 02:03:06.567", stateMap.get("names")); + + //Add rows, check again + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12350, '2017-01-01 03:04:05.123', " + + "'2017-01-01 03:04:06.123', 10030, 12350)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (2000, 'Chris', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12350, 12350, 1, 1, 0, 2000, 'Chris', 'Stone')"); + + runner.clearTransferState(); + + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("1", attributes.get("mssqlcdc.row.count")); + Assert.assertEquals("2017-01-01 03:04:06.123", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-01-01 03:04:06.123", stateMap.get("names")); + } + + @Test + public void testInitialTimestamp() throws SQLException, IOException { + setupNamesTable(); + + runner.setIncomingConnection(false); + + runner.setProperty(CaptureChangeMSSQL.CDC_TABLES, "Names"); + runner.setProperty("initial.timestamp.names", "2017-01-01 02:03:04.123"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + Map attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("3", attributes.get("mssqlcdc.row.count")); + Assert.assertEquals("2017-01-01 02:03:06.567", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-01-01 02:03:06.567", stateMap.get("names")); + + runner.clearTransferState(); + runner.run(); + + runner.assertTransferCount(CaptureChangeMSSQL.REL_SUCCESS, 0); + + //Add rows, check again + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12350, '2017-01-01 03:04:05.123', " + + "'2017-01-01 03:04:06.123', 10030, 12350)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (2000, 'Chris', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12350, 12350, 1, 1, 0, 2000, 'Chris', 'Stone')"); + + runner.clearTransferState(); + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("1", attributes.get("mssqlcdc.row.count")); + Assert.assertEquals("2017-01-01 03:04:06.123", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-01-01 03:04:06.123", stateMap.get("names")); + } + + @Test + public void testRowLimit() throws SQLException, IOException { + setupNamesTable(); + + runner.setIncomingConnection(false); + + runner.setProperty(CaptureChangeMSSQL.CDC_TABLES, "Names"); + runner.setProperty(CaptureChangeMSSQL.FULL_SNAPSHOT_ROW_LIMIT, "2"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + Map attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("6", attributes.get("mssqlcdc.row.count")); + + //since a full data snapshot was taken, the CURRENT_TIMESTAMP from the database is used + Assert.assertEquals("2017-03-01 01:01:01.123", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("true", attributes.get("fullsnapshot")); + + StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-01 01:01:01.123", stateMap.get("names")); + + runner.clearTransferState(); + runner.run(); + + runner.assertTransferCount(CaptureChangeMSSQL.REL_SUCCESS, 0); + + //These rows should be skipped. The timestamp coming back from the full table snapshot is greater then the CDC update timestamp + // resulting in no rows returned. + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12350, '2017-01-01 03:04:05.123', " + + "'2017-01-01 03:04:06.123', 10030, 12350)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (2000, 'Chris', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12350, 12350, 1, 1, 0, 2000, 'Chris', 'Stone')"); + + runner.clearTransferState(); + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 0); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-01 01:01:01.123", stateMap.get("names")); + + //Add one row, past current max timestamp, make sure we get a CDC only output + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12360, '2017-03-01 23:59:58.123', " + + "'2017-03-02 00:00:00.001', 10440, 12360)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (1900, 'Chris', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12360, 12360, 1, 1, 0, 1900, 'Juan', 'Stone')"); + + runner.clearTransferState(); + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("1", attributes.get("mssqlcdc.row.count")); + + Assert.assertEquals("2017-03-02 00:00:00.001", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-02 00:00:00.001", stateMap.get("names")); + + + //Add two rows so we go over the row limit and get a full snapshot + // This snapshot will include the row we inserted above but which got skipped previously + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12360, '2017-02-28 23:59:01.123', " + + "'2017-03-02 00:01:01.123', 10040, 12360)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (2100, 'James', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12360, 12360, 1, 1, 0, 2100, 'James', 'Stone')"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (2200, 'Clark', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12360, 12360, 1, 1, 0, 2200, 'Clark', 'Stone')"); + + //Update the "CURRENT_TIMESTAMP" value, which gets used + // as the current tran_end_time marker for full snapshots + processor.db_timestamp = "2017-03-03 01:01:01.123"; + + runner.clearTransferState(); + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("10", attributes.get("mssqlcdc.row.count")); + Assert.assertEquals("true", attributes.get("fullsnapshot")); + Assert.assertEquals("2017-03-03 01:01:01.123", attributes.get("maxvalue.tran_end_time")); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-03 01:01:01.123", stateMap.get("names")); + } + + @Test + public void testBaseline() throws SQLException, IOException { + setupNamesTable(); + + runner.setIncomingConnection(false); + + runner.setProperty(CaptureChangeMSSQL.CDC_TABLES, "Names"); + runner.setProperty(CaptureChangeMSSQL.TAKE_INITIAL_SNAPSHOT, "true"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + Map attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("6", attributes.get("mssqlcdc.row.count")); + + //since a full data snapshot was taken, the CURRENT_TIMESTAMP from the database is used + Assert.assertEquals("2017-03-01 01:01:01.123", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("true", attributes.get("fullsnapshot")); + + StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-01 01:01:01.123", stateMap.get("names")); + + runner.clearTransferState(); + runner.run(); + + runner.assertTransferCount(CaptureChangeMSSQL.REL_SUCCESS, 0); + + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + //Add one row, past current max timestamp, make sure we get a CDC only output + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12360, '2017-03-01 23:59:58.123', " + + "'2017-03-02 00:00:00.001', 10440, 12360)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (1900, 'Chris', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12360, 12360, 1, 1, 0, 1900, 'Juan', 'Stone')"); + + runner.clearTransferState(); + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("1", attributes.get("mssqlcdc.row.count")); + + Assert.assertEquals("2017-03-02 00:00:00.001", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-02 00:00:00.001", stateMap.get("names")); + } + + @Test + public void testBaselineRowCount() throws SQLException, IOException { + setupNamesTable(); + + runner.setIncomingConnection(false); + + runner.setProperty(CaptureChangeMSSQL.CDC_TABLES, "Names"); + runner.setProperty(CaptureChangeMSSQL.TAKE_INITIAL_SNAPSHOT, "true"); + runner.setProperty(CaptureChangeMSSQL.FULL_SNAPSHOT_ROW_LIMIT, "2"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + Map attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("6", attributes.get("mssqlcdc.row.count")); + + //since a full data snapshot was taken, the CURRENT_TIMESTAMP from the database is used + Assert.assertEquals("2017-03-01 01:01:01.123", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("true", attributes.get("fullsnapshot")); + + StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-01 01:01:01.123", stateMap.get("names")); + + runner.clearTransferState(); + runner.run(); + + runner.assertTransferCount(CaptureChangeMSSQL.REL_SUCCESS, 0); + + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + //Add one row, past current max timestamp, make sure we get a CDC only output + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12360, '2017-03-01 23:59:58.123', " + + "'2017-03-02 00:00:00.001', 10440, 12360)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (1900, 'Chris', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12360, 12360, 1, 1, 0, 1900, 'Juan', 'Stone')"); + + runner.clearTransferState(); + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("1", attributes.get("mssqlcdc.row.count")); + + Assert.assertEquals("2017-03-02 00:00:00.001", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-02 00:00:00.001", stateMap.get("names")); + + + //Add two rows so we go over the row limit and get a full snapshot + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12360, '2017-02-28 23:59:01.123', " + + "'2017-03-02 00:01:01.123', 10040, 12360)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (2100, 'James', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12360, 12360, 1, 1, 0, 2100, 'James', 'Stone')"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (2200, 'Clark', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12360, 12360, 1, 1, 0, 2200, 'Clark', 'Stone')"); + + //Update the "CURRENT_TIMESTAMP" value, which gets used + // as the current tran_end_time marker for full snapshots + processor.db_timestamp = "2017-03-03 01:01:01.123"; + + runner.clearTransferState(); + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("9", attributes.get("mssqlcdc.row.count")); + Assert.assertEquals("true", attributes.get("fullsnapshot")); + Assert.assertEquals("2017-03-03 01:01:01.123", attributes.get("maxvalue.tran_end_time")); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-03 01:01:01.123", stateMap.get("names")); + } + + + private void setupNamesTable() throws SQLException { + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try{ + stmt.execute("DROP TABLE cdc.\"dbo_Names_CT\""); + } catch(SQLException e){ + + } + + try{ + stmt.execute("DROP TABLE dbo.\"Names\""); + } catch(SQLException e){ + + } + + try{ + stmt.execute("DROP TABLE cdc.\"dbo_Names2_CT\""); + } catch(SQLException e){ + + } + + try{ + stmt.execute("DROP TABLE dbo.\"Names2\""); + } catch(SQLException e){ + + } + + stmt.execute("CREATE TABLE cdc.\"dbo_Names_CT\"(\n" + + "\"__$start_lsn\" int,\n" + + "\"__$end_lsn\" int,\n" + + "\"__$seqval\" int,\n" + + "\"__$operation\" int,\n" + + "\"__$update_mask\" int," + + "\"ID\" int,\n" + + "\"FirstName\" varchar(128),\n" + + "\"LastName\" varchar(128))"); + + stmt.execute("CREATE TABLE dbo.\"Names\"(\n" + + "\"ID\" int,\n" + + "\"FirstName\" varchar(128),\n" + + "\"LastName\" varchar(128))"); + + stmt.execute("CREATE TABLE cdc.\"dbo_Names2_CT\"(\n" + + "\"__$start_lsn\" int,\n" + + "\"__$end_lsn\" int,\n" + + "\"__$seqval\" int,\n" + + "\"__$operation\" int,\n" + + "\"__$update_mask\" int," + + "\"ID\" int,\n" + + "\"Kanji\" varchar(128))"); + + stmt.execute("CREATE TABLE dbo.\"Names2\"(\n" + + "\"ID\" int,\n" + + "\"Kanji\" varchar(128))"); + + //Empty CDC tables + stmt.execute("DELETE FROM cdc.change_tables"); + stmt.execute("DELETE FROM cdc.lsn_time_mapping"); + stmt.execute("DELETE FROM cdc.index_columns"); + stmt.execute("DELETE FROM cdc.captured_columns"); + + stmt.execute("insert into cdc.change_tables (object_id, schemaName, tableName, sourceSchemaName, sourceTableName) VALUES (1, 'cdc', 'dbo_Names_CT', 'dbo', 'Names')"); + stmt.execute("insert into cdc.change_tables (object_id, schemaName, tableName, sourceSchemaName, sourceTableName) VALUES (2, 'cdc', 'dbo_Names2_CT', 'dbo', 'Names2')"); + + stmt.execute("insert into cdc.captured_columns (object_id, column_name, column_id, column_type, column_ordinal) VALUES (1, 'ID', 1, 'int', 1)"); + stmt.execute("insert into cdc.captured_columns (object_id, column_name, column_id, column_type, column_ordinal) VALUES (1, 'FirstName', 2, 'nvarchar', 2)"); + stmt.execute("insert into cdc.captured_columns (object_id, column_name, column_id, column_type, column_ordinal) VALUES (1, 'LastName', 3, 'nvarchar', 3)"); + + stmt.execute("insert into cdc.captured_columns (object_id, column_name, column_id, column_type, column_ordinal) VALUES (2, 'ID', 1, 'int', 1)"); + stmt.execute("insert into cdc.captured_columns (object_id, column_name, column_id, column_type, column_ordinal) VALUES (2, 'Kanji', 2, 'nvarchar', 2)"); + + stmt.execute("insert into cdc.index_columns (object_id, column_name, column_id, index_ordinal) VALUES (1, 'ID', 1, 1)"); + + stmt.execute("insert into cdc.index_columns (object_id, column_name, column_id, index_ordinal) VALUES (2, 'ID', 1, 1)"); + + + + + //Load in sample data, both into "data tables" and into "cdc tables" + + //Load in some data that predates CDC data + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (900, 'Jim', 'Chen')"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (901, 'Audrey', 'Evans')"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (902, 'Hao', 'Chen')"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (903, 'Greg', 'Phillips')"); + + //Load in data that is in both data tables and in CDC + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12345, '2017-01-01 02:03:03.123', " + + "'2017-01-01 02:03:04.123', 10000, 12345)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (1000, 'John', 'Smith')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12345, 12345, 1, 1, 0, 1000, 'John', 'Smith')"); + + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12346, '2017-01-01 02:01:05.123', " + + "'2017-01-01 02:03:06.567', 10001, 12346)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (1010, 'Ami', 'Smith')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12346, 12346, 1, 1, 0, 1010, 'Ami', 'Smith')"); + + stmt.execute("insert into dbo.\"Names2\" (\"ID\", \"Kanji\") VALUES (1010, '亜美')"); + stmt.execute("insert into cdc.\"dbo_Names2_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"Kanji\") VALUES (12346, 12346, 1, 1, 0, 1010, '亜美')"); + + stmt.execute("update dbo.\"Names\" SET \"LastName\"='Smithson' where ID=1010"); + //Pre-update values + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12346, 12346, 2, 3, 0, 1010, 'Ami', 'Smith')"); + //Post-update values + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12346, 12346, 2, 4, 0, 1010, 'Ami', 'Smithson')"); + } + + /** + * Simple implementation only for CaptureChangeMSSQL processor testing. + */ + private static class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + } catch (final Exception e) { + throw new ProcessException("getConnection failed: " + e); + } + } + } + + @Stateful(scopes = Scope.CLUSTER, description = "Mock for CaptureChangeMSSQL processor") + public static class MockCaptureChangeMSSQL extends CaptureChangeMSSQL { + public String db_timestamp = "2017-03-01 01:01:01.123"; + + MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils() { + @Override + public String getLIST_CHANGE_TRACKING_TABLES_SQL() { + return "SELECT object_id,\n" + + " 'NiFi_TEST' AS databaseName, \n" + + " schemaName, \n" + + " tableName, \n" + + " sourceSchemaName,\n" + + " sourceTableName\n" + + "FROM cdc.change_tables"; + } + + @Override + public String getLIST_TABLE_COLUMNS() { + return super.getLIST_TABLE_COLUMNS(); + } + + @Override + public String getCURRENT_TIMESTAMP() { + return "TIMESTAMP('" + db_timestamp + "')"; + } + }; + + @Override + public MSSQLCDCUtils getMssqlcdcUtils() { + return mssqlcdcUtils; + } + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml new file mode 100644 index 000000000000..5e12c45519f2 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml @@ -0,0 +1,34 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-cdc + 1.9.0-SNAPSHOT + + + org.apache.nifi + nifi-cdc-mssql-bundle + 1.9.0-SNAPSHOT + pom + + + nifi-cdc-mssql-processors + nifi-cdc-mssql-nar + + diff --git a/nifi-nar-bundles/nifi-cdc/pom.xml b/nifi-nar-bundles/nifi-cdc/pom.xml index 029733a695cd..97eb153ba5db 100644 --- a/nifi-nar-bundles/nifi-cdc/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/pom.xml @@ -25,5 +25,6 @@ nifi-cdc-api nifi-cdc-mysql-bundle + nifi-cdc-mssql-bundle \ No newline at end of file From 4613af92fa04d56d3971fdd7915dbf466a6b992d Mon Sep 17 00:00:00 2001 From: Peter Wicks Date: Tue, 20 Aug 2019 16:04:44 +0000 Subject: [PATCH 02/11] NIFI-4521 MS SQL CDC Processor --- nifi-assembly/pom.xml | 2 +- .../nifi-cdc-mssql-nar/pom.xml | 8 ++++---- .../nifi-cdc-mssql-processors/pom.xml | 20 +++++++++---------- .../nifi-cdc/nifi-cdc-mssql-bundle/pom.xml | 4 ++-- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 8d1b7b7b02ab..4731cbea0235 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -668,7 +668,7 @@ language governing permissions and limitations under the License. --> org.apache.nifi nifi-cdc-mssql-nar - 1.8.0-SNAPSHOT + 1.10.0-SNAPSHOT nar diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml index 49220bbae3eb..419e7b8734e3 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml @@ -19,11 +19,11 @@ org.apache.nifi nifi-cdc-mssql-bundle - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT nifi-cdc-mssql-nar - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT nar NiFi Microsoft SQL Change Data Capture (CDC) NAR @@ -35,13 +35,13 @@ org.apache.nifi nifi-standard-services-api-nar - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT nar org.apache.nifi nifi-cdc-mssql-processors - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml index 759f13804278..6d0e1c0829fb 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml @@ -19,7 +19,7 @@ org.apache.nifi nifi-cdc-mssql-bundle - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT nifi-cdc-mssql-processors @@ -29,39 +29,39 @@ org.apache.nifi nifi-api - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT org.apache.nifi nifi-utils - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT org.apache.nifi nifi-dbcp-service-api - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT provided org.apache.nifi nifi-cdc-api - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT org.apache.nifi nifi-record-serialization-service-api - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT org.apache.nifi nifi-record - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT compile org.apache.nifi nifi-mock - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT test @@ -77,13 +77,13 @@ org.apache.nifi nifi-dbcp-service - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT test org.apache.nifi nifi-mock-record-utils - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT test diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml index 5e12c45519f2..7c842f0d7a91 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml @@ -19,12 +19,12 @@ org.apache.nifi nifi-cdc - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT org.apache.nifi nifi-cdc-mssql-bundle - 1.9.0-SNAPSHOT + 1.10.0-SNAPSHOT pom From de2fa7917a4bfd073e67fa42e94160b54d81021f Mon Sep 17 00:00:00 2001 From: patricker Date: Mon, 18 Sep 2017 04:43:57 +0000 Subject: [PATCH 03/11] NIFI-4521 MS SQL CDC Processor --- .../nifi-cdc-mssql-nar/pom.xml | 8 ++++---- .../nifi-cdc-mssql-processors/pom.xml | 20 +++++++++---------- .../nifi-cdc/nifi-cdc-mssql-bundle/pom.xml | 4 ++-- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml index 419e7b8734e3..49220bbae3eb 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml @@ -19,11 +19,11 @@ org.apache.nifi nifi-cdc-mssql-bundle - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT nifi-cdc-mssql-nar - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT nar NiFi Microsoft SQL Change Data Capture (CDC) NAR @@ -35,13 +35,13 @@ org.apache.nifi nifi-standard-services-api-nar - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT nar org.apache.nifi nifi-cdc-mssql-processors - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml index 6d0e1c0829fb..759f13804278 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml @@ -19,7 +19,7 @@ org.apache.nifi nifi-cdc-mssql-bundle - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT nifi-cdc-mssql-processors @@ -29,39 +29,39 @@ org.apache.nifi nifi-api - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT org.apache.nifi nifi-utils - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT org.apache.nifi nifi-dbcp-service-api - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT provided org.apache.nifi nifi-cdc-api - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT org.apache.nifi nifi-record-serialization-service-api - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT org.apache.nifi nifi-record - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT compile org.apache.nifi nifi-mock - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT test @@ -77,13 +77,13 @@ org.apache.nifi nifi-dbcp-service - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT test org.apache.nifi nifi-mock-record-utils - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT test diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml index 7c842f0d7a91..5e12c45519f2 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml @@ -19,12 +19,12 @@ org.apache.nifi nifi-cdc - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT org.apache.nifi nifi-cdc-mssql-bundle - 1.10.0-SNAPSHOT + 1.9.0-SNAPSHOT pom From a53f6e42a5ccb419c35a290fedabec7f7c2383d6 Mon Sep 17 00:00:00 2001 From: Peter Wicks Date: Wed, 22 Jan 2020 03:17:52 +0000 Subject: [PATCH 04/11] NIFI-4521 MS SQL CDC Processor Version Update --- nifi-assembly/pom.xml | 2 +- .../nifi-cdc-mssql-nar/pom.xml | 8 ++++---- .../nifi-cdc-mssql-processors/pom.xml | 20 +++++++++---------- .../nifi-cdc/nifi-cdc-mssql-bundle/pom.xml | 4 ++-- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 4731cbea0235..cb70a8c46f79 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -668,7 +668,7 @@ language governing permissions and limitations under the License. --> org.apache.nifi nifi-cdc-mssql-nar - 1.10.0-SNAPSHOT + 1.11.0-SNAPSHOT nar diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml index 49220bbae3eb..e170c344c750 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml @@ -19,11 +19,11 @@ org.apache.nifi nifi-cdc-mssql-bundle - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT nifi-cdc-mssql-nar - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT nar NiFi Microsoft SQL Change Data Capture (CDC) NAR @@ -35,13 +35,13 @@ org.apache.nifi nifi-standard-services-api-nar - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT nar org.apache.nifi nifi-cdc-mssql-processors - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml index 759f13804278..38925b91fe61 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml @@ -19,7 +19,7 @@ org.apache.nifi nifi-cdc-mssql-bundle - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT nifi-cdc-mssql-processors @@ -29,39 +29,39 @@ org.apache.nifi nifi-api - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT org.apache.nifi nifi-utils - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT org.apache.nifi nifi-dbcp-service-api - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT provided org.apache.nifi nifi-cdc-api - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT org.apache.nifi nifi-record-serialization-service-api - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT org.apache.nifi nifi-record - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT compile org.apache.nifi nifi-mock - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT test @@ -77,13 +77,13 @@ org.apache.nifi nifi-dbcp-service - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT test org.apache.nifi nifi-mock-record-utils - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT test diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml index 5e12c45519f2..2ea1274d1e35 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml @@ -19,12 +19,12 @@ org.apache.nifi nifi-cdc - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT org.apache.nifi nifi-cdc-mssql-bundle - 1.9.0-SNAPSHOT + 1.11.0-SNAPSHOT pom From 36a6d336eab8f476b9f299fe0c625471822a37fd Mon Sep 17 00:00:00 2001 From: Peter Wicks Date: Wed, 22 Jan 2020 15:23:00 +0000 Subject: [PATCH 05/11] NIFI-4521 MS SQL CDC Table Name Trimming --- .../nifi/cdc/mssql/processors/CaptureChangeMSSQL.java | 6 +++--- .../org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java index c738c02024e2..a16d800f21e8 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.cdc.mssql.processors; -import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.Stateful; @@ -226,8 +225,9 @@ public void onTrigger(ProcessContext processContext, ProcessSessionFactory proce final String[] allTables = schemaCache.keySet().toArray(new String[schemaCache.size()]); - String[] tables = StringUtils - .split(processContext.getProperty(CDC_TABLES).evaluateAttributeExpressions().getValue(), ","); + String[] tables = processContext.getProperty(CDC_TABLES).evaluateAttributeExpressions().getValue() + .trim() + .split("\\s*,\\s*"); if(tables == null || tables.length == 0){ tables = allTables; diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java index 9d956ee78a7b..1be4862706cf 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java @@ -248,7 +248,7 @@ public void testSelectGenerator(){ @Test public void testRetrieveAllChanges() throws SQLException, IOException { setupNamesTable(); - + runner.setIncomingConnection(false); runner.setProperty(CaptureChangeMSSQL.CDC_TABLES, "Names"); @@ -560,13 +560,13 @@ public void testBaselineRowCount() throws SQLException, IOException { runner.setIncomingConnection(false); - runner.setProperty(CaptureChangeMSSQL.CDC_TABLES, "Names"); + runner.setProperty(CaptureChangeMSSQL.CDC_TABLES, " Names , Names2 "); runner.setProperty(CaptureChangeMSSQL.TAKE_INITIAL_SNAPSHOT, "true"); runner.setProperty(CaptureChangeMSSQL.FULL_SNAPSHOT_ROW_LIMIT, "2"); runner.run(); - runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 2); MockFlowFile flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); Map attributes = flowFile.getAttributes(); From ffcf0a8e18f00f8afb108c004096b8ec84230dbc Mon Sep 17 00:00:00 2001 From: Peter Wicks Date: Fri, 24 Jan 2020 15:46:55 +0000 Subject: [PATCH 06/11] NIFI-4521 NiFi Version Update --- nifi-assembly/pom.xml | 2 +- .../nifi-cdc-mssql-nar/pom.xml | 8 ++++---- .../nifi-cdc-mssql-processors/pom.xml | 20 +++++++++---------- .../nifi-cdc/nifi-cdc-mssql-bundle/pom.xml | 4 ++-- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index cb70a8c46f79..40fa6934c89f 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -668,7 +668,7 @@ language governing permissions and limitations under the License. --> org.apache.nifi nifi-cdc-mssql-nar - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT nar diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml index e170c344c750..6cd12a4ba50e 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml @@ -19,11 +19,11 @@ org.apache.nifi nifi-cdc-mssql-bundle - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT nifi-cdc-mssql-nar - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT nar NiFi Microsoft SQL Change Data Capture (CDC) NAR @@ -35,13 +35,13 @@ org.apache.nifi nifi-standard-services-api-nar - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT nar org.apache.nifi nifi-cdc-mssql-processors - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml index 38925b91fe61..26843a0c8499 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml @@ -19,7 +19,7 @@ org.apache.nifi nifi-cdc-mssql-bundle - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT nifi-cdc-mssql-processors @@ -29,39 +29,39 @@ org.apache.nifi nifi-api - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT org.apache.nifi nifi-utils - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT org.apache.nifi nifi-dbcp-service-api - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT provided org.apache.nifi nifi-cdc-api - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT org.apache.nifi nifi-record-serialization-service-api - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT org.apache.nifi nifi-record - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT compile org.apache.nifi nifi-mock - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT test @@ -77,13 +77,13 @@ org.apache.nifi nifi-dbcp-service - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT test org.apache.nifi nifi-mock-record-utils - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT test diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml index 2ea1274d1e35..72bc5500eb52 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml @@ -19,12 +19,12 @@ org.apache.nifi nifi-cdc - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT org.apache.nifi nifi-cdc-mssql-bundle - 1.11.0-SNAPSHOT + 1.12.0-SNAPSHOT pom From e7d5fb1b1a53aa2516f991851f9be05a4780f338 Mon Sep 17 00:00:00 2001 From: Peter Wicks Date: Fri, 28 Aug 2020 09:20:23 -0600 Subject: [PATCH 07/11] NIFI-4521 Logging and cleanup --- .../mssql/processors/CaptureChangeMSSQL.java | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java index a16d800f21e8..cf2a3bbeb6ef 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java @@ -263,20 +263,23 @@ public void onTrigger(ProcessContext processContext, ProcessSessionFactory proce ArrayList tableCapturePlans = new ArrayList<>(); try (final Connection con = dbcpService.getConnection()){ for (String t : tables) { - String tableKey = t.toLowerCase(); + final String tableKey = t.toLowerCase(); if (!schemaCache.containsKey(tableKey)) { throw new ProcessException("Unknown CDC enabled table named " + t + ". Known table names: " + String.join(", ", allTables)); } - MSSQLTableInfo tableInfo = schemaCache.get(tableKey); + final MSSQLTableInfo tableInfo = schemaCache.get(tableKey); //Get Max Timestamp from state (if it exists) String sTime = null; if (statePropertyMap.containsKey(tableKey)) { sTime = statePropertyMap.get(tableKey); + logger.info("Table `{}` using Timestamp '{}'", new Object[] { tableKey, sTime }); + } else { + logger.info("Table `{}` has no saved Timestamp", new Object[] { tableKey }); } - TableCapturePlan tableCapturePlan = new TableCapturePlan(tableInfo, fullSnapshotRowLimit, takeInitialSnapshot, includePreupdateValues, sTime); + final TableCapturePlan tableCapturePlan = new TableCapturePlan(tableInfo, fullSnapshotRowLimit, takeInitialSnapshot, includePreupdateValues, sTime); //Determine Plan Type tableCapturePlan.computeCapturePlan(con, getMssqlcdcUtils()); @@ -295,6 +298,8 @@ public void onTrigger(ProcessContext processContext, ProcessSessionFactory proce throw new ProcessException("Unknown Capture Plan type, '" + capturePlan.getPlanType() + "'."); } + logger.debug("SQL Statement for `{}`: {}", new Object[] { capturePlan.getTable(), selectQuery }); + FlowFile cdcFlowFile = session.create(); try(final PreparedStatement st = con.prepareStatement(selectQuery)) { if(capturePlan.getPlanType() == TableCapturePlan.PlanTypes.CDC && capturePlan.getMaxTime() != null){ @@ -302,47 +307,42 @@ public void onTrigger(ProcessContext processContext, ProcessSessionFactory proce } final ResultSet resultSet = st.executeQuery(); - ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, null); - + final Map attributes = new HashMap<>(); - final AtomicReference maxTimestamp = new AtomicReference<>(); - final AtomicLong rowCount = new AtomicLong(); - final AtomicInteger fieldCount = new AtomicInteger(); - cdcFlowFile = session.write(cdcFlowFile, new StreamCallback() { - @Override - public void process(final InputStream in, final OutputStream out) throws IOException { - Long rows=0L; - final RecordSchema writeSchema = resultSetRecordSet.getSchema(); - fieldCount.set(writeSchema.getFieldCount()); - try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) { - writer.beginRecordSet(); - - Record record; - while ((record = resultSetRecordSet.next()) != null) { - writer.write(record); - - rows++; - maxTimestamp.set((Timestamp)record.getValue("tran_end_time")); - } - - final WriteResult writeResult = writer.finishRecordSet(); - - attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); - } catch (SchemaNotFoundException e) { - e.printStackTrace(); + Timestamp maxTimestamp=null; + long rows=0L; + int fieldCount; + try (final ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, null); + final OutputStream out = session.write(cdcFlowFile)) { + + final RecordSchema writeSchema = resultSetRecordSet.getSchema(); + fieldCount = writeSchema.getFieldCount(); + try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, cdcFlowFile)) { + writer.beginRecordSet(); + + Record record; + while ((record = resultSetRecordSet.next()) != null) { + writer.write(record); + + rows++; + maxTimestamp = (Timestamp)record.getValue("tran_end_time"); } - rowCount.set(rows); + writer.finishRecordSet(); + + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + } catch (SchemaNotFoundException e) { + e.printStackTrace(); } - }); + } - if(rowCount.get() == 0){ + if(rows == 0){ session.remove(cdcFlowFile); continue; } attributes.put("tablename", capturePlan.getTable().getSourceTableName()); - attributes.put("mssqlcdc.row.count", rowCount.toString()); + attributes.put("mssqlcdc.row.count", Long.toString(rows)); attributes.put("maxvalue.tran_end_time", maxTimestamp.toString()); attributes.put("fullsnapshot", Boolean.toString(capturePlan.getPlanType() == TableCapturePlan.PlanTypes.SNAPSHOT)); From b2aa70b12abf8cf1e973360a7c95057bfa69b07e Mon Sep 17 00:00:00 2001 From: Peter Wicks Date: Fri, 28 Aug 2020 09:50:35 -0600 Subject: [PATCH 08/11] NIFI-4521 POM Version Update --- nifi-assembly/pom.xml | 2 +- .../nifi-cdc-mssql-nar/pom.xml | 8 ++++---- .../nifi-cdc-mssql-processors/pom.xml | 20 +++++++++---------- .../nifi-cdc/nifi-cdc-mssql-bundle/pom.xml | 4 ++-- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 40fa6934c89f..6983311e80d0 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -668,7 +668,7 @@ language governing permissions and limitations under the License. --> org.apache.nifi nifi-cdc-mssql-nar - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT nar diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml index 6cd12a4ba50e..42f09e2f8e91 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml @@ -19,11 +19,11 @@ org.apache.nifi nifi-cdc-mssql-bundle - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT nifi-cdc-mssql-nar - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT nar NiFi Microsoft SQL Change Data Capture (CDC) NAR @@ -35,13 +35,13 @@ org.apache.nifi nifi-standard-services-api-nar - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT nar org.apache.nifi nifi-cdc-mssql-processors - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml index 26843a0c8499..3c3383160dc5 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml @@ -19,7 +19,7 @@ org.apache.nifi nifi-cdc-mssql-bundle - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT nifi-cdc-mssql-processors @@ -29,39 +29,39 @@ org.apache.nifi nifi-api - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT org.apache.nifi nifi-utils - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT org.apache.nifi nifi-dbcp-service-api - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT provided org.apache.nifi nifi-cdc-api - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT org.apache.nifi nifi-record-serialization-service-api - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT org.apache.nifi nifi-record - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT compile org.apache.nifi nifi-mock - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT test @@ -77,13 +77,13 @@ org.apache.nifi nifi-dbcp-service - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT test org.apache.nifi nifi-mock-record-utils - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT test diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml index 72bc5500eb52..8b71b5db3ad6 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml @@ -19,12 +19,12 @@ org.apache.nifi nifi-cdc - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT org.apache.nifi nifi-cdc-mssql-bundle - 1.12.0-SNAPSHOT + 1.13.0-SNAPSHOT pom From b9f78b827268267c6bd6c6d4adce35658fdf1b3e Mon Sep 17 00:00:00 2001 From: Peter Wicks Date: Thu, 3 Sep 2020 12:47:19 -0600 Subject: [PATCH 09/11] NIFI-4521 Fixing Checkstyle Issues --- .../nifi/cdc/mssql/processors/CaptureChangeMSSQL.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java index cf2a3bbeb6ef..6f6df0911698 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java @@ -45,18 +45,15 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; -import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.ResultSetRecordSet; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.sql.Connection; import java.sql.PreparedStatement; @@ -71,9 +68,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; @TriggerSerially @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @@ -307,14 +301,13 @@ public void onTrigger(ProcessContext processContext, ProcessSessionFactory proce } final ResultSet resultSet = st.executeQuery(); - + final Map attributes = new HashMap<>(); Timestamp maxTimestamp=null; long rows=0L; int fieldCount; try (final ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, null); - final OutputStream out = session.write(cdcFlowFile)) { - + final OutputStream out = session.write(cdcFlowFile)) { final RecordSchema writeSchema = resultSetRecordSet.getSchema(); fieldCount = writeSchema.getFieldCount(); try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, cdcFlowFile)) { From b8b44d121f748800fc320c4907fda9a76e65c105 Mon Sep 17 00:00:00 2001 From: Peter Wicks Date: Fri, 11 Sep 2020 15:11:13 -0600 Subject: [PATCH 10/11] NIFI-4521 Fix Tablename Parsing --- .../cdc/mssql/processors/CaptureChangeMSSQL.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java index 6f6df0911698..228a116d5d21 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java @@ -219,12 +219,14 @@ public void onTrigger(ProcessContext processContext, ProcessSessionFactory proce final String[] allTables = schemaCache.keySet().toArray(new String[schemaCache.size()]); - String[] tables = processContext.getProperty(CDC_TABLES).evaluateAttributeExpressions().getValue() - .trim() - .split("\\s*,\\s*"); - - if(tables == null || tables.length == 0){ + final String tablesProp = processContext.getProperty(CDC_TABLES).evaluateAttributeExpressions().getValue(); + String[] tables; + if(tablesProp == null || tablesProp.length() == 0){ tables = allTables; + } else { + tables = tablesProp + .trim() + .split("\\s*,\\s*"); } final StateManager stateManager = processContext.getStateManager(); From 15684f1f93b63a5bbe5c9ad72362e985e744bead Mon Sep 17 00:00:00 2001 From: Peter Wicks Date: Fri, 26 Feb 2021 09:44:19 -0700 Subject: [PATCH 11/11] NIFI-4521 Update versions, prepare tests to match real binary(10) --- nifi-assembly/pom.xml | 2 +- .../nifi-cdc-mssql-nar/pom.xml | 8 +-- .../nifi-cdc-mssql-processors/pom.xml | 20 +++--- .../cdc/mssql/CaptureChangeMSSQLTest.java | 72 +++++++++++++++++-- .../nifi-cdc/nifi-cdc-mssql-bundle/pom.xml | 4 +- 5 files changed, 82 insertions(+), 24 deletions(-) diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 6983311e80d0..c6a520ea2a2f 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -668,7 +668,7 @@ language governing permissions and limitations under the License. --> org.apache.nifi nifi-cdc-mssql-nar - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT nar diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml index 42f09e2f8e91..405d9bbf613f 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml @@ -19,11 +19,11 @@ org.apache.nifi nifi-cdc-mssql-bundle - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT nifi-cdc-mssql-nar - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT nar NiFi Microsoft SQL Change Data Capture (CDC) NAR @@ -35,13 +35,13 @@ org.apache.nifi nifi-standard-services-api-nar - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT nar org.apache.nifi nifi-cdc-mssql-processors - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml index 3c3383160dc5..846ce4752ff2 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml @@ -19,7 +19,7 @@ org.apache.nifi nifi-cdc-mssql-bundle - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT nifi-cdc-mssql-processors @@ -29,39 +29,39 @@ org.apache.nifi nifi-api - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT org.apache.nifi nifi-utils - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT org.apache.nifi nifi-dbcp-service-api - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT provided org.apache.nifi nifi-cdc-api - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT org.apache.nifi nifi-record-serialization-service-api - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT org.apache.nifi nifi-record - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT compile org.apache.nifi nifi-mock - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT test @@ -77,13 +77,13 @@ org.apache.nifi nifi-dbcp-service - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT test org.apache.nifi nifi-mock-record-utils - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT test diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java index 1be4862706cf..6e0740235665 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java @@ -87,8 +87,8 @@ public static void setupBeforeClass() throws IOException, SQLException { "version int,\n" + "capture_instance varchar(128),\n" + - "start_lsn int,\n" + - "end_lsn int,\n" + + "start_lsn char(10) FOR BIT DATA,\n" + + "end_lsn char(10) FOR BIT DATA,\n" + "supports_net_changes BOOLEAN,\n" + "has_drop_pending BOOLEAN,\n" + "role_name varchar(128),\n" + @@ -98,10 +98,10 @@ public static void setupBeforeClass() throws IOException, SQLException { "partition_switch BOOLEAN)"); stmt.execute("CREATE TABLE cdc.lsn_time_mapping(\n" + - "start_lsn int,\n" + + "start_lsn char(10) FOR BIT DATA,\n" + "tran_begin_time TIMESTAMP,\n" + "tran_end_time TIMESTAMP,\n" + - "tran_id int,\n" + + "tran_id char(10) FOR BIT DATA,\n" + "tran_begin_lsn int)"); stmt.execute("CREATE TABLE cdc.index_columns(\n" + @@ -657,6 +657,64 @@ public void testBaselineRowCount() throws SQLException, IOException { Assert.assertEquals("2017-03-03 01:01:01.123", stateMap.get("names")); } + @Test + public void testVeryLargeLSN() throws SQLException, IOException { + setupNamesTable(); + + runner.setIncomingConnection(false); + + runner.setProperty(CaptureChangeMSSQL.CDC_TABLES, "Names"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + Map attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("4", attributes.get("mssqlcdc.row.count")); + Assert.assertEquals("2017-01-01 02:03:06.567", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-01-01 02:03:06.567", stateMap.get("names")); + + //Add rows, check again + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12350, '2017-01-01 03:04:05.123', " + + "'2017-01-01 03:04:06.123', 10030, 12350)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (2000, 'Chris', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12350, 12350, 1, 1, 0, 2000, 'Chris', 'Stone')"); + + runner.clearTransferState(); + + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("1", attributes.get("mssqlcdc.row.count")); + Assert.assertEquals("2017-01-01 03:04:06.123", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-01-01 03:04:06.123", stateMap.get("names")); + } private void setupNamesTable() throws SQLException { // load test data to database @@ -688,9 +746,9 @@ private void setupNamesTable() throws SQLException { } stmt.execute("CREATE TABLE cdc.\"dbo_Names_CT\"(\n" + - "\"__$start_lsn\" int,\n" + - "\"__$end_lsn\" int,\n" + - "\"__$seqval\" int,\n" + + "\"__$start_lsn\" CHAR(10) FOR BIT DATA,\n" + + "\"__$end_lsn\" CHAR(10) FOR BIT DATA,\n" + + "\"__$seqval\" CHAR(10) FOR BIT DATA,\n" + "\"__$operation\" int,\n" + "\"__$update_mask\" int," + "\"ID\" int,\n" + diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml index 8b71b5db3ad6..f0428b336b89 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml @@ -19,12 +19,12 @@ org.apache.nifi nifi-cdc - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT org.apache.nifi nifi-cdc-mssql-bundle - 1.13.0-SNAPSHOT + 1.14.0-SNAPSHOT pom