Permalink
Browse files

Delete the pipeline package

  • Loading branch information...
thomas-kielbus committed Jun 12, 2014
1 parent 8e856c3 commit 9a7e2223050e0d925e8c3b47ac5a94da56b18271
@@ -1,70 +0,0 @@
-/**
- * Copyright 2014 LiveRamp
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.liveramp.megadesk.recipes.pipeline;
-
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-
-import com.liveramp.megadesk.base.transaction.BaseDependency;
-import com.liveramp.megadesk.core.transaction.Context;
-import com.liveramp.megadesk.recipes.gear.Outcome;
-import com.liveramp.megadesk.recipes.queue.Batch;
-
-public abstract class BatchConsumerOperator<VALUE> extends Operator {
-
- private final Batch batch;
-
- public BatchConsumerOperator(Batch batch, BaseDependency dependency, Pipeline pipeline) {
- super(BaseDependency.builder()
- .reads((List)dependency.reads())
- .writes((List)dependency.writes())
- .writes(batch.getInput(), batch.getOutput())
- .build(),
- pipeline);
- this.batch = batch;
- }
-
- @Override
- public Outcome check(Context context) {
- Outcome check = super.check(context);
- if (check == Outcome.SUCCESS) {
- ImmutableList currentBatch = (ImmutableList)batch.read(context);
- if (!currentBatch.isEmpty()) {
- return Outcome.SUCCESS;
- } else {
- batch.pop(context);
- return Outcome.STANDBY;
- }
- } else {
- return check;
- }
- }
-
- @Override
- public Outcome execute(Context context) throws Exception {
- ImmutableList currentBatch = (ImmutableList)batch.read(context);
- Outcome outcome = this.consume(context, currentBatch);
- if (outcome == Outcome.SUCCESS) {
- batch.pop(context);
- return outcome;
- } else {
- return outcome;
- }
- }
-
- public abstract Outcome consume(Context context, ImmutableList<VALUE> batch);
-}
@@ -1,41 +0,0 @@
-/**
- * Copyright 2014 LiveRamp
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.liveramp.megadesk.recipes.pipeline;
-
-import com.liveramp.megadesk.base.transaction.BaseDependency;
-import com.liveramp.megadesk.core.transaction.Context;
-import com.liveramp.megadesk.recipes.gear.ConditionalGear;
-import com.liveramp.megadesk.recipes.gear.Outcome;
-
-public abstract class Operator extends ConditionalGear {
-
- private Pipeline pipeline;
-
- protected Operator(BaseDependency dependency, Pipeline pipeline) {
- super(dependency);
- this.pipeline = pipeline;
- }
-
- @Override
- public Outcome check(Context context) {
- if (pipeline.shouldShutdown()) {
- return Outcome.ABANDON;
- } else {
- return Outcome.SUCCESS;
- }
- }
-}
@@ -1,22 +0,0 @@
-/**
- * Copyright 2014 LiveRamp
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.liveramp.megadesk.recipes.pipeline;
-
-public interface Pipeline {
-
- public boolean shouldShutdown();
-}
@@ -1,54 +0,0 @@
-/**
- * Copyright 2014 LiveRamp
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.liveramp.megadesk.recipes.pipeline;
-
-import java.util.List;
-
-import com.liveramp.megadesk.base.transaction.BaseDependency;
-import com.liveramp.megadesk.core.state.Variable;
-import com.liveramp.megadesk.core.transaction.Context;
-import com.liveramp.megadesk.recipes.gear.Gear;
-import com.liveramp.megadesk.recipes.gear.Outcome;
-
-public abstract class TimeBasedOperator extends Operator implements Gear {
-
- protected TimeBasedOperator(List<Variable<? extends TimestampedValue>> reads, List<Variable<? extends TimestampedValue>> writes, Pipeline pipeline) {
- super(BaseDependency.builder().reads((List)reads).writes((List)writes).build(), pipeline);
- }
-
- @Override
- public Outcome check(Context context) {
- Outcome check = super.check(context);
- if (check == Outcome.SUCCESS) {
- long oldestRead = Long.MAX_VALUE;
- for (Variable<TimestampedValue> driver : this.dependency().reads()) {
- oldestRead = Math.min(context.read(driver.reference()).timestamp(), oldestRead);
- }
- long youngestWrite = 0;
- for (Variable<TimestampedValue> driver : this.dependency().writes()) {
- youngestWrite = Math.max(context.read(driver.reference()).timestamp(), youngestWrite);
- }
- if (oldestRead >= youngestWrite) {
- return Outcome.SUCCESS;
- } else {
- return Outcome.STANDBY;
- }
- } else {
- return check;
- }
- }
-}
@@ -1,5 +0,0 @@
-package com.liveramp.megadesk.recipes.pipeline;
-
-public interface TimestampedValue {
- public long timestamp();
-}

0 comments on commit 9a7e222

Please sign in to comment.