Skip to content

Commit

Permalink
#13 building GCS paginator
Browse files Browse the repository at this point in the history
  • Loading branch information
ananasjoe committed Jun 17, 2019
1 parent 3709b81 commit 26f270d
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 3 deletions.
4 changes: 4 additions & 0 deletions runner/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ repositories {
dependencies {
def withoutX = { exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core:2.9.5' }

compile 'com.google.cloud:google-cloud-storage:1.74.0'

compile group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version: '2.9.0'

compile 'org.freemarker:freemarker:2.3.28'
compile 'org.apache.beam:beam-sdks-java-io-tika:2.9.0'
compile 'org.apache.beam:beam-sdks-java-io-kafka:2.9.0'
Expand Down
5 changes: 5 additions & 0 deletions runner/src/main/java/org/ananas/runner/api/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import org.ananas.runner.kernel.ExtensionRegistry;
import org.ananas.runner.paginator.files.CSVPaginator;
import org.ananas.runner.paginator.files.JdbcPaginator;
import org.ananas.runner.paginator.files.GCSPaginator;
import org.ananas.runner.steprunner.DefaultDataViewer;
import org.ananas.runner.steprunner.files.FileLoader;
import org.ananas.runner.steprunner.files.GCSConnector;
import org.ananas.runner.steprunner.files.csv.CSVConnector;
import org.ananas.runner.steprunner.jdbc.JdbcLoader;
import org.ananas.runner.steprunner.sql.SQLTransformer;
Expand All @@ -25,6 +27,9 @@ public static void registerExtensions() {
ExtensionRegistry.registerConnector(
"org.ananas.source.file.csv", CSVConnector.class, CSVPaginator.class);

ExtensionRegistry.registerConnector(
"org.ananas.source.file.gcs", GCSConnector.class, GCSPaginator.class);

ExtensionRegistry.registerTransformer("org.ananas.transform.sql", SQLTransformer.class);

ExtensionRegistry.registerLoader(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.ananas.runner.kernel.pipeline;

import org.ananas.runner.kernel.model.Engine;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.spark.SparkPipelineOptions;
Expand All @@ -14,19 +16,31 @@ public static PipelineOptions create(boolean isTest, Engine engine) {
return createFlinkOptions(null);
}

switch (engine.type) {
case "Flink":
switch (engine.type.toLowerCase()) {
case "flink":
return createFlinkOptions(engine);
case "Spark":
case "spark":
return createSparkOptions(engine);
case "dataflow":
return createDataflowOptions(engine);
default:
return createFlinkOptions(null);
}
}

public static PipelineOptions createDataflowOptions(Engine engine) {
DataflowPipelineOptions options =
org.apache.beam.sdk.options.PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setJobName("ananas");
options.setRunner(DataflowRunner.class);
options.setTempLocation(engine.getProperty("tempLocation", "/tmp/"));
return options;
}

public static PipelineOptions createFlinkOptions(Engine engine) {
FlinkPipelineOptions options =
org.apache.beam.sdk.options.PipelineOptionsFactory.create().as(FlinkPipelineOptions.class);
options.setJobName("ananas");
if (engine == null) {
options.setParallelism(10);
options.setMaxBundleSize(1000 * 1000L);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package org.ananas.runner.paginator.files;

import com.google.api.gax.paging.Page;
import com.google.cloud.storage.*;
import org.ananas.runner.kernel.paginate.AutoDetectedSchemaPaginator;
import org.ananas.runner.kernel.paginate.Paginator;
import org.ananas.runner.model.steps.files.JsonPaginator;
import org.ananas.runner.model.steps.files.TextPaginator;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;

import java.util.Map;

public class GCSPaginator extends AutoDetectedSchemaPaginator {

private String format;
private Paginator paginator;

public GCSPaginator(String id,
String type,
Map<String, Object> config,
Schema schema) {
super(id, type, config, schema);
}

@Override
public Iterable<Row> iterateRows(Integer page, Integer pageSize) {

return paginator.paginateRows(
page == null ? 0 : Integer.valueOf(page),
pageSize == null ? 1000 : Integer.valueOf(pageSize)).getRight();
}

@Override
public Schema autodetect() {
if (format.toLowerCase().equals("csv")) {
return ((CSVPaginator)paginator).autodetect();
}
return this.schema;//already autodetected
}

@Override
public void parseConfig(Map<String, Object> config) {
System.out.println("Hey ho " + config);
this.format = (String)config.get("format");
String bucketName = (String)config.get("bucket");
String prefix = (String)config.get("path");
System.out.println("Hey ho " + prefix);

//get bucket
BucketInfo.Builder builder = Bucket.newBuilder(bucketName);

// Instantiates a client
Storage storage = StorageOptions.getDefaultInstance().getService();

// The name for the new bucket
//String bucketName = "id5";
//String prefix = "appnexus/20180802";

// Creates the new bucket
Page<Blob> l = storage.list(bucketName, Storage.BlobListOption.prefix(prefix));

String url = null;

if (l.hasNextPage()) {
throw new IllegalArgumentException("There is no files in this bucket");
}

for (Blob b : l.iterateAll()) {
url = b.getMediaLink();
this.config.put("path", url);
break;
}

this.config.put("path", url);

switch (format.toLowerCase()) {
case "csv":
paginator = new CSVPaginator(this.id, this.type, this.config, schema);
break;
case "json":
paginator = new JsonPaginator(this.id, url);
break;
case "text":
paginator = new TextPaginator(this.id, url);
break;
default:
throw new IllegalArgumentException("Unsupported format " + format);
}
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.ananas.runner.steprunner.files;

import org.ananas.runner.kernel.ConnectorStepRunner;
import org.ananas.runner.kernel.model.Step;
import org.ananas.runner.kernel.model.StepType;
import org.ananas.runner.kernel.paginate.AutoDetectedSchemaPaginator;
import org.ananas.runner.kernel.paginate.PaginatorFactory;
import org.ananas.runner.steprunner.files.csv.BeamTextCSVCustomTable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.commons.lang3.tuple.MutablePair;

public class GCSConnector extends ConnectorStepRunner {

private static final long serialVersionUID = -6637097842918012864L;

public GCSConnector(Pipeline pipeline, Step step, boolean doSampling, boolean isTest) {
super(pipeline, step, doSampling, isTest);
}

public void build() {
System.out.println(step.config);
//GCSStepConfig config = new GCSStepConfig(StepType.Connector, step.config);

Schema schema = step.getBeamSchema();

AutoDetectedSchemaPaginator paginator =
PaginatorFactory.of(stepId, step.metadataId, step.type, step.config, schema)
.buildPaginator();
schema = paginator.getSchema();

if (schema == null || step.forceAutoDetectSchema()) {
// find the paginator bind to it

}

this.stepId = stepId;

Integer page = 1;
Integer pageSize = 100;

MutablePair<Schema, Iterable<Row>> c = paginator.paginateRows(page, pageSize);


Create.Values<Row> o = Create.of(c.getRight());
this.output = this.pipeline.apply(o);
this.output.setRowSchema(schema);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ public CSVStepConfig(StepType type, Map<String, Object> config) {
this.url = StepFileConfigToUrl.url(type, config, FileLoader.SupportedFormat.CSV);
this.hasHeader = (Boolean) config.getOrDefault("header", false);
}



}

0 comments on commit 26f270d

Please sign in to comment.