Skip to content

Commit

Permalink
[FLINK-1935] Reimplement PersistentKafkaSource using high level Kafka…
Browse files Browse the repository at this point in the history
… API
  • Loading branch information
rmetzger authored and StephanEwen committed May 12, 2015
1 parent 3b2ee23 commit 54e9576
Show file tree
Hide file tree
Showing 40 changed files with 711 additions and 2,112 deletions.
Expand Up @@ -1445,12 +1445,11 @@ private static TypeInformation<?> getTypeOfPojoField(TypeInformation<?> pojoInfo

public static <X> TypeInformation<X> getForObject(X value) {
return new TypeExtractor().privateGetForObject(value);

}

@SuppressWarnings({ "unchecked", "rawtypes" })
private <X> TypeInformation<X> privateGetForObject(X value) {
Validate.notNull(value);

// check if we can extract the types from tuples, otherwise work with the class
if (value instanceof Tuple) {
Tuple t = (Tuple) value;
Expand Down
Expand Up @@ -147,7 +147,7 @@ public void shutdown() {
return;
}
shutdown = true;
LOG.info("Stopping checkpoint coordinator jor job " + job);
LOG.info("Stopping checkpoint coordinator for job " + job);

// shut down the thread that handles the timeouts
timer.cancel();
Expand Down
Expand Up @@ -507,7 +507,7 @@ protected void run() throws Exception {
// Collect the accumulators of all involved UDFs and send them to the
// JobManager. close() has been called earlier for all involved UDFs
// (using this.stub.close() and closeChainedTasks()), so UDFs can no longer
// modify accumulators.ll;
// modify accumulators;
if (this.stub != null) {
// collect the counters from the stub
if (FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != null) {
Expand Down

This file was deleted.

This file was deleted.

Expand Up @@ -19,13 +19,10 @@

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.source.GenericSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements
GenericSourceFunction<OUT> {
public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements ResultTypeQueryable<OUT>{

private static final long serialVersionUID = 1L;
protected DeserializationSchema<OUT> schema;
Expand All @@ -35,12 +32,7 @@ public ConnectorSource(DeserializationSchema<OUT> schema) {
}

@Override
public TypeInformation<OUT> getType() {
if(schema instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) schema).getProducedType();
}
return TypeExtractor.createTypeInfo(DeserializationSchema.class, schema.getClass(), 0,
null, null);
public TypeInformation<OUT> getProducedType() {
return schema.getProducedType();
}

}
Expand Up @@ -38,7 +38,6 @@ public static void main(String[] args) throws Exception {

DataStream<String> kafkaStream = env
.addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));

kafkaStream.print();

env.execute();
Expand Down

This file was deleted.

Expand Up @@ -19,7 +19,6 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
import org.apache.flink.runtime.util.DataOutputSerializer;
Expand All @@ -30,7 +29,7 @@

public class Utils {
public static class TypeInformationSerializationSchema<T>
implements DeserializationSchema<T>, SerializationSchema<T, byte[]>, ResultTypeQueryable<T> {
implements DeserializationSchema<T>, SerializationSchema<T, byte[]> {
private final TypeSerializer<T> serializer;
private final TypeInformation<T> ti;

Expand Down

0 comments on commit 54e9576

Please sign in to comment.