# Proximity Tracing

This notebook shows how starting with an initial user, potential interactions can be traced over time and space. It uses GeoAnalytics Server and the `run_python_script` tool to identify all proximity events between users. In this specific example we'll be analyzing the tracks of four users near Portland Maine.

#### The algorithm

1. Find all "near events" using a spatiotemporal join features. A near event is a location and time where a point from one track is near a point from another track within the user defined contact parameters (search_distance and search_duration)
2. Fold consecutive near events between two tracks into a single contact event. Near events will be 
   considered part of the same contact event if they are within user defined break tolerance. 
  *  At this point we have all contact events between all tracks in the dataset
3. Create a queue containing the user defined search target (target_id and start_time). The queue will
   be appended to as more contact points are found.
4. For each target in the queue (until empty): 
  1. Find all contact events that include the target after the start time and then filter so that we
       only have the earliest contact event for each new track.
  2. Queue all first contact events for further processing. Also add each new contact to the result.
  
A function that implements this is defined below. This will be executed by GeoAnalytics Server using Spark. It will create several output layers the represent the results.

In [1]:
def proximity_tracing_rps():
    import json
    from heapq import heappush, heappop
    from pyspark.sql import functions as F
    from pyspark.sql import Window, SparkSession
    from pyspark.sql.types import StructType, StructField, IntegerType
    from datetime import datetime, timedelta

    class ProximityTracing:

        def __init__(self, input_layer, entity_id_field, search_distance, search_duration, break_tolerance):

            self.search_distance = search_distance
            self.search_duration = search_duration
            self.break_tolerance = break_tolerance

            self._init_observations(input_layer, entity_id_field)
            self._init_proximity_events()
            self._init_graph()

            self.spark = SparkSession.builder.getOrCreate()

        def trace(self, root_entities, max_depth):

            # as new proximity events are visited, they are added to the search queue with a priority ordering
            # based on the contact time. The ordering is necessary as multiple entities may come
            # in contact with the same entity at different times and we want to always process using
            # the earliest occurrence
            search_queue = []

            # maintains all entity ids that have been seen so that we only visit them once
            visited = set()

            # all new contacts are added here and returned as the result
            trace_result = []

            # initialize search queue and trace result with our root entities
            for (entity_id, start_time) in self._normalize_root_entities(root_entities):
                heappush(search_queue, (start_time, entity_id, 0, None))  # add to search queue with depth 0
                trace_result.append([None, entity_id, 0, {"start": start_time, "end": start_time}, None])

            # keep searching until search queue is empty
            while search_queue:

                # get next track to process
                (start, entity_id, depth, result_row) = heappop(search_queue)

                # we've already processed this track so we can skip it
                if entity_id in visited:
                    continue

                # set as visited so we don't revisit and create a cycle
                visited.add(entity_id)

                # add to result (initial targets will not have a row in the result)
                if result_row:
                    trace_result.append(result_row)

                # do not search for contacts past the max depth
                if depth <= max_depth:

                    print("Processing %s (start-from=%s, depth=%s, queue-size=%s)"
                                     % (entity_id, start, depth, len(search_queue)))

                    next_proximity_events = self.graph.find_next_events(entity_id, start)

                    for (id, time, point) in next_proximity_events:
                        result_row = (entity_id, id, depth + 1, time, point)
                        heappush(search_queue, (time.start, id, depth + 1, result_row))

            new_schema = StructType([
                self._copy_field_schema(self.observations, "entity_id", "from_id"),
                self._copy_field_schema(self.observations, "entity_id", "to_id"),
                StructField("depth", IntegerType()),
                self._copy_field_schema(self.all_proximity_events, "$time", "$time"),
                self._copy_field_schema(self.observations, "$geometry", "$geometry")
            ])

            return self.spark.createDataFrame(trace_result, new_schema)

        def build_trace_tracks(self, trace):

            joined = self.observations.join(trace, [self.observations.entity_id == trace.to_id,
                                                    self.observations["$time"].start >= trace["$time"].start])

            return joined.select(self.observations.entity_id, self.observations["$time"],
                                 self.observations["$geometry"],
                                 trace.depth)

        def _init_observations(self, input_layer, entity_id_field):

            # get all points that occur after our target search starting date
            if entity_id_field not in input_layer.schema.names:
                raise ValueError(
                    "Entity ID field '%s' not in input data (note that it is case sensitive)" % entity_id_field)

            # select standardized field names
            self.observations = input_layer.select(F.col(entity_id_field).alias("entity_id"), "$geometry", "$time")

        def _init_proximity_events(self):

            # add sequence number to observations grouped by entity ID and sorted by time
            window = Window.partitionBy("entity_id").orderBy(F.col("$time.start"))
            observations_sorted = self.observations.withColumn("series_index", F.row_number().over(window))

            # Use Join Features to find all proximity events in which an observation from one entity is near an observation
            # from another entity within the user defined parameters for proximity in both space and time.
            single_proximity_events = geoanalytics.join_features(observations_sorted, observations_sorted,
                                                                 "JoinOneToMany",
                                                                 spatial_relationship="NearGeodesic",
                                                                 spatial_near_distance=self.search_distance[0],
                                                                 spatial_near_distance_unit=self.search_distance[1],
                                                                 temporal_relationship="Near",
                                                                 temporal_near_distance=self.search_duration.seconds,
                                                                 temporal_near_distance_unit="seconds",
                                                                 join_condition="$target['entity_id'] != $join['entity_id']") \
                .withColumnRenamed("entity_id", "from_id") \
                .withColumnRenamed("join_entity_id", "to_id")

            # Note that we now have a `from_id` and a `to_id`. The result of a self-join in the Join Features tool will
            # contain two records for each proximity event. Consider an situation where (A) and (B) are near each other -
            # (A) will be  returned as being near (B) and (B) will be returned as being near (A). The records would be
            # [(A,B),(B,A)].

            # We're using detect incidents to identify sustained proximity between two entities. The "track id" used to do
            # the calculation is [from_id, to_id]. This creates a "track" of all proximity events between two entities in
            # sorted order. Consecutive proximity events that occur within <break tolerance> of each other are merged
            # together into a single, sustained proximity event.
            # See: https://pro.arcgis.com/en/pro-app/tool-reference/big-data-analytics/detect-incidents.htm

            # If the start condition is 'true', every observation is a candidate for starting a new incident but only once
            # the end condition of the previous incident has been met (if applicable).
            start_condition = "true"

            # The end condition for the sustained proximity "incident" is when 1) we reach last record 2) the two points are
            # consecutive in the track or 3) the temporal difference between the current observation and the next
            # observation is greater than our tolerance.
            break_tolerance_millis = 0
            if self.break_tolerance:
                # odd but effective way to get the total milliseconds of a timedelta (there is no timedelta.milliseconds)
                break_tolerance_millis = self.break_tolerance / timedelta(milliseconds=1)

            end_condition = """
            var window = trackWindow(0, 2)
            var break_tolerance_millis = %s
            console(break_tolerance_millis)
            if (count(window) == 1) {
                true // if our window length is 1 then we are at the end of the track
            } else if (window[0].series_index + 1 >= window[1].series_index) {
                false // these two events are consecutive
            } else if (break_tolerance_millis > 0) {
                var currTime = startTime(window[0])
                var nextTime = startTime(window[1])
                // end event when the difference between this feature and the next is greater than our threshold
                datediff(nextTime, currTime) >= break_tolerance_millis
            } else {
                true // two events are not consecutive and not within our break tolerance
            }
            """ % break_tolerance_millis
            contact_incidents = geoanalytics.detect_incidents(single_proximity_events, ["from_id", "to_id"],
                                                              start_condition, end_condition)

            # The result of detect incidents has includes all observations tagged with an unique ID that now represents the
            # unique proximity events. We want a single summary record for each event.
            # Each record contains [from_id, to_id, start_location, start_time, end_time].
            contact_events = contact_incidents \
                .withColumn("first_geometry",
                            F.first("$geometry").over(Window.partitionBy("IncidentID").orderBy("$time.start"))) \
                .groupby("from_id", "to_id", "IncidentID") \
                .agg(F.min("$time.start").alias("contact_start"), F.max("$time.start").alias("contact_end"),
                     F.first("first_geometry").alias("$geometry",
                                                     metadata=self.observations.schema["$geometry"].metadata)) \
                .drop("IncidentID")

            self.all_proximity_events = self._with_time_as_interval(contact_events, "contact_start", "contact_end")

        def _init_graph(self):
            self.graph = LocalGraph(self.all_proximity_events)

        # normalize the data type for the root entity ID provided by the user
        def _normalize_root_entities(self, root_entities):
            df = self.spark.createDataFrame(root_entities, ["entity_id", "start_time"])
            source_entity_id_datatype = self.observations.schema["entity_id"].dataType
            return df.select(F.col("entity_id").cast(source_entity_id_datatype), F.col("start_time")).collect()

        # copy schema of existing field with a new name (useful for complex types like $geometry)
        def _copy_field_schema(self, df, from_field, with_name):
            field = df.schema[from_field]
            return StructField(with_name, field.dataType, field.nullable, field.metadata)

        # convert start/end date fields into a time field that GA will recognize
        def _with_time_as_interval(self, df, start_time_field, end_time_field):
            projection = F.struct([F.col(start_time_field).alias("start"), F.col(end_time_field).alias("end")])
            return df.withColumn("$time", projection.alias("$time", metadata={"time": {"type": "interval"}}))

    class DataFrameGraph:

        def __init__(self, all_contact_events):
            self.all_contact_events = all_contact_events

        def find_next_events(self, track_id, time):
            # collect all contacts with our target id that ended after our target contact time
            contact_with_id = self.all_contact_events.where(F.col("from_id") == track_id).where(
                F.col("$time.end") > time)
            window = Window.partitionBy("to_id").orderBy(F.col("$time.end"))
            first_contacts = contact_with_id.withColumn("rn", F.row_number().over(window)).where(F.col("rn") == 1)
            return first_contacts.select("to_id", "$time", "$geometry").collect()

    class LocalGraph:
        """

        Builds a local graph using dictionaries. This approach is considerably faster than the data frame alternative
        but has the potential of overflowing memory on the Spark driver if there are lots of proximity events.

        The local graph is a directed multigraph. Each node represents a track (person,vehicle,...) and each edge between
        two nodes represents a proximity event (there can be multiple proximity events between nodes).

        Format:

        {
          "<from_id>" : {
            "<to_id>" : [(event_time, event_location), (event_time, event_location), ... ],
            "<to_id>" : [ ... ]
          },
          "<from_id>" : { ... },
          ...
        }

        """

        def __init__(self, all_proximity_events):
            self.all_proximity_events = all_proximity_events
            self._build_lookup()

        def _build_lookup(self):
            events_collected = self.all_proximity_events.collect()

            self.edges = {}

            for event in events_collected:

                from_track = self.edges.get(event.from_id)
                if not from_track:
                    from_track = {}
                    self.edges[event.from_id] = from_track

                to_track = from_track.get(event.to_id)
                if not to_track:
                    to_track = []
                    from_track[event.to_id] = to_track

                to_track.append((event["$time"], event["$geometry"]))

        def find_next_events(self, track_id, start):

            edge_from = self.edges.get(track_id)
            if not edge_from:
                return []

            next_events = []

            for edge_to in edge_from:

                min_event = None
                for event in edge_from.get(edge_to):

                    if event[0].end > start:
                        if not min_event or event[0].end < min_event[0].end:
                            min_event = event

                if min_event:
                    next_events.append((edge_to, min_event[0], min_event[1]))

            return next_events


    if __name__ == "__main__":

        input_layer = layers[0]
        parameters_layer = layers[1]

        # parse parameters from parameters layer
        parameters = [json.loads(row['params']) for row in parameters_layer.collect()][0]
        entity_id_field = parameters["entity_id_field"]
        search_distance = parameters["search_distance"]
        search_duration = timedelta(minutes=parameters["search_duration_minutes"])
        break_tolerance = timedelta(minutes=parameters["break_tolerance_minutes"])
        root_entities = [(i[0], datetime(i[1], i[2], i[3])) for i in parameters["root_entities"]]
        output_trace_events = parameters["output_trace_events"]
        output_all_proximity_events = parameters["output_all_proximity_events"]
        output_trace_tracks = parameters["output_trace_tracks"]

        max_depth = parameters.get("max_depth", 10000)

        # get earliest start time of all trace targets to filter out unnecessary records
        initial_start_time = min(map(lambda t: t[1], root_entities))
        input_layer_filtered = input_layer.where(F.col("$time.start") >= initial_start_time)

        tracing = ProximityTracing(input_layer_filtered, entity_id_field, search_distance, search_duration,
                                   break_tolerance)

        # proximity events are always calculated so we can write them first if requested
        if output_all_proximity_events:
            print(f"Writing all proximity events to {output_all_proximity_events}...")
            tracing.all_proximity_events.write.format("webgis").save(output_all_proximity_events)

        print("Performing trace...")
        trace_events = tracing.trace(root_entities, max_depth)

        print(f"Writing trace events to {output_trace_events}...")
        trace_events.write.format("webgis").save(output_trace_events)

        if output_trace_tracks:
            trace_tracks = tracing.build_trace_tracks(trace_events)
            print(f"Writing trace tracks to {output_trace_tracks}...")
            trace_tracks.write.format("webgis").save(output_trace_tracks)


