Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Commit

Permalink
Merge pull request #134 from CatalystCode/terms-only-once
Browse files Browse the repository at this point in the history
Dedupe locations/keywords/etc before aggregation
  • Loading branch information
c-w committed Sep 11, 2017
2 parents cfc6736 + a2aeb7b commit e015063
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 8 deletions.
Expand Up @@ -17,7 +17,13 @@ object CassandraTest {
case class TestFortisEvent(
details: Details,
analysis: Analysis
) extends FortisEvent
) extends FortisEvent {
override def copy(analysis: Analysis = null): FortisEvent = {
TestFortisEvent(
details = details,
analysis = Option(analysis).getOrElse(this.analysis))
}
}

case class TestFortisDetails(
sourceeventid: String,
Expand Down
Expand Up @@ -5,7 +5,13 @@ import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, Details, Fortis
case class ExtendedFortisEvent[T](
details: ExtendedDetails[T],
analysis: Analysis
) extends FortisEvent
) extends FortisEvent {
override def copy(analysis: Analysis) = {
ExtendedFortisEvent[T](
details = details,
analysis = Option(analysis).getOrElse(this.analysis))
}
}

case class ExtendedDetails[T](
eventid: String,
Expand Down
Expand Up @@ -5,6 +5,8 @@ import java.util.Objects
trait FortisEvent {
val details: Details
val analysis: Analysis

def copy(analysis: Analysis = null): FortisEvent
}

trait Details {
Expand Down
Expand Up @@ -188,7 +188,7 @@ object Utils {
Ordering.comparatorToOrdering(Collator.getInstance(new Locale(langcode.getOrElse(langcode.getOrElse(DefaultPrimaryLanguage))))).compare(a,b) < 0 && a != ""
}

(sortedCombo(0), sortedCombo(1), sortedCombo(2))
(sortedCombo.head, sortedCombo(1), sortedCombo(2))
})
case None => Seq()
}
Expand Down
Expand Up @@ -27,7 +27,13 @@ object CassandraEventsSink extends Loggable {
def apply(dstream: DStream[FortisEvent], sparkSession: SparkSession, configurationManager: ConfigurationManager): Unit = {
implicit lazy val connector: CassandraConnector = CassandraConnector(sparkSession.sparkContext)

dstream.foreachRDD { (eventsRDD, time: Time) => {
dstream
.map(event => event.copy(analysis = event.analysis.copy(
keywords = event.analysis.keywords.distinct,
locations = event.analysis.locations.distinct,
entities = event.analysis.entities.distinct
)))
.foreachRDD { (eventsRDD, _: Time) => {
Timer.time(Telemetry.logSinkPhase("all", _, _, -1)) {
Timer.time(Telemetry.logSinkPhase("eventsRDD.cache", _, _, -1)) {
eventsRDD.cache()
Expand Down Expand Up @@ -62,13 +68,12 @@ object CassandraEventsSink extends Loggable {

offlineAggregators.foreach(aggregator => {
val aggregatorName = aggregator.getClass.getSimpleName
Timer.time(Telemetry.logSinkPhase(s"offlineAggregators.${aggregatorName}", _, _, -1)) {
Timer.time(Telemetry.logSinkPhase(s"offlineAggregators.$aggregatorName", _, _, -1)) {
try {
aggregator.aggregateAndSave(fortisEventsRDD, KeyspaceName)
} catch {
case e: Exception => {
logError(s"Failed performing offline aggregation ${aggregatorName}", e)
}
case e: Exception =>
logError(s"Failed performing offline aggregation $aggregatorName", e)
}
}
})
Expand Down
Expand Up @@ -63,4 +63,16 @@ class KeywordExtractorSpec extends FlatSpec {
val matches = extractor.extractKeywords("Testing{testing}123").map(_.name)
assert(matches.head == keywords.head)
}

it should "find keywords many times" in {
val keywords = List(
"{testing}"
)

val extractor = new KeywordExtractor(keywords)

val matches = extractor.extractKeywords("Testing{testing}12{testing}3").map(_.name)
assert(matches.length == 2)
assert(matches.forall(term => term == "{testing}"))
}
}

0 comments on commit e015063

Please sign in to comment.