Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,26 @@
package org.apache.iotdb.pipe.api.customizer.parameter;

import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.PipeExtractor;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Used in {@link PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} and
* {@link PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)}.
* Used in {@link PipeExtractor#customize(PipeParameters, PipeExtractorRuntimeConfiguration)} ,
* {@link PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} and {@link
* PipeConnector#customize(PipeParameters, PipeConnectorRuntimeConfiguration)}.
*
* <p>This class is used to parse the parameters in WITH PROCESSOR and WITH CONNECTOR when creating
* a pipe.
* <p>This class is used to parse the parameters in WITH SOURCE, WITH PROCESSOR and WITH SINK when
* creating a pipe.
*
* <p>The input parameters is the key-value pair attributes for customization.
* <p>The input parameters are the key-value pair attributes for customization.
*/
public class PipeParameters {

Expand All @@ -49,50 +54,51 @@ public Map<String, String> getAttribute() {
}

public boolean hasAttribute(String key) {
return attributes.containsKey(key);
return attributes.containsKey(key) || attributes.containsKey(KeyReducer.reduce(key));
}

public boolean hasAnyAttributes(String... keys) {
for (final String key : keys) {
if (attributes.containsKey(key)) {
if (hasAttribute(key)) {
return true;
}
}
return false;
}

public String getString(String key) {
return attributes.get(key);
final String value = attributes.get(key);
return value != null ? value : attributes.get(KeyReducer.reduce(key));
}

public Boolean getBoolean(String key) {
String value = attributes.get(key);
final String value = getString(key);
return value == null ? null : Boolean.parseBoolean(value);
}

public Integer getInt(String key) {
String value = attributes.get(key);
final String value = getString(key);
return value == null ? null : Integer.parseInt(value);
}

public Long getLong(String key) {
String value = attributes.get(key);
final String value = getString(key);
return value == null ? null : Long.parseLong(value);
}

public Float getFloat(String key) {
String value = attributes.get(key);
final String value = getString(key);
return value == null ? null : Float.parseFloat(value);
}

public Double getDouble(String key) {
String value = attributes.get(key);
final String value = getString(key);
return value == null ? null : Double.parseDouble(value);
}

public String getStringByKeys(String... keys) {
for (final String key : keys) {
final String value = attributes.get(key);
final String value = getString(key);
if (value != null) {
return value;
}
Expand All @@ -102,87 +108,87 @@ public String getStringByKeys(String... keys) {

public Boolean getBooleanByKeys(String... keys) {
for (final String key : keys) {
final String value = attributes.get(key);
final Boolean value = getBoolean(key);
if (value != null) {
return Boolean.parseBoolean(value);
return value;
}
}
return null;
}

public Integer getIntByKeys(String... keys) {
for (final String key : keys) {
final String value = attributes.get(key);
final Integer value = getInt(key);
if (value != null) {
return Integer.parseInt(value);
return value;
}
}
return null;
}

public Long getLongByKeys(String... keys) {
for (final String key : keys) {
final String value = attributes.get(key);
final Long value = getLong(key);
if (value != null) {
return Long.parseLong(value);
return value;
}
}
return null;
}

public Float getFloatByKeys(String... keys) {
for (final String key : keys) {
final String value = attributes.get(key);
final Float value = getFloat(key);
if (value != null) {
return Float.parseFloat(value);
return value;
}
}
return null;
}

public Double getDoubleByKeys(String... keys) {
for (final String key : keys) {
final String value = attributes.get(key);
final Double value = getDouble(key);
if (value != null) {
return Double.parseDouble(value);
return value;
}
}
return null;
}

public String getStringOrDefault(String key, String defaultValue) {
String value = attributes.get(key);
final String value = getString(key);
return value == null ? defaultValue : value;
}

public boolean getBooleanOrDefault(String key, boolean defaultValue) {
String value = attributes.get(key);
final String value = getString(key);
return value == null ? defaultValue : Boolean.parseBoolean(value);
}

public int getIntOrDefault(String key, int defaultValue) {
String value = attributes.get(key);
final String value = getString(key);
return value == null ? defaultValue : Integer.parseInt(value);
}

public long getLongOrDefault(String key, long defaultValue) {
String value = attributes.get(key);
final String value = getString(key);
return value == null ? defaultValue : Long.parseLong(value);
}

public float getFloatOrDefault(String key, float defaultValue) {
String value = attributes.get(key);
final String value = getString(key);
return value == null ? defaultValue : Float.parseFloat(value);
}

public double getDoubleOrDefault(String key, double defaultValue) {
String value = attributes.get(key);
final String value = getString(key);
return value == null ? defaultValue : Double.parseDouble(value);
}

public String getStringOrDefault(List<String> keys, String defaultValue) {
for (final String key : keys) {
final String value = attributes.get(key);
final String value = getString(key);
if (value != null) {
return value;
}
Expand All @@ -192,7 +198,7 @@ public String getStringOrDefault(List<String> keys, String defaultValue) {

public boolean getBooleanOrDefault(List<String> keys, boolean defaultValue) {
for (final String key : keys) {
final String value = attributes.get(key);
final String value = getString(key);
if (value != null) {
return Boolean.parseBoolean(value);
}
Expand All @@ -202,7 +208,7 @@ public boolean getBooleanOrDefault(List<String> keys, boolean defaultValue) {

public int getIntOrDefault(List<String> keys, int defaultValue) {
for (final String key : keys) {
final String value = attributes.get(key);
final String value = getString(key);
if (value != null) {
return Integer.parseInt(value);
}
Expand All @@ -212,7 +218,7 @@ public int getIntOrDefault(List<String> keys, int defaultValue) {

public long getLongOrDefault(List<String> keys, long defaultValue) {
for (final String key : keys) {
final String value = attributes.get(key);
final String value = getString(key);
if (value != null) {
return Long.parseLong(value);
}
Expand All @@ -222,7 +228,7 @@ public long getLongOrDefault(List<String> keys, long defaultValue) {

public float getFloatOrDefault(List<String> keys, float defaultValue) {
for (final String key : keys) {
final String value = attributes.get(key);
final String value = getString(key);
if (value != null) {
return Float.parseFloat(value);
}
Expand All @@ -232,7 +238,7 @@ public float getFloatOrDefault(List<String> keys, float defaultValue) {

public double getDoubleOrDefault(List<String> keys, double defaultValue) {
for (final String key : keys) {
final String value = attributes.get(key);
final String value = getString(key);
if (value != null) {
return Double.parseDouble(value);
}
Expand Down Expand Up @@ -261,4 +267,30 @@ public int hashCode() {
public String toString() {
return attributes.toString();
}

private static class KeyReducer {

private static final Set<String> PREFIXES = new HashSet<>();

static {
PREFIXES.add("extractor.");
PREFIXES.add("source.");
PREFIXES.add("processor.");
PREFIXES.add("connector.");
PREFIXES.add("sink.");
}

static String reduce(String key) {
if (key == null) {
return null;
}
final String lowerCaseKey = key.toLowerCase();
for (final String prefix : PREFIXES) {
if (lowerCaseKey.startsWith(prefix)) {
return key.substring(prefix.length());
}
}
return key;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@

import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.DO_NOTHING_PROCESSOR;
import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_EXTRACTOR;
import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR;

public class PipePluginInfo implements SnapshotProcessor {

Expand Down Expand Up @@ -153,16 +154,10 @@ public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) {

final PipeParameters connectorParameters =
new PipeParameters(createPipeRequest.getConnectorAttributes());
if (!connectorParameters.hasAnyAttributes(
PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY)) {
final String exceptionMessage =
"Failed to create pipe, the pipe connector plugin is not specified";
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
final String connectorPluginName =
connectorParameters.getStringByKeys(
PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY);
connectorParameters.getStringOrDefault(
Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY),
IOTDB_THRIFT_CONNECTOR.getPipePluginName());
if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
final String exceptionMessage =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.iotdb.db.pipe.connector.protocol.writeback.WriteBackConnector;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;

import java.util.Arrays;

Expand All @@ -47,6 +46,9 @@ protected void initConstructors() {
PLUGIN_CONSTRUCTORS.put(
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName(),
IoTDBThriftAsyncConnector::new);
PLUGIN_CONSTRUCTORS.put(
BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName(),
IoTDBThriftSyncConnector::new);
PLUGIN_CONSTRUCTORS.put(
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName(),
IoTDBThriftSyncConnector::new);
Expand All @@ -69,6 +71,8 @@ protected void initConstructors() {

PLUGIN_CONSTRUCTORS.put(
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName(), IoTDBThriftAsyncConnector::new);
PLUGIN_CONSTRUCTORS.put(
BuiltinPipePlugin.IOTDB_THRIFT_SSL_SINK.getPipePluginName(), IoTDBThriftSyncConnector::new);
PLUGIN_CONSTRUCTORS.put(
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(),
IoTDBThriftSyncConnector::new);
Expand All @@ -91,13 +95,6 @@ protected void initConstructors() {

@Override
PipeConnector reflectPlugin(PipeParameters connectorParameters) {
if (!connectorParameters.hasAnyAttributes(
PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY)) {
throw new PipeException(
"Failed to reflect PipeConnector instance because "
+ "'connector' is not specified in the parameters.");
}

return (PipeConnector)
reflectPluginByKey(
connectorParameters
Expand Down
Loading