/
logProcessor.java
121 lines (72 loc) · 3.52 KB
/
logProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package com.bigData;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import java.io.IOException;
import java.util.LinkedList;
/**
* Created by DELL on 16-Jan-19.
*/
public class logProcessor {
public static void main(String[] args) {
configuration config = new configuration();
//System.out.println(config.getConfig().getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
StreamsBuilder builder = new StreamsBuilder();
Utils utils = new Utils();
KStream<String,JsonNode> input = builder.stream(config.getInputTopicName(), Consumed.with(Serdes.String(), utils.jsonSerde));
KTable<String,streamList> table = input.map((k, v) ->
{
streamList newClass = new streamList();
String event_id = v.get("event_id").asText();
int val = utils.map(event_id);
int offset = v.get("offset").asInt();
newClass.l1 = new LinkedList<Integer>();
newClass.l1.add(val);
newClass.offset = offset;
return new KeyValue<String, streamList>(k, newClass);
}
)
.groupByKey(Serialized.with(Serdes.String(), utils.getstreamListSerde()))
.reduce((x, y) ->
{
x.l1.addAll(y.l1);
x.offset = y.offset;
int sizeCounter = x.l1.size();
return x;
}
);
KStream<String,JsonNode> prediction = table.toStream()
.map( (k,v) ->
{
ObjectNode node = JsonNodeFactory.instance.objectNode();
node = utils.createArray(node);
try {
node = utils.addToArray(node,v.l1);
} catch (IOException e) {
e.printStackTrace();
}
ObjectNode predictionJson = JsonNodeFactory.instance.objectNode();
String pred = "12";
int[] buffer = utils.toIntArray(v.l1);
predictionJson = Predict.prediction(buffer,k);
//pred = Predict.prediction();
node.put("key",k);
node.put("offset",v.offset);
node.put("prediction",predictionJson.get("predictions"));
return new KeyValue<String, JsonNode>(k, node);
}
);
prediction.to(config.getOutputTopicName(), Produced.with(Serdes.String(), utils.getJsonSerde()));
prediction.peek((k, v) -> {
System.out.println("key :" + k + " value: " + v);
}
);
KafkaStreams streams = new KafkaStreams(builder.build(), config.getConfig());
streams.start();
}
}