Skip to content
This repository has been archived by the owner on Oct 1, 2020. It is now read-only.

Commit

Permalink
New release 1.0 -> 1.1:
Browse files Browse the repository at this point in the history
- Extractor inferface no longer throws BackingStoreException.
- Extractor, Transformer and Loader can all throw
UnrecoverableStreamException which aborts stream.
- Speed optimizations for simple transformations and loaders bypassing
introspective mapping functions.
- Support for java.time and Optional objects on the stream.
- S3FastLoader will abort the stream if it fails to flush its buffer to
S3.
- RedshiftBulkLoader will abort the stream if it fails to write to S3 or
there is a problem executing the COPY SQL on Redshift.
- All stages will actively report a count of 0 on the metrics object
even if they do not process any records.
  • Loading branch information
bmaizels committed Mar 11, 2019
1 parent c03b2df commit 9935a76
Show file tree
Hide file tree
Showing 46 changed files with 947 additions and 388 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.amazonaws</groupId>
<artifactId>pocket-etl</artifactId>
<version>1.0.1</version>
<version>1.1.0</version>

<licenses>
<license>
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/com/amazon/pocketEtl/EtlCombineStage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -15,18 +15,18 @@

package com.amazon.pocketEtl;

import java.util.Collection;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;

import com.amazon.pocketEtl.core.consumer.EtlConsumer;
import com.amazon.pocketEtl.core.producer.EtlProducer;

import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;

import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.stream.Collectors;

