From 5fa7f8208f8a1d69ddd4498e9bb414ba41a208bc Mon Sep 17 00:00:00 2001 From: deonhuang Date: Mon, 3 Jul 2017 14:00:22 +0800 Subject: [PATCH] NIFI-2829 - Add Date and Time Format Support for PutSQL Fix unit test for Date and Time type time zone problem Enhance Time type to record milliseconds --- .../nifi/processors/standard/PutSQL.java | 62 +++++- .../nifi/processors/standard/TestPutSQL.java | 193 ++++++++++++++++++ 2 files changed, 247 insertions(+), 8 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index cb3b19841a80..5f2f093bd1cc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -72,6 +72,10 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalAccessor; import java.util.ArrayList; @@ -110,11 +114,15 @@ + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."), @ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. " + "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases " - + "a format option needs to be specified, currently this is only applicable for binary data types and timestamps. For binary data types " - + "available options are 'ascii', 'base64' and 'hex'. In 'ascii' format each string character in your attribute value represents a single byte, this is the default format " - + "and the format provided by Avro Processors. In 'base64' format your string is a Base64 encoded string. In 'hex' format the string is hex encoded with all " - + "letters in upper case and no '0x' at the beginning. For timestamps, the format can be specified according to java.time.format.DateTimeFormatter." - + "Customer and named patterns are accepted i.e. ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME')") + + "a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - " + + "ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. " + + "base64: the string is a Base64 encoded string that can be decoded to bytes. " + + "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. " + + "Dates/Times/Timestamps - " + + "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') " + + "as specified according to java.time.format.DateTimeFormatter." + + "If not specified, a long value input is expected to be an unix epoch (milli seconds from 1970/1/1) and " + + "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (Derby and MySQL will truncate milliseconds), 'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.") }) @WritesAttributes({ @WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, " @@ -828,10 +836,48 @@ private void setParameter(final PreparedStatement stmt, final String attrName, f stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue)); break; case Types.DATE: - stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue))); + Date date; + + if (valueFormat.equals("")) { + if (LONG_PATTERN.matcher(parameterValue).matches()) { + date = new Date(Long.parseLong(parameterValue)); + } else { + String dateFormatString = "yyyy-MM-dd"; + + SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString); + java.util.Date parsedDate = dateFormat.parse(parameterValue); + date = new Date(parsedDate.getTime()); + } + } else { + final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat); + LocalDate parsedDate = LocalDate.parse(parameterValue, dtFormatter); + date = new Date(Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime()); + } + + stmt.setDate(parameterIndex, date); break; case Types.TIME: - stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue))); + Time time; + + if (valueFormat.equals("")) { + if (LONG_PATTERN.matcher(parameterValue).matches()) { + time = new Time(Long.parseLong(parameterValue)); + } else { + String timeFormatString = "HH:mm:ss.SSS"; + + SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString); + java.util.Date parsedDate = dateFormat.parse(parameterValue); + time = new Time(parsedDate.getTime()); + } + } else { + final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat); + LocalTime parsedTime = LocalTime.parse(parameterValue, dtFormatter); + LocalDateTime localDateTime = parsedTime.atDate(LocalDate.ofEpochDay(0)); + Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant(); + time = new Time(instant.toEpochMilli()); + } + + stmt.setTime(parameterIndex, time); break; case Types.TIMESTAMP: long lTimestamp=0L; @@ -845,7 +891,7 @@ private void setParameter(final PreparedStatement stmt, final String attrName, f java.util.Date parsedDate = dateFormat.parse(parameterValue); lTimestamp = parsedDate.getTime(); } - }else { + } else { final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat); TemporalAccessor accessor = dtFormatter.parse(parameterValue); java.util.Date parsedDate = java.util.Date.from(Instant.from(accessor)); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index 2d0491b9b6c1..17ba50b25888 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -29,12 +29,20 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Time; import java.sql.Types; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.Arrays; +import java.util.Calendar; +import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.TimeZone; import org.apache.commons.lang3.RandomUtils; import org.apache.nifi.controller.AbstractControllerService; @@ -451,6 +459,104 @@ public void testUsingTimestampValuesWithFormatAttribute() throws InitializationE } } + @Test + public void testUsingDateTimeValuesWithFormatAttribute() throws InitializationException, ProcessException, SQLException, IOException, ParseException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST3 (id integer primary key, ts1 TIME, ts2 DATE)"); + } + } + + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String timeStr = "12:02:02"; + final String dateStr = "2002-02-02"; + final String timeFormatString = "HH:mm:ss"; + final String dateFormatString ="yyyy-MM-dd"; + + + final DateTimeFormatter timeFormatter= DateTimeFormatter.ISO_LOCAL_TIME; + LocalTime parsedTime = LocalTime.parse(timeStr, timeFormatter); + Time expectedTime = Time.valueOf(parsedTime); + + final DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_LOCAL_DATE; + LocalDate parsedDate = LocalDate.parse(dateStr, dateFormatter); + Date expectedDate = new Date(Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime()); + + final long expectedTimeInLong = expectedTime.getTime(); + final long expectedDateInLong = expectedDate.getTime(); + + //test with time zone GMT to avoid negative value unmatched with long pattern problem. + SimpleDateFormat timeFormat = new SimpleDateFormat(timeFormatString); + timeFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + java.util.Date parsedTimeGMT = timeFormat.parse(timeStr); + + SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString); + dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + java.util.Date parsedDateGMT = dateFormat.parse(dateStr); + + Calendar gmtCalendar = Calendar.getInstance(TimeZone.getTimeZone("GMT")); + + + //test with ISO LOCAL format attribute + Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.TIME)); + attributes.put("sql.args.1.value", timeStr); + attributes.put("sql.args.1.format", "ISO_LOCAL_TIME"); + attributes.put("sql.args.2.type", String.valueOf(Types.DATE)); + attributes.put("sql.args.2.value", dateStr); + attributes.put("sql.args.2.format", "ISO_LOCAL_DATE"); + + runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes); + + //test Long pattern without format attribute + attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.TIME)); + attributes.put("sql.args.1.value", Long.toString(parsedTimeGMT.getTime())); + attributes.put("sql.args.2.type", String.valueOf(Types.DATE)); + attributes.put("sql.args.2.value", Long.toString(parsedDateGMT.getTime())); + + runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (2, ?, ?)".getBytes(), attributes); + + //test with format attribute + attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.TIME)); + attributes.put("sql.args.1.value", "120202000"); + attributes.put("sql.args.1.format", "HHmmssSSS"); + attributes.put("sql.args.2.type", String.valueOf(Types.DATE)); + attributes.put("sql.args.2.value", "20020202"); + attributes.put("sql.args.2.format", "yyyyMMdd"); + + runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (3, ?, ?)".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 3); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST3 ORDER BY ID"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(expectedTimeInLong, rs.getTime(2).getTime()); + assertEquals(expectedDateInLong, rs.getDate(3).getTime()); + + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals(parsedTimeGMT.getTime(), rs.getTime(2).getTime()); + assertEquals(parsedDateGMT.getTime(), rs.getDate(3,gmtCalendar).getTime()); + + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + assertEquals(expectedTimeInLong, rs.getTime(2).getTime()); + assertEquals(expectedDateInLong, rs.getDate(3).getTime()); + + assertFalse(rs.next()); + } + } + } + @Test public void testBitType() throws SQLException, InitializationException { final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); @@ -568,6 +674,93 @@ public void testBitType() throws SQLException, InitializationException { } + @Test + public void testUsingTimeValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate("CREATE TABLE TIMETESTS (id integer primary key, ts1 time, ts2 time)"); + } + } + + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String arg2TS = "00:01:01"; + final String art3TS = "12:02:02"; + final String timeFormatString = "HH:mm:ss"; + SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString); + dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + java.util.Date parsedDate = dateFormat.parse(arg2TS); + + + final Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.TIME)); + attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime())); + attributes.put("sql.args.2.type", String.valueOf(Types.TIME)); + attributes.put("sql.args.2.value", art3TS); + attributes.put("sql.args.2.format", timeFormatString); + + runner.enqueue("INSERT INTO TIMETESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMETESTS"); + + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(arg2TS, dateFormat.format(rs.getTime(2))); + assertEquals(art3TS, rs.getString(3)); + assertFalse(rs.next()); + } + } + } + + @Test + public void testUsingDateValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate("CREATE TABLE DATETESTS (id integer primary key, ts1 date, ts2 date)"); + } + } + + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String arg2TS = "2001-01-01"; + final String art3TS = "2002-02-02"; + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + java.util.Date parsedDate = dateFormat.parse(arg2TS); + + final Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.DATE)); + attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime())); + attributes.put("sql.args.2.type", String.valueOf(Types.DATE)); + attributes.put("sql.args.2.value", art3TS); + + runner.enqueue("INSERT INTO DATETESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM DATETESTS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(arg2TS, rs.getString(2)); + assertEquals(art3TS, rs.getString(3)); + assertFalse(rs.next()); + } + } + } + @Test public void testBinaryColumnTypes() throws InitializationException, ProcessException, SQLException, IOException, ParseException { final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);