forked from linkedin/Hoptimator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
PipelineRel.java
103 lines (85 loc) · 3.36 KB
/
PipelineRel.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package com.linkedin.hoptimator.planner;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.rel.type.RelDataTypeImpl;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import com.linkedin.hoptimator.catalog.Resource;
import com.linkedin.hoptimator.catalog.HopTable;
import com.linkedin.hoptimator.catalog.ScriptImplementor;
import java.util.ArrayList;
import java.util.List;
/**
* Calling convention which implements an SQL-based streaming data pipeline.
*
* Pipelines tend to have the following general shape:
* <pre>
* _________
* topic1 ----------------------> | |
* table2 --> CDC ---> topic2 --> | SQL job | --> topic4
* table3 --> rETL --> topic3 --> |_________|
*
* </pre>
* The SQL job may consume multiple sources but writes to a single sink. The
* CDC and rETL "hops" are made possible by Resources, which describe any
* additional infra required by the Pipeline. As Resources are essentially
* YAML, anything can be represented there, including additional SQL jobs.
*
*/
public interface PipelineRel extends RelNode {
Convention CONVENTION = new Convention.Impl("PIPELINE", PipelineRel.class);
// SqlDialect OUTPUT_DIALECT = MysqlSqlDialect.DEFAULT; // closely resembles Flink SQL
// TODO support alternative output dialects
void implement(Implementor implementor);
/** Implements a Pipeline using SQL.
*/
class Implementor {
private final RelNode relNode;
private final List<Resource> resources = new ArrayList<>();
private ScriptImplementor script = ScriptImplementor.empty().database("PIPELINE");
public Implementor(RelNode relNode) {
this.relNode = relNode;
visit(relNode);
}
public RelDataType rowType() {
return relNode.getRowType();
}
public RelProtoDataType rowProtoType() {
return RelDataTypeImpl.proto(rowType());
}
/** Depend on an arbitrary Resource, e.g. a Kafka topic */
public void resource(Resource resource) {
this.resources.add(resource);
}
private void visit(RelNode input) {
input.getInputs().forEach(x -> visit(x));
((PipelineRel) input).implement(this);
}
/** Script ending in SELECT... */
public ScriptImplementor query() {
return script.query(relNode);
}
/** Script ending in INSERT INTO ... */
public ScriptImplementor insertInto(HopTable sink) {
return script.database(sink.database()).with(sink)
.insert(sink.database(), sink.name(), relNode);
}
/** Add any resources, SQL, DDL etc required to access the table. */
public void implement(HopTable table) {
script = script.database(table.database()).with(table);
table.resources().forEach(x -> resource(x));
}
public Pipeline pipeline(HopTable sink) {
return pipeline(sink, AnsiSqlDialect.DEFAULT);
}
/** Combine SQL and any Resources into a Pipeline */
public Pipeline pipeline(HopTable sink, SqlDialect sqlDialect) {
List<Resource> resourcesAndJob = new ArrayList<>();
resourcesAndJob.addAll(resources);
resourcesAndJob.add(new SqlJob(insertInto(sink).sql(sqlDialect)));
return new Pipeline(resourcesAndJob, rowType());
}
}
}