@Getter(AccessLevel.PACKAGE)
@EqualsAndHashCode(callSuper = true)
class EtlCombineStage extends EtlProducerStage {
private final static String DEFAULT_COMBINE_STAGE_NAME = "EtlStream.Combine";

Expand Down
4 changes: 1 addition & 3 deletions src/main/java/com/amazon/pocketEtl/EtlConsumerStage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@

import com.amazon.pocketEtl.core.DefaultLoggingStrategy;
import com.amazon.pocketEtl.core.consumer.EtlConsumer;
import lombok.EqualsAndHashCode;
import lombok.Getter;

import javax.annotation.Nonnull;
Expand All @@ -29,7 +28,6 @@
* @param <T> The type of the data being operated on by this stage in the stream.
*/
@Getter
@EqualsAndHashCode
public abstract class EtlConsumerStage<T> {
/**
* Static constructor for an EtlConsumerStage that transforms data in the stream. Used as a component in an
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/com/amazon/pocketEtl/EtlExtractStage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -15,21 +15,21 @@

package com.amazon.pocketEtl;

import java.util.Collection;
import java.util.Collections;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;

import com.amazon.pocketEtl.core.consumer.EtlConsumer;
import com.amazon.pocketEtl.core.executor.EtlExecutorFactory;
import com.amazon.pocketEtl.core.producer.EtlProducer;
import com.amazon.pocketEtl.core.producer.EtlProducerFactory;

import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;

import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Collections;
import java.util.stream.Collectors;

@Getter(AccessLevel.PACKAGE)
@EqualsAndHashCode(callSuper = true)
class EtlExtractStage extends EtlProducerStage {
private final static String DEFAULT_EXTRACT_STAGE_NAME = "EtlStream.Extract";
private final static EtlExecutorFactory defaultExecutorFactory = new EtlExecutorFactory();
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/com/amazon/pocketEtl/EtlLoadStage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -15,21 +15,21 @@

package com.amazon.pocketEtl;

import static org.apache.logging.log4j.LogManager.getLogger;

import java.util.function.Function;

import javax.annotation.Nonnull;

import com.amazon.pocketEtl.core.consumer.EtlConsumer;
import com.amazon.pocketEtl.core.consumer.EtlConsumerFactory;
import com.amazon.pocketEtl.core.executor.EtlExecutor;
import com.amazon.pocketEtl.core.executor.EtlExecutorFactory;

import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;

import javax.annotation.Nonnull;
import java.util.function.Function;

import static org.apache.logging.log4j.LogManager.getLogger;

@Getter(AccessLevel.PACKAGE)
@EqualsAndHashCode(callSuper = true)
class EtlLoadStage<T> extends EtlConsumerStage<T> {
private final static String DEFAULT_LOAD_STAGE_NAME = "EtlStream.Load";

Expand Down
14 changes: 7 additions & 7 deletions src/main/java/com/amazon/pocketEtl/EtlProducerStage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -15,24 +15,24 @@

package com.amazon.pocketEtl;

import java.util.Arrays;
import java.util.Collection;

import javax.annotation.Nonnull;

import com.amazon.pocketEtl.core.consumer.EtlConsumer;
import com.amazon.pocketEtl.core.producer.EtlProducer;

import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.Collection;

/**
* Abstract class that represents the producer stage in an EtlStream. An EtlStream consists of a producer stage chained
* to one or more consumer stages.
*/
@Getter(AccessLevel.PACKAGE)
@RequiredArgsConstructor
@EqualsAndHashCode
public abstract class EtlProducerStage {
/**
* Static constructor for an EtlProducerStage that extracts data and puts it on the stream. Used as a component in an
Expand Down
15 changes: 1 addition & 14 deletions src/main/java/com/amazon/pocketEtl/EtlProfilingScope.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,10 +46,8 @@
*/
public class EtlProfilingScope implements AutoCloseable {
private final static Log logger = LogFactory.getLog(EtlProfilingScope.class);
private final static ThreadLocal<EtlMetrics> rootMetrics = new ThreadLocal<>();
private final EtlMetrics ourMetrics;
private final long metricsStart;
private final boolean isRoot;
private final String scopeName;
private boolean closed = false;

Expand All @@ -70,13 +68,6 @@ private EtlProfilingScope(final EtlMetrics metrics, final String scopeName, fina
ourMetrics = metrics;
}

if (rootMetrics.get() == null) {
rootMetrics.set(ourMetrics);
isRoot = true;
} else {
isRoot = false;
}

this.scopeName = scopeName;
metricsStart = System.currentTimeMillis();
}
Expand Down Expand Up @@ -117,10 +108,6 @@ public void close() {
}
closed = true;

if (isRoot) {
rootMetrics.remove();
}

if (ourMetrics != null) {
final long metricsEnd = System.currentTimeMillis();
ourMetrics.addTime(scopeName, metricsEnd - metricsStart);
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/com/amazon/pocketEtl/EtlTransformStage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -16,21 +16,21 @@
package com.amazon.pocketEtl;


import static org.apache.logging.log4j.LogManager.getLogger;

import java.util.function.Function;

import javax.annotation.Nonnull;

import com.amazon.pocketEtl.core.consumer.EtlConsumer;
import com.amazon.pocketEtl.core.consumer.EtlConsumerFactory;
import com.amazon.pocketEtl.core.executor.EtlExecutor;
import com.amazon.pocketEtl.core.executor.EtlExecutorFactory;

import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;

import javax.annotation.Nonnull;
import java.util.function.Function;

import static org.apache.logging.log4j.LogManager.getLogger;

@Getter(AccessLevel.PACKAGE)
@EqualsAndHashCode(callSuper = true)
class EtlTransformStage<T> extends EtlConsumerStage<T> {
private final static String DEFAULT_TRANSFORM_STAGE_NAME = "EtlStream.Transform";

Expand Down
10 changes: 6 additions & 4 deletions src/main/java/com/amazon/pocketEtl/Extractor.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,8 @@

import javax.annotation.Nullable;
import java.util.Optional;
import java.util.prefs.BackingStoreException;

import com.amazon.pocketEtl.exception.UnrecoverableStreamFailureException;

/**
* Interface for an extractor object that either extracts objects from some kind of storage.
Expand All @@ -31,9 +32,10 @@ public interface Extractor<T> extends AutoCloseable {
*
* @return An optional object that contains the next extracted object or empty if there are no objects left to be
* extracted.
* @throws BackingStoreException If there was a problem extracting the object from the backing store.
* @throws UnrecoverableStreamFailureException An unrecoverable problem that affects the entire stream has been
* detected and the stream needs to be aborted.
*/
Optional<T> next() throws BackingStoreException;
Optional<T> next() throws UnrecoverableStreamFailureException;

/**
* Signal the extractor to prepare to extract objects.
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/com/amazon/pocketEtl/Loader.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,8 @@

import javax.annotation.Nullable;

import com.amazon.pocketEtl.exception.UnrecoverableStreamFailureException;

/**
* Interface for a Loader that loads (writes) objects into final its final destination.
*
Expand All @@ -28,8 +30,10 @@ public interface Loader<T> extends AutoCloseable {
* Load a single object to to the destination store/service.
*
* @param objectToLoad The object to be loaded.
* @throws UnrecoverableStreamFailureException An unrecoverable problem that affects the entire stream has been
* detected and the stream needs to be aborted.
*/
void load(T objectToLoad);
void load(T objectToLoad) throws UnrecoverableStreamFailureException;

/**
* Signal the loader to prepare to load objects.
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/com/amazon/pocketEtl/Transformer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@
import javax.annotation.Nullable;
import java.util.List;

import com.amazon.pocketEtl.exception.UnrecoverableStreamFailureException;

/**
* Interface for an object that transforms an object of one type into a stream of objects of another type. This allows
* for an expansion or contraction of the stream where one object can become many objects or no objects.
Expand All @@ -32,8 +34,10 @@ public interface Transformer<UpstreamType, DownstreamType> extends AutoCloseable
*
* @param objectToTransform The object to be transformed.
* @return The transformed object.
* @throws UnrecoverableStreamFailureException An unrecoverable problem that affects the entire stream has been
* detected and the stream needs to be aborted.
*/
List<DownstreamType> transform(UpstreamType objectToTransform);
List<DownstreamType> transform(UpstreamType objectToTransform) throws UnrecoverableStreamFailureException;

/**
* Signal the transformer to prepare to transform objects.
Expand Down
Loading

0 comments on commit 9935a76

Please sign in to comment.