Skip to content

Commit

Permalink
Merge pull request #49 from apache/fix-cs-code
Browse files Browse the repository at this point in the history
fix checkstyle violation
  • Loading branch information
omarsmak authored Dec 13, 2019
2 parents 7dd49f0 + f2efb39 commit c44523c
Show file tree
Hide file tree
Showing 29 changed files with 64 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.camel.kafkaconnector;

import java.util.Map;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
Expand Down Expand Up @@ -125,9 +126,9 @@ private void addHeader(Map<String, Object> map, Header singleHeader) {
} else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) {
map.put(singleHeader.key(), (byte)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
map.put(singleHeader.key(), (Map)singleHeader.value());
map.put(singleHeader.key(), (Map)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
map.put(singleHeader.key(), (List)singleHeader.value());
map.put(singleHeader.key(), (List)singleHeader.value());
}
}

Expand All @@ -152,9 +153,9 @@ private void addProperty(Exchange exchange, Header singleHeader) {
} else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) {
exchange.getProperties().put(singleHeader.key(), (byte)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) {
exchange.getProperties().put(singleHeader.key(), (Map)singleHeader.value());
exchange.getProperties().put(singleHeader.key(), (Map)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
exchange.getProperties().put(singleHeader.key(), (List)singleHeader.value());
exchange.getProperties().put(singleHeader.key(), (List)singleHeader.value());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.camel.kafkaconnector;

import java.util.Map;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
Expand Down Expand Up @@ -49,8 +50,8 @@ public class CamelSourceConnectorConfig extends AbstractConfig {

public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT = null;
public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF = "camel.source.camelMessageHeaderKey";
public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC = "The camel message header key that contain an unique key for the message which can be used a Kafka message key. If this is not specified, then the Kafka message will not have a key";

public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC = "The camel message header key that contain an unique key for the message which can be used a Kafka message key."
+ " If this is not specified, then the Kafka message will not have a key";

private static final String CAMEL_SOURCE_URL_DOC = "The camel url to configure the source";
private static final String CAMEL_SOURCE_UNMARSHAL_DOC = "The camel dataformat name to use to unmarshal data from the source";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.camel.kafkaconnector.converters;

import java.util.Map;

import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;

import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,37 @@

import java.util.Map;

import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.transforms.Transformation;

import com.amazonaws.services.s3.model.S3ObjectInputStream;

public class S3ObjectTransformer<R extends ConnectRecord<R>> implements Transformation<R> {

private final S3ObjectSerializer serializer = new S3ObjectSerializer();

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define("test", ConfigDef.Type.STRING, "test", ConfigDef.Importance.MEDIUM,
"Transform the content of a bucket into a string ");

@Override
public void configure(Map<String, ?> configs) {
}

@Override
public R apply(R record) {
byte[] v = serializer.serialize(record.topic(), (S3ObjectInputStream) record.value());
String finalValue = new String(v);
return record.newRecord(record.topic(), record.kafkaPartition(), null, record.key(), Schema.STRING_SCHEMA, finalValue, record.timestamp());
}

@Override
public void close() {
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}
.define("test", ConfigDef.Type.STRING, "test", ConfigDef.Importance.MEDIUM, "Transform the content of a bucket into a string ");

private final S3ObjectSerializer serializer = new S3ObjectSerializer();

@Override
public void configure(Map<String, ?> configs) {
}

@Override
public R apply(R record) {
byte[] v = serializer.serialize(record.topic(), (S3ObjectInputStream) record.value());
String finalValue = new String(v);
return record.newRecord(record.topic(), record.kafkaPartition(), null, record.key(), Schema.STRING_SCHEMA, finalValue, record.timestamp());
}

@Override
public void close() {
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@
public abstract class CamelTypeConverterTransform<R extends ConnectRecord<R>> extends CamelTransformSupport<R> {

public static final String FIELD_TARGET_TYPE_CONFIG = "target.type";

private static TypeConverter typeConverter;
private Class<?> fieldTargetType;

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELD_TARGET_TYPE_CONFIG, ConfigDef.Type.CLASS, null, ConfigDef.Importance.HIGH,
"The target field type to convert the value from, this is full qualified Java class, e.g: java.util.Map");

private static TypeConverter typeConverter;
private Class<?> fieldTargetType;

@Override
public R apply(R record) {
final Schema schema = operatingSchema(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.ConsumerTemplate;
Expand All @@ -33,7 +34,6 @@
import org.apache.camel.main.BaseMainSupport;
import org.apache.camel.main.Main;
import org.apache.camel.main.MainListener;
import org.apache.camel.main.MainSupport;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.support.PropertyBindingSupport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

public final class SchemaHelper {

private SchemaHelper() {
}

/**
* Try to build a {@link SchemaBuilder} for a value of type {@link Object}
* However, this will only build the schema only for known types, in case it can not return the precise SchemaBuilder type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.HashMap;
import java.util.Map;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.camel.component.hl7.HL7DataFormat;
import org.apache.camel.impl.DefaultCamelContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.HashMap;
import java.util.Map;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.camel.kafkaconnector.test;

import java.util.concurrent.BlockingQueue;

import org.apache.camel.component.seda.ArrayBlockingQueueFactory;
import org.apache.camel.component.seda.BlockingQueueFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,5 @@ public void testBuildSchemaBuilderForAllSpecialTypes() {
// finally how to handle if we have no idea about the value
final S3ObjectInputStream s3ObjectInputStream = new S3ObjectInputStream(System.in, new HttpDelete());
assertEquals(Schema.Type.BYTES, SchemaHelper.buildSchemaBuilderForType(s3ObjectInputStream).type());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.camel.kafkaconnector;

import java.util.Properties;

import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static junit.framework.TestCase.fail;

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Connect;
Expand All @@ -44,6 +42,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static junit.framework.TestCase.fail;



/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import java.util.Properties;

import static junit.framework.TestCase.fail;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static junit.framework.TestCase.fail;


/**
* Common test constants and utilities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Properties;
import java.util.function.Function;
import java.util.function.Predicate;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Properties;
import java.util.UUID;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Properties;
import java.util.UUID;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Predicate;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A very simple test message consumer that can consume messages of different types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ public JMSService(Future<String> image) {
* Gets the default endpoint for the JMS service (ie.: amqp://host:port, or tcp://host:port, etc)
* @return the endpoint URL as a string in the specific format used by the service
*/
abstract public String getDefaultEndpoint();
public abstract String getDefaultEndpoint();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JMSServiceFactory {
public final class JMSServiceFactory {
private static final Logger LOG = LoggerFactory.getLogger(JMSServiceFactory.class);

private JMSServiceFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.camel.kafkaconnector.sink.aws.sqs;

import java.util.Properties;

import org.apache.camel.kafkaconnector.AWSConfigs;
import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
import org.apache.kafka.connect.runtime.ConnectorConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.sqs.model.Message;
import org.apache.camel.kafkaconnector.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.camel.kafkaconnector.source.timer;

import java.util.Properties;

import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
import org.apache.kafka.connect.runtime.ConnectorConfig;

Expand Down

0 comments on commit c44523c

Please sign in to comment.