diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java index a5ee68787..b4b87c81b 100644 --- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java +++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java @@ -16,6 +16,7 @@ package io.cdap.plugin.cloudsql.mysql; +import com.google.common.collect.Maps; import io.cdap.cdap.api.annotation.Category; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; @@ -75,7 +76,7 @@ public StructuredRecord transform(LongWritable longWritable, MysqlDBRecord mysql @Override protected SchemaReader getSchemaReader(String sessionID) { - return new MysqlSchemaReader(sessionID); + return new MysqlSchemaReader(sessionID, Maps.fromProperties(config.getConnectionArgumentsProperties())); } @Override diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java index b8b6fbf27..b0bea9e7a 100644 --- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java +++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java @@ -31,9 +31,11 @@ import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.ConfigUtil; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.db.SchemaReader; import io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig; import io.cdap.plugin.db.source.AbstractDBSource; import io.cdap.plugin.mysql.MysqlDBRecord; +import io.cdap.plugin.mysql.MysqlSchemaReader; import io.cdap.plugin.util.CloudSQLUtil; import io.cdap.plugin.util.DBUtils; import org.apache.hadoop.mapreduce.lib.db.DBWritable; @@ -120,6 +122,11 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) { return new LineageRecorder(context, assetBuilder.build()); } + @Override + protected SchemaReader getSchemaReader() { + return new MysqlSchemaReader(null, cloudsqlMysqlSourceConfig.getConnectionArguments()); + } + /** CloudSQL MySQL source config. */ public static class CloudSQLMySQLSourceConfig extends AbstractDBSpecificSourceConfig { diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConnector.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConnector.java index 3dede5d49..e7e935135 100644 --- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConnector.java +++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConnector.java @@ -16,6 +16,7 @@ package io.cdap.plugin.mysql; +import com.google.common.collect.Maps; import io.cdap.cdap.api.annotation.Category; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; @@ -62,7 +63,7 @@ public boolean supportSchema() { @Override protected SchemaReader getSchemaReader(String sessionID) { - return new MysqlSchemaReader(sessionID); + return new MysqlSchemaReader(sessionID, Maps.fromProperties(config.getConnectionArgumentsProperties())); } @Override diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConstants.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConstants.java index 39c0b8d08..54593f580 100644 --- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConstants.java +++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConstants.java @@ -39,6 +39,7 @@ private MysqlConstants() { public static final String TRUST_CERT_KEYSTORE_PASSWORD = "trustCertificateKeyStorePassword"; public static final String MYSQL_CONNECTION_STRING_FORMAT = "jdbc:mysql://%s:%s/%s"; public static final String USE_CURSOR_FETCH = "useCursorFetch"; + public static final String ZERO_DATE_TIME_BEHAVIOR = "zeroDateTimeBehavior"; /** * Query to set SQL_MODE system variable. diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSchemaReader.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSchemaReader.java index a842ba568..50907c063 100644 --- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSchemaReader.java +++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSchemaReader.java @@ -16,12 +16,16 @@ package io.cdap.plugin.mysql; +import com.google.common.collect.Lists; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.db.CommonSchemaReader; +import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; +import java.util.List; +import java.util.Map; /** * Schema reader for mapping Mysql DB type @@ -31,12 +35,42 @@ public class MysqlSchemaReader extends CommonSchemaReader { public static final String YEAR_TYPE_NAME = "YEAR"; public static final String MEDIUMINT_UNSIGNED_TYPE_NAME = "MEDIUMINT UNSIGNED"; private final String sessionID; + private boolean zeroDateTimeToNull; public MysqlSchemaReader(String sessionID) { super(); this.sessionID = sessionID; } + public MysqlSchemaReader(String sessionID, Map connectionArguments) { + super(); + this.sessionID = sessionID; + this.zeroDateTimeToNull = MysqlUtil.isZeroDateTimeToNull(connectionArguments); + } + + @Override + public List getSchemaFields(ResultSet resultSet) throws SQLException { + List schemaFields = Lists.newArrayList(); + ResultSetMetaData metadata = resultSet.getMetaData(); + // ResultSetMetadata columns are numbered starting with 1 + for (int i = 1; i <= metadata.getColumnCount(); i++) { + if (shouldIgnoreColumn(metadata, i)) { + continue; + } + + String columnName = metadata.getColumnName(i); + Schema columnSchema = getSchema(metadata, i); + + if (ResultSetMetaData.columnNullable == metadata.isNullable(i) + || (zeroDateTimeToNull && MysqlUtil.isDateTimeLikeType(metadata.getColumnType(i)))) { + columnSchema = Schema.nullableOf(columnSchema); + } + Schema.Field field = Schema.Field.of(columnName, columnSchema); + schemaFields.add(field); + } + return schemaFields; + } + @Override public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException { return metadata.getColumnName(index).equals("c_" + sessionID) || diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSource.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSource.java index 71f113436..a91139196 100644 --- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSource.java +++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSource.java @@ -81,7 +81,7 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) { @Override protected SchemaReader getSchemaReader() { - return new MysqlSchemaReader(null); + return new MysqlSchemaReader(null, mysqlSourceConfig.getConnectionArguments()); } /** diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlUtil.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlUtil.java index c1c770c06..abb4aa27b 100644 --- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlUtil.java +++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlUtil.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap; +import java.sql.Types; import java.util.Map; /** @@ -91,4 +92,20 @@ public static Map composeDbSpecificArgumentsMap(Boolean autoReco public static String getConnectionString(String host, Integer port, String database) { return String.format(MysqlConstants.MYSQL_CONNECTION_STRING_FORMAT, host, port, database); } + + public static boolean isDateTimeLikeType(int columnType) { + int[] dateTimeLikeTypes = new int[]{Types.TIMESTAMP, Types.TIMESTAMP_WITH_TIMEZONE, Types.DATE}; + + for (int dttType : dateTimeLikeTypes) { + if (dttType == columnType) { + return true; + } + } + return false; + } + + public static boolean isZeroDateTimeToNull(Map connectionArguments) { + String argValue = connectionArguments.getOrDefault(MysqlConstants.ZERO_DATE_TIME_BEHAVIOR, ""); + return argValue.equals("CONVERT_TO_NULL") || argValue.equals("convertToNull"); + } } diff --git a/mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlSchemaReaderUnitTest.java b/mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlSchemaReaderUnitTest.java index 28582bc3b..fa7029c8f 100644 --- a/mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlSchemaReaderUnitTest.java +++ b/mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlSchemaReaderUnitTest.java @@ -21,9 +21,13 @@ import org.junit.Test; import org.mockito.Mockito; +import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class MysqlSchemaReaderUnitTest { @@ -37,4 +41,33 @@ public void validateYearTypeToStringTypeConversion() throws SQLException { Schema schema = schemaReader.getSchema(metadata, 1); Assert.assertTrue(Schema.of(Schema.Type.INT).equals(schema)); } + + @Test + public void validateZeroDateTimeBehavior() throws SQLException { + ResultSet resultSet = Mockito.mock(ResultSet.class); + ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Mockito.when(resultSet.getMetaData()).thenReturn(metadata); + + Mockito.when(metadata.getColumnCount()).thenReturn(1); + Mockito.when(metadata.getColumnName(Mockito.eq(1))).thenReturn("some_date"); + + Mockito.when(metadata.getColumnType(Mockito.eq(1))).thenReturn(Types.DATE); + Mockito.when(metadata.getColumnTypeName(Mockito.eq(1))).thenReturn(MysqlSchemaReader.YEAR_TYPE_NAME); + + // non-nullable column + Mockito.when(metadata.isNullable(Mockito.eq(1))).thenReturn(0); + + // test that non-nullable date remains non-nullable when no conn arg is present + MysqlSchemaReader schemaReader = new MysqlSchemaReader(null); + List schemaFields = schemaReader.getSchemaFields(resultSet); + Assert.assertFalse(schemaFields.get(0).getSchema().isNullable()); + + // test that it converts non-nullable date column to nullable when zeroDateTimeBehavior is convert to null + Map connectionArguments = new HashMap<>(); + connectionArguments.put("zeroDateTimeBehavior", "CONVERT_TO_NULL"); + + schemaReader = new MysqlSchemaReader(null, connectionArguments); + schemaFields = schemaReader.getSchemaFields(resultSet); + Assert.assertTrue(schemaFields.get(0).getSchema().isNullable()); + } } diff --git a/mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlUtilUnitTest.java b/mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlUtilUnitTest.java new file mode 100644 index 000000000..9481068f1 --- /dev/null +++ b/mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlUtilUnitTest.java @@ -0,0 +1,62 @@ + +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.mysql; + +import org.junit.Test; + +import java.sql.Types; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class MysqlUtilUnitTest { + + @Test + public void testIsZeroDateTimeToNull() { + Map connArgsMap = new HashMap<>(1); + + connArgsMap.put("zeroDateTimeBehavior", ""); + assertFalse(MysqlUtil.isZeroDateTimeToNull(connArgsMap)); + + connArgsMap.put("zeroDateTimeBehavior", "ROUND"); + assertFalse(MysqlUtil.isZeroDateTimeToNull(connArgsMap)); + + connArgsMap.put("zeroDateTimeBehavior", "CONVERT_TO_NULL"); + assertTrue(MysqlUtil.isZeroDateTimeToNull(connArgsMap)); + + connArgsMap.put("zeroDateTimeBehavior", "convertToNull"); + assertTrue(MysqlUtil.isZeroDateTimeToNull(connArgsMap)); + } + + @Test + public void testIsDateTimeLikeType() { + int dateType = Types.DATE; + int timestampType = Types.TIMESTAMP; + int timestampWithTimezoneType = Types.TIMESTAMP_WITH_TIMEZONE; + int timeType = Types.TIME; + int stringType = Types.VARCHAR; + + assertTrue(MysqlUtil.isDateTimeLikeType(dateType)); + assertTrue(MysqlUtil.isDateTimeLikeType(timestampType)); + assertTrue(MysqlUtil.isDateTimeLikeType(timestampWithTimezoneType)); + assertFalse(MysqlUtil.isDateTimeLikeType(timeType)); + assertFalse(MysqlUtil.isDateTimeLikeType(stringType)); + } +}