def run_tracing_script(entity_id_field, search_distance, search_duration_minutes, break_tolerance_minutes,
                       root_entities, output_trace_events, output_all_proximity_events=None, output_trace_tracks=None,
                       max_depth=100):

    import json

    # RunPythonScript (at 10.8) only accepts layers as parameters - we can encode the script parameters as a local
    # feature collection to work around this limitation
    parameters = {
        "entity_id_field": entity_id_field,
        "search_distance": search_distance,
        "search_duration_minutes": search_duration_minutes,
        "break_tolerance_minutes": break_tolerance_minutes,
        "root_entities": root_entities,
        "output_trace_events": output_trace_events,
        "output_all_proximity_events": output_all_proximity_events,
        "output_trace_tracks": output_trace_tracks,
        "max_depth": max_depth
    }

    parameters_layer = {"featureSet": {"features": [{"attributes": {"params": json.dumps(parameters)}}]},
                        "layerDefinition": {"fields": [{"name": "params", "type": "esriFieldTypeString"}]}}

    # send script and parameters to server for processing
    run_python_script(code=proximity_tracing_rps, layers=[layer, parameters_layer])

#### Connect to our track data

In [2]:
import uuid
from arcgis.apps.tracker import TrackView
from arcgis.gis import GIS
from arcgis.geoanalytics.manage_data import run_python_script

