# Kernel Density Estimation

Kernel Density Estimate (KDE) is a statistical technique to estimate the probability density function of a random variable.
In this notebook, we employ KDE to visualize the distribution of tweets that contain a certan term over time.

## Import Dependencies

In [1]:
%AddDeps com.lucidworks.spark spark-solr 3.6.0 --transitive

Marking com.lucidworks.spark:spark-solr:3.6.0 for download
-> Failed to resolve org.restlet.jee:org.restlet:2.3.0
    -> not found: /tmp/toree-tmp-dir1787466831314273765/toree_add_deps/cache/org.restlet.jee/org.restlet/ivy-2.3.0.xml
    -> not found: https://repo1.maven.org/maven2/org/restlet/jee/org.restlet/2.3.0/org.restlet-2.3.0.pom
-> Failed to resolve org.restlet.jee:org.restlet.ext.servlet:2.3.0
    -> not found: /tmp/toree-tmp-dir1787466831314273765/toree_add_deps/cache/org.restlet.jee/org.restlet.ext.servlet/ivy-2.3.0.xml
    -> not found: https://repo1.maven.org/maven2/org/restlet/jee/org.restlet.ext.servlet/2.3.0/org.restlet.ext.servlet-2.3.0.pom
Obtained 388 files


Waiting for a Spark session to start...

Waiting for a Spark session to start...

## Query Solr

In [2]:
def updateTime(hour:Int, createdAt:String):Int = {
    var adjusted = hour
    createdAt match {
      case "Pacific Time (US & Canada)" => adjusted = shiftHours(hour, -8)
      case "Eastern Time (US & Canada)" => adjusted = shiftHours(hour, -5)
      case "Central Time (US & Canada)" => adjusted = shiftHours(hour, -5)
      case "Mountain Time (US & Canada)" => adjusted = shiftHours(hour, -6)
      case "Atlantic Time (Canada)" => adjusted = shiftHours(hour, -4)
    }
    adjusted
}

def timeZoneToInt(timeZone:String):Int = {
    var out = 6 // sunday

    if (timeZone contains "Mon") {
      out = 0
    } else if (timeZone contains "Tue") {
      out = 1
    } else if (timeZone contains "Wed") {
      out = 2
    } else if (timeZone contains "Thu") {
      out = 3
    } else if (timeZone contains "Fri") {
      out = 4
    } else if (timeZone contains "Sat") {
      out = 5
    }
    out
}

def shiftHours(hour:Int, shift:Int):Int = {
    var adjusted = hour + shift
    if (adjusted >= 24) {
      adjusted %= 24
    } else if (adjusted < 0) {
      adjusted += 24
    }
    adjusted
}

updateTime: (hour: Int, createdAt: String)Int
timeZoneToInt: (timeZone: String)Int
shiftHours: (hour: Int, shift: Int)Int


First we find the tweets that contain the term TERM, which are created in Canada or USA. We accumulate the tweets over a certain time period, MODE (e.g: day or hour).

In [5]:
import com.lucidworks.spark.rdd.SelectSolrRDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.stat.KernelDensity
import play.api.libs.json._

// Solr's ZooKeeper URL
val SOLR = "192.168.1.111:9983"

// The Solr collection
val INDEX = "mb13"

// The Solr query
val MODE = "day"  // day OR hour
val TERM = "church"
val QUERY = "contents:$TERM"

val timeRegex = raw"([0-9]+):([0-9]+):([0-9]+)".r

val rdd = new SelectSolrRDD(SOLR, INDEX, sc)
.rows(10000)
.query(QUERY)
.flatMap(doc => {
    val parsedJson = Json.parse(doc.get("raw").toString)
    var out:List[Tuple3[Int, Double, Int]] = List()

    try {
        val timeZone:String = (parsedJson \ "user" \ "time_zone").as[String]
        if ((timeZone contains "Canada") || (timeZone contains "US")) {
            val time = (parsedJson \ "created_at").as[String]
            val matches = timeRegex.findFirstMatchIn(time)
            val hour = updateTime(matches.get.group(1).toInt, timeZone)
            val week = timeZoneToInt(time)
            val min = matches.get.group(2).toDouble
            out = if (MODE == "day") List((week, hour/24, 1)) else List((hour, min/60, 1))        
            }
        } catch {
          case e : Exception => println("unable to parse the tweet", e)
        }
        out
      }).persist()

SOLR = 192.168.1.111:9983
INDEX = mb13
MODE = day
TERM = church
QUERY = contents:$TERM
timeRegex = ([0-9]+):([0-9]+):([0-9]+)
rdd = MapPartitionsRDD[13] at flatMap at <console>:74


MapPartitionsRDD[13] at flatMap at <console>:74

## Compute KDE

In [6]:
val counts = rdd.map(item => (item._1, item._3)).reduceByKey(_+_).sortByKey().collect().toMap

val kdeData = rdd.map(item => item._1.toInt.toDouble + item._2)

val kd = if (MODE == "day") new KernelDensity().setSample(kdeData).setBandwidth(1.0) else new KernelDensity().setSample(kdeData).setBandwidth(2.0)
val domain = if (MODE ==  "day") (0 to 6).toArray else (0 to 23).toArray

val densities = kd.estimate(domain.map(_.toDouble))

println(s"counts / density per $MODE for $TERM")
domain.foreach(x => {
    println(s"$x ( ${counts(x)} ) -- ${densities(x)}")
})

counts / density per day for church
0 ( 3629101 ) -- 0.09827479434429007
1 ( 3586485 ) -- 0.13149417467469857
2 ( 3553899 ) -- 0.1386817583600064
3 ( 3517636 ) -- 0.14163367121194861
4 ( 3921588 ) -- 0.14477533633158732
5 ( 3673112 ) -- 0.13782699122788605
6 ( 3797445 ) -- 0.10247753456187696


counts = Map(0 -> 3629101, 5 -> 3673112, 1 -> 3586485, 6 -> 3797445, 2 -> 3553899, 3 -> 3517636, 4 -> 3921588)
kdeData = MapPartitionsRDD[19] at map at <console>:60
kd = org.apache.spark.mllib.stat.KernelDensity@18288e06
domain = Array(0, 1, 2, 3, 4, 5, 6)
densities = Array(0.09827479434429007, 0.13149417467469857, 0.1386817583600064, 0.14163367121194861, 0.14477533633158732, 0.13782699122788605, 0.10247753456187696)


Array(0.09827479434429007, 0.13149417467469857, 0.1386817583600064, 0.14163367121194861, 0.14477533633158732, 0.13782699122788605, 0.10247753456187696)

## Generate Graph