Skip to content

Commit

Permalink
add new tsid intrinsic transform
Browse files Browse the repository at this point in the history
`tsid` - create a unique id as a long or string

using https://github.com/f4b6a3/tsid-creator
  • Loading branch information
cwensel committed Sep 20, 2023
1 parent 7bce281 commit 3f4d419
Show file tree
Hide file tree
Showing 13 changed files with 401 additions and 51 deletions.
50 changes: 33 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ Regex support is based on regex groups. Groups are matched by ordinal with the d
Provided named formats include:

- AWS S3 Access Logs
- named: `aws-s3-access-log`
- https://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html
- named: `aws-s3-access-log`
- https://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html

Usage:

Expand All @@ -84,32 +84,48 @@ Usage:
### Supported path and filename patterns

- Path partitioning - data can be partitioned by intrinsic values in the data set.
- partitioning can be named, e.g. `year=2023/month=01/day=01`, or
- unnamed, e.g. `2023/01/01`
- partitioning can be named, e.g. `year=2023/month=01/day=01`, or
- unnamed, e.g. `2023/01/01`
- Filename metadata - `[prefix]-[field-hash]-[guid].parquet`
- `prefix` is `part` by default
- `field-hash` is a hash of the schema: field names, and field types
- `guid` is a random UUID or a provided value
- `prefix` is `part` by default
- `field-hash` is a hash of the schema: field names, and field types
- `guid` is a random UUID or a provided value

### Supported operations

#### Transforms

- insert - insert a literal value into a field
- `value=>intoField|type`
- `value => intoField|type`
- coerce - transform a field to a new type
- `field|newType`
- `field|newType`
- copy - copy a field value to a new field
- `fromField+>toField|type`
- `fromField +> toField|type`
- rename - rename a field, optionally coercing its type
- `fromField->toField|type`
- `fromField -> toField|type`
- discard - remove a field
- `field->`

#### Functions

- tsid - create a unique long id
- `!tsid{node:...,nodeCount:...,signed:true/false,epoch:...}+>intoField|type`
- `field ->`

#### Intrinsic Functions

