-
Notifications
You must be signed in to change notification settings - Fork 42
/
HttpTableLookupFunction.java
111 lines (88 loc) · 3.43 KB
/
HttpTableLookupFunction.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package com.getindata.connectors.http.internal.table.lookup;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import com.getindata.connectors.http.internal.PollingClient;
import com.getindata.connectors.http.internal.PollingClientFactory;
@Slf4j
public class HttpTableLookupFunction extends TableFunction<RowData> {
private final PollingClientFactory<RowData> pollingClientFactory;
private final DeserializationSchema<RowData> schemaDecoder;
@Getter
private final ColumnData columnData;
@Getter
private final HttpLookupConfig options;
private transient AtomicInteger localHttpCallCounter;
private transient PollingClient<RowData> client;
@Builder
private HttpTableLookupFunction(
PollingClientFactory<RowData> pollingClientFactory,
DeserializationSchema<RowData> schemaDecoder,
ColumnData columnData,
HttpLookupConfig options) {
this.pollingClientFactory = pollingClientFactory;
this.schemaDecoder = schemaDecoder;
this.columnData = columnData;
this.options = options;
}
@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
this.localHttpCallCounter = new AtomicInteger(0);
this.client = pollingClientFactory.createPollClient(options, schemaDecoder);
context
.getMetricGroup()
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
}
/**
* This is a lookup method which is called by Flink framework in a runtime.
*/
public void eval(Object... keys) {
lookupByKeys(keys)
.ifPresent(this::collect);
}
public Optional<RowData> lookupByKeys(Object[] keys) {
RowData keyRow = GenericRowData.of(keys);
log.debug("Used Keys - {}", keyRow);
// TODO Implement transient Cache here
List<LookupArg> lookupArgs = new ArrayList<>(keys.length);
for (int i = 0; i < keys.length; i++) {
LookupArg lookupArg = processKey(columnData.getKeyNames()[i], keys[i]);
lookupArgs.add(lookupArg);
}
localHttpCallCounter.incrementAndGet();
return client.pull(lookupArgs);
}
// TODO implement all Flink Types here.
private LookupArg processKey(String keyName, Object key) {
String keyValue;
if (!(key instanceof BinaryStringData)) {
log.warn(
"Unsupported Key Type {}. Trying simple toString(), wish me luck...",
key.getClass());
}
keyValue = key.toString();
return new LookupArg(keyName, keyValue);
}
// TODOESP-148 DO I need this??
@Data
@Builder
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public static class ColumnData implements Serializable {
private final String[] keyNames;
}
}