Skip to content

Commit

Permalink
Fix json extract error on empty columns
Browse files Browse the repository at this point in the history
  • Loading branch information
c-w committed Feb 8, 2018
1 parent 695f076 commit a2fa8ac
Showing 1 changed file with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import net.liftweb.json
import org.apache.spark.SparkContext

import scala.compat.java8.FunctionConverters._
import scala.util.{Failure, Success, Try}

@SerialVersionUID(100L)
class CassandraConfigurationManager extends ConfigurationManager with Serializable with Loggable {
Expand All @@ -34,17 +35,15 @@ class CassandraConfigurationManager extends ConfigurationManager with Serializab
implicit val formats = json.DefaultFormats

val trustedSources = connectorToTrustedSources.computeIfAbsent(pipeline, (fetchTrustedSources _).asJava)
val params = json.parse(stream.params_json).extract[Map[String, String]]

val params = Try(json.parse(stream.params_json).extract[Map[String, String]]) match {
case Failure(_) => Map[String, String]()
case Success(map) => map
}

ConnectorConfig(
stream.streamfactory,
params +
(
"trustedSources" -> trustedSources,
"streamId" -> stream.streamid
)
)

params + ("trustedSources" -> trustedSources, "streamId" -> stream.streamid))
}).toList
}

Expand All @@ -67,8 +66,15 @@ class CassandraConfigurationManager extends ConfigurationManager with Serializab
.flatMap(row => {
implicit val formats = json.DefaultFormats

(row.getString("lang_code"),
row.getString("topic")) :: json.parse(row.getString("translations_json")).extract[Map[String, String]].toList
val translations = (row.getStringOption("translations_json") match {
case None => Map[String, String]()
case Some(jsonString) if jsonString.equals("") => Map[String, String]()
case Some(jsonString) => Try(json.parse(jsonString).extract[Map[String, String]]) match {
case Failure(_) => Map[String, String]()
case Success(map) => map
}}).toList

(row.getString("lang_code"), row.getString("topic")) :: translations
})
.mapValues(List(_))
.reduceByKey(_ ::: _)
Expand All @@ -82,8 +88,16 @@ class CassandraConfigurationManager extends ConfigurationManager with Serializab
.map(row => {
implicit val formats = json.DefaultFormats

val conjunctivefilter = (row.getStringOption("conjunctivefilter_json") match {
case None => List[String]()
case Some(jsonString) if jsonString.equals("") => List[String]()
case Some(jsonString) => Try(json.parse(jsonString).extract[List[String]]) match {
case Failure(_) => List[String]()
case Success(list) => list
}}).toSet

BlacklistedItem(
json.parse(row.getString("conjunctivefilter_json")).extract[List[String]].toSet,
conjunctivefilter,
row.getBooleanOption("islocation").getOrElse(false))
})

Expand Down

0 comments on commit a2fa8ac

Please sign in to comment.