Skip to content

Commit

Permalink
improved MVEL support via templates and intrinsics. README.md updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
cwensel committed Jul 21, 2023
1 parent aed193c commit e00a94e
Show file tree
Hide file tree
Showing 10 changed files with 368 additions and 130 deletions.
112 changes: 74 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,37 +48,11 @@ Overriding command line options include
- `--output-manifest`
- `--output-manifest-lot`

In order to embed system properties, environment variables, or other provided intrinsic values, [MVEL
templates](http://mvel.documentnode.com) are supported.

Current context values supported are:

- Environment variables
- System properties
- Pipeline source properties
- Pipeline sink properties

For example:

- `@{env['USER']}` - resolve an environment variable
- `@{sys['user.name']}` - resolve a system property
- `@{sink.manifestLot}` - resolve a sink property from the pipeline JSON definition

Used in a transform to embed the current `lot` value into the output:

```json
{
"transform": [
"@{source.manifestLot}=>lot|string"
]
}
```

### Supported data formats

- `text/regex` - lines of text parsed by regex
- `csv` - with or without headers
- `tsv` - with or without headers
- text/regex - lines of text parsed by regex
- csv - with or without headers
- tsv - with or without headers
- [Apache Parquet](https://parquet.apache.org)

Regex support is based on regex groups. Groups are matched by ordinal with the declared fields in the schema.
Expand Down Expand Up @@ -148,27 +122,89 @@ Usage:
instant format, e.g. `2011-12-03T10:15:30Z`
- `json` - canonical type is `com.fasterxml.jackson.databind.JsonNode`, supports nested objects and arrays

## To Build
## Pipeline Template expressions

In order to embed system properties, environment variables, or other provided intrinsic values, [MVEL
templates](http://mvel.documentnode.com) are supported in the pipeline JSON file.

Provided intrinsic values include:

- `env[...]` - Environment variables
- `sys[...]` - System properties
- `source.*` - Pipeline source properties
- `sink.*` - Pipeline sink properties
- `pid` - `ProcessHandle.current().pid()`
- `rnd32` - `Math.abs(random.nextInt())` always returns the same value
- `rnd64` - `Math.abs(random.nextLong())` always returns the same value
- `rnd32Next` - `Math.abs(random.nextInt())` never returns the same value
- `rnd64Next` - `Math.abs(random.nextLong)` never returns the same value
- `hostAddress` - `localHost.getHostAddress()`
- `hostName` - `localHost.getCanonicalHostName()`
- `currentTimeMillis` - `now.toEpochMilli()`
- `currentTimeISO8601` - `now.toString()` at millis precision
- `currentTimeYear` - `utc.getYear()`
- `currentTimeMonth` - `utc.getMonthValue()` zero padded
- `currentTimeDay` - `utc.getDayOfMonth()` zero padded
- `currentTimeHour` - `utc.getHour()` zero padded
- `currentTimeMinute` - `utc.getMinute()` zero padded
- `currentTimeSecond` - `utc.getSecond()` zero padded

Where:

- `Random random = new Random()`
- `InetAddress localHost = InetAddress.getLocalHost()`
- `Instant now = Instant.now()`
- `ZonedDateTime utc = now.atZone(ZoneId.of("UTC"))`

For example:

- `@{env['USER']}` - resolve an environment variable
- `@{sys['user.name']}` - resolve a system property
- `@{sink.manifestLot}` - resolve a sink property from the pipeline JSON definition

Used in a transform to embed the current `lot` value into the output:

```json
{
"transform": [
"@{source.manifestLot}=>lot|string"
]
}
```

Or create a filename that prevents collisions but simplifies duplicate removal:

```json
{
"filename": {
"prefix": "access",
"includeGuid": true,
"providedGuid": "@{sink.manifestLot}-@{currentTimeMillis}",
"includeFieldsHash": true
}
}
```

Will result in a filename similar to `access-1717f2ea-20230717PT5M250-1689896792672-00000-00000-m-00000.gz`.

## Building

So that the Cascading WIP releases can be retrieved, to `gradle.properties` add:

```properties
githubUsername=[your github username]
githubPassword=[your github password]
githubPassword=[your github personal access token]
```

> ./gradlew installDist
## To Run
See creating a personal access
token [here](https://docs.github.com/en/github/authenticating-to-github/creating-a-personal-access-token).

```shell
./tessellate-main/build/install/tess/bin/tess --help
./gradlew installDist
```

To print a project file template:

```shell
tess --print-pipeline
./tessellate-main/build/install/tessellate/bin/tess --help
```

Documentation coming soon, but see the tests for usage.
2 changes: 1 addition & 1 deletion tessellate-main/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ dependencies {
implementation("ch.qos.logback:logback-classic:1.4.8")
implementation("ch.qos.logback:logback-core:1.4.8")

val cascading = "4.6.0-wip-8"
val cascading = "4.6.0-wip-9"
implementation("net.wensel:cascading-core:$cascading")
implementation("net.wensel:cascading-nested-json:$cascading")
implementation("net.wensel:cascading-local:$cascading")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ public void writeSuccess(Properties conf) throws IOException {
manifest.put("uriType", "identifier");
manifest.put("uris", Observed.INSTANCE.writes(uriPrefix));

try (TupleEntryCollector tupleEntryCollector = openForWrite(conf, complete)) {
Properties properties = new Properties(conf);

properties.put("cascading.tapcollector.partname", "manifest-data.json");

try (TupleEntryCollector tupleEntryCollector = openForWrite(properties, complete)) {
tupleEntryCollector.add(new Tuple(JSONUtil.valueToTree(manifest)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public abstract class FSFactory extends FilesFactory {
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser(System.getProperty("user.name", "nobody")));
}

@NotNull
private static String logPrefix(boolean isSink) {
return isSink ? "writing" : "reading";
}

@Override
public int openWritesThreshold() {
return 10;
Expand Down Expand Up @@ -112,8 +117,57 @@ public boolean commitResource(Properties conf) throws IOException {
}

@NotNull
private static String logPrefix(boolean isSink) {
return isSink ? "writing" : "reading";
private static Hfs createSourceTap(Properties local, Scheme scheme, URI commonURI, List<URI> uris) {
Observed.INSTANCE.reads(commonURI);

String[] identifiers = uris.stream()
.map(URI::toString)
.toArray(String[]::new);

return new Hfs(scheme, commonURI.toString(), SinkMode.UPDATE) {
@Override
public boolean isSink() {
return false;
}

@Override
public void sourceConfInit(FlowProcess<? extends Configuration> process, Configuration conf) {
HadoopUtil.copyConfiguration(local, conf);

applySourceConfInitIdentifiers(process, conf, identifiers);

verifyNoDuplicates(conf);
}

@Override
public Fields retrieveSourceFields(FlowProcess<? extends Configuration> flowProcess) {
HadoopUtil.copyConfiguration(local, flowProcess.getConfig());
return super.retrieveSourceFields(flowProcess);
}
};
}

@NotNull
private static Hfs createSinkTap(Properties local, Scheme scheme, URI commonURI, List<URI> uris, URI manifest) {
if (uris.size() > 1) {
throw new IllegalArgumentException("cannot write to multiple uris, got: " + uris.stream().limit(10));
}

Observed.INSTANCE.writes(commonURI);

return new Hfs(scheme, commonURI.toString(), SinkMode.UPDATE) {
@Override
public void sourceConfInit(FlowProcess<? extends Configuration> process, Configuration conf) {
HadoopUtil.copyConfiguration(local, conf);
super.sourceConfInit(process, conf);
}

@Override
public void sinkConfInit(FlowProcess<? extends Configuration> process, Configuration conf) {
HadoopUtil.copyConfiguration(local, conf);
super.sinkConfInit(process, conf);
}
};
}

@Override
Expand Down Expand Up @@ -143,14 +197,16 @@ protected Properties getProperties(PipelineOptions pipelineOptions, Dataset data
}

protected Properties applySinkProperties(Dataset dataset, Fields declaredFields, Properties local) {
String prefix = PART_NAME_DEFAULT;
if (!isSink(dataset)) {
return local;
}

// hdfs always treat paths as directories, so we need to provide a prefix for the part files
if (isSink(dataset)) {
prefix = getPartFileName((Sink) dataset, declaredFields);
}
String prefix = getPartFileName((Sink) dataset, declaredFields);
String format = String.format("%%s%%s%s-%%05d-%%05d", prefix);
local.setProperty("cascading.tapcollector.partname", format);

local.setProperty("cascading.tapcollector.partname", String.format("%%s%%s%s-%%05d-%%05d", prefix));
LOG.info("sinking to prefix: {}", prefix);

return local;
}
Expand Down Expand Up @@ -192,60 +248,6 @@ protected Properties applyAWSProperties(PipelineOptions pipelineOptions, Propert
return properties;
}

@NotNull
private static Hfs createSourceTap(Properties local, Scheme scheme, URI commonURI, List<URI> uris) {
Observed.INSTANCE.reads(commonURI);

String[] identifiers = uris.stream()
.map(URI::toString)
.toArray(String[]::new);

return new Hfs(scheme, commonURI.toString(), SinkMode.UPDATE) {
@Override
public boolean isSink() {
return false;
}

@Override
public void sourceConfInit(FlowProcess<? extends Configuration> process, Configuration conf) {
HadoopUtil.copyConfiguration(local, conf);

applySourceConfInitIdentifiers(process, conf, identifiers);

verifyNoDuplicates(conf);
}

@Override
public Fields retrieveSourceFields(FlowProcess<? extends Configuration> flowProcess) {
HadoopUtil.copyConfiguration(local, flowProcess.getConfig());
return super.retrieveSourceFields(flowProcess);
}
};
}

@NotNull
private static Hfs createSinkTap(Properties local, Scheme scheme, URI commonURI, List<URI> uris, URI manifest) {
if (uris.size() > 1) {
throw new IllegalArgumentException("cannot write to multiple uris, got: " + uris.stream().limit(10));
}

Observed.INSTANCE.writes(commonURI);

return new Hfs(scheme, commonURI.toString(), SinkMode.UPDATE) {
@Override
public void sourceConfInit(FlowProcess<? extends Configuration> process, Configuration conf) {
HadoopUtil.copyConfiguration(local, conf);
super.sourceConfInit(process, conf);
}

@Override
public void sinkConfInit(FlowProcess<? extends Configuration> process, Configuration conf) {
HadoopUtil.copyConfiguration(local, conf);
super.sinkConfInit(process, conf);
}
};
}

private String getAWSCredentialProviders() {
LinkedList<Class> list = new LinkedList<>(S3AUtils.STANDARD_AWS_PROVIDERS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.clusterless.tessellate.model.PipelineDef;
import io.clusterless.tessellate.util.JSONUtil;
import io.clusterless.tessellate.util.LiteralResolver;
import io.clusterless.tessellate.util.MVELContext;
import org.jetbrains.annotations.NotNull;
import org.mvel2.templates.TemplateRuntime;
import org.slf4j.Logger;
Expand Down Expand Up @@ -134,20 +135,18 @@ public PipelineDef merge(JsonNode pipelineDef) {
loadAndMerge(pipelineDef, "/sink");

String mergedPipelineDef = JSONUtil.writeAsStringSafe(pipelineDef);
Map<String, Object> context = getContext(mergedPipelineDef);
MVELContext context = getContext(mergedPipelineDef);
String resolved = TemplateRuntime.eval(mergedPipelineDef, context).toString();
LOG.info("pipeline: {}", resolved);

return JSONUtil.stringToValue(resolved, PipelineDef.class);
}

@NotNull
private static Map<String, Object> getContext(String mergedPipelineDef) {
Map<String, Object> context = LiteralResolver.context();
private static MVELContext getContext(String mergedPipelineDef) {
Map map = JSONUtil.stringToValue(mergedPipelineDef, Map.class);
MVELContext context = LiteralResolver.context((Map<String, Object>) map.get("source"), (Map<String, Object>) map.get("source"));

context.put("source", map.get("source"));
context.put("sink", map.get("sink"));
return context;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,11 @@
import io.clusterless.tessellate.model.*;
import io.clusterless.tessellate.options.PipelineOptions;
import io.clusterless.tessellate.util.Format;
import io.clusterless.tessellate.util.LiteralResolver;
import io.clusterless.tessellate.util.Models;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -202,14 +199,6 @@ public void build() throws IOException {
.addTail(pipe));
}

@NotNull
protected Map<String, Object> getContext() {
Map<String, Object> context = LiteralResolver.context();
context.put("source", pipelineDef.source());
context.put("sink", pipelineDef.sink());
return context;
}

private static void logCurrentFields(Fields currentFields) {
LOG.info("current fields: {}", currentFields);
}
Expand Down
Loading

0 comments on commit e00a94e

Please sign in to comment.