/
TwitterAdapter.scala
87 lines (77 loc) · 3.04 KB
/
TwitterAdapter.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ibm.cds.spark.samples
import java.io.ObjectInputStream
import java.io.ByteArrayInputStream
import scala.util.parsing.json.JSON
import org.apache.kafka.common.serialization.Deserializer
import twitter4j.Status
/**
* @author dtaieb
* Deserialization adapters for Twitter4J Status
*/
case class StatusAdapter(userName:String, userId: String, userLang: String,createdAt:String,text:String, long:Double, lat:Double);
object StatusAdapter{
implicit def statusAdapterWrapper(status: Status) =
StatusAdapter(
status.getUser.getName,
status.getUser.getScreenName,
status.getUser.getLang,
status.getCreatedAt.toString,
status.getText,
Option(status.getGeoLocation).map{ _.getLongitude}.getOrElse(0.0),
Option(status.getGeoLocation).map{ _.getLatitude}.getOrElse(0.0)
)
}
class StatusDeserializer extends Deserializer[StatusAdapter]{
def configure( props: java.util.Map[String, _], isKey: Boolean) = {
}
def close(){
}
def deserialize(topic: String, data: Array[Byte] ): StatusAdapter = {
try{
val bais = new ByteArrayInputStream( data )
var ois:ObjectInputStream = null
try{
ois = new ObjectInputStream( bais )
ois.readObject().asInstanceOf[Status]
}finally{
if (bais != null ){
bais.close
}
if ( ois != null ){
ois.close
}
}
}catch{
case e:Throwable=>{
val jsonObject = JSON.parseFull( new String(data) ).getOrElse(Map.empty).asInstanceOf[Map[String, Any]]
val user=jsonObject.get("user").getOrElse( Map.empty ).asInstanceOf[Map[String,Any]]
val geo = Option(jsonObject.get("geo").orNull).getOrElse(Map.empty).asInstanceOf[Map[String,Any]]
StatusAdapter(
user.get("name").getOrElse("").asInstanceOf[String],
user.get("userid").getOrElse("").asInstanceOf[String],
user.get("lang").getOrElse("").asInstanceOf[String],
jsonObject.get("created_at").getOrElse("").asInstanceOf[String],
jsonObject.get("text").getOrElse("").asInstanceOf[String],
geo.get("long").getOrElse(0.0).asInstanceOf[Double],
geo.get("lat").getOrElse(0.0).asInstanceOf[Double]
)
}
}
}
}