Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@

import java.util.Map;

import static org.apache.flink.table.types.logical.LogicalTypeRoot.*;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;

//import static org.apache.flink.table.types.logical.LogicalTypeRoot.*;

/**
* Redis dynamic table source
Expand All @@ -52,8 +56,13 @@ public class RedisDynamicTableSource implements LookupTableSource {
private final RedisLookupOptions redisLookupOptions;
private final Map<String, String> properties;

private final String inlongMetric;
private final String auditHostAndPorts;
private final String auditKeys;

Comment on lines +59 to +62
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do these change for?

public RedisDynamicTableSource(Map<String, String> properties, ResolvedSchema tableSchema,
ReadableConfig config, RedisLookupOptions redisLookupOptions) {
ReadableConfig config, RedisLookupOptions redisLookupOptions, String inlongMetric, String auditHostAndPorts,
String auditKeys) {
this.properties = properties;
Preconditions.checkNotNull(properties, "properties should not be null");
this.tableSchema = tableSchema;
Expand All @@ -73,11 +82,15 @@ public RedisDynamicTableSource(Map<String, String> properties, ResolvedSchema ta
flinkJedisConfigBase = RedisHandlerServices
.findRedisHandler(InlongJedisConfigHandler.class, properties).createFlinkJedisConfig(config);
this.redisLookupOptions = redisLookupOptions;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
this.auditKeys = auditKeys;
}

@Override
public DynamicTableSource copy() {
return new RedisDynamicTableSource(properties, tableSchema, config, redisLookupOptions);
return new RedisDynamicTableSource(properties, tableSchema, config, redisLookupOptions, inlongMetric,
auditHostAndPorts, auditKeys);
}

@Override
Expand All @@ -88,6 +101,7 @@ public String asSummaryString() {
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
return TableFunctionProvider.of(new RedisRowDataLookupFunction(
redisMapper.getCommandDescription(), flinkJedisConfigBase, this.redisLookupOptions));
redisMapper.getCommandDescription(), flinkJedisConfigBase, this.redisLookupOptions, inlongMetric,
auditHostAndPorts, auditKeys));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.redis.source;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.redis.common.config.RedisLookupOptions;
import org.apache.inlong.sort.redis.common.container.InlongRedisCommandsContainer;
import org.apache.inlong.sort.redis.common.container.RedisCommandsContainerBuilder;
Expand Down Expand Up @@ -55,14 +57,25 @@ public class RedisRowDataLookupFunction extends TableFunction<RowData> {
private transient Cache<RowData, RowData> cache;
private InlongRedisCommandsContainer redisCommandsContainer;

private SourceMetricData sourceMetricData;

RedisRowDataLookupFunction(RedisCommandDescription redisCommandDescription,
FlinkJedisConfigBase flinkJedisConfigBase, RedisLookupOptions redisLookupOptions) {
FlinkJedisConfigBase flinkJedisConfigBase, RedisLookupOptions redisLookupOptions, String inlongMetric,
String auditHostAndPorts, String auditKeys) {
this.flinkJedisConfigBase = flinkJedisConfigBase;
this.redisCommand = redisCommandDescription.getCommand();
this.additionalKey = redisCommandDescription.getAdditionalKey();
this.cacheMaxSize = redisLookupOptions.getCacheMaxSize();
this.cacheExpireMs = redisLookupOptions.getCacheExpireMs();
this.maxRetryTimes = redisLookupOptions.getMaxRetryTimes();
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
.withAuditAddress(auditHostAndPorts)
.withAuditKeys(auditKeys)
.build();
if (metricOption != null) {
sourceMetricData = new SourceMetricData(metricOption);
}
}

/**
Expand Down Expand Up @@ -107,6 +120,10 @@ public void eval(Object... keys) {
throw new UnsupportedOperationException(
String.format("Unsupported for redisCommand: %s", redisCommand));
}
if (sourceMetricData != null) {
sourceMetricData.outputMetricsWithEstimate(rowData, System.currentTimeMillis());
}
LOG.info("Report audit: {} and time : {}", rowData, System.currentTimeMillis());
if (cache == null) {
collect(rowData);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,15 @@ public class RedisDynamicTableFactory implements DynamicTableSourceFactory, Dyna
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
ReadableConfig config = helper.getOptions();

helper.validate();
validateConfigOptions(config, SUPPORT_SOURCE_COMMANDS);
String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts = config.get(INLONG_AUDIT);
String auditKeys = config.get(AUDIT_KEYS);
return new RedisDynamicTableSource(context.getCatalogTable().getOptions(),
context.getCatalogTable().getResolvedSchema(), config, getJdbcLookupOptions(config));
context.getCatalogTable().getResolvedSchema(), config, getJdbcLookupOptions(config), inlongMetric,
auditHostAndPorts, auditKeys);
}

@Override
Expand Down
16 changes: 8 additions & 8 deletions licenses/inlong-audit/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -480,14 +480,14 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
org.mybatis.spring.boot:mybatis-spring-boot-autoconfigure:2.1.3 - mybatis-spring-boot-autoconfigure (https://github.com/mybatis/spring-boot-starter/tree/mybatis-spring-boot-2.1.3/mybatis-spring-boot-autoconfigure), (The Apache Software License, Version 2.0)
org.mybatis.spring.boot:mybatis-spring-boot-starter:2.1.3 - mybatis-spring-boot-starter (https://github.com/mybatis/spring-boot-starter/tree/mybatis-spring-boot-2.1.3/mybatis-spring-boot-starter), (The Apache Software License, Version 2.0)
org.apache.pulsar:pulsar-client:2.8.4 - Pulsar Client Java (https://github.com/apache/pulsar/tree/v2.8.4), (Apache License, Version 2.0)
org.springframework:spring-aop:5.3.32 - Spring AOP (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-beans:5.3.32 - Spring Beans (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-context:5.3.32 - Spring Context (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-core:5.3.32 - Spring Core (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-expression:5.3.32 - Spring Expression Language (SpEL) (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-jcl:5.3.32 - Spring Commons Logging Bridge (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-jdbc:5.3.32 - Spring JDBC (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-tx:5.3.32 - Spring Transaction (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-aop:5.3.34 - Spring AOP (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-beans:5.3.34 - Spring Beans (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-context:5.3.34 - Spring Context (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-core:5.3.34 - Spring Core (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-expression:5.3.34 - Spring Expression Language (SpEL) (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-jcl:5.3.34 - Spring Commons Logging Bridge (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-jdbc:5.3.34 - Spring JDBC (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-tx:5.3.34 - Spring Transaction (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
ru.yandex.clickhouse:clickhouse-jdbc:0.3.1 - clickhouse-jdbc (https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-jdbc), (The Apache Software License, Version 2.0)


Expand Down
2 changes: 1 addition & 1 deletion licenses/inlong-audit/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ Spring JDBC NOTICE
Spring Transaction NOTICE
========================================================================

Spring Framework 5.3.32
Spring Framework 5.3.34
Copyright (c) 2002-2022 Pivotal, Inc.

This product is licensed to you under the Apache License, Version 2.0
Expand Down
4 changes: 2 additions & 2 deletions licenses/inlong-audit/licenses/LICENSE-spring-aop.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@

=======================================================================

SPRING FRAMEWORK 5.3.32 SUBCOMPONENTS:
SPRING FRAMEWORK 5.3.34 SUBCOMPONENTS:

Spring Framework 5.3.32 includes a number of subcomponents
Spring Framework 5.3.34 includes a number of subcomponents
with separate copyright notices and license terms. The product that
includes this file does not necessarily use all the open source
subcomponents referred to below. Your use of the source
Expand Down
4 changes: 2 additions & 2 deletions licenses/inlong-audit/licenses/LICENSE-spring-beans.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@

=======================================================================

SPRING FRAMEWORK 5.3.32 SUBCOMPONENTS:
SPRING FRAMEWORK 5.3.34 SUBCOMPONENTS:

Spring Framework 5.3.32 includes a number of subcomponents
Spring Framework 5.3.34 includes a number of subcomponents
with separate copyright notices and license terms. The product that
includes this file does not necessarily use all the open source
subcomponents referred to below. Your use of the source
Expand Down
4 changes: 2 additions & 2 deletions licenses/inlong-audit/licenses/LICENSE-spring-context.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@

=======================================================================

SPRING FRAMEWORK 5.3.32 SUBCOMPONENTS:
SPRING FRAMEWORK 5.3.34 SUBCOMPONENTS:

Spring Framework 5.3.32 includes a number of subcomponents
Spring Framework 5.3.34 includes a number of subcomponents
with separate copyright notices and license terms. The product that
includes this file does not necessarily use all the open source
subcomponents referred to below. Your use of the source
Expand Down
4 changes: 2 additions & 2 deletions licenses/inlong-audit/licenses/LICENSE-spring-core.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@

=======================================================================

SPRING FRAMEWORK 5.3.32 SUBCOMPONENTS:
SPRING FRAMEWORK 5.3.34 SUBCOMPONENTS:

Spring Framework 5.3.32 includes a number of subcomponents
Spring Framework 5.3.34 includes a number of subcomponents
with separate copyright notices and license terms. The product that
includes this file does not necessarily use all the open source
subcomponents referred to below. Your use of the source
Expand Down
4 changes: 2 additions & 2 deletions licenses/inlong-audit/licenses/LICENSE-spring-expression.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@

=======================================================================

SPRING FRAMEWORK 5.3.32 SUBCOMPONENTS:
SPRING FRAMEWORK 5.3.34 SUBCOMPONENTS:

Spring Framework 5.3.32 includes a number of subcomponents
Spring Framework 5.3.34 includes a number of subcomponents
with separate copyright notices and license terms. The product that
includes this file does not necessarily use all the open source
subcomponents referred to below. Your use of the source
Expand Down
4 changes: 2 additions & 2 deletions licenses/inlong-audit/licenses/LICENSE-spring-jcl.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@

=======================================================================

SPRING FRAMEWORK 5.3.32 SUBCOMPONENTS:
SPRING FRAMEWORK 5.3.34 SUBCOMPONENTS:

Spring Framework 5.3.32 includes a number of subcomponents
Spring Framework 5.3.34 includes a number of subcomponents
with separate copyright notices and license terms. The product that
includes this file does not necessarily use all the open source
subcomponents referred to below. Your use of the source
Expand Down
4 changes: 2 additions & 2 deletions licenses/inlong-audit/licenses/LICENSE-spring-jdbc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@

=======================================================================

SPRING FRAMEWORK 5.3.32 SUBCOMPONENTS:
SPRING FRAMEWORK 5.3.34 SUBCOMPONENTS:

Spring Framework 5.3.32 includes a number of subcomponents
Spring Framework 5.3.34 includes a number of subcomponents
with separate copyright notices and license terms. The product that
includes this file does not necessarily use all the open source
subcomponents referred to below. Your use of the source
Expand Down
4 changes: 2 additions & 2 deletions licenses/inlong-audit/licenses/LICENSE-spring-tx.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@

=======================================================================

SPRING FRAMEWORK 5.3.32 SUBCOMPONENTS:
SPRING FRAMEWORK 5.3.34 SUBCOMPONENTS:

Spring Framework 5.3.32 includes a number of subcomponents
Spring Framework 5.3.34 includes a number of subcomponents
with separate copyright notices and license terms. The product that
includes this file does not necessarily use all the open source
subcomponents referred to below. Your use of the source
Expand Down
22 changes: 11 additions & 11 deletions licenses/inlong-manager/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -662,19 +662,19 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
org.apache.pulsar:pulsar-client:2.8.4 - Pulsar Client Java (https://github.com/apache/pulsar/tree/v2.8.4), (Apache License, Version 2.0)
org.roaringbitmap:RoaringBitmap:0.9.22 - org.roaringbitmap:RoaringBitmap (https://github.com/RoaringBitmap/RoaringBitmap/tree/0.9.22), (Apache 2)
org.roaringbitmap:shims:0.9.22 - org.roaringbitmap:shims (https://github.com/RoaringBitmap/RoaringBitmap/tree/0.9.22/shims), (Apache 2)
org.springframework:spring-aop:5.3.32 - Spring AOP (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-beans:5.3.32 - Spring Beans (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-context:5.3.32 - Spring Context (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-context-support:5.3.32 - Spring Context Support (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-core:5.3.32 - Spring Core (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-expression:5.3.32 - Spring Expression Language (SpEL) (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-jcl:5.3.32 - Spring Commons Logging Bridge (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-jdbc:5.3.32 - Spring JDBC (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-aop:5.3.34 - Spring AOP (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-beans:5.3.34 - Spring Beans (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-context:5.3.34 - Spring Context (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-context-support:5.3.34 - Spring Context Support (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-core:5.3.34 - Spring Core (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-expression:5.3.34 - Spring Expression Language (SpEL) (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-jcl:5.3.34 - Spring Commons Logging Bridge (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-jdbc:5.3.34 - Spring JDBC (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework.plugin:spring-plugin-core:2.0.0.RELEASE - Spring Plugin - Core (https://github.com/spring-projects/spring-plugin/tree/2.0.0.RELEASE), (Apache License, Version 2.0)
org.springframework.plugin:spring-plugin-metadata:2.0.0.RELEASE - Spring Plugin - Metadata Extension (https://github.com/spring-projects/spring-plugin/tree/2.0.0.RELEASE), (Apache License, Version 2.0)
org.springframework:spring-tx:5.3.32 - Spring Transaction (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-web:5.3.32 - Spring Web (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-webmvc:5.3.32 - Spring Web MVC (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-tx:5.3.34 - Spring Transaction (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-web:5.3.34 - Spring Web (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-webmvc:5.3.34 - Spring Web MVC (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
com.typesafe:ssl-config-core_2.11:0.3.7 - ssl-config-core (https://github.com/lightbend/ssl-config/tree/v0.3.7), (Apache-2.0)
org.apache.tomcat.embed:tomcat-embed-core:9.0.60 - tomcat-embed-core (https://tomcat.apache.org/), (Apache License, Version 2.0)
com.tencentcloudapi:tencentcloud-sdk-java-cls:3.1.830 - tencentcloud-sdk-java-cls (https://github.com/TencentCloud/tencentcloud-sdk-java-cls/tree/v3.1.830), (Apache License, Version 2.0)
Expand Down
4 changes: 2 additions & 2 deletions licenses/inlong-manager/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -2010,7 +2010,7 @@ Spring Expression Language (SpEL) NOTICE
Spring Web MVC NOTICE
========================================================================

Spring Framework 5.3.32
Spring Framework 5.3.34
Copyright (c) 2002-2022 Pivotal, Inc.

This product is licensed to you under the Apache License, Version 2.0
Expand Down Expand Up @@ -2043,7 +2043,7 @@ Spring Object/Relational Mapping NOTICE
Spring Web NOTICE
========================================================================

Spring Framework 5.3.32
Spring Framework 5.3.34
Copyright (c) 2002-2022 Pivotal, Inc.

This product is licensed to you under the Apache License, Version 2.0
Expand Down
4 changes: 2 additions & 2 deletions licenses/inlong-manager/licenses/LICENSE-spring-aop.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@

=======================================================================

SPRING FRAMEWORK 5.3.32 SUBCOMPONENTS:
SPRING FRAMEWORK 5.3.34 SUBCOMPONENTS:

Spring Framework 5.3.32 includes a number of subcomponents
Spring Framework 5.3.34 includes a number of subcomponents
with separate copyright notices and license terms. The product that
includes this file does not necessarily use all the open source
subcomponents referred to below. Your use of the source
Expand Down
4 changes: 2 additions & 2 deletions licenses/inlong-manager/licenses/LICENSE-spring-beans.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@

=======================================================================

SPRING FRAMEWORK 5.3.32 SUBCOMPONENTS:
SPRING FRAMEWORK 5.3.34 SUBCOMPONENTS:

Spring Framework 5.3.32 includes a number of subcomponents
Spring Framework 5.3.34 includes a number of subcomponents
with separate copyright notices and license terms. The product that
includes this file does not necessarily use all the open source
subcomponents referred to below. Your use of the source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@

=======================================================================

SPRING FRAMEWORK 5.3.32 SUBCOMPONENTS:
SPRING FRAMEWORK 5.3.34 SUBCOMPONENTS:

Spring Framework 5.3.32 includes a number of subcomponents
Spring Framework 5.3.34 includes a number of subcomponents
with separate copyright notices and license terms. The product that
includes this file does not necessarily use all the open source
subcomponents referred to below. Your use of the source
Expand Down
Loading