Skip to content

Commit

Permalink
[GOBBLIN-236] Add a ControlMessage injector as a RecordStreamProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
htran1 committed Sep 5, 2017
1 parent a5fe062 commit 6957bd7
Show file tree
Hide file tree
Showing 17 changed files with 592 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public class ConfigurationKeys {
public static final String TASK_DATA_ROOT_DIR_KEY = "task.data.root.dir";
public static final String SOURCE_CLASS_KEY = "source.class";
public static final String CONVERTER_CLASSES_KEY = "converter.classes";
public static final String RECORD_STREAM_PROCESSOR_CLASSES_KEY = "recordStreamProcessor.classes";
public static final String FORK_OPERATOR_CLASS_KEY = "fork.operator.class";
public static final String DEFAULT_FORK_OPERATOR_CLASS = "org.apache.gobblin.fork.IdentityForkOperator";
public static final String JOB_COMMIT_POLICY_KEY = "job.commit.policy";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.initializer.ConverterInitializer;
import org.apache.gobblin.converter.initializer.NoopConverterInitializer;
import org.apache.gobblin.metadata.GlobalMetadata;
import org.apache.gobblin.stream.ControlMessage;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.records.RecordStreamProcessor;
import org.apache.gobblin.records.RecordStreamWithMetadata;
import org.apache.gobblin.stream.MetadataUpdateControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.stream.StreamEntity;
Expand All @@ -55,6 +57,9 @@
* @param <DO> output data type
*/
public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState, RecordStreamProcessor<SI, SO, DI, DO> {
// Metadata containing the output schema. This may be changed when a MetadataUpdateControlMessage is received.
private GlobalMetadata<SO> outputGlobalMetadata;

/**
* Initialize this {@link Converter}.
*
Expand Down Expand Up @@ -120,16 +125,27 @@ public State getFinalState() {
public RecordStreamWithMetadata<DO, SO> processStream(RecordStreamWithMetadata<DI, SI> inputStream,
WorkUnitState workUnitState) throws SchemaConversionException {
init(workUnitState);
SO outputSchema = convertSchema(inputStream.getSchema(), workUnitState);
this.outputGlobalMetadata =
new GlobalMetadata<SO>(convertSchema(inputStream.getGlobalMetadata().getSchema(), workUnitState));
Flowable<StreamEntity<DO>> outputStream =
inputStream.getRecordStream()
.flatMap(in -> {
if (in instanceof ControlMessage) {
ControlMessage out = (ControlMessage) in;
// update the output schema with the new input schema from the MetadataUpdateControlMessage
if (in instanceof MetadataUpdateControlMessage) {
this.outputGlobalMetadata =
new GlobalMetadata<SO>(convertSchema((SI)((MetadataUpdateControlMessage) in).getGlobalMetadata()
.getSchema(), workUnitState));
out = new MetadataUpdateControlMessage<SO>(this.outputGlobalMetadata);
}

getMessageHandler().handleMessage((ControlMessage) in);
return Flowable.just(((ControlMessage<DO>) in));
return Flowable.just(((ControlMessage<DO>) out));
} else if (in instanceof RecordEnvelope) {
RecordEnvelope<DI> recordEnvelope = (RecordEnvelope<DI>) in;
Iterator<DO> convertedIterable = convertRecord(outputSchema, recordEnvelope.getRecord(), workUnitState).iterator();
Iterator<DO> convertedIterable = convertRecord(this.outputGlobalMetadata.getSchema(),
recordEnvelope.getRecord(), workUnitState).iterator();

if (!convertedIterable.hasNext()) {
// if the iterable is empty, ack the record, return an empty flowable
Expand All @@ -153,7 +169,7 @@ public RecordStreamWithMetadata<DO, SO> processStream(RecordStreamWithMetadata<D
}
}, 1);
outputStream = outputStream.doOnComplete(this::close);
return inputStream.withRecordStream(outputStream, outputSchema);
return inputStream.withRecordStream(outputStream, this.outputGlobalMetadata);
}

/**
Expand Down
6 changes: 4 additions & 2 deletions gobblin-api/src/main/java/org/apache/gobblin/fork/Forker.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metadata.GlobalMetadata;
import org.apache.gobblin.records.RecordStreamWithMetadata;
import org.apache.gobblin.stream.ControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;
Expand Down Expand Up @@ -59,7 +60,7 @@ public class Forker {
workUnitState.setProp(ConfigurationKeys.FORK_BRANCHES_KEY, branches);

forkOperator.init(workUnitState);
List<Boolean> forkedSchemas = forkOperator.forkSchema(workUnitState, inputStream.getSchema());
List<Boolean> forkedSchemas = forkOperator.forkSchema(workUnitState, inputStream.getGlobalMetadata().getSchema());
int activeForks = (int) forkedSchemas.stream().filter(b -> b).count();

Preconditions.checkState(forkedSchemas.size() == branches, String
Expand Down Expand Up @@ -90,7 +91,8 @@ public class Forker {
Flowable<StreamEntity<D>> thisStream =
forkedStream.filter(new ForkFilter<>(idx)).map(RecordWithForkMap::getRecordCopyIfNecessary);
forkStreams.add(inputStream.withRecordStream(thisStream,
mustCopy ? (S) CopyHelper.copy(inputStream.getSchema()) : inputStream.getSchema()));
mustCopy ? (GlobalMetadata<S>) CopyHelper.copy(inputStream.getGlobalMetadata()) :
inputStream.getGlobalMetadata()));
} else {
forkStreams.add(null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.metadata;

import org.apache.gobblin.fork.CopyHelper;
import org.apache.gobblin.fork.CopyNotSupportedException;
import org.apache.gobblin.fork.Copyable;

import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;


/**
* Global metadata
* @param <S> schema type
*/
@AllArgsConstructor
@EqualsAndHashCode
public class GlobalMetadata<S> implements Copyable<GlobalMetadata<S>> {
@Getter
private S schema;

@Override
public GlobalMetadata<S> copy() throws CopyNotSupportedException {
if (CopyHelper.isCopyable(schema)) {
return new GlobalMetadata((S)CopyHelper.copy(schema));
}

throw new CopyNotSupportedException("Type is not copyable: " + schema.getClass().getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.function.Function;

import org.apache.gobblin.metadata.GlobalMetadata;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.stream.StreamEntity;

Expand All @@ -34,20 +35,29 @@
@Data
public class RecordStreamWithMetadata<D, S> {
private final Flowable<StreamEntity<D>> recordStream;
private final S schema;
private final GlobalMetadata<S> globalMetadata;

/**
* @return a new {@link RecordStreamWithMetadata} with a different {@link #recordStream} but same schema.
*/
public <DO> RecordStreamWithMetadata<DO, S> withRecordStream(Flowable<StreamEntity<DO>> newRecordStream) {
return withRecordStream(newRecordStream, this.schema);
return withRecordStream(newRecordStream, this.globalMetadata);
}

/**
* @return a new {@link RecordStreamWithMetadata} with a different {@link #recordStream} and {@link #schema}.
* @return a new {@link RecordStreamWithMetadata} with a different {@link #recordStream} and {@link #globalMetadata}.
*/
@Deprecated
public <DO, SO> RecordStreamWithMetadata<DO, SO> withRecordStream(Flowable<StreamEntity<DO>> newRecordStream, SO newSchema) {
return new RecordStreamWithMetadata<>(newRecordStream, newSchema);
return new RecordStreamWithMetadata<>(newRecordStream, new GlobalMetadata<SO>(newSchema));
}

/**
* @return a new {@link RecordStreamWithMetadata} with a different {@link #recordStream} and {@link #globalMetadata}.
*/
public <DO, SO> RecordStreamWithMetadata<DO, SO> withRecordStream(Flowable<StreamEntity<DO>> newRecordStream,
GlobalMetadata<SO> newGlobalMetadata) {
return new RecordStreamWithMetadata<>(newRecordStream, newGlobalMetadata);
}

/**
Expand All @@ -56,7 +66,7 @@ public <DO, SO> RecordStreamWithMetadata<DO, SO> withRecordStream(Flowable<Strea
*/
public <DO> RecordStreamWithMetadata<DO, S>
mapStream(Function<? super Flowable<StreamEntity<D>>, ? extends Flowable<StreamEntity<DO>>> transform) {
return new RecordStreamWithMetadata<>(transform.apply(this.recordStream), this.schema);
return new RecordStreamWithMetadata<>(transform.apply(this.recordStream), this.globalMetadata);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.gobblin.metadata.GlobalMetadata;
import org.apache.gobblin.records.RecordStreamWithMetadata;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.stream.StreamEntity;
Expand Down Expand Up @@ -128,7 +129,7 @@ default RecordStreamWithMetadata<D, S> recordStream(AtomicBoolean shutdownReques
}
});
recordStream = recordStream.doFinally(this::close);
return new RecordStreamWithMetadata<>(recordStream, schema);
return new RecordStreamWithMetadata<>(recordStream, new GlobalMetadata<>(schema));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.stream;

import java.io.Closeable;
import java.io.IOException;

import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metadata.GlobalMetadata;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.records.RecordStreamProcessor;
import org.apache.gobblin.records.RecordStreamWithMetadata;

import io.reactivex.Flowable;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;

/**
* A {@link RecordStreamProcessor} that inspects an input record and outputs control messages before, after, or around
* the input record
* @param <SI>
* @param <DI>
*/
public abstract class ControlMessageInjector<SI, DI> implements Closeable,
RecordStreamProcessor<SI, SI, DI, DI> {

@Setter(AccessLevel.PROTECTED)
@Getter(AccessLevel.PROTECTED)
private GlobalMetadata<SI> inputGlobalMetadata;

/**
* Initialize this {@link ControlMessageInjector}.
*
* @param workUnit a {@link WorkUnitState} object carrying configuration properties
* @return an initialized {@link ControlMessageInjector} instance
*/
public ControlMessageInjector<SI, DI> init(WorkUnitState workUnit) {
return this;
}

@Override
public void close() throws IOException {
}

/**
* Set the global metadata of the input messages
* @param inputGlobalMetadata the global metadata for input messages
* @param workUnit
*/
public void setInputGlobalMetadata(GlobalMetadata<SI> inputGlobalMetadata, WorkUnitState workUnit) {
this.inputGlobalMetadata = inputGlobalMetadata;
}

/**
* Inject {@link ControlMessage}s before the record
* @param inputRecordEnvelope
* @param workUnit
* @return The {@link ControlMessage}s to inject before the record
*/
public abstract Iterable<ControlMessage<DI>> injectControlMessagesBefore(RecordEnvelope<DI> inputRecordEnvelope,
WorkUnitState workUnit);

/**
* Inject {@link ControlMessage}s after the record
* @param inputRecordEnvelope
* @param workUnit
* @return The {@link ControlMessage}s to inject after the record
*/
public abstract Iterable<ControlMessage<DI>> injectControlMessagesAfter(RecordEnvelope<DI> inputRecordEnvelope,
WorkUnitState workUnit);

/**
* Apply injections to the input {@link RecordStreamWithMetadata}.
* {@link ControlMessage}s may be injected before, after, or around the input record.
* A {@link MetadataUpdateControlMessage} will update the current input {@link GlobalMetadata} and pass the
* updated input {@link GlobalMetadata} to the next processor to propagate the metadata update down the pipeline.
*/
@Override
public RecordStreamWithMetadata<DI, SI> processStream(RecordStreamWithMetadata<DI, SI> inputStream,
WorkUnitState workUnitState) throws StreamProcessingException {
init(workUnitState);

setInputGlobalMetadata(inputStream.getGlobalMetadata(), workUnitState);

Flowable<StreamEntity<DI>> outputStream =
inputStream.getRecordStream()
.flatMap(in -> {
if (in instanceof ControlMessage) {
ControlMessage out = (ControlMessage) in;
if (in instanceof MetadataUpdateControlMessage) {
setInputGlobalMetadata(((MetadataUpdateControlMessage) in).getGlobalMetadata(),
workUnitState);
out = new MetadataUpdateControlMessage<SI>(this.inputGlobalMetadata);
}

getMessageHandler().handleMessage((ControlMessage) in);
return Flowable.just(((ControlMessage<DI>) out));

} else if (in instanceof RecordEnvelope) {
RecordEnvelope<DI> recordEnvelope = (RecordEnvelope<DI>) in;
Iterable<ControlMessage<DI>> injectedBeforeIterable =
injectControlMessagesBefore(recordEnvelope, workUnitState);
Iterable<ControlMessage<DI>> injectedAfterIterable =
injectControlMessagesAfter(recordEnvelope, workUnitState);

if (injectedBeforeIterable == null && injectedAfterIterable == null) {
// nothing injected so return the record envelope
return Flowable.just(recordEnvelope);
} else {
Flowable<StreamEntity<DI>> flowable;

if (injectedBeforeIterable != null) {
flowable = Flowable.<StreamEntity<DI>>fromIterable(injectedBeforeIterable)
.concatWith(Flowable.just(recordEnvelope));
} else {
flowable = Flowable.just(recordEnvelope);
}

if (injectedAfterIterable != null) {
flowable.concatWith(Flowable.fromIterable(injectedAfterIterable));
}
return flowable;
}
} else {
throw new UnsupportedOperationException();
}
}, 1);
outputStream = outputStream.doOnComplete(this::close);
return inputStream.withRecordStream(outputStream, this.inputGlobalMetadata);
}

/**
* @return {@link ControlMessageHandler} to call for each {@link ControlMessage} received.
*/
public ControlMessageHandler getMessageHandler() {
return ControlMessageHandler.NOOP;
}
}

0 comments on commit 6957bd7

Please sign in to comment.