Skip to content

Commit

Permalink
refactor:
Browse files Browse the repository at this point in the history
1) add TableFactoryDiscoverable trait
2) add util for handling rowtime/proctime for table schema and unittests
  • Loading branch information
Shuyi Chen committed Jul 7, 2018
1 parent d70d033 commit 4324efd
Show file tree
Hide file tree
Showing 46 changed files with 482 additions and 317 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connectors.TableConnectorFactory;
import org.apache.flink.table.connectors.TableFactoryDiscoverable;
import org.apache.flink.table.connectors.TableSourceFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptorValidator;
import org.apache.flink.table.descriptors.KafkaValidator;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.descriptors.TableDescriptorValidator;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -69,12 +69,8 @@
/**
* Factory for creating configured instances of {@link KafkaJsonTableSource}.
*/
abstract class KafkaTableSourceFactory implements TableConnectorFactory<TableSource<Row>> {

@Override
public String getType() {
return TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE();
}
abstract class KafkaTableSourceFactory
implements TableSourceFactory<Row>, TableFactoryDiscoverable {

@Override
public Map<String, String> requiredContext() {
Expand Down Expand Up @@ -125,7 +121,7 @@ public List<String> supportedProperties() {
}

@Override
public TableSource<Row> create(Map<String, String> properties) {
public TableSource<Row> createTableSource(Map<String, String> properties) {
final DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(properties);

Expand Down Expand Up @@ -188,7 +184,7 @@ public TableSource<Row> create(Map<String, String> properties) {
});

// schema
final TableSchema schema = params.getTableSchema(SCHEMA());
final TableSchema schema = SchemaValidator.deriveTableSourceSchema(params);
builder.withSchema(schema);

// proctime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connectors.TableSourceFactoryService;
import org.apache.flink.table.connectors.TableFactoryService;
import org.apache.flink.table.connectors.TableSourceFactory;
import org.apache.flink.table.descriptors.Avro;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
Expand Down Expand Up @@ -116,8 +118,11 @@ private void testTableSource(FormatDescriptor format) {
.field("zip", Types.STRING)
.field("proctime", Types.SQL_TIMESTAMP).proctime());

DescriptorProperties properties = new DescriptorProperties(true);
testDesc.addProperties(properties);
final TableSource<?> factorySource =
(TableSource<?>) TableSourceFactoryService.findAndCreateTableConnector(testDesc);
((TableSourceFactory) TableFactoryService.find(TableSourceFactory.class, testDesc))
.createTableSource(properties.asMap());

assertEquals(builderSource, factorySource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.flink.formats.json.JsonSchemaConverter;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connectors.TableSourceFactoryService;
import org.apache.flink.table.connectors.TableFactoryService;
import org.apache.flink.table.connectors.TableSourceFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
Expand Down Expand Up @@ -146,8 +148,11 @@ private void testTableSource(FormatDescriptor format) {
new Rowtime().timestampsFromField("time").watermarksFromSource())
.field("proc-time", Types.SQL_TIMESTAMP).proctime());

DescriptorProperties properties = new DescriptorProperties(true);
testDesc.addProperties(properties);
final TableSource<?> factorySource =
(TableSource<?>) TableSourceFactoryService.findAndCreateTableConnector(testDesc);
((TableSourceFactory) TableFactoryService.find(TableSourceFactory.class, testDesc))
.createTableSource(properties.asMap());

assertEquals(builderSource, factorySource);
}
Expand Down
2 changes: 1 addition & 1 deletion flink-libraries/flink-sql-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ under the License.
<include>org.codehaus.commons.compiler.properties</include>
<include>org/codehaus/janino/**</include>
<include>org/codehaus/commons/**</include>
<include>META-INF/services/org.apache.flink.table.connectors.TableConnectorFactory</include>
<include>META-INF/services/org.apache.flink.table.connectors.TableFactoryDiscoverable</include>
<!-- flink-sql-client -->
<include>org/jline/**</include>
<include>com/fasterxml/jackson/**</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,6 @@ public void open() {
case SELECT:
callSelect(cmdCall);
break;
case INSERT:
callInsert(cmdCall);
break;
case SOURCE:
callSource(cmdCall);
break;
Expand Down Expand Up @@ -338,10 +335,6 @@ private void callSelect(SqlCommandCall cmdCall) {
}
}

private void callInsert(SqlCommandCall cmdCall) {
executor.executeUpdate(context, cmdCall.operands[0]);
}

private void callSource(SqlCommandCall cmdCall) {
final String pathString = cmdCall.operands[0];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ private static String[] splitOperands(SqlCommand cmd, String originalCall, Strin
return new String[] {operands.substring(0, delimiter), operands.substring(delimiter + 1)};
}
case SELECT:
case INSERT:
return new String[] {originalCall};
default:
return new String[] {operands};
Expand All @@ -90,7 +89,6 @@ enum SqlCommand {
DESCRIBE("describe"),
EXPLAIN("explain"),
SELECT("select"),
INSERT("insert"),
SET("set"),
RESET("reset"),
SOURCE("source");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void setTables(List<Map<String, Object>> tables) {
TableDescriptor tableDescriptor = create(tableName, properties);
if (null == tableDescriptor) {
throw new SqlClientException(
"Invalid table 'type' attribute value, only 'source' or 'sink' is supported");
"Invalid table 'type' attribute value, only 'source', 'sink' and 'both' is supported");
}
if (this.tables.containsKey(tableName)) {
throw new SqlClientException("Duplicate table name '" + tableName + "'.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.client.config;

import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.TableDescriptor;
import org.apache.flink.table.descriptors.TableDescriptorValidator;

Expand All @@ -44,6 +45,11 @@ public Map<String, String> getProperties() {
return properties;
}

@Override
public void addProperties(DescriptorProperties properties) {
this.properties.forEach(properties::putString);
}

public Source toSource() {
final Map<String, String> newProperties = new HashMap<>(properties);
newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ public interface Executor {
*/
ResultDescriptor executeQuery(SessionContext session, String query) throws SqlExecutionException;

/**
* Submits a Flink SQL update job (detached).
*/
void executeUpdate(SessionContext session, String query) throws SqlExecutionException;

/**
* Asks for the next changelog results (non-blocking).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@
import org.apache.flink.table.client.config.SourceSink;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.connectors.TableSinkFactoryService;
import org.apache.flink.table.connectors.TableSourceFactoryService;
import org.apache.flink.table.connectors.TableFactoryService;
import org.apache.flink.table.connectors.TableSinkFactory;
import org.apache.flink.table.connectors.TableSourceFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.util.FlinkException;
Expand Down Expand Up @@ -102,20 +104,30 @@ public ExecutionContext(Environment defaultEnvironment, SessionContext sessionCo
tableSinks = new HashMap<>();
mergedEnv.getTables().forEach((name, descriptor) -> {
if (descriptor instanceof Source) {
DescriptorProperties properties = new DescriptorProperties(true);
descriptor.addProperties(properties);
tableSources.put(name,
(TableSource<?>) TableSourceFactoryService.findAndCreateTableConnector(
descriptor, classLoader));
((TableSourceFactory) TableFactoryService.find(
TableSourceFactory.class, descriptor, classLoader))
.createTableSource(properties.asMap()));
} else if (descriptor instanceof Sink) {
DescriptorProperties properties = new DescriptorProperties(true);
descriptor.addProperties(properties);
tableSinks.put(name,
(TableSink<?>) TableSinkFactoryService.findAndCreateTableConnector(
descriptor, classLoader));
((TableSinkFactory) TableFactoryService.find(
TableSinkFactory.class, descriptor, classLoader))
.createTableSink(properties.asMap()));
} else if (descriptor instanceof SourceSink) {
DescriptorProperties properties = new DescriptorProperties(true);
descriptor.addProperties(properties);
tableSources.put(name,
(TableSource<?>) TableSourceFactoryService.findAndCreateTableConnector(
((SourceSink) descriptor).toSource(), classLoader));
((TableSourceFactory) TableFactoryService.find(
TableSourceFactory.class, descriptor, classLoader))
.createTableSource(properties.asMap()));
tableSinks.put(name,
(TableSink<?>) TableSinkFactoryService.findAndCreateTableConnector(
((SourceSink) descriptor).toSink(), classLoader));
((TableSinkFactory) TableFactoryService.find(
TableSinkFactory.class, descriptor, classLoader))
.createTableSink(properties.asMap()));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,6 @@ public ResultDescriptor executeQuery(SessionContext session, String query) throw
return executeQueryInternal(context, query);
}

@Override
public void executeUpdate(SessionContext session, String query) throws SqlExecutionException {
final ExecutionContext<?> context = getOrCreateExecutionContext(session);
executeUpdateInternal(context, query);
}

@Override
public TypedResult<List<Tuple2<Boolean, Row>>> retrieveResultChanges(SessionContext session,
String resultId) throws SqlExecutionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ under the License.
<file>
<source>src/test/resources/test-factory-services-file</source>
<outputDirectory>META-INF/services</outputDirectory>
<destName>org.apache.flink.table.connectors.TableConnectorFactory</destName>
<destName>org.apache.flink.table.connectors.TableFactoryDiscoverable</destName>
<fileMode>0755</fileMode>
</file>
</files>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class DependencyTest {
"table-connector-factory-test-jar.jar";

@Test
public void testTableConnectorFactoryDiscovery() throws Exception {
public void testTableFactoryDiscovery() throws Exception {
// create environment
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_0", "test-connector");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public void testMerging() throws Exception {
final Set<String> tables = new HashSet<>();
tables.add("TableNumber1");
tables.add("TableNumber2");
tables.add("TableNumber3");
tables.add("NewTable");

assertEquals(tables, merged.getTables().keySet());
Expand Down
Loading

0 comments on commit 4324efd

Please sign in to comment.