diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/configuration/ConfigurationUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/configuration/ConfigurationUtils.java index 7fea3960e4..0f506ee797 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/configuration/ConfigurationUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/configuration/ConfigurationUtils.java @@ -45,6 +45,9 @@ public class ConfigurationUtils { */ @SuppressWarnings("unchecked") public static T convertValue(Object rawValue, Class clazz) { + if (rawValue == null) { + return null; + } if (Integer.class.equals(clazz)) { return (T) convertToInt(rawValue); } else if (Long.class.equals(clazz)) { @@ -70,7 +73,9 @@ public static T convertValue(Object rawValue, Class clazz) { @SuppressWarnings("unchecked") static T convertToList(Object rawValue, Class atomicClass) { - if (rawValue instanceof List) { + if (rawValue == null) { + return null; + } else if (rawValue instanceof List) { return (T) rawValue; } else { return (T) @@ -82,7 +87,9 @@ static T convertToList(Object rawValue, Class atomicClass) { @SuppressWarnings("unchecked") static Map convertToProperties(Object o) { - if (o instanceof Map) { + if (o == null) { + return null; + } else if (o instanceof Map) { return (Map) o; } else { List listOfRawProperties = @@ -102,7 +109,9 @@ static Map convertToProperties(Object o) { @SuppressWarnings("unchecked") public static > E convertToEnum(Object o, Class clazz) { - if (o.getClass().equals(clazz)) { + if (o == null) { + return null; + } else if (o.getClass().equals(clazz)) { return (E) o; } @@ -122,7 +131,9 @@ public static > E convertToEnum(Object o, Class clazz) { } static Duration convertToDuration(Object o) { - if (o.getClass() == Duration.class) { + if (o == null) { + return null; + } else if (o.getClass() == Duration.class) { return (Duration) o; } @@ -130,7 +141,9 @@ static Duration convertToDuration(Object o) { } static String convertToString(Object o) { - if (o.getClass() == String.class) { + if (o == null) { + return null; + } else if (o.getClass() == String.class) { return (String) o; } else if (o.getClass() == Duration.class) { Duration duration = (Duration) o; @@ -165,7 +178,9 @@ static String convertToString(Object o) { } static Integer convertToInt(Object o) { - if (o.getClass() == Integer.class) { + if (o == null) { + return null; + } else if (o.getClass() == Integer.class) { return (Integer) o; } else if (o.getClass() == Long.class) { long value = (Long) o; @@ -183,7 +198,9 @@ static Integer convertToInt(Object o) { } static Long convertToLong(Object o) { - if (o.getClass() == Long.class) { + if (o == null) { + return null; + } else if (o.getClass() == Long.class) { return (Long) o; } else if (o.getClass() == Integer.class) { return ((Integer) o).longValue(); @@ -193,7 +210,9 @@ static Long convertToLong(Object o) { } static Boolean convertToBoolean(Object o) { - if (o.getClass() == Boolean.class) { + if (o == null) { + return null; + } else if (o.getClass() == Boolean.class) { return (Boolean) o; } @@ -211,6 +230,9 @@ static Boolean convertToBoolean(Object o) { } static Float convertToFloat(Object o) { + if (o == null) { + return null; + } if (o.getClass() == Float.class) { return (Float) o; } else if (o.getClass() == Double.class) { @@ -231,6 +253,9 @@ static Float convertToFloat(Object o) { } static Double convertToDouble(Object o) { + if (o == null) { + return null; + } if (o.getClass() == Double.class) { return (Double) o; } else if (o.getClass() == Float.class) {