Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip #1

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 30 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,27 @@ Multi-hop declarative data pipelines

Hoptimator is an SQL-based control plane for complex data pipelines.

Hoptimator turns high-level SQL _subscriptions_ into low-level SQL
_pipelines_. Pipelines may involve an auto-generated Flink job (or
similar) and any arbitrary resources required for the job to run.
Hoptimator turns high-level SQL _subscriptions_ into multi-hop data pipelines. Pipelines may involve an auto-generated Flink job (or similar) and any arbitrary resources required for the job to run.

## How does it work?

Hoptimator has a pluggable _adapter_ framework, which lets you wire-up
arbtitary data sources. Adapters loosely correspond to "connectors"
in the underlying compute engine (e.g. Flink Connectors), but they may
bring along additional _baggage_. For example, an adapter may bring
along a cache or a CDC stream as part of the resulting pipeline.
Hoptimator has a pluggable _adapter_ framework, which lets you wire-up arbtitary data sources. Adapters loosely correspond to connectors in the underlying compute engine (e.g. Flink Connectors), but they may include custom control plane logic. For example, an adapter may create a cache or a CDC stream as part of a pipeline. This enables a single pipeline to span multiple "hops" across different systems (as opposed to, say, a single Flink job).

Hoptimator's pipelines tend to have the following general shape:

_________
topic1 ----------------------> | |
table2 --> CDC ---> topic2 --> | SQL job | --> topic4
table3 --> rETL --> topic3 --> |_________|


The three data sources on the left correspond to three different adapters:

1. `topic1` can be read directly from a Flink job, so the first adapter simply configures a Flink connector.
2. `table2` is inefficient for bulk access, so the second adapter creates a CDC stream (`topic2`) and configures a Flink connector to read from _that_.
3. `table3` is in cold storage, so the third adapter creates a reverse-ETL job to re-ingest the data into Kafka.

In order to deploy such a pipeline, you only need to write one SQL query, called a _subscription_. Pipelines are constructed automatically based on subscriptions.

## Quick Start

