-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathClickJsonDeserializer.java
46 lines (40 loc) · 1.18 KB
/
ClickJsonDeserializer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package in.ankushs.sample.flink.deserializers;
import in.ankushs.sample.flink.domain.Click;
import in.ankushs.sample.flink.utils.Json;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import java.io.IOException;
import java.util.Objects;
/**
* Created by Ankush on 02/03/17.
*/
@Slf4j
public class ClickJsonDeserializer implements DeserializationSchema<Click> {
@Override
public Click deserialize(final byte[] bytes) throws IOException {
if(Objects.isNull(bytes)){
return null;
}
Click click = null;
try{
val json = new String(bytes);
click = Json.toObject(json, Click.class);
}
catch(final Exception ex){
log.error("", ex);
}
return click;
}
@Override
public boolean isEndOfStream(final Click nextElement) {
return false;
}
//NOTE THIS.
@Override
public TypeInformation<Click> getProducedType() {
return PojoTypeInfo.of(Click.class);
}
}