- `tsid` - create a unique id as a long or string (using https://github.com/f4b6a3/tsid-creator)
- `^tsid{node:...,nodeCount:...,epoch:...,format:...,counterToZero:...} +> intoField|type`
- `type` must be `string` or `long`, defaults to `long`. When `string`, the `format` is honored.
- Params:
- `node` - the node id, defaults to a random int.
- if a string is provided, it is hashed to an int
- SIP_HASHER.hashString(s, StandardCharsets.UTF_8).asInt() % nodeCount;
- `nodeCount` - the number of nodes, defaults to `1024`
- `epoch` - the epoch, defaults to `Instant.parse("2020-01-01T00:00:00.000Z").toEpochMilli()`
- `format` - the format, defaults to `null`. Example: `K%S` where `%S` is a placeholder.
- Placeholders:
- `%S`: canonical string in upper case
- `%s`: canonical string in lower case
- `%X`: hexadecimal in upper case
- `%x`: hexadecimal in lower case
- `%d`: base-10
- `%z`: base-62
- `counterToZero` - resets the counter portion when the millisecond changes, defaults to `false`

### Supported types

Expand Down
2 changes: 2 additions & 0 deletions tessellate-main/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ dependencies {

implementation("org.jparsec:jparsec:3.1")

implementation("com.github.f4b6a3:tsid-creator:5.2.5")

testImplementation("net.wensel:cascading-core:$cascading:tests")

// https://github.com/hosuaby/inject-resources
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) 2023 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package io.clusterless.tessellate.operation;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.OperationCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import com.github.f4b6a3.tsid.Tsid;
import com.github.f4b6a3.tsid.TsidFactory;

import java.lang.reflect.Type;
import java.time.Instant;
import java.util.function.Supplier;

public class TsidFunction extends BaseOperation<TsidFunction.Context> implements Function<TsidFunction.Context> {
private final Integer node;
private final Integer nodeCount;
private final Long epoch;
private final String format;
private final Boolean counterToZero;

public static class Context {

protected final Tuple tuple = Tuple.size(1);
protected final TsidFactory factory;
protected final Supplier<?> supplier;

public Context(Type type, Integer node, Integer nodeCount, Long epoch, String format, Boolean counterToZero) {
// pass nulls instead of defaults so that the builder can pick up env/property defaults
TsidFactory.Builder builder = TsidFactory.builder()
.withNode(node)
.withNodeBits(nodeCount != null ? (int) Math.ceil(Math.log(nodeCount) / Math.log(2)) : null)
.withCustomEpoch(Instant.ofEpochMilli(epoch == null ? Tsid.TSID_EPOCH : epoch));

// zeros the counter every millisecond
if (counterToZero != null && counterToZero) {
builder.withRandomFunction(byte[]::new);
}

this.factory = builder
.build();

if (type == String.class) {
if (format == null) {
supplier = () -> factory.create().toString();
} else {
supplier = () -> factory.create().format(format);
}
} else if (type == Long.class || type == Long.TYPE) {
supplier = () -> factory.create().toLong();
} else {
throw new IllegalArgumentException("unsupported type: " + type);
}
}

public Tuple next() {
tuple.set(0, supplier.get());
return tuple;
}
}

public TsidFunction(Fields fieldDeclaration, Integer node, Integer nodeCount, Long epoch, String format, Boolean counterToZero) {
super(0, fieldDeclaration);
this.node = node;
this.nodeCount = nodeCount;
this.epoch = epoch;
this.format = format;
this.counterToZero = counterToZero;

if (fieldDeclaration.size() != 1)
throw new IllegalArgumentException("fieldDeclaration may only declare one field, was " + fieldDeclaration.print());
}

@Override
public void prepare(FlowProcess flowProcess, OperationCall<Context> operationCall) {
operationCall.setContext(new Context(fieldDeclaration.getType(0), node, nodeCount, epoch, format, counterToZero));
}

@Override
public void operate(FlowProcess flowProcess, FunctionCall<Context> functionCall) {
functionCall.getOutputCollector().add(functionCall.getContext().next());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,12 @@ public Fields parseSingleFields(String value, Type defaultType) {
}

public Fields asFields(List<Field> fields) {
return asFields(fields, null);
}

public Fields asFields(List<Field> fields, Type defaultType) {
return fields.stream()
.map(this::asFields)
.map(f -> asFields(f, defaultType))
.reduce(Fields.NONE, Fields::append);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.clusterless.tessellate.parser.ast.Field;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class Printer {
Expand All @@ -33,4 +34,10 @@ public static String literal(String literal) {

return literal;
}

public static String params(Map<String, String> params) {
return params.entrySet().stream()
.map(e -> e.getKey() + ":" + literal(e.getValue()))
.collect(Collectors.joining(", "));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ public IntrinsicParams params() {

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Intrinsic{");
sb.append("name=").append(name);
sb.append(", params=").append(params);
sb.append('}');
return sb.toString();
return "^" + name + "{" + params() + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ public IntrinsicName(String name) {
this.name = name;
}

public String name() {
return name;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("IntrinsicName{");
sb.append("name='").append(name).append('\'');
sb.append('}');
return sb.toString();
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

package io.clusterless.tessellate.parser.ast;

import io.clusterless.tessellate.parser.Printer;

import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

public class IntrinsicParams {
Map<String, String> params;
Expand All @@ -22,11 +25,50 @@ public Map<String, String> params() {
return params;
}

public String getString(String param) {
return params.get(param);
}

public Boolean getBoolean(String param) {
String value = params.get(param);

if (value == null) {
return null;
}

return Boolean.parseBoolean(value);
}

public Integer getInteger(String param, Function<String, Integer> otherwise) {
try {
return getInteger(param);
} catch (NumberFormatException e) {
return otherwise.apply(getString(param));
}
}

public Integer getInteger(String param) {
String value = params.get(param);

if (value == null) {
return null;
}

return Integer.parseInt(value);
}

public Long getLong(String param) {
String value = params.get(param);

if (value == null) {
return null;
}

return Long.parseLong(value);
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("IntrinsicParams{");
sb.append("params=").append(params);
sb.append('}');
return sb.toString();
return Printer.params(params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2023 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package io.clusterless.tessellate.pipeline;

import io.clusterless.tessellate.pipeline.intrinsic.IntrinsicBuilder;
import io.clusterless.tessellate.pipeline.intrinsic.TsidIntrinsic;

import java.util.HashMap;
import java.util.Map;

public class Intrinsics {

static Map<String, IntrinsicBuilder> builders = new HashMap<>();

private static void add(IntrinsicBuilder intrinsicBuilder) {
builders.put(intrinsicBuilder.name(), intrinsicBuilder);
}

static {
add(new TsidIntrinsic());
}

public static Map<String, IntrinsicBuilder> builders() {
return builders;
}
}
Loading

0 comments on commit 3f4d419

Please sign in to comment.