Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Commit

Permalink
Dedupe locations/keywords/etc before aggregation
Browse files Browse the repository at this point in the history
In the dashboard we want to show the number of events that match a
particular location or keyword, not the number of times that the term
was matched in the event.
  • Loading branch information
c-w committed Sep 11, 2017
1 parent 0754ffd commit a2aeb7b
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 3 deletions.
Expand Up @@ -17,7 +17,13 @@ object CassandraTest {
case class TestFortisEvent(
details: Details,
analysis: Analysis
) extends FortisEvent
) extends FortisEvent {
override def copy(analysis: Analysis = null): FortisEvent = {
TestFortisEvent(
details = details,
analysis = Option(analysis).getOrElse(this.analysis))
}
}

case class TestFortisDetails(
sourceeventid: String,
Expand Down
Expand Up @@ -5,7 +5,13 @@ import com.microsoft.partnercatalyst.fortis.spark.dto.{Analysis, Details, Fortis
case class ExtendedFortisEvent[T](
details: ExtendedDetails[T],
analysis: Analysis
) extends FortisEvent
) extends FortisEvent {
override def copy(analysis: Analysis) = {
ExtendedFortisEvent[T](
details = details,
analysis = Option(analysis).getOrElse(this.analysis))
}
}

case class ExtendedDetails[T](
eventid: String,
Expand Down
Expand Up @@ -5,6 +5,8 @@ import java.util.Objects
trait FortisEvent {
val details: Details
val analysis: Analysis

def copy(analysis: Analysis = null): FortisEvent
}

trait Details {
Expand Down
Expand Up @@ -27,7 +27,13 @@ object CassandraEventsSink extends Loggable {
def apply(dstream: DStream[FortisEvent], sparkSession: SparkSession, configurationManager: ConfigurationManager): Unit = {
implicit lazy val connector: CassandraConnector = CassandraConnector(sparkSession.sparkContext)

dstream.foreachRDD { (eventsRDD, _: Time) => {
dstream
.map(event => event.copy(analysis = event.analysis.copy(
keywords = event.analysis.keywords.distinct,
locations = event.analysis.locations.distinct,
entities = event.analysis.entities.distinct
)))
.foreachRDD { (eventsRDD, _: Time) => {
Timer.time(Telemetry.logSinkPhase("all", _, _, -1)) {
Timer.time(Telemetry.logSinkPhase("eventsRDD.cache", _, _, -1)) {
eventsRDD.cache()
Expand Down
Expand Up @@ -63,4 +63,16 @@ class KeywordExtractorSpec extends FlatSpec {
val matches = extractor.extractKeywords("Testing{testing}123").map(_.name)
assert(matches.head == keywords.head)
}

it should "find keywords many times" in {
val keywords = List(
"{testing}"
)

val extractor = new KeywordExtractor(keywords)

val matches = extractor.extractKeywords("Testing{testing}12{testing}3").map(_.name)
assert(matches.length == 2)
assert(matches.forall(term => term == "{testing}"))
}
}

0 comments on commit a2aeb7b

Please sign in to comment.