Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-2829 Date and Time Format Support for PutSQL #1524

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayList;
Expand Down Expand Up @@ -99,11 +102,13 @@
+ "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."),
@ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. "
+ "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases "
+ "a format option needs to be specified, currently this is only applicable for binary data types and timestamps. For binary data types "
+ "available options are 'ascii', 'base64' and 'hex'. In 'ascii' format each string character in your attribute value represents a single byte, this is the default format "
+ "and the format provided by Avro Processors. In 'base64' format your string is a Base64 encoded string. In 'hex' format the string is hex encoded with all "
+ "letters in upper case and no '0x' at the beginning. For timestamps, the format can be specified according to java.time.format.DateTimeFormatter."
+ "Customer and named patterns are accepted i.e. ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME')")
+ "a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - "
+ "ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. "
+ "base64: the string is a Base64 encoded string that can be decoded to bytes. "
+ "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. "
+ "Dates/Times/Timestamps - "
+ "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') "
+ "as specified according to java.time.format.DateTimeFormatter.")
})
@WritesAttributes({
@WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, "
Expand Down Expand Up @@ -790,10 +795,50 @@ private void setParameter(final PreparedStatement stmt, final String attrName, f
stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue));
break;
case Types.DATE:
stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue)));
Date date;

if (valueFormat.equals("")) {
if(LONG_PATTERN.matcher(parameterValue).matches()){
date = new Date(Long.parseLong(parameterValue));
}else {
String dateFormatString = "yyyy-MM-dd";
if (!valueFormat.isEmpty()) {
dateFormatString = valueFormat;
}
SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString);
java.util.Date parsedDate = dateFormat.parse(parameterValue);
date = new Date(parsedDate.getTime());
}
} else {
final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat);
LocalDate parsedDate = LocalDate.parse(parameterValue, dtFormatter);
date = new Date(Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime());
}

stmt.setDate(parameterIndex, date);
break;
case Types.TIME:
stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue)));
Time time;

if (valueFormat.equals("")) {
if (LONG_PATTERN.matcher(parameterValue).matches()) {
time = new Time(Long.parseLong(parameterValue));
} else {
String timeFormatString = "HH:mm:ss.SSS";
if (!valueFormat.isEmpty()) {
timeFormatString = valueFormat;
}
SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString);
java.util.Date parsedDate = dateFormat.parse(parameterValue);
time = new Time(parsedDate.getTime());
}
} else {
final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat);
LocalTime parsedTime = LocalTime.parse(parameterValue, dtFormatter);
time = Time.valueOf(parsedTime);
}

stmt.setTime(parameterIndex, time);
break;
case Types.TIMESTAMP:
long lTimestamp=0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,77 @@ public void testUsingTimestampValuesWithFormatAttribute() throws InitializationE
}
}

@Test
public void testUsingDateTimeValuesWithFormatAttribute() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST3 (id integer primary key, ts1 TIME, ts2 DATE)");
}
}

runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");

final String dateStr = "2002-02-02";
final String timeStr = "12:02:02";

Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", timeStr);
attributes.put("sql.args.1.format", "ISO_LOCAL_TIME");
attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
attributes.put("sql.args.2.value", dateStr);
attributes.put("sql.args.2.format", "ISO_LOCAL_DATE");

runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);

attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", "68522000");
attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
attributes.put("sql.args.2.value", "1012633200000");

runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (2, ?, ?)".getBytes(), attributes);

attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", "120202000");
attributes.put("sql.args.1.format", "HHmmssSSS");
attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
attributes.put("sql.args.2.value", "20020202");
attributes.put("sql.args.2.format", "yyyyMMdd");

runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (3, ?, ?)".getBytes(), attributes);

runner.run();

runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 3);

try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST3 ORDER BY ID");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals(68522000L, rs.getTime(2).getTime());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert is failing for me, presumably because the format was ISO_LOCAL_TIME and I am at -0500 UTC. Perhaps change the constant to something that takes timezone into account?

assertEquals(1012633200000L, rs.getDate(3).getTime());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same goes for this guy. Are the second and third flow files supposed to put in UTC-based dates/times? Or will they be local as well? If the latter then the same change of constants will be needed for the asserts below.


assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertEquals(68522000L, rs.getTime(2).getTime());
assertEquals(1012633200000L, rs.getDate(3).getTime());

assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
assertEquals(68522000L, rs.getTime(2).getTime());
assertEquals(1012633200000L, rs.getDate(3).getTime());

assertFalse(rs.next());
}
}
}

@Test
public void testBitType() throws SQLException, InitializationException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
Expand Down Expand Up @@ -468,6 +539,91 @@ public void testBitType() throws SQLException, InitializationException {

}

@Test
public void testUsingTimeValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE TIMETESTS (id integer primary key, ts1 time, ts2 time)");
}
}

runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");

final String arg2TS = "00:01:01";
final String art3TS = "12:02:02";
final String timeFormatString = "HH:mm:ss";
SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString);
java.util.Date parsedDate = dateFormat.parse(arg2TS);

final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime()));

attributes.put("sql.args.2.type", String.valueOf(Types.TIME));
attributes.put("sql.args.2.value", art3TS);
attributes.put("sql.args.2.format", timeFormatString);

runner.enqueue("INSERT INTO TIMETESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
runner.run();

runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);

try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMETESTS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals(arg2TS, rs.getString(2));
assertEquals(art3TS, rs.getString(3));
assertFalse(rs.next());
}
}
}

@Test
public void testUsingDateValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE DATETESTS (id integer primary key, ts1 date, ts2 date)");
}
}

runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");

final String arg2TS = "2001-01-01";
final String art3TS = "2002-02-02";
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
java.util.Date parsedDate = dateFormat.parse(arg2TS);

final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.DATE));
attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime()));
attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
attributes.put("sql.args.2.value", art3TS);

runner.enqueue("INSERT INTO DATETESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
runner.run();

runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);

try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM DATETESTS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals(arg2TS, rs.getString(2));
assertEquals(art3TS, rs.getString(3));
assertFalse(rs.next());
}
}
}

@Test
public void testBinaryColumnTypes() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
Expand Down