From a17c7464d0dea11bebd1a013a3b6fa04a0e57181 Mon Sep 17 00:00:00 2001 From: patricker Date: Mon, 18 Sep 2017 12:43:57 +0800 Subject: [PATCH 1/7] NIFI-4521 MS SQL CDC Processor --- nifi-assembly/pom.xml | 5 + .../nifi-cdc-mssql-nar/pom.xml | 46 + .../src/main/resources/META-INF/NOTICE | 5 + .../nifi-cdc-mssql-processors/pom.xml | 87 ++ .../apache/nifi/cdc/mssql/MSSQLCDCUtils.java | 264 ++++++ .../mssql/event/MSSQLColumnDefinition.java | 67 ++ .../nifi/cdc/mssql/event/MSSQLTableInfo.java | 48 ++ .../cdc/mssql/event/TableCapturePlan.java | 136 +++ .../mssql/processors/CaptureChangeMSSQL.java | 387 +++++++++ .../org.apache.nifi.processor.Processor | 15 + .../cdc/mssql/CaptureChangeMSSQLTest.java | 792 ++++++++++++++++++ .../nifi-cdc/nifi-cdc-mssql-bundle/pom.xml | 43 + nifi-nar-bundles/nifi-cdc/pom.xml | 6 +- pom.xml | 6 + 14 files changed, 1904 insertions(+), 3 deletions(-) create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLColumnDefinition.java create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLTableInfo.java create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/TableCapturePlan.java create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index b9cd707f528d..ec233fd44a82 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -501,6 +501,11 @@ nifi-cdc-mysql-nar nar + + org.apache.nifi + nifi-cdc-mssql-nar + nar + org.apache.nifi nifi-parquet-nar diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml new file mode 100644 index 000000000000..673b6ef03669 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/pom.xml @@ -0,0 +1,46 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-cdc-mssql-bundle + 1.5.0-SNAPSHOT + + + nifi-cdc-mssql-nar + 1.5.0-SNAPSHOT + nar + NiFi Microsoft SQL Change Data Capture (CDC) NAR + + true + true + + + + + org.apache.nifi + nifi-standard-services-api-nar + nar + + + org.apache.nifi + nifi-cdc-mssql-processors + 1.5.0-SNAPSHOT + + + diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 000000000000..fa0bbade0629 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,5 @@ +nifi-cdc-mssql-nar +Copyright 2014-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml new file mode 100644 index 000000000000..de84036a7cc3 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/pom.xml @@ -0,0 +1,87 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-cdc-mssql-bundle + 1.5.0-SNAPSHOT + + + nifi-cdc-mssql-processors + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + + + org.apache.nifi + nifi-dbcp-service-api + provided + + + org.apache.nifi + nifi-cdc-api + 1.5.0-SNAPSHOT + + + org.apache.nifi + nifi-record-serialization-service-api + + + org.apache.nifi + nifi-record + compile + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + test + + + org.apache.nifi + nifi-dbcp-service + test + + + org.apache.httpcomponents + httpclient + test + + + org.apache.nifi + nifi-mock-record-utils + test + + + diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java new file mode 100644 index 000000000000..258f31cba268 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql; + +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; + +import java.sql.Connection; +import java.sql.Timestamp; +import java.sql.Types; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +public class MSSQLCDCUtils { + private static final String _columnSplit = "\n,"; + + final String LIST_CHANGE_TRACKING_TABLES_SQL = "SELECT object_id,\n" + + " DB_NAME() AS [databaseName], \n" + + " SCHEMA_NAME(OBJECTPROPERTY(object_id, 'SchemaId')) AS [schemaName], \n" + + " OBJECT_NAME(object_id) AS [tableName], \n" + + " SCHEMA_NAME(OBJECTPROPERTY(source_object_id, 'SchemaId')) AS [sourceSchemaName],\n" + + " OBJECT_NAME(source_object_id) AS [sourceTableName] \n" + + "FROM [cdc].[change_tables]"; + + final String LIST_TABLE_COLUMNS = "select cc.object_id\n" + + ",cc.column_name\n" + + ",cc.column_id\n" + + ",cc.column_type\n" + + ",cc.column_ordinal\n" + + ",CASE WHEN ic.object_id IS NULL THEN 0 ELSE 1 END \"key\"\n" + + "FROM cdc.captured_columns cc\n" + + "LEFT OUTER JOIN cdc.index_columns ic ON \n" + + "(ic.object_id = cc.object_id AND ic.column_name = cc.column_name)\n" + + "where cc.object_id=?\n" + + "ORDER BY cc.column_ordinal"; + + public String getLIST_CHANGE_TRACKING_TABLES_SQL(){ + return LIST_CHANGE_TRACKING_TABLES_SQL; + } + + public String getLIST_TABLE_COLUMNS(){ + return LIST_TABLE_COLUMNS; + } + + public String getCURRENT_TIMESTAMP(){ + return "CURRENT_TIMESTAMP"; + } + + public List getCDCTableList(Connection con) throws SQLException, CDCException { + ArrayList cdcTables = new ArrayList<>(); + + try(final Statement st = con.createStatement()){ + final ResultSet resultSet = st.executeQuery(getLIST_CHANGE_TRACKING_TABLES_SQL()); + + while (resultSet.next()) { + int objectId = resultSet.getInt("object_id"); + String databaseName = resultSet.getString("databaseName"); + String schemaName = resultSet.getString("schemaName"); + String tableName = resultSet.getString("tableName"); + String sourceSchemaName = resultSet.getString("sourceSchemaName"); + String sourceTableName = resultSet.getString("sourceTableName"); + + MSSQLTableInfo ti = new MSSQLTableInfo(databaseName, schemaName, tableName, sourceSchemaName, sourceTableName, Integer.toUnsignedLong(objectId), null); + cdcTables.add(ti); + } + + for (MSSQLTableInfo ti:cdcTables) { + List tableColums = getCaptureColumns(con, ti.getTableId()); + + ti.setColumns(tableColums); + } + } + + return cdcTables; + } + + public List getCaptureColumns(Connection con, long objectId) throws SQLException, CDCException { + ArrayList tableColumns = new ArrayList<>(); + try(final PreparedStatement st = con.prepareStatement(getLIST_TABLE_COLUMNS())){ + st.setLong(1, objectId); + + final ResultSet resultSet = st.executeQuery(); + while (resultSet.next()) { + String columnName = resultSet.getString("column_name"); + int columnId = resultSet.getInt("column_id"); + String columnType = resultSet.getString("column_type"); + int columnOrdinal = resultSet.getInt("column_ordinal"); + int isColumnKey = resultSet.getInt("key"); + + int jdbcType = TranslateMSSQLTypeToJDBCTypes(columnType); + + //get column list + MSSQLColumnDefinition col = new MSSQLColumnDefinition(jdbcType, columnName, columnOrdinal, isColumnKey==1); + tableColumns.add(col); + } + } catch (SQLException e) { + throw e; + } + + return tableColumns; + } + + public String getSnapshotSelectStatement(MSSQLTableInfo tableInfo){ + final StringBuilder sbQuery = new StringBuilder(); + + sbQuery.append("SELECT " + getCURRENT_TIMESTAMP() + " tran_begin_time"); + + sbQuery.append(_columnSplit); + sbQuery.append(getCURRENT_TIMESTAMP() + " \"tran_end_time\""); + + sbQuery.append(_columnSplit); + sbQuery.append("0 trans_id"); + + sbQuery.append(_columnSplit); + sbQuery.append("0 start_lsn"); + + sbQuery.append(_columnSplit); + sbQuery.append("0 seqval"); + + sbQuery.append(_columnSplit); + sbQuery.append("2 operation"); + + sbQuery.append(_columnSplit); + sbQuery.append("0 update_mask"); + + for(ColumnDefinition col : tableInfo.getColumns()){ + MSSQLColumnDefinition mssqlColumnDefinition = (MSSQLColumnDefinition)col; + + sbQuery.append(_columnSplit); + sbQuery.append("\"" + mssqlColumnDefinition.getName() + "\""); + } + + sbQuery.append(_columnSplit); + sbQuery.append(getCURRENT_TIMESTAMP() + " EXTRACT_TIME"); + sbQuery.append("\n"); + sbQuery.append("FROM " + tableInfo.getSourceSchemaName() + ".\""+ tableInfo.getSourceTableName() + "\""); + + return sbQuery.toString(); + } + + public String getCDCSelectStatement(MSSQLTableInfo tableInfo, Timestamp maxTime){ + final StringBuilder sbQuery = new StringBuilder(); + + sbQuery.append("SELECT t.tran_begin_time\n" + + ",t.tran_end_time \"tran_end_time\"\n" + + ",CAST(t.tran_id AS bigint) trans_id\n" + + ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" + + ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" + + ",\"o\".\"__$operation\" operation\n" + + ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask"); + + for(ColumnDefinition col : tableInfo.getColumns()){ + MSSQLColumnDefinition mssqlColumnDefinition = (MSSQLColumnDefinition)col; + + sbQuery.append(_columnSplit); + sbQuery.append("\"o\".\"" + mssqlColumnDefinition.getName() + "\""); + } + + sbQuery.append(_columnSplit); + sbQuery.append(getCURRENT_TIMESTAMP() + " EXTRACT_TIME"); + sbQuery.append("\n"); + sbQuery.append("FROM cdc.\""+ tableInfo.getTableName() + "\" \"o\"\nINNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")"); + + if(maxTime != null){ + sbQuery.append("\nWHERE t.tran_end_time > ?"); + } + + sbQuery.append("\nORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), \"o\".\"__$seqval\", \"o\".\"__$operation\""); + + return sbQuery.toString(); + } + + public String getCDCRowCountStatement(MSSQLTableInfo tableInfo, Timestamp maxTime){ + final StringBuilder sbQuery = new StringBuilder(); + + sbQuery.append("SELECT COUNT(*) rowcnt \n"); + sbQuery.append("FROM cdc.\""+ tableInfo.getTableName() + "\" \"o\"\nINNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")"); + + if(maxTime != null){ + sbQuery.append("\nWHERE t.tran_end_time > ?"); + } + + return sbQuery.toString(); + } + + //List from https://docs.microsoft.com/en-us/sql/connect/jdbc/using-basic-data-types + public static int TranslateMSSQLTypeToJDBCTypes(String mssqltype) throws CDCException { + switch (mssqltype){ + case "bigint": + return Types.BIGINT; + case "binary": + return Types.BINARY; + case "bit": + return Types.BIT; + case "char": + case "uniqueidentifier": + return Types.CHAR; + case "date": + return Types.DATE; + case "datetime": + case "datetime2": + case "smalldatetime": + return Types.TIMESTAMP; + case "decimal": + return Types.DECIMAL; + case "float": + return Types.DOUBLE; + case "image": + return Types.LONGVARBINARY; + case "int": + return Types.INTEGER; + case "money": + case "smallmoney": + return Types.DECIMAL; + case "nchar": + return Types.NCHAR; + case "ntext": + case "text": + case "xml": + return Types.LONGVARCHAR; + case "numeric": + return Types.NUMERIC; + case "nvarchar": + case "varchar": + return Types.VARCHAR; + case "real": + return Types.REAL; + case "smallint": + return Types.SMALLINT; + case "time": + return Types.TIME; + case "timestamp": + return Types.BINARY; + case "tinyint": + return Types.TINYINT; + case "udt": + case "varbinary": + return Types.VARBINARY; + } + + throw new CDCException("Unrecognized MS SQL data type " + mssqltype); + } +} + diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLColumnDefinition.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLColumnDefinition.java new file mode 100644 index 000000000000..5e56674b01ea --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLColumnDefinition.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.event; + +import org.apache.nifi.cdc.event.ColumnDefinition; + +import java.sql.JDBCType; + +public class MSSQLColumnDefinition extends ColumnDefinition { + + private boolean isKey=false; + private int columnOrdinal; + private int columnSize=255; + + public boolean getIsKey(){ + return isKey; + } + public int getColumnOrdinal(){ + return columnOrdinal; + } + + public int getColumnSize(){ + return columnSize; + } + + public void setColumnSize(int size){ + columnSize = size; + } + + public String getCustomTypeName(){ + return ""; + } + + public boolean isRequired(){ + return isKey; + } + + public void setIsKey(boolean isKey){ + this.isKey = isKey; + } + + public JDBCType getDataType(){ + return JDBCType.valueOf(this.getType()); + + } + + public MSSQLColumnDefinition(int type, String name, int columnOrdinal, boolean isKey) { + super(type, name); + + this.isKey = isKey; + this.columnOrdinal = columnOrdinal; + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLTableInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLTableInfo.java new file mode 100644 index 000000000000..b1295ea6bda0 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/MSSQLTableInfo.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.event; + +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.event.TableInfo; + +import java.util.List; + +public class MSSQLTableInfo extends TableInfo { + + private String schemaName; + + private String sourceSchemaName; + private String sourceTableName; + + public String getSchemaName() { + return schemaName; + } + public String getSourceSchemaName() { + return sourceSchemaName; + } + public String getSourceTableName() { + return sourceTableName; + } + + public MSSQLTableInfo(String databaseName, String schemaName, String tableName, String sourceSchemaName, String sourceTableName, Long tableId, List columns) { + super(databaseName, tableName, tableId, columns); + + this.schemaName = schemaName; + this.sourceSchemaName = sourceSchemaName; + this.sourceTableName = sourceTableName; + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/TableCapturePlan.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/TableCapturePlan.java new file mode 100644 index 000000000000..1be91748b63c --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/TableCapturePlan.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.event; + +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.util.StringUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +public class TableCapturePlan { + + public enum PlanTypes{ + CDC, + SNAPSHOT + } + + public MSSQLTableInfo getTable() { + return table; + } + + public int getRowLimit() { + return rowLimit; + } + + public boolean getCaptureBaseline() { + return captureBaseline; + } + + public Timestamp getMaxTime() { + return maxTime; + } + + public PlanTypes getPlanType() { + return planType; + } + + private MSSQLTableInfo table; + private int rowLimit; + private boolean captureBaseline; + private Timestamp maxTime; + + private PlanTypes planType; + + public TableCapturePlan(MSSQLTableInfo table, int rowLimit, boolean captureBaseline, String sTime){ + this.table = table; + + this.rowLimit = rowLimit; + this.captureBaseline = captureBaseline; + + if(!StringUtils.isEmpty(sTime)){ + this.maxTime = Timestamp.valueOf(sTime); + } + } + + public void ComputeCapturePlan(Connection con, MSSQLCDCUtils mssqlcdcUtils) throws SQLException { + //No Row Limit Options + if(getRowLimit() == 0){ + //No starting point for CDC data extract + if(getMaxTime() == null){ + //If we need a data baseline then capture a snapshot, otherwise CDC + this.planType = getCaptureBaseline()?PlanTypes.SNAPSHOT:PlanTypes.CDC; + } else { + //We have a starting point for data extraction + this.planType = PlanTypes.CDC; + } + } else { + //We may need to capture a data baseline depending on CDC Row Count + + //No starting point for CDC data extract + if(getMaxTime() == null) { + //There is no previous starting point, and we need a data baseline anyways + // do a full snapshot + if(getCaptureBaseline()){ + this.planType = PlanTypes.SNAPSHOT; + } else { + //There is no previous starting point, and depending on row count + // we may choose to pull a full snapshot anyways, even though we + // didn't ask to capture a baseline + String sqlQuery = mssqlcdcUtils.getCDCRowCountStatement(getTable(), null); + long rowCount = getRowCount(con, sqlQuery); + + //If we didn't pass the Row Limit threshold, then use CDC, else get a Snapshot + this.planType = rowCount < getRowLimit()?PlanTypes.CDC:PlanTypes.SNAPSHOT; + } + } else { + //We have a starting point for data extraction + + //Assume CDC unless we have a reason to change it + this.planType = PlanTypes.CDC; + + String sqlQuery = mssqlcdcUtils.getCDCRowCountStatement(getTable(), getMaxTime()); + long rowCount = getRowCount(con, sqlQuery); + + if(rowCount >= getRowLimit()){ + this.planType = PlanTypes.SNAPSHOT; + } + } + } + + return; + } + + private long getRowCount(Connection con, String sqlQuery) throws SQLException { + try(final PreparedStatement st = con.prepareStatement(sqlQuery)) { + if(getMaxTime() != null){ + st.setTimestamp(1, getMaxTime()); + } + + final ResultSet resultSet = st.executeQuery(); + resultSet.next(); + long rowCount = resultSet.getLong(1); + + resultSet.close(); + + return rowCount; + } + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java new file mode 100644 index 000000000000..e0f7dce3a9fd --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.processors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.event.TableCapturePlan; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.ResultSetRecordSet; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "jdbc", "cdc", "mssql"}) +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events " + + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.") +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so " + + "that it can continue from the same point in time if restarted.") +@WritesAttributes({ + @WritesAttribute(attribute = "tablename", description="Name of the table this changeset was captured from."), + @WritesAttribute(attribute="mssqlcdc.row.count", description="The number of rows in this changeset"), + @WritesAttribute(attribute="fullsnapshot", description="Whether this was a full snapshot of the base table or not..")}) +@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial " + + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp.{table_name}`, one for each table. " + + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.") +public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor { + public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp."; + + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("cdcmssql-dbcp-service") + .displayName("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain connection to database") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + public static final PropertyDescriptor CDC_TABLES = new PropertyDescriptor.Builder() + .name("cdcmssql-cdc-table-list") + .displayName("CDC Table List") + .description("The comma delimited list of tables in the source database to monitor for changes. If no tables " + + "are specified the [cdc].[change_tables] table is queried for all of the available tables with change tracking enabled in the database.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new PropertyDescriptor.Builder() + .name("cdcmssql-initial-snapshot") + .displayName("Generate an Initial Source Table Snapshot") + .description("Usually CDC only includes recent historic changes. Setting this property to true will cause a snapshot of the " + + "source table to be taken using the same schema as the CDC extracts. The snapshot time will be used as the starting point " + + "for extracting CDC changes.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new PropertyDescriptor + .Builder().name("cdcmssql-full-snapshot-row-limit") + .displayName("Change Set Row Limit") + .description("If a very large change occurs on the source table, " + + "the generated change set may be too large too quickly merge into a destination system. " + + "Use this property to set a cut-off point where instead of returning a changeset a full snapshot will be generated instead. " + + "The fullsnapshot attribute will be set to true when this happens.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully created FlowFile from SQL query result set.") + .build(); + + protected List descriptors; + protected Set relationships; + + protected final Map schemaCache = new ConcurrentHashMap(1000); + + // A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time + protected Map maxValueProperties; + protected MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils(); + + public MSSQLCDCUtils getMssqlcdcUtils(){ + return mssqlcdcUtils; + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList(); + descriptors.add(RECORD_WRITER); + descriptors.add(DBCP_SERVICE); + descriptors.add(CDC_TABLES); + descriptors.add(TAKE_INITIAL_SNAPSHOT); + descriptors.add(FULL_SNAPSHOT_ROW_LIMIT); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + if(!propertyDescriptorName.startsWith("initial.timestamp.")){ + return null; + } + + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(false) + .dynamic(true) + .build(); + } + + @Override + public void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) throws ProcessException { + ProcessSession session = processSessionFactory.createSession(); + + final ComponentLog logger = getLogger(); + final RecordSetWriterFactory writerFactory = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final DBCPService dbcpService = processContext.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + + final boolean takeInitialSnapshot = processContext.getProperty(TAKE_INITIAL_SNAPSHOT).asBoolean(); + final int fullSnapshotRowLimit = processContext.getProperty(FULL_SNAPSHOT_ROW_LIMIT).asInteger(); + + final String[] allTables = schemaCache.keySet().toArray(new String[schemaCache.size()]); + + String[] tables = StringUtils + .split(processContext.getProperty(CDC_TABLES).evaluateAttributeExpressions().getValue(), ","); + + if(tables == null || tables.length == 0){ + tables = allTables; + } + + final StateManager stateManager = processContext.getStateManager(); + final StateMap stateMap; + try { + stateMap = stateManager.getState(Scope.CLUSTER); + } catch (final IOException ioe) { + logger.error("Failed to retrieve observed current timestamp values from the State Manager. Will not perform " + + "query until this is accomplished.", ioe); + processContext.yield(); + return; + } + + // Make a mutable copy of the current state property map. This will be updated and eventually + // set as the current state map (after the session has been committed) + final Map statePropertyMap = new HashMap<>(stateMap.toMap()); + + // If an initial max value for the table has been specified using properties, and this table is not in the state manager, sync it to the state property map + for (final Map.Entry maxProp : maxValueProperties.entrySet()) { + String maxPropKey = maxProp.getKey().toLowerCase(); + String tableName = getStateKey(INITIAL_TIMESTAMP_PROP_START, maxPropKey); + if (!statePropertyMap.containsKey(tableName)) { + String newMaxPropValue = maxProp.getValue(); + + statePropertyMap.put(tableName, newMaxPropValue); + } + } + + //Build a capture plan for each table + ArrayList tableCapturePlans = new ArrayList<>(); + try (final Connection con = dbcpService.getConnection()){ + for (String t : tables) { + String tableKey = t.toLowerCase(); + if (!schemaCache.containsKey(tableKey)) { + throw new ProcessException("Unknown CDC enabled table named " + t + ". Known table names: " + String.join(", ", allTables)); + } + + MSSQLTableInfo tableInfo = schemaCache.get(tableKey); + + //Get Max Timestamp from state (if it exists) + String sTime = null; + if (statePropertyMap.containsKey(tableKey)) { + sTime = statePropertyMap.get(tableKey); + } + + TableCapturePlan tableCapturePlan = new TableCapturePlan(tableInfo, fullSnapshotRowLimit, takeInitialSnapshot, sTime); + + //Determine Plan Type + tableCapturePlan.ComputeCapturePlan(con, getMssqlcdcUtils()); + + tableCapturePlans.add(tableCapturePlan); + } + + for (TableCapturePlan capturePlan : tableCapturePlans) { + final String selectQuery; + + if(capturePlan.getPlanType() == TableCapturePlan.PlanTypes.CDC){ + selectQuery = getMssqlcdcUtils().getCDCSelectStatement(capturePlan.getTable(), capturePlan.getMaxTime()); + } else if(capturePlan.getPlanType() == TableCapturePlan.PlanTypes.SNAPSHOT){ + selectQuery = getMssqlcdcUtils().getSnapshotSelectStatement(capturePlan.getTable()); + } else { + throw new ProcessException("Unknown Capture Plan type, '" + capturePlan.getPlanType() + "'."); + } + + FlowFile cdcFlowFile = session.create(); + try(final PreparedStatement st = con.prepareStatement(selectQuery)) { + if(capturePlan.getPlanType() == TableCapturePlan.PlanTypes.CDC && capturePlan.getMaxTime() != null){ + st.setTimestamp(1, capturePlan.getMaxTime()); + } + + final ResultSet resultSet = st.executeQuery(); + ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet); + + final AtomicReference maxTimestamp = new AtomicReference<>(); + final AtomicLong rowCount = new AtomicLong(); + + cdcFlowFile = session.write(cdcFlowFile, new StreamCallback() { + @Override + public void process(final InputStream in, final OutputStream out) throws IOException { + Long rows=0L; + final RecordSchema writeSchema = resultSetRecordSet.getSchema(); + try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) { + writer.beginRecordSet(); + + Record record; + while ((record = resultSetRecordSet.next()) != null) { + writer.write(record); + + rows++; + maxTimestamp.set((Timestamp)record.getValue("tran_end_time")); + } + + final WriteResult writeResult = writer.finishRecordSet(); + } catch (SchemaNotFoundException e) { + e.printStackTrace(); + } + + rowCount.set(rows); + } + }); + + if(rowCount.get() == 0){ + session.remove(cdcFlowFile); + continue; + } + + Map attributes = new HashMap<>(); + attributes.put("tablename", capturePlan.getTable().getSourceTableName()); + attributes.put("mssqlcdc.row.count", rowCount.toString()); + attributes.put("maxvalue.tran_end_time", maxTimestamp.toString()); + attributes.put("fullsnapshot", Boolean.toString(capturePlan.getPlanType() == TableCapturePlan.PlanTypes.SNAPSHOT)); + + cdcFlowFile = session.putAllAttributes(cdcFlowFile, attributes); + session.transfer(cdcFlowFile, REL_SUCCESS); + + statePropertyMap.put(capturePlan.getTable().getSourceTableName().toLowerCase(), maxTimestamp.toString()); + stateManager.setState(statePropertyMap, Scope.CLUSTER); + + session.commit(); + } catch (IOException e) { + session.remove(cdcFlowFile); + throw new ProcessException("Failed to update cluster state with new timestamps.", e); + } + } + } catch (SQLException e) { + throw new ProcessException("Error working with MS SQL CDC Database.", e); + } + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + //Prefetch list of all CDC tables and their schemas. + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + try (final Connection con = dbcpService.getConnection()) { + List tableSchemas = getMssqlcdcUtils().getCDCTableList(con); + + for (MSSQLTableInfo ti:tableSchemas) { + schemaCache.put(ti.getSourceTableName().toLowerCase(), ti); + } + } catch (SQLException e) { + throw new ProcessException("Unable to communicate with database in order to determine CDC tables", e); + } catch (CDCException e) { + throw new ProcessException(e.getMessage(), e); + } + + maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); + } + + protected static String getStateKey(String prefix, String tableName) { + return tableName.toLowerCase().replace(prefix,""); + } + + protected Map getDefaultMaxValueProperties(final Map properties){ + final Map defaultMaxValues = new HashMap<>(); + + for (final Map.Entry entry : properties.entrySet()) { + final String key = entry.getKey().getName(); + + if(!key.startsWith(INITIAL_TIMESTAMP_PROP_START)) { + continue; + } + + defaultMaxValues.put(key.substring(INITIAL_TIMESTAMP_PROP_START.length()), entry.getValue()); + } + + return defaultMaxValues; + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 000000000000..8b592e17e723 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.cdc.mssql.processors.CaptureChangeMSSQL \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java new file mode 100644 index 000000000000..8613a2468f85 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java @@ -0,0 +1,792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.processors.CaptureChangeMSSQL; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.SQLNonTransientConnectionException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CaptureChangeMSSQLTest { + + private TestRunner runner; + private MockCaptureChangeMSSQL processor; + private final static String DB_LOCATION = "target/db_qdt"; + + + @BeforeClass + public static void setupBeforeClass() throws IOException, SQLException { + System.setProperty("derby.stream.error.file", "target/derby.log"); + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ioe) { + // Do nothing, may not have existed + } + + // load CDC schema to database + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Connection con = dbcp.getConnection(); + Statement stmt = con.createStatement(); + + stmt.execute("CREATE TABLE cdc.change_tables(\n" + + "object_id int,\n" + + //These four columns are computed from object_id/source_object_id in MS SQL, but for testing we put them as strings + "schemaName varchar(128),\n" + + "tableName varchar(128),\n" + + "sourceSchemaName varchar(128),\n" + + "sourceTableName varchar(128),\n" + + + "version int,\n" + + "capture_instance varchar(128),\n" + + "start_lsn int,\n" + + "end_lsn int,\n" + + "supports_net_changes BOOLEAN,\n" + + "has_drop_pending BOOLEAN,\n" + + "role_name varchar(128),\n" + + "index_name varchar(128),\n" + + "filegroup_name varchar(128),\n" + + "create_date TIMESTAMP,\n" + + "partition_switch BOOLEAN)"); + + stmt.execute("CREATE TABLE cdc.lsn_time_mapping(\n" + + "start_lsn int,\n" + + "tran_begin_time TIMESTAMP,\n" + + "tran_end_time TIMESTAMP,\n" + + "tran_id int,\n" + + "tran_begin_lsn int)"); + + stmt.execute("CREATE TABLE cdc.index_columns(\n" + + "object_id int,\n" + + "column_name varchar(128),\n" + + "index_ordinal int,\n" + + "column_id int)"); + + stmt.execute("CREATE TABLE cdc.captured_columns(\n" + + "object_id int,\n" + + "column_name varchar(128),\n" + + "column_id int,\n" + + "column_type varchar(128),\n" + + "column_ordinal int,\n" + + "is_computed BOOLEAN)"); + } + + @AfterClass + public static void cleanUpAfterClass() throws Exception { + try { + DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true"); + } catch (SQLNonTransientConnectionException e) { + // Do nothing, this is what happens at Derby shutdown + } + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ioe) { + // Do nothing, may not have existed + } + } + + @Before + public void setup() throws InitializationException, IOException, SQLException { + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map dbcpProperties = new HashMap<>(); + + processor = new MockCaptureChangeMSSQL(); + runner = TestRunners.newTestRunner(processor); + + runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(CaptureChangeMSSQL.DBCP_SERVICE, "dbcp"); + + final MockRecordWriter writerService = new MockRecordWriter(null, false); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + runner.setProperty(CaptureChangeMSSQL.RECORD_WRITER, "writer"); + + runner.getStateManager().clear(Scope.CLUSTER); + } + + @After + public void teardown() throws IOException { + runner.getStateManager().clear(Scope.CLUSTER); + runner = null; + } + + @Test + public void testSelectGenerator(){ + MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils(); + + List columns = new ArrayList<>(); + columns.add(new MSSQLColumnDefinition(Types.INTEGER, "ID", 1, true)); + columns.add(new MSSQLColumnDefinition(Types.VARCHAR, "LastName", 2, false)); + columns.add(new MSSQLColumnDefinition(Types.VARCHAR, "FirstName", 3, false)); + + MSSQLTableInfo ti = new MSSQLTableInfo("NiFi", "cdc", "Names", + "dbo", "dbo_Names_CT", 1000L, columns); + + String noMaxTime = mssqlcdcUtils.getCDCSelectStatement(ti, null); + + Assert.assertEquals("SELECT t.tran_begin_time\n" + + ",t.tran_end_time \"tran_end_time\"\n" + + ",CAST(t.tran_id AS bigint) trans_id\n" + + ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" + + ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" + + ",\"o\".\"__$operation\" operation\n" + + ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" + + ",\"o\".\"ID\"\n" + + ",\"o\".\"LastName\"\n" + + ",\"o\".\"FirstName\"\n" + + ",CURRENT_TIMESTAMP EXTRACT_TIME\n" + + "FROM cdc.\"Names\" \"o\"\n" + + "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")\n" + + "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), \"o\".\"__$seqval\", \"o\".\"__$operation\"", noMaxTime); + + String withMaxTime = mssqlcdcUtils.getCDCSelectStatement(ti, new Timestamp(0)); + + Assert.assertEquals("SELECT t.tran_begin_time\n" + + ",t.tran_end_time \"tran_end_time\"\n" + + ",CAST(t.tran_id AS bigint) trans_id\n" + + ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" + + ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" + + ",\"o\".\"__$operation\" operation\n" + + ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" + + ",\"o\".\"ID\"\n" + + ",\"o\".\"LastName\"\n" + + ",\"o\".\"FirstName\"\n" + + ",CURRENT_TIMESTAMP EXTRACT_TIME\n" + + "FROM cdc.\"Names\" \"o\"\n" + + "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")\n" + + "WHERE t.tran_end_time > ?\n" + + "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), \"o\".\"__$seqval\", \"o\".\"__$operation\"", withMaxTime); + } + + @Test + public void testRetrieveAllChanges() throws SQLException, IOException { + setupNamesTable(); + + runner.setIncomingConnection(false); + + runner.setProperty(CaptureChangeMSSQL.CDC_TABLES, "Names"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + Map attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("4", attributes.get("mssqlcdc.row.count")); + Assert.assertEquals("2017-01-01 02:03:06.567", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-01-01 02:03:06.567", stateMap.get("names")); + + //Add rows, check again + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12350, '2017-01-01 03:04:05.123', " + + "'2017-01-01 03:04:06.123', 10030, 12350)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (2000, 'Chris', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12350, 12350, 1, 1, 0, 2000, 'Chris', 'Stone')"); + + runner.clearTransferState(); + + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("1", attributes.get("mssqlcdc.row.count")); + Assert.assertEquals("2017-01-01 03:04:06.123", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-01-01 03:04:06.123", stateMap.get("names")); + } + + @Test + public void testInitialTimestamp() throws SQLException, IOException { + setupNamesTable(); + + runner.setIncomingConnection(false); + + runner.setProperty(CaptureChangeMSSQL.CDC_TABLES, "Names"); + runner.setProperty("initial.timestamp.names", "2017-01-01 02:03:04.123"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + Map attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("3", attributes.get("mssqlcdc.row.count")); + Assert.assertEquals("2017-01-01 02:03:06.567", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-01-01 02:03:06.567", stateMap.get("names")); + + runner.clearTransferState(); + runner.run(); + + runner.assertTransferCount(CaptureChangeMSSQL.REL_SUCCESS, 0); + + //Add rows, check again + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12350, '2017-01-01 03:04:05.123', " + + "'2017-01-01 03:04:06.123', 10030, 12350)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (2000, 'Chris', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12350, 12350, 1, 1, 0, 2000, 'Chris', 'Stone')"); + + runner.clearTransferState(); + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("1", attributes.get("mssqlcdc.row.count")); + Assert.assertEquals("2017-01-01 03:04:06.123", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-01-01 03:04:06.123", stateMap.get("names")); + } + + @Test + public void testRowLimit() throws SQLException, IOException { + setupNamesTable(); + + runner.setIncomingConnection(false); + + runner.setProperty(CaptureChangeMSSQL.CDC_TABLES, "Names"); + runner.setProperty(CaptureChangeMSSQL.FULL_SNAPSHOT_ROW_LIMIT, "2"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + Map attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("6", attributes.get("mssqlcdc.row.count")); + + //since a full data snapshot was taken, the CURRENT_TIMESTAMP from the database is used + Assert.assertEquals("2017-03-01 01:01:01.123", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("true", attributes.get("fullsnapshot")); + + StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-01 01:01:01.123", stateMap.get("names")); + + runner.clearTransferState(); + runner.run(); + + runner.assertTransferCount(CaptureChangeMSSQL.REL_SUCCESS, 0); + + //These rows should be skipped. The timestamp coming back from the full table snapshot is greater then the CDC update timestamp + // resulting in no rows returned. + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12350, '2017-01-01 03:04:05.123', " + + "'2017-01-01 03:04:06.123', 10030, 12350)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (2000, 'Chris', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12350, 12350, 1, 1, 0, 2000, 'Chris', 'Stone')"); + + runner.clearTransferState(); + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 0); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-01 01:01:01.123", stateMap.get("names")); + + //Add one row, past current max timestamp, make sure we get a CDC only output + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12360, '2017-03-01 23:59:58.123', " + + "'2017-03-02 00:00:00.001', 10440, 12360)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (1900, 'Chris', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12360, 12360, 1, 1, 0, 1900, 'Juan', 'Stone')"); + + runner.clearTransferState(); + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("1", attributes.get("mssqlcdc.row.count")); + + Assert.assertEquals("2017-03-02 00:00:00.001", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-02 00:00:00.001", stateMap.get("names")); + + + //Add two rows so we go over the row limit and get a full snapshot + // This snapshot will include the row we inserted above but which got skipped previously + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12360, '2017-02-28 23:59:01.123', " + + "'2017-03-02 00:01:01.123', 10040, 12360)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (2100, 'James', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12360, 12360, 1, 1, 0, 2100, 'James', 'Stone')"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (2200, 'Clark', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12360, 12360, 1, 1, 0, 2200, 'Clark', 'Stone')"); + + //Update the "CURRENT_TIMESTAMP" value, which gets used + // as the current tran_end_time marker for full snapshots + processor.db_timestamp = "2017-03-03 01:01:01.123"; + + runner.clearTransferState(); + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("10", attributes.get("mssqlcdc.row.count")); + Assert.assertEquals("true", attributes.get("fullsnapshot")); + Assert.assertEquals("2017-03-03 01:01:01.123", attributes.get("maxvalue.tran_end_time")); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-03 01:01:01.123", stateMap.get("names")); + } + + @Test + public void testBaseline() throws SQLException, IOException { + setupNamesTable(); + + runner.setIncomingConnection(false); + + runner.setProperty(CaptureChangeMSSQL.CDC_TABLES, "Names"); + runner.setProperty(CaptureChangeMSSQL.TAKE_INITIAL_SNAPSHOT, "true"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + Map attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("6", attributes.get("mssqlcdc.row.count")); + + //since a full data snapshot was taken, the CURRENT_TIMESTAMP from the database is used + Assert.assertEquals("2017-03-01 01:01:01.123", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("true", attributes.get("fullsnapshot")); + + StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-01 01:01:01.123", stateMap.get("names")); + + runner.clearTransferState(); + runner.run(); + + runner.assertTransferCount(CaptureChangeMSSQL.REL_SUCCESS, 0); + + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + //Add one row, past current max timestamp, make sure we get a CDC only output + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12360, '2017-03-01 23:59:58.123', " + + "'2017-03-02 00:00:00.001', 10440, 12360)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (1900, 'Chris', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12360, 12360, 1, 1, 0, 1900, 'Juan', 'Stone')"); + + runner.clearTransferState(); + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("1", attributes.get("mssqlcdc.row.count")); + + Assert.assertEquals("2017-03-02 00:00:00.001", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-02 00:00:00.001", stateMap.get("names")); + } + + @Test + public void testBaselineRowCount() throws SQLException, IOException { + setupNamesTable(); + + runner.setIncomingConnection(false); + + runner.setProperty(CaptureChangeMSSQL.CDC_TABLES, "Names"); + runner.setProperty(CaptureChangeMSSQL.TAKE_INITIAL_SNAPSHOT, "true"); + runner.setProperty(CaptureChangeMSSQL.FULL_SNAPSHOT_ROW_LIMIT, "2"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + Map attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("6", attributes.get("mssqlcdc.row.count")); + + //since a full data snapshot was taken, the CURRENT_TIMESTAMP from the database is used + Assert.assertEquals("2017-03-01 01:01:01.123", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("true", attributes.get("fullsnapshot")); + + StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-01 01:01:01.123", stateMap.get("names")); + + runner.clearTransferState(); + runner.run(); + + runner.assertTransferCount(CaptureChangeMSSQL.REL_SUCCESS, 0); + + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + //Add one row, past current max timestamp, make sure we get a CDC only output + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12360, '2017-03-01 23:59:58.123', " + + "'2017-03-02 00:00:00.001', 10440, 12360)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (1900, 'Chris', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12360, 12360, 1, 1, 0, 1900, 'Juan', 'Stone')"); + + runner.clearTransferState(); + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("1", attributes.get("mssqlcdc.row.count")); + + Assert.assertEquals("2017-03-02 00:00:00.001", attributes.get("maxvalue.tran_end_time")); + Assert.assertEquals("false", attributes.get("fullsnapshot")); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-02 00:00:00.001", stateMap.get("names")); + + + //Add two rows so we go over the row limit and get a full snapshot + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12360, '2017-02-28 23:59:01.123', " + + "'2017-03-02 00:01:01.123', 10040, 12360)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (2100, 'James', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12360, 12360, 1, 1, 0, 2100, 'James', 'Stone')"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (2200, 'Clark', 'Stone')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12360, 12360, 1, 1, 0, 2200, 'Clark', 'Stone')"); + + //Update the "CURRENT_TIMESTAMP" value, which gets used + // as the current tran_end_time marker for full snapshots + processor.db_timestamp = "2017-03-03 01:01:01.123"; + + runner.clearTransferState(); + runner.run(); + + runner.assertAllFlowFilesTransferred(CaptureChangeMSSQL.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CaptureChangeMSSQL.REL_SUCCESS).get(0); + + attributes = flowFile.getAttributes(); + + Assert.assertTrue("Tablename attribute", attributes.containsKey("tablename")); + Assert.assertTrue("CDC row count attribute", attributes.containsKey("mssqlcdc.row.count")); + Assert.assertTrue("Maximum Transacation End Time attribute", attributes.containsKey("maxvalue.tran_end_time")); + + Assert.assertEquals("Names", attributes.get("tablename")); + Assert.assertEquals("9", attributes.get("mssqlcdc.row.count")); + Assert.assertEquals("true", attributes.get("fullsnapshot")); + Assert.assertEquals("2017-03-03 01:01:01.123", attributes.get("maxvalue.tran_end_time")); + + stateMap = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertEquals("2017-03-03 01:01:01.123", stateMap.get("names")); + } + + + private void setupNamesTable() throws SQLException { + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try{ + stmt.execute("DROP TABLE cdc.\"dbo_Names_CT\""); + } catch(SQLException e){ + + } + + try{ + stmt.execute("DROP TABLE dbo.\"Names\""); + } catch(SQLException e){ + + } + + try{ + stmt.execute("DROP TABLE cdc.\"dbo_Names2_CT\""); + } catch(SQLException e){ + + } + + try{ + stmt.execute("DROP TABLE dbo.\"Names2\""); + } catch(SQLException e){ + + } + + stmt.execute("CREATE TABLE cdc.\"dbo_Names_CT\"(\n" + + "\"__$start_lsn\" int,\n" + + "\"__$end_lsn\" int,\n" + + "\"__$seqval\" int,\n" + + "\"__$operation\" int,\n" + + "\"__$update_mask\" int," + + "\"ID\" int,\n" + + "\"FirstName\" varchar(128),\n" + + "\"LastName\" varchar(128))"); + + stmt.execute("CREATE TABLE dbo.\"Names\"(\n" + + "\"ID\" int,\n" + + "\"FirstName\" varchar(128),\n" + + "\"LastName\" varchar(128))"); + + stmt.execute("CREATE TABLE cdc.\"dbo_Names2_CT\"(\n" + + "\"__$start_lsn\" int,\n" + + "\"__$end_lsn\" int,\n" + + "\"__$seqval\" int,\n" + + "\"__$operation\" int,\n" + + "\"__$update_mask\" int," + + "\"ID\" int,\n" + + "\"Kanji\" varchar(128))"); + + stmt.execute("CREATE TABLE dbo.\"Names2\"(\n" + + "\"ID\" int,\n" + + "\"Kanji\" varchar(128))"); + + //Empty CDC tables + stmt.execute("DELETE FROM cdc.change_tables"); + stmt.execute("DELETE FROM cdc.lsn_time_mapping"); + stmt.execute("DELETE FROM cdc.index_columns"); + stmt.execute("DELETE FROM cdc.captured_columns"); + + stmt.execute("insert into cdc.change_tables (object_id, schemaName, tableName, sourceSchemaName, sourceTableName) VALUES (1, 'cdc', 'dbo_Names_CT', 'dbo', 'Names')"); + stmt.execute("insert into cdc.change_tables (object_id, schemaName, tableName, sourceSchemaName, sourceTableName) VALUES (2, 'cdc', 'dbo_Names2_CT', 'dbo', 'Names2')"); + + stmt.execute("insert into cdc.captured_columns (object_id, column_name, column_id, column_type, column_ordinal) VALUES (1, 'ID', 1, 'int', 1)"); + stmt.execute("insert into cdc.captured_columns (object_id, column_name, column_id, column_type, column_ordinal) VALUES (1, 'FirstName', 2, 'nvarchar', 2)"); + stmt.execute("insert into cdc.captured_columns (object_id, column_name, column_id, column_type, column_ordinal) VALUES (1, 'LastName', 3, 'nvarchar', 3)"); + + stmt.execute("insert into cdc.captured_columns (object_id, column_name, column_id, column_type, column_ordinal) VALUES (2, 'ID', 1, 'int', 1)"); + stmt.execute("insert into cdc.captured_columns (object_id, column_name, column_id, column_type, column_ordinal) VALUES (2, 'Kanji', 2, 'nvarchar', 2)"); + + stmt.execute("insert into cdc.index_columns (object_id, column_name, column_id, index_ordinal) VALUES (1, 'ID', 1, 1)"); + + stmt.execute("insert into cdc.index_columns (object_id, column_name, column_id, index_ordinal) VALUES (2, 'ID', 1, 1)"); + + + + + //Load in sample data, both into "data tables" and into "cdc tables" + + //Load in some data that predates CDC data + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (900, 'Jim', 'Chen')"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (901, 'Audrey', 'Evans')"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (902, 'Hao', 'Chen')"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (903, 'Greg', 'Phillips')"); + + //Load in data that is in both data tables and in CDC + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12345, '2017-01-01 02:03:03.123', " + + "'2017-01-01 02:03:04.123', 10000, 12345)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (1000, 'John', 'Smith')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12345, 12345, 1, 1, 0, 1000, 'John', 'Smith')"); + + stmt.execute("insert into cdc.lsn_time_mapping (start_lsn, tran_begin_time, tran_end_time, tran_id, tran_begin_lsn) VALUES (12346, '2017-01-01 02:01:05.123', " + + "'2017-01-01 02:03:06.567', 10001, 12346)"); + stmt.execute("insert into dbo.\"Names\" (\"ID\", \"FirstName\", \"LastName\") VALUES (1010, 'Ami', 'Smith')"); + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12346, 12346, 1, 1, 0, 1010, 'Ami', 'Smith')"); + + stmt.execute("insert into dbo.\"Names2\" (\"ID\", \"Kanji\") VALUES (1010, '亜美')"); + stmt.execute("insert into cdc.\"dbo_Names2_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"Kanji\") VALUES (12346, 12346, 1, 1, 0, 1010, '亜美')"); + + stmt.execute("update dbo.\"Names\" SET \"LastName\"='Smithson' where ID=1010"); + //Pre-update values + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12346, 12346, 2, 3, 0, 1010, 'Ami', 'Smith')"); + //Post-update values + stmt.execute("insert into cdc.\"dbo_Names_CT\" (\"__$start_lsn\", \"__$end_lsn\", \"__$seqval\", \"__$operation\", \"__$update_mask\", " + + "\"ID\", \"FirstName\", \"LastName\") VALUES (12346, 12346, 2, 4, 0, 1010, 'Ami', 'Smithson')"); + } + + /** + * Simple implementation only for CaptureChangeMSSQL processor testing. + */ + private static class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + } catch (final Exception e) { + throw new ProcessException("getConnection failed: " + e); + } + } + } + + @Stateful(scopes = Scope.CLUSTER, description = "Mock for CaptureChangeMSSQL processor") + public static class MockCaptureChangeMSSQL extends CaptureChangeMSSQL { + public String db_timestamp = "2017-03-01 01:01:01.123"; + + MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils() { + @Override + public String getLIST_CHANGE_TRACKING_TABLES_SQL() { + return "SELECT object_id,\n" + + " 'NiFi_TEST' AS databaseName, \n" + + " schemaName, \n" + + " tableName, \n" + + " sourceSchemaName,\n" + + " sourceTableName\n" + + "FROM cdc.change_tables"; + } + + @Override + public String getLIST_TABLE_COLUMNS() { + return super.getLIST_TABLE_COLUMNS(); + } + + @Override + public String getCURRENT_TIMESTAMP() { + return "TIMESTAMP('" + db_timestamp + "')"; + } + }; + + @Override + public MSSQLCDCUtils getMssqlcdcUtils() { + return mssqlcdcUtils; + } + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml new file mode 100644 index 000000000000..4d0e9a9186ec --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/pom.xml @@ -0,0 +1,43 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-cdc + 1.5.0-SNAPSHOT + + + org.apache.nifi + nifi-cdc-mssql-bundle + 1.5.0-SNAPSHOT + pom + + + nifi-cdc-mssql-processors + nifi-cdc-mssql-nar + + + + + org.apache.nifi + nifi-cdc-mssql-processors + 1.5.0-SNAPSHOT + + + + diff --git a/nifi-nar-bundles/nifi-cdc/pom.xml b/nifi-nar-bundles/nifi-cdc/pom.xml index 3c6fea351a36..80919436a1a9 100644 --- a/nifi-nar-bundles/nifi-cdc/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/pom.xml @@ -12,8 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ---> - +--> 4.0.0 org.apache.nifi @@ -25,5 +24,6 @@ nifi-cdc-api nifi-cdc-mysql-bundle - + nifi-cdc-mssql-bundle + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 0104a45e5304..f9c8bf9eee3a 100644 --- a/pom.xml +++ b/pom.xml @@ -1413,6 +1413,12 @@ 1.5.0-SNAPSHOT nar + + org.apache.nifi + nifi-cdc-mssql-nar + 1.5.0-SNAPSHOT + nar + org.apache.nifi nifi-hwx-schema-registry-nar From 51ac727051dfac7e1988032eb75c097473868288 Mon Sep 17 00:00:00 2001 From: Mark Owens Date: Fri, 20 Oct 2017 11:54:54 -0400 Subject: [PATCH 2/7] NIFI-2979 PriorityAttributePrioritizer violates Comparator contract. This closes #2220. Modified the return value when both objects priority values are null to zero to match the expected return value based upon the Comparator contract. --- .../apache/nifi/prioritizer/PriorityAttributePrioritizer.java | 2 +- .../nifi/prioritizer/PriorityAttributePrioritizerTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizer.java index 3d27930553d2..fdd963e65ba6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizer.java @@ -49,7 +49,7 @@ public int compare(FlowFile o1, FlowFile o2) { String o1Priority = o1.getAttribute(CoreAttributes.PRIORITY.key()); String o2Priority = o2.getAttribute(CoreAttributes.PRIORITY.key()); if (o1Priority == null && o2Priority == null) { - return -1; // this is not 0 to match FirstInFirstOut + return 0; } else if (o2Priority == null) { return -1; } else if (o1Priority == null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java index 709855102b0b..d6ed2dc98cd9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java @@ -83,7 +83,7 @@ public void testPrioritizer() throws InstantiationException, IllegalAccessExcept assertEquals(-1, prioritizer.compare(ffNoPriority, null)); assertEquals(1, prioritizer.compare(null, ffNoPriority)); - assertEquals(-1, prioritizer.compare(ffNoPriority, ffNoPriority)); + assertEquals(0, prioritizer.compare(ffNoPriority, ffNoPriority)); assertEquals(-1, prioritizer.compare(ffPri1, ffNoPriority)); assertEquals(1, prioritizer.compare(ffNoPriority, ffPri1)); From 10293324cd61f26c3e1d021d843a89f347a2cd1e Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Thu, 26 Oct 2017 10:27:57 -0400 Subject: [PATCH 3/7] NIFI-4518: - When the URI is too long, invoking the bulletin board multiple times for all specified component ids. --- .../webapp/js/nf/canvas/nf-canvas-utils.js | 89 ++++++++++++++++--- .../js/nf/canvas/nf-variable-registry.js | 14 ++- 2 files changed, 89 insertions(+), 14 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js index 4ead97b8dc83..f9178e2ec62e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js @@ -65,6 +65,8 @@ } }; + var MAX_URL_LENGTH = 2000; // the maximum (suggested) safe string length of a URL supported by all browsers and application servers + var TWO_PI = 2 * Math.PI; var binarySearch = function (length, comparator) { @@ -245,16 +247,83 @@ * @returns {deferred} */ queryBulletins: function (componentIds) { - var ids = componentIds.join('|'); + var queries = []; - return $.ajax({ - type: 'GET', - url: '../nifi-api/flow/bulletin-board', - data: { - sourceId: ids - }, - dataType: 'json' - }).fail(nfErrorHandler.handleAjaxError); + var query = function (ids) { + var url = new URL(window.location); + var endpoint = url.origin + '/nifi-api/flow/bulletin-board?' + $.param({ + sourceId: ids.join('|') + }); + + if (endpoint.length > MAX_URL_LENGTH) { + // split into two arrays and recurse with both halves + var mid = Math.ceil(ids.length / 2); + + // left half + var left = ids.slice(0, mid); + if (left.length > 0) { + query(left); + } + + // right half + var right = ids.slice(mid); + if (right.length > 0) { + query(right); + } + } else { + queries.push($.ajax({ + type: 'GET', + url: endpoint, + dataType: 'json' + })); + } + }; + + // initiate the queries + query(componentIds); + + if (queries.length === 1) { + // if there was only one query, return it + return queries[0].fail(nfErrorHandler.handleAjaxError); + } else { + // if there were multiple queries, wait for each to complete + return $.Deferred(function (deferred) { + $.when.apply(window, queries).done(function () { + var results = $.makeArray(arguments); + + var generated = null; + var bulletins = []; + + $.each(results, function (_, result) { + var response = result[0]; + var bulletinBoard = response.bulletinBoard; + + // use the first generated timestamp + if (generated === null) { + generated = bulletinBoard.generated; + } + + // build up all the bulletins + Array.prototype.push.apply(bulletins, bulletinBoard.bulletins); + }); + + // sort all the bulletins + bulletins.sort(function (a, b) { + return b.id - a.id; + }); + + // resolve with a aggregated result + deferred.resolve({ + bulletinBoard: { + generated: generated, + bulletins: bulletins + } + }); + }).fail(function () { + deferred.reject(); + }).fail(nfErrorHandler.handleAjaxError); + }).promise(); + } }, /** @@ -418,8 +487,6 @@ } }, - MAX_URL_LENGTH: 2000, // the maximum (suggested) safe string length of a URL supported by all browsers and application servers - /** * Set the parameters of the URL. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-variable-registry.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-variable-registry.js index 1614632a1308..668d1d2088a9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-variable-registry.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-variable-registry.js @@ -71,6 +71,8 @@ }(this, function ($, d3, Slick, nfCanvas, nfCanvasUtils, nfErrorHandler, nfDialog, nfClient, nfCommon, nfNgBridge, nfProcessor, nfProcessGroup, nfProcessGroupConfiguration) { 'use strict'; + var lastSelectedId = null; + // text editor var textEditor = function (args) { var scope = this; @@ -568,9 +570,15 @@ var variableIndex = args.rows[0]; var variable = variablesGrid.getDataItem(variableIndex); - // update the details for this variable - $('#affected-components-context').removeClass('unset').text(variable.name); - populateAffectedComponents(variable.affectedComponents); + // only populate affected components if this variable is different than the last selected + if (lastSelectedId === null || lastSelectedId !== variable.id) { + // update the details for this variable + $('#affected-components-context').removeClass('unset').text(variable.name); + populateAffectedComponents(variable.affectedComponents); + + // update the last selected id + lastSelectedId = variable.id; + } } } }); From 4a11b009d9b64b6ede079aed349f0ece5b7e0105 Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Thu, 26 Oct 2017 12:43:02 -0400 Subject: [PATCH 4/7] NIFI-4518: - Addressing issues when handling the done callback for controller service references. --- .../src/main/webapp/js/nf/canvas/nf-canvas-utils.js | 8 +++++++- .../src/main/webapp/js/nf/canvas/nf-controller-service.js | 3 +-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js index f9178e2ec62e..68a918aa4a7b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js @@ -284,7 +284,13 @@ if (queries.length === 1) { // if there was only one query, return it - return queries[0].fail(nfErrorHandler.handleAjaxError); + return $.Deferred(function (deferred) { + queries[0].done(function (response) { + deferred.resolve(response); + }).fail(function () { + deferred.reject(); + }).fail(nfErrorHandler.handleAjaxError); + }).promise(); } else { // if there were multiple queries, wait for each to complete return $.Deferred(function (deferred) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js index c7d9c435dc27..c97c861d18b6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js @@ -864,8 +864,7 @@ dataType: 'json' }); - $.when(bulletins, service).done(function (bulletinResult, serviceResult) { - var bulletinResponse = bulletinResult[0]; + $.when(bulletins, service).done(function (bulletinResponse, serviceResult) { var serviceResponse = serviceResult[0]; conditionMet(serviceResponse.component, bulletinResponse.bulletinBoard.bulletins); }).fail(function (xhr, status, error) { From a7177daf902e8bf9d348e8ecefb36a85293cdce1 Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Thu, 26 Oct 2017 16:35:14 -0400 Subject: [PATCH 5/7] NIFI-4518: - Ensuring the last selected variable id is reset when closing the dialog. This closes #2227 Signed-off-by: Scott Aslan --- .../src/main/webapp/js/nf/canvas/nf-variable-registry.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-variable-registry.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-variable-registry.js index 668d1d2088a9..24c00ab0a109 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-variable-registry.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-variable-registry.js @@ -1536,6 +1536,9 @@ affectedControllerServicesContainer.empty(); $('#variable-registry-affected-unauthorized-components').empty(); + + // reset the last selected variable + lastSelectedId = null; }; return { From fe3bd99dcfdd307dad97af0331289eac26e1e57b Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Wed, 25 Oct 2017 09:53:16 -0400 Subject: [PATCH 6/7] NIFI-4522: Add SQL Statement property to PutSQL Signed-off-by: Pierre Villard This closes #2225. --- .../nifi/processors/standard/PutSQL.java | 26 +++++++++-- .../nifi/processors/standard/TestPutSQL.java | 46 +++++++++++++++++++ 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index f75ccaa02138..b50dcd067177 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -139,6 +139,19 @@ public class PutSQL extends AbstractSessionFactoryProcessor { .identifiesControllerService(DBCPService.class) .required(true) .build(); + + static final PropertyDescriptor SQL_STATEMENT = new PropertyDescriptor.Builder() + .name("putsql-sql-statement") + .displayName("SQL Statement") + .description("The SQL statement to execute. The statement can be empty, a constant value, or built from attributes " + + "using Expression Language. If this property is specified, it will be used regardless of the content of " + + "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected " + + "to contain a valid SQL statement, to be issued by the processor to the database.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder() .name("Support Fragmented Transactions") .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. " @@ -197,6 +210,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor { protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(); properties.add(CONNECTION_POOL); + properties.add(SQL_STATEMENT); properties.add(SUPPORT_TRANSACTIONS); properties.add(TRANSACTION_TIMEOUT); properties.add(BATCH_SIZE); @@ -269,7 +283,9 @@ void apply(final ProcessContext context, final ProcessSession session, final Fun groups.add(fragmentedEnclosure); for (final FlowFile flowFile : flowFiles) { - final String sql = getSQL(session, flowFile); + final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet() + ? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue() + : getSQL(session, flowFile); final StatementFlowFileEnclosure enclosure = sqlToEnclosure .computeIfAbsent(sql, k -> new StatementFlowFileEnclosure(sql)); @@ -280,7 +296,9 @@ void apply(final ProcessContext context, final ProcessSession session, final Fun private final GroupingFunction groupFlowFilesBySQLBatch = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> { for (final FlowFile flowFile : flowFiles) { - final String sql = getSQL(session, flowFile); + final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet() + ? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue() + : getSQL(session, flowFile); // Get or create the appropriate PreparedStatement to use. final StatementFlowFileEnclosure enclosure = sqlToEnclosure @@ -304,7 +322,9 @@ void apply(final ProcessContext context, final ProcessSession session, final Fun private GroupingFunction groupFlowFilesBySQL = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> { for (final FlowFile flowFile : flowFiles) { - final String sql = getSQL(session, flowFile); + final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet() + ? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue() + : getSQL(session, flowFile); // Get or create the appropriate PreparedStatement to use. final StatementFlowFileEnclosure enclosure = sqlToEnclosure diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index 5a2909b1b229..b804447bbb5b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -1412,6 +1412,52 @@ public void testNullFragmentCountRollbackOnFailure() throws InitializationExcept runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0); } + @Test + public void testStatementsFromProperty() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(PutSQL.SQL_STATEMENT, "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (${row.id}, 'Mark', 84)"); + + recreateTable("PERSONS", createPersons); + + runner.enqueue("This statement should be ignored".getBytes(), new HashMap() {{ + put("row.id", "1"); + }}); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + + runner.setProperty(PutSQL.SQL_STATEMENT, "UPDATE PERSONS SET NAME='George' WHERE ID=${row.id}"); + runner.enqueue("This statement should be ignored".getBytes(), new HashMap() {{ + put("row.id", "1"); + }}); + runner.run(); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("George", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + /** * Simple implementation only for testing purposes */ From 478fc5d06cb515563dc8a260421e0ad1932cd2e0 Mon Sep 17 00:00:00 2001 From: patricker Date: Fri, 27 Oct 2017 13:42:28 +0800 Subject: [PATCH 7/7] NIFI-4534 Choose Character Set for CSV Record Read/Write streams Signed-off-by: Pierre Villard This closes #2229. --- .../java/org/apache/nifi/csv/CSVUtils.java | 9 +++++++ .../java/org/apache/nifi/csv/CSVReader.java | 6 +++-- .../org/apache/nifi/csv/CSVRecordReader.java | 4 +-- .../apache/nifi/csv/CSVRecordSetWriter.java | 5 +++- .../org/apache/nifi/csv/WriteCSVResult.java | 4 +-- .../apache/nifi/csv/TestCSVRecordReader.java | 27 ++++++++++++++++--- .../apache/nifi/csv/TestWriteCSVResult.java | 18 ++++++------- 7 files changed, 53 insertions(+), 20 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java index bc074b329ba3..eecf29037b5d 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java @@ -117,6 +117,15 @@ public class CSVUtils { .defaultValue("true") .required(true) .build(); + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("csvutils-character-set") + .displayName("Character Set") + .description("The Character Encoding that is used to encode/decode the CSV file") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .required(true) + .build(); // CSV Format fields for writers only public static final AllowableValue QUOTE_ALL = new AllowableValue("ALL", "Quote All Values", "All values will be quoted using the configured quote character."); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java index f15f85d60a81..a9b98f572181 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java @@ -60,7 +60,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact private volatile String timestampFormat; private volatile boolean firstLineIsHeader; private volatile boolean ignoreHeader; - + private volatile String charSet; @Override protected List getSupportedPropertyDescriptors() { @@ -77,6 +77,7 @@ protected List getSupportedPropertyDescriptors() { properties.add(CSVUtils.COMMENT_MARKER); properties.add(CSVUtils.NULL_STRING); properties.add(CSVUtils.TRIM_FIELDS); + properties.add(CSVUtils.CHARSET); return properties; } @@ -88,6 +89,7 @@ public void storeCsvFormat(final ConfigurationContext context) { this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); this.firstLineIsHeader = context.getProperty(CSVUtils.FIRST_LINE_IS_HEADER).asBoolean(); this.ignoreHeader = context.getProperty(CSVUtils.IGNORE_CSV_HEADER).asBoolean(); + this.charSet = context.getProperty(CSVUtils.CHARSET).getValue(); // Ensure that if we are deriving schema from header that we always treat the first line as a header, // regardless of the 'First Line is Header' property @@ -106,7 +108,7 @@ public RecordReader createRecordReader(final Map variables, fina final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(bufferedIn), null); bufferedIn.reset(); - return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat); + return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet); } @Override diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java index 18bea6b88bb5..70aaba9e5182 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java @@ -56,7 +56,7 @@ public class CSVRecordReader implements RecordReader { private List rawFieldNames; public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader, - final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException { + final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding) throws IOException { this.schema = schema; final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat); @@ -67,7 +67,7 @@ public CSVRecordReader(final InputStream in, final ComponentLog logger, final Re LAZY_TIME_FORMAT = () -> tf; LAZY_TIMESTAMP_FORMAT = () -> tsf; - final Reader reader = new InputStreamReader(new BOMInputStream(in)); + final Reader reader = new InputStreamReader(new BOMInputStream(in), encoding); CSVFormat withHeader; if (hasHeader) { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java index bd2e60042f28..7aab5a36e656 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java @@ -43,6 +43,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R private volatile CSVFormat csvFormat; private volatile boolean includeHeader; + private volatile String charSet; @Override protected List getSupportedPropertyDescriptors() { @@ -58,6 +59,7 @@ protected List getSupportedPropertyDescriptors() { properties.add(CSVUtils.QUOTE_MODE); properties.add(CSVUtils.RECORD_SEPARATOR); properties.add(CSVUtils.TRAILING_DELIMITER); + properties.add(CSVUtils.CHARSET); return properties; } @@ -65,11 +67,12 @@ protected List getSupportedPropertyDescriptors() { public void storeCsvFormat(final ConfigurationContext context) { this.csvFormat = CSVUtils.createCSVFormat(context); this.includeHeader = context.getProperty(CSVUtils.INCLUDE_HEADER_LINE).asBoolean(); + this.charSet = context.getProperty(CSVUtils.CHARSET).getValue(); } @Override public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException { return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out, - getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader); + getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java index 82d687a42a19..849b1ca6829e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java @@ -50,7 +50,7 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet private String[] fieldNames; public WriteCSVResult(final CSVFormat csvFormat, final RecordSchema recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out, - final String dateFormat, final String timeFormat, final String timestampFormat, final boolean includeHeaderLine) throws IOException { + final String dateFormat, final String timeFormat, final String timestampFormat, final boolean includeHeaderLine, final String charSet) throws IOException { super(out); this.recordSchema = recordSchema; @@ -61,7 +61,7 @@ public WriteCSVResult(final CSVFormat csvFormat, final RecordSchema recordSchema this.includeHeaderLine = includeHeaderLine; final CSVFormat formatWithHeader = csvFormat.withSkipHeaderRecord(true); - final OutputStreamWriter streamWriter = new OutputStreamWriter(out); + final OutputStreamWriter streamWriter = new OutputStreamWriter(out, charSet); printer = new CSVPrinter(streamWriter, formatWithHeader); fieldValues = new Object[recordSchema.getFieldCount()]; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java index 576132fb9f76..23eb95bca39d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java @@ -60,7 +60,26 @@ private List getDefaultFields() { private CSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format) throws IOException { return new CSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII"); + } + + @Test + public void testUTF8() throws IOException, MalformedRecordException { + final String text = "name\n黃凱揚"; + + final List fields = new ArrayList<>(); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream bais = new ByteArrayInputStream(text.getBytes()); + final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) { + + final Record record = reader.nextRecord(); + final String name = (String)record.getValue("name"); + + assertEquals("黃凱揚", name); + } } @Test @@ -72,8 +91,8 @@ public void testDate() throws IOException, MalformedRecordException { final RecordSchema schema = new SimpleRecordSchema(fields); try (final InputStream bais = new ByteArrayInputStream(text.getBytes()); - final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false, - "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) { + final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false, + "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) { final Record record = reader.nextRecord(); final java.sql.Date date = (Date) record.getValue("date"); @@ -268,7 +287,7 @@ public void testFieldInSchemaButNotHeader() throws IOException, MalformedRecordE // our schema to be the definitive list of what fields exist. try (final InputStream bais = new ByteArrayInputStream(inputData); final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, true, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) { final Record record = reader.nextRecord(); assertNotNull(record); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java index 0285796d61f7..6f4269a923e4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java @@ -76,10 +76,10 @@ public void testDataTypes() throws IOException { final long now = System.currentTimeMillis(); try (final WriteCSVResult result = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "UTF-8")) { final Map valueMap = new HashMap<>(); - valueMap.put("string", "string"); + valueMap.put("string", "a孟bc李12儒3"); valueMap.put("boolean", true); valueMap.put("byte", (byte) 1); valueMap.put("char", 'c'); @@ -113,7 +113,7 @@ public void testDataTypes() throws IOException { final String values = splits[1]; final StringBuilder expectedBuilder = new StringBuilder(); - expectedBuilder.append("\"string\",\"true\",\"1\",\"c\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\","); + expectedBuilder.append("\"a孟bc李12儒3\",\"true\",\"1\",\"c\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\","); final String dateValue = getDateFormat(RecordFieldType.DATE.getDefaultFormat()).format(now); final String timeValue = getDateFormat(RecordFieldType.TIME.getDefaultFormat()).format(now); @@ -143,7 +143,7 @@ public void testExtraFieldInWriteRecord() throws IOException { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final String output; try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { writer.beginRecordSet(); writer.write(record); @@ -170,7 +170,7 @@ public void testExtraFieldInWriteRawRecord() throws IOException { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final String output; try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { writer.beginRecordSet(); writer.writeRawRecord(record); @@ -197,7 +197,7 @@ public void testMissingFieldWriteRecord() throws IOException { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final String output; try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { writer.beginRecordSet(); writer.writeRecord(record); @@ -224,7 +224,7 @@ public void testMissingFieldWriteRawRecord() throws IOException { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final String output; try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { writer.beginRecordSet(); writer.writeRawRecord(record); @@ -253,7 +253,7 @@ public void testMissingAndExtraFieldWriteRecord() throws IOException { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final String output; try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { writer.beginRecordSet(); writer.writeRecord(record); @@ -281,7 +281,7 @@ public void testMissingAndExtraFieldWriteRawRecord() throws IOException { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final String output; try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { writer.beginRecordSet(); writer.writeRawRecord(record);