-
Notifications
You must be signed in to change notification settings - Fork 10
/
GeodeKafkaSinkTask.java
150 lines (131 loc) · 5.43 KB
/
GeodeKafkaSinkTask.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.kafka.sink;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.kafka.GeodeContext;
import org.apache.geode.kafka.Version;
/**
* TODO javaDoc
* Currently force 1 region per task
*/
public class GeodeKafkaSinkTask extends SinkTask {
private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSinkTask.class);
private GeodeContext geodeContext;
private Map<String, List<String>> topicToRegions;
private Map<String, Region<Object, Object>> regionNameToRegion;
private boolean nullValuesMeansRemove = true;
/**
* {@inheritDoc}
*/
@Override
public String version() {
return Version.getVersion();
}
@Override
public void start(Map<String, String> props) {
logger.info("Starting Apache Geode sink task");
try {
GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
configure(geodeConnectorConfig);
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getSecurityClientAuthInit(),
geodeConnectorConfig.getSecurityUserName(),
geodeConnectorConfig.getSecurityPassword(),
geodeConnectorConfig.usesSecurity());
regionNameToRegion = createProxyRegions(topicToRegions.values());
} catch (Exception e) {
throw new ConnectException("Unable to start sink task", e);
}
}
void configure(GeodeSinkConnectorConfig geodeConnectorConfig) {
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
topicToRegions = geodeConnectorConfig.getTopicToRegions();
nullValuesMeansRemove = geodeConnectorConfig.getNullValueBehavior();
}
// For tests only
void setRegionNameToRegion(Map<String, Region<Object, Object>> regionNameToRegion) {
this.regionNameToRegion = regionNameToRegion;
}
@Override
public void put(Collection<SinkRecord> records) {
put(records, new HashMap<>());
}
void put(Collection<SinkRecord> records, Map<String, BatchRecords> batchRecordsMap) {
// spin off a new thread to handle this operation? Downside is ordering and retries...
for (SinkRecord record : records) {
updateBatchForRegionByTopic(record, batchRecordsMap);
}
batchRecordsMap.forEach(
(region, batchRecords) -> batchRecords.executeOperations(regionNameToRegion.get(region)));
}
private void updateBatchForRegionByTopic(SinkRecord sinkRecord,
Map<String, BatchRecords> batchRecordsMap) {
Collection<String> regionsToUpdate = topicToRegions.get(sinkRecord.topic());
for (String region : regionsToUpdate) {
updateBatchRecordsForRecord(sinkRecord, batchRecordsMap, region);
}
}
private void updateBatchRecordsForRecord(SinkRecord record,
Map<String, BatchRecords> batchRecordsMap, String region) {
BatchRecords batchRecords = batchRecordsMap.get(region);
if (batchRecords == null) {
batchRecords = new BatchRecords();
batchRecordsMap.put(region, batchRecords);
}
if (record.key() != null) {
if (record.value() == null && nullValuesMeansRemove) {
batchRecords.addRemoveOperation(record);
} else {
batchRecords.addUpdateOperation(record, nullValuesMeansRemove);
}
} else {
// Invest in configurable auto key generator?
logger.warn("Unable to push to Geode, missing key in record : " + record.value());
}
}
private Map<String, Region<Object, Object>> createProxyRegions(
Collection<List<String>> regionNames) {
List<String> flat = regionNames.stream().flatMap(List::stream).collect(Collectors.toList());
return flat.stream().map(this::createProxyRegion)
.collect(Collectors.toMap(Region::getName, region -> region));
}
private Region<Object, Object> createProxyRegion(String regionName) {
try {
return geodeContext.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
.create(regionName);
} catch (RegionExistsException e) {
// Each task is a separate parallel task controlled by kafka.
return geodeContext.getClientCache().getRegion(regionName);
}
}
@Override
public void stop() {
logger.info("Stopping task");
geodeContext.close(false);
}
}