Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix checkstyle violation #49

Merged
merged 1 commit into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.camel.kafkaconnector;

import java.util.Map;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
Expand Down Expand Up @@ -125,9 +126,9 @@ private void addHeader(Map<String, Object> map, Header singleHeader) {
} else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) {
map.put(singleHeader.key(), (byte)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
map.put(singleHeader.key(), (Map)singleHeader.value());
map.put(singleHeader.key(), (Map)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
map.put(singleHeader.key(), (List)singleHeader.value());
map.put(singleHeader.key(), (List)singleHeader.value());
}
}

Expand All @@ -152,9 +153,9 @@ private void addProperty(Exchange exchange, Header singleHeader) {
} else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) {
exchange.getProperties().put(singleHeader.key(), (byte)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
exchange.getProperties().put(singleHeader.key(), (Map)singleHeader.value());
exchange.getProperties().put(singleHeader.key(), (Map)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
exchange.getProperties().put(singleHeader.key(), (List)singleHeader.value());
exchange.getProperties().put(singleHeader.key(), (List)singleHeader.value());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.camel.kafkaconnector;

import java.util.Map;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
Expand Down Expand Up @@ -49,8 +50,8 @@ public class CamelSourceConnectorConfig extends AbstractConfig {

public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT = null;
public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF = "camel.source.camelMessageHeaderKey";
public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC = "The camel message header key that contain an unique key for the message which can be used a Kafka message key. If this is not specified, then the Kafka message will not have a key";

public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC = "The camel message header key that contain an unique key for the message which can be used a Kafka message key."
+ " If this is not specified, then the Kafka message will not have a key";

private static final String CAMEL_SOURCE_URL_DOC = "The camel url to configure the source";
private static final String CAMEL_SOURCE_UNMARSHAL_DOC = "The camel dataformat name to use to unmarshal data from the source";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.camel.kafkaconnector.converters;

import java.util.Map;

import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;

import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,37 @@

import java.util.Map;

import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.transforms.Transformation;

import com.amazonaws.services.s3.model.S3ObjectInputStream;

public class S3ObjectTransformer<R extends ConnectRecord<R>> implements Transformation<R> {

private final S3ObjectSerializer serializer = new S3ObjectSerializer();

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define("test", ConfigDef.Type.STRING, "test", ConfigDef.Importance.MEDIUM,
"Transform the content of a bucket into a string ");

@Override
public void configure(Map<String, ?> configs) {
}

@Override
public R apply(R record) {
byte[] v = serializer.serialize(record.topic(), (S3ObjectInputStream) record.value());
String finalValue = new String(v);
return record.newRecord(record.topic(), record.kafkaPartition(), null, record.key(), Schema.STRING_SCHEMA, finalValue, record.timestamp());
}

@Override
public void close() {
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}
.define("test", ConfigDef.Type.STRING, "test", ConfigDef.Importance.MEDIUM, "Transform the content of a bucket into a string ");

private final S3ObjectSerializer serializer = new S3ObjectSerializer();

@Override
public void configure(Map<String, ?> configs) {
}

@Override
public R apply(R record) {
byte[] v = serializer.serialize(record.topic(), (S3ObjectInputStream) record.value());
String finalValue = new String(v);
return record.newRecord(record.topic(), record.kafkaPartition(), null, record.key(), Schema.STRING_SCHEMA, finalValue, record.timestamp());
}

@Override
public void close() {
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@
public abstract class CamelTypeConverterTransform<R extends ConnectRecord<R>> extends CamelTransformSupport<R> {

public static final String FIELD_TARGET_TYPE_CONFIG = "target.type";

private static TypeConverter typeConverter;
private Class<?> fieldTargetType;

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELD_TARGET_TYPE_CONFIG, ConfigDef.Type.CLASS, null, ConfigDef.Importance.HIGH,
"The target field type to convert the value from, this is full qualified Java class, e.g: java.util.Map");

private static TypeConverter typeConverter;
private Class<?> fieldTargetType;

@Override
public R apply(R record) {
final Schema schema = operatingSchema(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.ConsumerTemplate;
Expand All @@ -33,7 +34,6 @@
import org.apache.camel.main.BaseMainSupport;
import org.apache.camel.main.Main;
import org.apache.camel.main.MainListener;
import org.apache.camel.main.MainSupport;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.support.PropertyBindingSupport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

public final class SchemaHelper {

private SchemaHelper() {
}

/**
* Try to build a {@link SchemaBuilder} for a value of type {@link Object}
* However, this will only build the schema only for known types, in case it can not return the precise SchemaBuilder type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.HashMap;
import java.util.Map;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.camel.component.hl7.HL7DataFormat;
import org.apache.camel.impl.DefaultCamelContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.HashMap;
import java.util.Map;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.camel.kafkaconnector.test;

import java.util.concurrent.BlockingQueue;

import org.apache.camel.component.seda.ArrayBlockingQueueFactory;
import org.apache.camel.component.seda.BlockingQueueFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,5 @@ public void testBuildSchemaBuilderForAllSpecialTypes() {
// finally how to handle if we have no idea about the value
final S3ObjectInputStream s3ObjectInputStream = new S3ObjectInputStream(System.in, new HttpDelete());
assertEquals(Schema.Type.BYTES, SchemaHelper.buildSchemaBuilderForType(s3ObjectInputStream).type());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.camel.kafkaconnector;

import java.util.Properties;

import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static junit.framework.TestCase.fail;

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Connect;
Expand All @@ -44,6 +42,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static junit.framework.TestCase.fail;



/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import java.util.Properties;

import static junit.framework.TestCase.fail;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static junit.framework.TestCase.fail;


/**
* Common test constants and utilities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Properties;
import java.util.function.Function;
import java.util.function.Predicate;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Properties;
import java.util.UUID;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Properties;
import java.util.UUID;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Predicate;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A very simple test message consumer that can consume messages of different types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ public JMSService(Future<String> image) {
* Gets the default endpoint for the JMS service (ie.: amqp://host:port, or tcp://host:port, etc)
* @return the endpoint URL as a string in the specific format used by the service
*/
abstract public String getDefaultEndpoint();
public abstract String getDefaultEndpoint();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JMSServiceFactory {
public final class JMSServiceFactory {
private static final Logger LOG = LoggerFactory.getLogger(JMSServiceFactory.class);

private JMSServiceFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.camel.kafkaconnector.sink.aws.sqs;

import java.util.Properties;

import org.apache.camel.kafkaconnector.AWSConfigs;
import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
import org.apache.kafka.connect.runtime.ConnectorConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.sqs.model.Message;
import org.apache.camel.kafkaconnector.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.camel.kafkaconnector.source.timer;

import java.util.Properties;

import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
import org.apache.kafka.connect.runtime.ConnectorConfig;

Expand Down