## Geo-located Clustering and Sequence Mining with Spark, Cassandra and Scala

Today, geo-located data is available in a number of domains, ranging from healthcare to financial markets, to social services. In all these domains, extracting patterns and detecting clusters and sequences of events from data has very concrete business outcomes. 

Cluster analysis on geo-located events is a key step for understanding user's behavior and it is essential for a more personalized customer experience, but also to prevent fraud and cyber attacks. By extracting sequences of events, we can also determine how various venues, products and services are connected, and provide a very detailed analysis about the popularity of certain items, which conversely yields to better user recommendations and richer and more meaningful UI interactions.

As more data gets ingested/produced via digital services, it’s key to perform this sort of analytics at scale. In the open source space, technologies such as Spark and Cassandra are definitely instrumental to implement and execute modern data pipelines at scale.


#### Synopsis

In this Oriole, I will explore with you a number of techniques for understanding users' behavior: We will look at process mining, to understand the _sequence_ of events registered by users. And apply graph analytics to detect the most popular venues in town. Also we will look at how to sketch a geo-fencing, location-based alert service, by applying the DBSCAN clustering algorithm to the geo-located events. 

We will follow the following steps

  - Data extraction from Cassandra
  - Process Mining using Spark RDDs
  - Graph Analytics with Spark GraphFrames
  - Clustering for Geo-Located Data with DBSCAN
  
If you are interested in a general overview of descriptive analytics, histograms, and pattern detection, please have a look at this companion Oriole notebook "Anomaly detection and pattern extraction with Spark, Cassandra and Scala", also available on Safari.


#### References and datasets

For this analysis, we are going to use the Gowalla Dataset [1]. The Gowalla dataset consists of a table of events, registered by anonymized users. Each event registers a user checking into a geolocated venue at a specific timestamp. The dataset is available at https://snap.stanford.edu/data/loc-gowalla.html

A number of venues in this demo have been tagged with an actual name. Thanks to the https://code.google.com/archive/p/locrec/ project (now archived). The project is being developed in the context of the SInteliGIS project financed by the Portuguese Foundation for Science and Technology (FCT) through project grant PTDC/EIA-EIA/109840/2009.

[1] E. Cho, S. A. Myers, J. Leskovec. Friendship and Mobility: Friendship and Mobility: User Movement in Location-Based Social Networks ACM SIGKDD International Conference on Knowledge Discovery and Data Mining (KDD), 2011.



#### Setup