gis = GIS("https://gageocode.esri.com/portal", "admin", verify_cert=False)

# Provide a track view item id
layer = TrackView(gis.content.get("9a1503810b77473e8264d1c93266e295")).tracks_layer
layer.filter = "horizontal_accuracy <= 100 and location_timestamp > date '2020-04-24'"

Enter password: ········


#### Let's look at the tracks
You can see there are four users tracks and they come into contact with each other at various locations. First Jane Doe (green tracks) who is the initial trace target drives to Portland and interacts with John Doe (blue tracks). Then John Doe drives to Bug Light Park and interacts with John Smith (purple tracks). Then John Smith drives to Fort Wiliams and interacts with Maria Garcia (red tracks). Finally Maria Garcia drives to Westbrook.

In [4]:
m = gis.map("Portland, ME", zoomlevel=12)
uvr = {"type":"uniqueValue","field1":"created_user","uniqueValueInfos":[{"value":"mgarcia","label":"mgarcia","symbol":{"color":[237,81,81,255],"size":6,"angle":0,"xoffset":0,"yoffset":0,"type":"esriSMS","style":"esriSMSCircle","outline":{"color":[153,153,153,64],"width":0.75,"type":"esriSLS","style":"esriSLSSolid"}}},{"value":"johndoe","label":"johndoe","symbol":{"color":[20,158,206,255],"size":6,"angle":0,"xoffset":0,"yoffset":0,"type":"esriSMS","style":"esriSMSCircle","outline":{"color":[153,153,153,64],"width":0.75,"type":"esriSLS","style":"esriSLSSolid"}}},{"value":"janedoe","label":"janedoe","symbol":{"color":[167,198,54,255],"size":6,"angle":0,"xoffset":0,"yoffset":0,"type":"esriSMS","style":"esriSMSCircle","outline":{"color":[153,153,153,64],"width":0.75,"type":"esriSLS","style":"esriSLSSolid"}}},{"value":"johnsmith","label":"tracker_bot3","symbol":{"color":[158,85,156,255],"size":6,"angle":0,"xoffset":0,"yoffset":0,"type":"esriSMS","style":"esriSMSCircle","outline":{"color":[153,153,153,64],"width":0.75,"type":"esriSLS","style":"esriSLSSolid"}}}]}
m.add_layer(layer, {"renderer": uvr})
m

