Skip to content

Commit

Permalink
support MVEL templates within the pipeline JSON file
Browse files Browse the repository at this point in the history
also remove the evalInsert transform, as it's redundant and less useful than the template support
  • Loading branch information
cwensel committed Jul 20, 2023
1 parent 02627a2 commit aed193c
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 79 deletions.
70 changes: 62 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ This project is under active development and many features are considered alpha.
Please do play around with this project in order to provide early feedback, but do expect things to change until we hit
1.0 release.

All final and WIP releases can be found here:

- https://github.com/ClusterlessHQ/tessellate/releases

## About

A primary activity of any data-engineering effort is to format and organize data for different access patterns.
Expand All @@ -22,14 +26,62 @@ Tessellate may be used from the command line, but also natively supports the

## Features

### Supported formats
### Pipeline definition

Tessellate pipelines are defined in JSON files.

For a copy of a template pipeline JSON file, run:

```shell
tess --print-pipeline > pipeline.json
```

Some command line options are merged at runtime with the pipeline JSON file. Command line options take precedence over
the pipeline JSON file.

Overriding command line options include

- `--inputs`
- `--input-manifest`
- `--input-manifest-lot`
- `--output`
- `--output-manifest`
- `--output-manifest-lot`

In order to embed system properties, environment variables, or other provided intrinsic values, [MVEL
templates](http://mvel.documentnode.com) are supported.

Current context values supported are:

- Environment variables
- System properties
- Pipeline source properties
- Pipeline sink properties

For example:

- `@{env['USER']}` - resolve an environment variable
- `@{sys['user.name']}` - resolve a system property
- `@{sink.manifestLot}` - resolve a sink property from the pipeline JSON definition

Used in a transform to embed the current `lot` value into the output:

```json
{
"transform": [
"@{source.manifestLot}=>lot|string"
]
}
```

### Supported data formats

- `text/regex` - lines of text parsed by regex
- `csv` - with or without headers
- `tsv` - with or without headers
- [Apache Parquet](https://parquet.apache.org)

The regex support is based on regex groups. Groups are matched by ordinal with the declared fields in the schema.
Regex support is based on regex groups. Groups are matched by ordinal with the declared fields in the schema.

Provided named formats include:

Expand All @@ -49,7 +101,7 @@ Usage:
}
```

### Supported locations/protocols
### Supported data locations/protocols

- `file://`
- `s3://`
Expand All @@ -69,8 +121,6 @@ Usage:

- insert - insert a literal value into a field
- `value=>intoField|type`
- eval - evaluate an expression locally and insert into a field (relies on [MVEL](http://mvel.documentnode.com))
- `expression!>intoField|type`
- coerce - transform a field to a new type
- `field|newType`
- copy - copy a field value to a new field
Expand Down Expand Up @@ -102,7 +152,7 @@ Usage:

So that the Cascading WIP releases can be retrieved, to `gradle.properties` add:

```
```properties
githubUsername=[your github username]
githubPassword=[your github password]
```
Expand All @@ -111,10 +161,14 @@ githubPassword=[your github password]
## To Run

> ./tessellate-main/build/install/tess/bin/tess --help
```shell
./tessellate-main/build/install/tess/bin/tess --help
```

To print a project file template:

> tess --print-project
```shell
tess --print-pipeline
```

Documentation coming soon, but see the tests for usage.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ enum Show {
@CommandLine.Mixin
protected PipelineOptions pipelineOptions = new PipelineOptions();

@CommandLine.Option(names = "--print-project", description = "show project template, will not run pipeline")
protected boolean printProject = false;
@CommandLine.Option(names = "--print-pipeline", description = "show pipeline template, will not run pipeline")
protected boolean printPipeline = false;

@CommandLine.Option(names = "--show-source", description = "show protocols, formats, or compression options")
protected Show showSource;
Expand Down Expand Up @@ -137,7 +137,7 @@ public Integer call() throws IOException {

PipelineDef pipelineDef = merge.merge();

if (printProject) {
if (printPipeline) {
System.out.println(JSONUtil.writeAsStringSafePretty(pipelineDef));
return 0;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class InputOptions implements AWSOptions {
private List<URI> inputs = new LinkedList<>();
@CommandLine.Option(names = {"-m", "--input-manifest"}, description = "input manifest uri")
private URI inputManifest;
@CommandLine.Option(names = {"--input-lot"}, description = "input lot")
@CommandLine.Option(names = {"--input-manifest-lot"}, description = "input lot")
private String inputLot;
@CommandLine.Option(names = {"--input-aws-endpoint"}, description = "aws endpoint")
protected String awsEndpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class OutputOptions implements AWSOptions {
private URI output;
@CommandLine.Option(names = {"-t", "--output-manifest"}, description = "output manifest uri template")
private String outputManifest;
@CommandLine.Option(names = {"-l", "--output-lot"}, description = "output lot")
@CommandLine.Option(names = {"-l", "--output-manifest-lot"}, description = "output lot")
private String outputLot;
@CommandLine.Option(names = {"--output-aws-endpoint"}, description = "aws endpoint")
protected String awsEndpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import heretical.pointer.path.NestedPointer;
import io.clusterless.tessellate.model.PipelineDef;
import io.clusterless.tessellate.util.JSONUtil;
import io.clusterless.tessellate.util.LiteralResolver;
import org.jetbrains.annotations.NotNull;
import org.mvel2.templates.TemplateRuntime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -130,9 +133,22 @@ public PipelineDef merge(JsonNode pipelineDef) {
loadAndMerge(pipelineDef, "/source");
loadAndMerge(pipelineDef, "/sink");

LOG.info("pipeline: {}", JSONUtil.writeAsStringSafe(pipelineDef));
String mergedPipelineDef = JSONUtil.writeAsStringSafe(pipelineDef);
Map<String, Object> context = getContext(mergedPipelineDef);
String resolved = TemplateRuntime.eval(mergedPipelineDef, context).toString();
LOG.info("pipeline: {}", resolved);

return JSONUtil.treeToValueSafe(pipelineDef, PipelineDef.class);
return JSONUtil.stringToValue(resolved, PipelineDef.class);
}

@NotNull
private static Map<String, Object> getContext(String mergedPipelineDef) {
Map<String, Object> context = LiteralResolver.context();
Map map = JSONUtil.stringToValue(mergedPipelineDef, Map.class);

context.put("source", map.get("source"));
context.put("sink", map.get("sink"));
return context;
}

private void loadAndMerge(JsonNode jsonNode, String target) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,6 @@ public void build() throws IOException {
// todo: group like transforms together if there are no interdependencies
for (TransformOp transformOp : pipelineDef.transform().transformOps()) {
switch (transformOp.transform()) {
case eval:
EvalInsertOp evalOp = (EvalInsertOp) transformOp;
Fields evalFields = evalOp.field().fields();
Object eval = evalOp.evaluate(getContext());
LOG.info("transform eval: fields: {}, value: {}", evalFields, eval);
pipe = new Each(pipe, new Insert(evalFields, eval), Fields.ALL);
currentFields = currentFields.append(evalFields);
logCurrentFields(currentFields);
break;
case insert:
InsertOp insertOp = (InsertOp) transformOp;
Fields insertFields = insertOp.field().fields();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

public enum Transforms {
insert("=>", "^.+[=]>.+$", InsertOp::new),
eval("!>", "^.+[!]>.+$", EvalInsertOp::new),
copy("+>", "^.+[+]>.+$", CopyOp::new),
rename("->", "^.+[-]>.+$", RenameOp::new),
discard("->", "^.+[-]>$", DiscardOp::new),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ public static JsonNode stringToTree(String value) {
}
}

public static <T> T stringToValue(String value, Class<T> type) {
try {
return CONFIG_READER.readValue(value, type);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static <T> T treeToValue(JsonNode n, Class<T> type) throws JsonProcessingException {
return CONFIG_READER.treeToValue(n, type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.adelean.inject.resources.junit.jupiter.GivenTextResource;
import com.adelean.inject.resources.junit.jupiter.TestWithResources;
import io.clusterless.tessellate.model.InsertOp;
import io.clusterless.tessellate.model.PipelineDef;
import io.clusterless.tessellate.options.PipelineOptions;
import io.clusterless.tessellate.options.PipelineOptionsMerge;
Expand Down Expand Up @@ -39,6 +40,8 @@ void usingOptions(@GivenTextResource("/config/pipeline.json") String pipelineJso

assertEquals(inputs, merged.source().inputs());
assertEquals(output, merged.sink().output());

assertEquals("1689820455", ((InsertOp) merged.transform().transformOps().get(5)).value());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ void name(@GivenTextResource("config/pipeline.json") String pipelineJson) throws
assertInstanceOf(CopyOp.class, transform.transformOps().get(2));
assertInstanceOf(DiscardOp.class, transform.transformOps().get(3));
assertInstanceOf(InsertOp.class, transform.transformOps().get(4));
assertInstanceOf(EvalInsertOp.class, transform.transformOps().get(5));
assertInstanceOf(InsertOp.class, transform.transformOps().get(5));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class LiteralResolverTest {
@Test
void resolveEnv() {
Assertions.assertEquals(System.getenv("USER"), LiteralResolver.resolve("env.USER", String.class));
Assertions.assertEquals(System.getenv("USER"), LiteralResolver.resolve("env['USER']", String.class));
Assertions.assertEquals(System.getProperty("user.name"), LiteralResolver.resolve("sys['user.name']", String.class));
}
}
2 changes: 1 addition & 1 deletion tessellate-main/src/test/resources/config/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@
"three+>@three|DateTime|yyyyMMdd",
"four->",
"five=>_five",
"1689820455!>six|DateTime|yyyyMMdd"
"@{1689820455}=>six|DateTime|yyyyMMdd"
]
}

0 comments on commit aed193c

Please sign in to comment.