Expand Down Expand Up @@ -54,4 +64,16 @@ You can verify the job is running by inspecting the output:
> !q
```

## The Operator

Hoptimator-operator is a Kubernetes operator that orchestrates multi-hop data pipelines based on Subscriptions (a custom resource). When a Subscription is deployed, the operator:

1. creates a _plan_ based on the Subscription SQL. The plan includes a set of _resources_ that make up a _pipeline_.
2. deploys each resource in the pipeline. This may involve creating Kafka topics, Flink jobs, etc.
3. reports Subscription status, which depends on the status of each resource in the pipeline.

The operator is extensible via _adapters_. Among other responsibilities, adapters can implement custom control plane logic (see `ControllerProvider`), or they can depend on external operators. For example, the Kafka adapter actively manages Kafka topics using a custom controller. The Flink adapter defers to [flink-kubernetes-operator](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/) to manage Flink jobs.

## The CLI

Hoptimator includes a SQL CLI based on [sqlline](https://github.com/julianhyde/sqlline). This is primarily for testing and debugging purposes, but it can also be useful for runnig ad-hoc queries. The CLI leverages the same adapters as the operator, but it doesn't deploy anything. Instead, queries run as local, in-process Flink jobs.
3 changes: 3 additions & 0 deletions etc/integration-tests.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ SELECT * FROM DATAGEN.COMPANY;
-- MySQL CDC tables
SELECT * FROM INVENTORY."products_on_hand" LIMIT 1;

-- Test check command
!check not empty SELECT * FROM INVENTORY."products_on_hand";

-- MySQL CDC -> Kafka
SELECT * FROM RAWKAFKA."products" LIMIT 1;

Expand Down
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ kubernetesExtendedClient = "io.kubernetes:client-java-extended:16.0.2"
slf4jSimple = "org.slf4j:slf4j-simple:1.7.30"
slf4jApi = "org.slf4j:slf4j-api:1.7.30"
sqlline = "sqlline:sqlline:1.12.0"
commonsCli = 'commons-cli:commons-cli:1.4'

Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,23 @@ public Iterator<Row> rowIterator() {
}
}

/* Iterates over the selected field/column only, with a limit set on the number of collected elements */
public <T> Iterable<T> field(int pos, Integer limit) {
if(limit==null) {
return this.field(pos);
}
return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
try {
return datastream().map(r -> r.<T>getFieldAs(pos)).executeAndCollect(limit).iterator();
} catch (Exception e) {
return new ExceptionalIterator<>(e);
}
}
};
}

/** Iterates over the selected field/column only. */
public <T> Iterable<T> field(int pos) {
return new Iterable<T>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.hoptimator;

import com.linkedin.hoptimator.catalog.ScriptImplementor;
import org.apache.calcite.sql.dialect.MysqlSqlDialect;
import sqlline.SqlLine;
import sqlline.CommandHandler;
import sqlline.DispatchCallback;
Expand All @@ -17,12 +19,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Properties;
import java.util.*;
import java.io.IOException;

public class HoptimatorCliApp {
Expand All @@ -49,6 +46,7 @@ protected int run(String[] args) throws IOException {
commandHandlers.add(new PipelineCommandHandler());
commandHandlers.add(new IntroCommandHandler());
commandHandlers.add(new InsertCommandHandler());
commandHandlers.add(new TestCommandHandler());
sqlline.updateCommandHandlers(commandHandlers);
return sqlline.begin(args, null, true).ordinal();
}
Expand Down Expand Up @@ -246,7 +244,133 @@ public void execute(String line, DispatchCallback dispatchCallback) {
sqlline.output("SQL:");
HopTable outputTable = new HopTable("PIPELINE", "SINK", plan.getRowType(),
Collections.singletonMap("connector", "dummy"));
sqlline.output(impl.insertInto(outputTable));
sqlline.output(impl.insertInto(outputTable).sql(MysqlSqlDialect.DEFAULT));
dispatchCallback.setToSuccess();
} catch (Exception e) {
sqlline.error(e.toString());
e.printStackTrace();
dispatchCallback.setToFailure();
}
}

@Override
public List<Completer> getParameterCompleters() {
return Collections.emptyList();
}

@Override
public boolean echoToFile() {
return false;
}
}

private class TestCommandHandler implements CommandHandler {

@Override
public String getName() {
return "check";
}

@Override
public List<String> getNames() {
return Collections.singletonList(getName());
}

@Override
public String getHelpText() {
return "Usage: !check <value> <query>, !check empty <query>, !check not empty <query>";
}

@Override
public String matches(String line) {
String sql = line;
if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
sql = sql.substring(1);
}

if (sql.startsWith("check")) {
sql = sql.substring("check".length() + 1);
return sql;
}

return null;
}

@Override
public void execute(String line, DispatchCallback dispatchCallback) {
String sql = line;
if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
sql = sql.substring(1);
}

if (sql.startsWith("check")) {
sql = sql.substring("check".length() + 1);
}

//remove semicolon from query if present
if (sql.length() > 0 && sql.charAt(sql.length() - 1) == ';') {
sql = sql.substring(0, sql.length() - 1);
}

String connectionUrl = sqlline.getConnectionMetadata().getUrl();
try {
String[] type = sql.split(" ", 2);
if(type.length < 2) {
throw new IllegalArgumentException("Invalid usage"); //TODO: expand
}

String value = null;
String query = null;

String checkType=type[0];
switch (checkType) {
case "not":
query = type[1].split(" ", 2)[1].trim();
break;
case "empty":
query = type[1].trim();
break;
case "value":
String[] valueQuery = type[1].split(" ", 2);
value = valueQuery[0].trim();
query = valueQuery[1].trim();
break;
default:
throw new IllegalArgumentException("Expected one of 'not', 'empty', or 'value'");
}

HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, new Properties());
PipelineRel plan = planner.pipeline(query);
PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
ScriptImplementor scriptImplementor = impl.query();
String pipelineSql = scriptImplementor.sql();
FlinkIterable iterable = new FlinkIterable(pipelineSql);
Iterator<String> iter = iterable.<String>field(0, 1).iterator();
switch(checkType) {
case "not":
if (!iter.hasNext()) {
throw new IllegalArgumentException("Expected >0 rows from query result");
}
break;
case "empty":
if (iter.hasNext()) {
throw new IllegalArgumentException("Expected 0 rows from query result");
}
break;
case "value":
boolean varFound = false;
while (iter.hasNext()) {
if(String.valueOf(iter.next()).contains(value)) {
varFound = true;
break;
}
}
if (varFound) {
break;
}
throw new IllegalArgumentException("Query result did not contain expected value");
}
sqlline.output("PASS");
dispatchCallback.setToSuccess();
} catch (Exception e) {
sqlline.error(e.toString());
Expand Down Expand Up @@ -328,7 +452,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
PipelineRel plan = planner.pipeline("SELECT " + query);
PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
HopTable sink = planner.database(database).makeTable(table, impl.rowType());
String pipelineSql = impl.insertInto(sink) + "\nSELECT 'SUCCESS';";
String pipelineSql = impl.insertInto(sink).sql(MysqlSqlDialect.DEFAULT) + "\nSELECT 'SUCCESS';";
FlinkIterable iterable = new FlinkIterable(pipelineSql);
Iterator<String> iter = iterable.<String>field(0).iterator();
if (iter.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.hoptimator;

import com.linkedin.hoptimator.catalog.RuleProvider;
import com.linkedin.hoptimator.catalog.ScriptImplementor;
import com.linkedin.hoptimator.planner.PipelineRel;
import com.linkedin.hoptimator.planner.HoptimatorHook;

Expand All @@ -21,6 +22,7 @@
import org.apache.calcite.rel.convert.ConverterImpl;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.sql.dialect.MysqlSqlDialect;
import org.apache.calcite.util.BuiltInMethod;

import java.util.Collections;
Expand Down Expand Up @@ -64,7 +66,9 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
BlockBuilder builder = new BlockBuilder();
PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), rowType, pref.preferCustom());
PipelineRel.Implementor impl = new PipelineRel.Implementor(getInput());
String sql = impl.query();

ScriptImplementor scriptImplementor = impl.query();
String sql = scriptImplementor.sql(MysqlSqlDialect.DEFAULT); // TODO
Hook.QUERY_PLAN.run(sql); // for script validation in tests
HoptimatorHook.QUERY_PLAN.run(sql); // ditto
Expression iter = builder.append("iter", Expressions.new_(FlinkIterable.class, Expressions.constant(sql),
Expand Down
3 changes: 2 additions & 1 deletion hoptimator-operator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ dependencies {
implementation libs.kubernetesClient
implementation libs.kubernetesExtendedClient
implementation libs.slf4jApi

implementation libs.commonsCli

testImplementation libs.junit
testImplementation libs.assertj
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@
import com.linkedin.hoptimator.operator.subscription.SubscriptionReconciler;
import com.linkedin.hoptimator.planner.HoptimatorPlanner;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,10 +45,34 @@ public HoptimatorOperatorApp(String modelPath, String namespace, Properties prop
}

public static void main(String[] args) throws Exception {
if (args.length != 1) {
if (args.length < 1) {
throw new IllegalArgumentException("Missing model file argument.");
}
new HoptimatorOperatorApp(args[0], "default", new Properties()).run();

Options options = new Options();

Option namespace = new Option("n", "namespace", true, "specified namespace");
namespace.setRequired(false);
options.addOption(namespace);

CommandLineParser parser = new DefaultParser();
HelpFormatter formatter = new HelpFormatter();
CommandLine cmd;

try {
cmd = parser.parse(options, args);
} catch (ParseException e) {
System.out.println(e.getMessage());
formatter.printHelp("utility-name", options);

System.exit(1);
return;
}

String modelFileInput = cmd.getArgs()[0];
String namespaceInput = cmd.getOptionValue("namespace", "default");

new HoptimatorOperatorApp(modelFileInput, namespaceInput, new Properties()).run();
}

public void run() throws Exception {
Expand Down