This notebook is running scala code and interfaces to a Spark cluster using the [Apache Toree](https://toree.incubator.apache.org/) project. Furthermore, Spark reads the data from Cassandra tables. Spark interfaces to Cassandra via the [Cassandra-Spark connector](https://github.com/datastax/spark-cassandra-connector). 

At the time of compiling this notebook, Spark 1.6.1 and Cassandra 3.5 were used. Here below the command to install the Spark - Scala Kernel on Jupiter. More instructions on this topic are available on Apache Toree [website](https://toree.incubator.apache.org/) and [github pages](https://github.com/apache/incubator-toree).

This particular Oriole is special in many ways. For instance, as you can see above from the Toree Kernel configuration, next to the ability to connect to Cassandra, it adds (GraphFrames)[https://graphframes.github.io/] as an extra custom Spark package to the mix. 

```
sudo jupyter-toree install --spark_home=${SPARK_HOME} 
--spark_opts='--packages com.datastax.spark:spark-cassandra
-connector_2.10:1.6.0,graphframes:graphframes:0.1.0-spark1.6 
--conf spark.cassandra.connection.host=localhost '
```

_Please allow a few seconds for the next cell to complete.  
It will start a single-node Spark context on the Oriole container, and connect it to the Oriole scala code cells._

In [1]:
// Scala version
sc.version

2.1.0

##### Quick Scala Spark Tests

In [2]:
val NUM_SAMPLES = 1000
val count = sc.parallelize(1 to NUM_SAMPLES).filter { _ =>
  val x = math.random
  val y = math.random
  x*x + y*y < 1
}.count()
println(s"Pi is roughly ${4.0 * count / NUM_SAMPLES}")

Pi is roughly 3.188


### Connecting to Cassandra

In [3]:
// spark-cassandra connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._

In [4]:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

#### SQL queries with Spark and Cassandra

Cassandra is exposed via a SQL context, so there is not need to learn a separate syntax as Spark will map the query to the available features of the underlying storage system. See below a simple query accessing the name and the id of venues from a cassandra table. Also remember that sql statements are _staged_ but not _executed_ until some actual [actions](http://spark.apache.org/docs/latest/programming-guide.html#actions) needs to be computed. Examples of actions are for instance, **count**(), **first**(), **collect**().

#### Dataset: Venues

Let's first have a quick look at the venues. They are stored in the cassandra table `lbsn.venues`. 

In [5]:
val venues = spark.
               read.
               format("org.apache.spark.sql.cassandra").
               options(Map( "table" -> "venues", "keyspace" -> "lbsn" )).
               load()

Let's start by counting the number of venues, and selecting vid `12525`, to get a feeling about how Spark works and getting some facts about the dataset.
Feel free to modify the below cells to gain insight on the venue data. Try for instance the `take()` and the `show()` spark dataframe methods.

In [6]:
venues.count()

17291

In [7]:
venues.where("vid = 12525").first()

[12525,40.7612551699,-73.977579698,The Museum of Modern Art (MoMA)]

#### Executing queries

Spark and Cassadra work together when it comes to executing queries. If the query can be executed directly on the database, Spark will offload the query to Cassandra. However, not all queries can be fully performed on cassandra, and that's where the combination Spark-Cassandra gets really handy. For instance, when executing joins, Spark will partition and plan the query _pushing down_ what can be done in Cassandra and perform in Spark the rest of the query. 

More information can be found on Cassandra Documentation about [using Spark SQL to query data](http://docs.datastax.com/en/datastax_enterprise/5.0/datastax_enterprise/spark/sparkSqlOverview.html) or on the [Cassandra Spark Connector](https://github.com/datastax/spark-cassandra-connector) pages.

#### Joining Cassandra tables with Spark

The query here below filters out those events which were registered in the New York City area. As filtering in cassandra cannot by done directly (lat/lon columns which are not indexed in this example), this specific query will first move the data form Cassandra to Spark, and then will perform the filtering in Spark. 

In general, it's a good practice to push down and filter as much data as early as possible. This practice keeps the throughput low and minimize the data transfered from one system to the other.

#### Dataset: Events

Let's first have a quick look at the events. They are stored in the cassandra table `lbsn.events`. 

In [8]:
spark.sql("DROP VIEW IF EXISTS events")

val createDDL = """
     CREATE TEMPORARY VIEW events
     USING org.apache.spark.sql.cassandra
     OPTIONS (
     table "events",
     keyspace "lbsn",
     pushdown "true")"""

// Creates Catalog Entry registering an existing Cassandra Table
spark.sql(createDDL);

[]

In [9]:
val events   = spark.sql("""select ts, uid, lat, lon, vid from events where
                            lon>-74.2589 and lon<-73.7004 and 
                            lat> 40.4774 and lat< 40.9176
                      """).as("events").orderBy("uid", "ts").repartition(8, $"uid").cache()

In [10]:
events.printSchema()

root
 |-- ts: timestamp (nullable = true)
 |-- uid: long (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- vid: long (nullable = true)



As you can see here above, Spark dataframe can extract the schema from the Cassandra/Spark SQL query. In this particular case, they mapping of cassandra types to Spark Dataframe types is performed by the [Cassandra-Spark connector](https://github.com/datastax/spark-cassandra-connector). Each event consists in a timestamp, the user and venue id and the location of the event (latitude and longitude).


#### Explore the events data

Let's start by looking at some recorded events. A quick way to do so is with the `show()` Dataframe spark method.
Feel free to modify the below cells to gain insight on the venue data. Try for instance the `take()`, `show()`, `count()` spark dataframe methods.

In [11]:
events.show(5)

+--------------------+---+-------------+------------------+------+
|                  ts|uid|          lat|               lon|   vid|
+--------------------+---+-------------+------------------+------+
|2010-04-18 21:23:...| 88| 40.645919692|    -73.7779355049| 17417|
|2010-04-18 22:33:...| 88|40.7628367545|    -73.9825987816|150676|
|2010-04-18 23:38:...| 88|    40.763479|-73.97908000000002|972311|
|2010-04-19 12:04:...| 88|40.7417466987|     -73.993421425|105068|
|2010-04-19 13:53:...| 88|40.7422661063|-73.98375749590001|851661|
+--------------------+---+-------------+------------------+------+
only showing top 5 rows



Before diving into anomaly detection of geo-located data, let's perform some more basic queries.  
Herebelow, it is shown how to count events registered by user `uid=0`.

In [12]:
// User 0: how many check-ins?

events.where("uid=0").count()

25

### Process mining with Spark RDDs

The first step, in order to do process mining, is to collect sequences of events. In particular, the following code will take chronologically consecutive events and bundle them in pairs for a specific user. These pairs consists of two venue ids, namely source and destination, defining where each user is coming from, going to respectively. 

The steps in the following code are:

  - Convert the DataFrame to an RDD
  - Select uid as the key for the PairRDD
  - Reshape the PairRDD from "tall" to "wide"
  - Sort chronologically all the checked-in venues for each user
  - Extract pairs from each sequence of checked-in venues per user
  - Reshape the PairRDD from "wide" to "tall" again
  - Convert back the PairRDD to a DataFrame

In [13]:
// process mining
val g_df = events.
  select($"ts", $"uid", $"vid").
  rdd.
  map(row => (row.getLong(1), List( (row.getTimestamp(0), row.getLong(2)) ))).
  reduceByKey(_ ++ _).
  mapValues( x =>
    x.sortWith(_._1.getTime < _._1.getTime).
      map(_._2)
  ).
  mapValues(_.sliding(2).toList).
  flatMap(_._2).
  map(
    _ match {
      case List(a, b) => Some((a, b))
      case _ => None
  }).
  flatMap(x => x).
  toDF("src", "dst").
  repartition(8,$"src").
  cache()

This newly created DataFrame is used to create a graph, where the nodes are the venues and the edges are connections of users checking-in from the one venue to the next. Let's do a bit of pruning on this graph by removing self-referencing nodes, and by filterning out nodes which are not connected at all, as shown below in the following two cells.

In [14]:
val edges_df = g_df.
  groupBy($"src",$"dst").
  count().
  select($"src",$"dst").
  filter($"src" !== $"dst")

edges_df.show(5)

+------+------+
|   src|   dst|
+------+------+
|620588|636683|
|226768|664731|
|664731|101538|
|167249|874897|
|226768|176668|
+------+------+
only showing top 5 rows



In [15]:
val nodes_df = edges_df.
  select($"src").
  unionAll(edges_df.select($"dst")).
  distinct().
  toDF("id")

nodes_df.show(5)

+-------+
|     id|
+-------+
|1046872|
| 578977|
| 216302|
|  11745|
| 740603|
+-------+
only showing top 5 rows



### Venues graph

Congratulation! You have just constructed a graph, where venues are connected to other venues according to sequence of events recorded by users. Let's have a quick look at the size of this graph. By the way for those of you new to scala, you can embedd code in a scala string using the `${ ... }` construct as shown below.

In [16]:
println(s"# nodes (venues)      = ${nodes_df.count()}")
println(s"# edges (connections) = ${edges_df.count()}")

                                                                                # nodes (venues)      = 17220
# edges (connections) = 87279


You can convert two Spark DataFrames, one for the nodes and the other for the edges as a GraphFrame g, as shown here below.

In [17]:
import org.graphframes.GraphFrame

In [18]:
val v = nodes_df.orderBy("id")
val e = edges_df.orderBy("src", "dst")

val g = GraphFrame(nodes_df, edges_df)

#### Graph: Node degree analysis

Let's start a basic graph analysis looking first at in-degree. The in-degree is the property which counts the number of incoming edges for each node of the graph. We can use in-degree to sort which venues are receiving most incoming connections from other venues. This is one of the easiest ways to see which venues are popular. Let's have a look at which nodes/venues in the graph have at least 50 incoming connections, and let's display this list.

In [19]:
import org.apache.spark.sql.functions.desc
val vertices_indeg = g.inDegrees.filter($"inDegree">50).sort(desc("inDegree"))

In [20]:
val popular_venues = vertices_indeg.
    join(venues, vertices_indeg("id") === venues("vid"), "inner").
    sort($"inDegree".desc).
    select("vid", "inDegree", "name")
    
println(s"Top in-degree nodes: ${ popular_venues.count()}")
popular_venues.show(10, false)

                                                                                Top in-degree nodes: 111
+------+--------+---------------------------------+
|vid   |inDegree|name                             |
+------+--------+---------------------------------+
|12505 |657     |LGA LaGuardia Airport            |
|23261 |600     |JFK John F. Kennedy International|
|11844 |580     |Times Square                     |
|13022 |442     |Grand Central Terminal           |
|24963 |352     |EWR Newark Liberty International |
|11875 |278     |Madison Square Garden            |
|12525 |273     |The Museum of Modern Art (MoMA)  |
|106840|247     |Union Square                     |
|11834 |234     |Bryant Park                      |
|11720 |232     |Yankee Stadium                   |
+------+--------+---------------------------------+
only showing top 10 rows



Most transportation hubs would probably will be the most pospular according to this metric. But what about other way of measuring the popularity of a venue?

#### Graph filtering

Before moving on to page rank let's make the graph smaller. For the sake of performance and fast execution of this next cells, we are limiting the analysis to only those venues with at least 50 incoming connections (in-degree).

In [21]:
val vertices  = vertices_indeg.select("id")

val edges_src = edges_df.join(vertices, vertices("id")  === edges_df("src"), "inner").select("src", "dst")
val edges     = edges_src.join(vertices, vertices("id") === edges_src("dst"), "inner").select("src", "dst")

println(s"graph: vertices=${vertices.count()} edges=${edges.count()}")

graph: vertices=111 edges=2027


#### Graph and Page Rank

The above node and edge DataFrames describes how users are moving from venue to venue. We can now calculate which venues attract more users. This can be done using the page rank algorithm. The PageRank algorithm outputs a probability distribution used to represent the likelihood that a person randomly walking in the city will arrive at any particular venue. This analysis can be executed in Spark using [GraphFrames](http://graphframes.github.io/). GraphFrames is a package for Apache Spark which provides DataFrame-based graph analytics, including the PageRank algorithm.

In [22]:
val v = vertices.orderBy("id").repartition(8, $"id").cache()
val e = edges.orderBy("src", "dst").repartition(8, $"src", $"dst").cache()

In [23]:
val g = GraphFrame(v, e)
val results = g.pageRank.resetProbability(0.05).maxIter(10).run()

In [24]:
val result_pr = results.vertices.select("id", "pagerank")
val popular_venues = result_pr.
    join(venues, result_pr("id") === venues("vid"), "inner").
    select("vid", "pagerank", "name")

popular_venues.sort($"pagerank".desc).show(13, false)

+------+------------------+---------------------------------+
|vid   |pagerank          |name                             |
+------+------------------+---------------------------------+
|12505 |47.025529046618324|LGA LaGuardia Airport            |
|23261 |43.786106629521555|JFK John F. Kennedy International|
|11844 |38.66027839728225 |Times Square                     |
|13022 |30.332699976352583|Grand Central Terminal           |
|24963 |27.040978475705277|EWR Newark Liberty International |
|11875 |19.71505524245435 |Madison Square Garden            |
|12525 |18.03550609957437 |The Museum of Modern Art (MoMA)  |
|11720 |17.15123666057204 |Yankee Stadium                   |
|106840|17.08807261065496 |Union Square                     |
|11834 |16.021149068906222|Bryant Park                      |
|12313 |14.530252724392344|Empire State Building            |
|17710 |12.700090614679826|Madison Square Park              |
|14151 |12.570035992405163|Rockefeller Center               |
+------+

As shown above, this algorithm provides a "popularity" factor for each checked-in venue. This feature can be used to further discriminate anomalies based on the rank of the venue, for instance combining it with the probability of checking in a specific time of the day.

### Geo-Location: density based

We will now cluster events based on the [DBSCAN algorithm](https://en.wikipedia.org/wiki/DBSCAN). DBSCAN is clustering events depending on the density of the events provided. Since the clusters emerge locally by looking for neighboring points, clusters of various shapes can be detected. Points that are isolated and too far from any other point are assigned to a special cluster of outliers. These discerning properties make the DBSCAN algorithm a good candidate for clustering geolocated events.

Let's prepare the data by transforming the events DataFrame, into a PairRDD. In particular, for geolocated data, we choose the key to be the user identifier, and the value to be the aggregated list of all check-ins posted by that given user. The geolocated data is arranged in a n-by-2 matrix, where the first column represents the latitude and the second column the longitude. 

In [25]:
val top_users = events.
    groupBy($"uid").
    count().
    filter($"count" > 1000).
    alias("top_users")
    
top_users.sort(desc("count")).show()

+----+-----+
| uid|count|
+----+-----+
| 578| 1641|
|  22| 1290|
| 842| 1145|
|4985| 1063|
+----+-----+



In [26]:
val top_events = events.
    join(top_users, top_users("uid")  === events("uid"), "inner").
    drop(events("uid"))

top_events.show(5, false)

+---------------------+-------------+------------------+------+----+-----+
|ts                   |lat          |lon               |vid   |uid |count|
+---------------------+-------------+------------------+------+----+-----+
|2009-12-30 20:23:39.0|40.7663583   |-73.9833973667    |218064|4985|1063 |
|2009-12-30 20:23:53.0|40.7665252686|-73.9829705584    |37819 |4985|1063 |
|2009-12-30 20:24:02.0|40.76665848  |-73.98390340000002|269570|4985|1063 |
|2009-12-30 20:24:26.0|40.7671166667|-73.9823264       |127312|4985|1063 |
|2009-12-30 20:24:35.0|40.7673913454|-73.9814347029    |110749|4985|1063 |
+---------------------+-------------+------------------+------+----+-----+
only showing top 5 rows



#### From long to wide

What would like to do now, as a preparation for the DBSCAN clustering algorithm is to take all events registered by each user and store them as a single array of tuples. One way to do that is to create a key-value RDD (check http://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs), where the key is the user id and the value is an array of co-ordinates tuples. How to go wide? we concatenate vectors using the `reduceByKey` rdd function, in this way we can ridistributed the data in a format which works well for our DBSCAN algorithm.


In [27]:
import breeze.linalg._
import breeze.linalg.DenseMatrix

In [28]:
val e_df = top_events.
  select("uid","lat","lon").
  rdd.map(row => (row.getLong(0), Array(row.getDouble(1), row.getDouble(2))) ).
  reduceByKey( _ ++ _).
  mapValues(v => new DenseMatrix(v.length/2,2,v, 0, 2, true))


What we have just created is a complex data structure, for debug and visualization, let's create a well formatted printing utility

In [29]:
def formatUserEvents(x: Tuple2[Long, DenseMatrix[Double]]) : Unit = {
    val arr = x._2
    val n = math.min( 5 , arr.rows) - 1
    val slice = arr(0 to n, ::)
    println(s"uid = ${x._1}")
    println(s"events count = ${arr.rows}")
    println("lat,lon = ")
    println(slice)
    if (arr.rows > 5) println(s"... ${arr.rows- 5} more rows")
    println("-"*30)
}

Now we can use the formatting function, with spark and scala foreach statements.  
See below a formatted output describing the events related to three users:

In [30]:
e_df.take(3).foreach(e => formatUserEvents(e))

uid = 4985
events count = 1063
lat,lon = 
40.7663583     -73.9833973667      
40.7665252686  -73.9829705584      
40.76665848    -73.98390340000002  
40.7671166667  -73.9823264         
40.7673913454  -73.9814347029      
... 1058 more rows
------------------------------
uid = 578
events count = 1641
lat,lon = 
40.7444201864  -73.98721218109999  
40.7457482667  -73.9850020333      
40.7443825333  -73.9783781667      
40.7428595014  -73.9768588543      
40.7428595014  -73.9768588543      
... 1636 more rows
------------------------------
uid = 842
events count = 1145
lat,lon = 
40.7164383   -74.0125343         
40.71685842  -74.01399805        
40.71575957  -74.01116800000001  
40.717054    -74.013316          
40.71685483  -74.01230673        
... 1140 more rows
------------------------------


We will now cluster the events for each user according to the DBSCAN algorithm. This algorithm with cluster those user's events in groups. The rest of the code below reduces those groups to bounding boxes. Next we will use the extracted bounding boxes to score events.

In [31]:
import breeze.numerics._
import breeze.linalg._

def euclideanDistance (a: DenseVector[Double], b: DenseVector[Double]) = norm(a-b, 2)

// 1deg at 40deg latitude is 111034.61 meters
// set radius at about 200 mt (0.002 * 111034.61)
// which is 0.002 in decimal degrees https://en.wikipedia.org/wiki/Decimal_degrees

val eps = 0.002
val min_points = 3

In [32]:
import nak.cluster._
import nak.cluster.GDBSCAN._

def dbscan(v : breeze.linalg.DenseMatrix[Double]) = {

  val gdbscan = new GDBSCAN(
    DBSCAN.getNeighbours(eps, distance=euclideanDistance),
    DBSCAN.isCorePoint(min_points)
  )

  // core DBSCAN algorithm
  val clusters = gdbscan cluster v
  
  // reducing the clusters to bounding boxes
  // for simplicity: each user could 
  clusters.map(
    cluster => (
      cluster.id.toInt, 
      cluster.points.size, 
      cluster.points.map(_.value(0)).min,
      cluster.points.map(_.value(1)).min,
      cluster.points.map(_.value(0)).max,
      cluster.points.map(_.value(1)).max
    )
  )
}

The next is probably the most powerful one-liner in this tutorial.   
It will stage the DBSCAN clustering algorithm for all users and their respective events.

In [33]:
val bboxRdd = e_df.mapValues(dbscan(_)).cache()

Let's convert back the RDDs to a DataFrame. Now we have a table describing clusters. Each row defines a cluster in terms of user id, cluster id, the number of cluster's events and the bounding box of the cluster. Each user can have multiple clusters, and some users might have no cluster at all.

In [34]:
val bbox_df = bboxRdd.
  flatMapValues(x => x).
  map(x => (x._1, x._2._1, x._2._2,x._2._3,x._2._4,x._2._5,x._2._6)).
  toDF("uid", "cid", "csize", "lat_min", "lon_min", "lat_max", "lon_max").
  filter($"cid" > 0)

bbox_df.show(10)

+----+---+-----+------------------+------------------+------------------+--------------+
| uid|cid|csize|           lat_min|           lon_min|           lat_max|       lon_max|
+----+---+-----+------------------+------------------+------------------+--------------+
|4985|  4|   14|     40.7665739833|    -73.9829850197|40.769262733299996|  -73.98122905|
|4985| 10|   25|       40.76349341|      -73.98234528|     40.7652683667|-73.9789733887|
|4985| 15|    6|        40.7631676|       -73.9792362|     40.7649154663|-73.9781703833|
|4985| 24|   12|     40.7626658695|    -73.9765036134|     40.7647866916|-73.9739406109|
|4985| 29|    7|     40.7604105667|      -73.97418069|40.762592967399996|  -73.97242375|
|4985| 38|   19|     40.7620566327|    -73.9716011167|40.764846633299996|-73.9698984337|
|4985| 40|   35|40.759771316700004|    -73.9695874833|       40.76248225|  -73.96590763|
|4985| 44|   21|       40.75770225|-73.96918773649999|40.760391691500004|-73.9659552814|
|4985| 45|   12|     

#### Scoring Events: looking for outliers

We will now score events, and look if some of them are located outside the computed clusters' bounding boxes. Firstly, we join the table of events with the table of clusters. Let's filter out users which do not have enough points as those users have no clusters associated with them and there is no sufficient data to determine outliers. In the code above, we need a user to have at least 3 events in a region of 0.1 degrees in order to have a DBSCAN cluster.

In [35]:
val bbox_events = events.
  join(bbox_df, Seq("uid"), "full").
  filter($"lat_min".isNotNull).
  filter($"lat_max".isNotNull).
  filter($"lon_min".isNotNull).
  filter($"lon_max".isNotNull).
  select($"events.ts",$"uid",$"lat",$"lon",$"lat_min",$"lon_min",$"lat_max",$"lon_max")

bbox_events.show(5,false)

+---------------------+----+-------------+------------------+-------------+--------------+------------------+------------+
|ts                   |uid |lat          |lon               |lat_min      |lon_min       |lat_max           |lon_max     |
+---------------------+----+-------------+------------------+-------------+--------------+------------------+------------+
|2010-10-16 04:29:09.0|4985|40.7727768667|-73.95842066670002|40.7665739833|-73.9829850197|40.769262733299996|-73.98122905|
|2010-10-16 04:28:54.0|4985|40.7726443798|-73.9579036832    |40.7665739833|-73.9829850197|40.769262733299996|-73.98122905|
|2010-10-16 04:28:37.0|4985|40.776251689 |-73.955940897     |40.7665739833|-73.9829850197|40.769262733299996|-73.98122905|
|2010-10-16 04:28:28.0|4985|40.77383416  |-73.95962762      |40.7665739833|-73.9829850197|40.769262733299996|-73.98122905|
|2010-10-16 04:28:19.0|4985|40.7728809121|-73.9556687823    |40.7665739833|-73.9829850197|40.769262733299996|-73.98122905|
+---------------

The following code uses the newer Dataset API, which is a DataFrame where row are handled as typed objects. In particular, we are converting the events row into a `EventDetected` object and then we check if the event is within the boundary of the given cluster. Since each user might have more than one cluster, we check each event against all the user's clusters and we consider it an outlier if none of the check is returns a positive outcome. 

In [36]:
val scored_events = bbox_events.
                      withColumn("detect", 
                                 $"lon" >= $"lon_min" &&
                                 $"lon" <= $"lon_max" &&
                                 $"lat" >= $"lat_min" &&
                                 $"lat" <= $"lat_max").
                      select($"ts",$"uid",$"lat",$"lon", $"detect")
scored_events.show(5)

+--------------------+----+-------------+------------------+------+
|                  ts| uid|          lat|               lon|detect|
+--------------------+----+-------------+------------------+------+
|2010-10-16 04:29:...|4985|40.7727768667|-73.95842066670002| false|
|2010-10-16 04:28:...|4985|40.7726443798|    -73.9579036832| false|
|2010-10-16 04:28:...|4985| 40.776251689|     -73.955940897| false|
|2010-10-16 04:28:...|4985|  40.77383416|      -73.95962762| false|
|2010-10-16 04:28:...|4985|40.7728809121|    -73.9556687823| false|
+--------------------+----+-------------+------------------+------+
only showing top 5 rows



Here below the outlier scoring for `uid=22`. 

In [37]:
scored_events.
  filter("uid == 22").
  show(20)

+--------------------+---+------------------+------------------+------+
|                  ts|uid|               lat|               lon|detect|
+--------------------+---+------------------+------------------+------+
|2010-10-17 21:43:...| 22|40.755876281599996|    -73.9823949337| false|
|2010-10-14 00:55:...| 22|40.748029583800005|    -74.0071158496| false|
|2010-10-13 04:49:...| 22|     40.7258684255|     -73.983424902| false|
|2010-10-12 23:58:...| 22|     40.7277736384|     -73.985715066| false|
|2010-10-12 04:23:...| 22|       40.76773025|    -73.8994259833| false|
|2010-10-12 02:45:...| 22|     40.6438845363|-73.78280639649999| false|
|2010-10-06 11:31:...| 22|     40.6438845363|-73.78280639649999| false|
|2010-09-21 12:47:...| 22|40.723510469299995|    -73.9884352684| false|
|2010-09-17 23:57:...| 22|     40.6438845363|-73.78280639649999| false|
|2010-09-17 00:43:...| 22|40.748029583800005|    -74.0071158496| false|
|2010-09-14 01:57:...| 22|     40.7331512078|    -74.0058752721|

As you can see three items are found outside those bounding boxes. Although this is not yet a strong indicator for an anomaly per se, it can constitute a very relevant signal if combined with other signals as seen above. Many improvements can be done to the above core idea, for instance, by including relations and interaction between users and more refined analysis of clusters, using for instance convex hulls instead of bounding boxes and so forth.

I hope you enjoyed this notebook, thanks for keeping up with me till here. Best wishes for your data science projects!