#### Execute the script
We'll now run the script using the GeoAnalytics Server. In this scenario we specified that `janedoe` was the intial target to start the trace with.

In [5]:
unique_id = str(uuid.uuid4())[:4]
output_trace_events_name = f"trace_events_{unique_id}"
output_all_proximity_events=f"all_proximity_events_{unique_id}"
output_trace_tracks=f"trace_tracks_{unique_id}"

run_tracing_script(
    entity_id_field="created_user",
    search_distance=[25, "feet"],
    search_duration_minutes=1,
    break_tolerance_minutes=1,
    root_entities=[["janedoe", 2020, 4, 24]],
    output_trace_events=output_trace_events_name,
    output_all_proximity_events=output_all_proximity_events,
    output_trace_tracks=output_trace_tracks
)

#### View the results

The following table shows there were 3 proximity events. You can see that Jane Doe was in proxmity of John Doe who was then later in proximity with John Smith, who was then later in proximity to Maria Garcia. The depth indicates how many degrees of separation are between the intial target and the user.

In [6]:
trace_events_df = gis.content.search(output_trace_events_name, sort_field="modified", sort_order="desc")[0].layers[0].query(as_df=True, order_by_fields="START_DATETIME")
trace_events_df

Unnamed: 0,from_id,to_id,depth,globalid,OBJECTID,END_DATETIME,START_DATETIME,SHAPE
0,,janedoe,0,{FEB54866-5CBF-5F66-57DD-474F4EF012A0},7,2020-04-24 07:00:00,2020-04-24 07:00:00,
1,janedoe,johndoe,1,{A69DD866-C6AA-098E-E64B-D44E88277837},14,2020-04-25 09:59:09,2020-04-25 09:28:40,"{""x"": -70.25112661914547, ""y"": 43.656707875851..."
2,johndoe,johnsmith,2,{FC21DA0F-F34A-AD49-F53E-6DCEC9AAECC2},21,2020-04-25 10:35:45,2020-04-25 10:05:16,"{""x"": -70.23408392747679, ""y"": 43.652439950707..."
3,johnsmith,mgarcia,3,{E54D9F22-1FF0-B023-6F39-9A6D659A0F7D},28,2020-04-25 11:44:05,2020-04-25 11:13:05,"{""x"": -70.21216558728193, ""y"": 43.624929435825..."


#### Let's look at these events on the map

You can see the 3 proximity events occured at: Downtown Portland, Bug Light Park, and at Fort Williams.

In [9]:
m = gis.map("Portland, ME", zoomlevel=12)
m.add_layer(layer, {"renderer": uvr })
m.add_layer(trace_events_df)
m

#### Let's overlay the traced tracks
The traced tracks represent where a person went after they came into contact with the initial target or a deriavative. For example, you can see that only the blue tracks from Portland to Bug Light Park are displayed since that was a route traveled by John Doe after he was in proximity to Jane Doe.

In [11]:
trace_tracks_layer = gis.content.search(output_trace_tracks, sort_field="modified", sort_order="desc")[0].layers[0]
m = gis.map("Portland, ME", zoomlevel=12)
m.add_layer(trace_events_df)
uvr["field1"] = "entity_id"
m.add_layer(trace_tracks_layer, {"renderer": uvr})
m