# Arcpy demo - traffic simulation

## Import necessary packages

In [None]:
from arcgis.features import GeoAccessor
import os
import pandas as pd
import sqlite3 as sql
import arcpy

### Define Workspace

In [None]:
workspace_dir = "C:/arcgis/home/traffic_simulation/"
os.chdir(workspace_dir)
gdb = arcpy.management.CreateFileGDB(workspace_dir, "traffic")
arcpy.env.workspace = gdb[0]

## Define methods to read data from sqlite to feature class

In [None]:
def read_sqlite_as_sdf(db_filepath: str, select_statement: str, x_column: str='longitude', y_column: str='latitude'):
    """
    Reads the data from a sqlite database into main memory using a SQL statement.
    """
    with sql.connect(db_filepath) as connection:
        df = pd.read_sql_query(select_statement, connection)
        return GeoAccessor.from_xy(df, x_column, y_column)
    
def read_sqlite_to_featureclass(db_filepath: str, select_statement: str, x_column: str='longitude', y_column: str='latitude') -> GeoAccessor:
    """
    Reads the data from a sqlite database as an in memory feature class using a SQL statement.
    """
    filename_with_extension = os.path.basename(db_filepath)
    filename = os.path.splitext(filename_with_extension)[0]

    sdf = read_sqlite_as_sdf(db_filepath, select_statement, x_column, y_column)
    featureclass = sdf.spatial.to_featureclass("memory/" + filename)
    arcpy.management.ClearWorkspaceCache()
    return featureclass

## Define classes

In [None]:
class Trip(object):

    def __init__(self, name, trip_id, long, lat, trip_time, angle, distance, speed) -> None:
        self.name = name
        self.trip_id = trip_id
        self.long = long
        self.lat = lat
        self.trip_time = trip_time
        self.angle = angle
        self.distance = distance
        self.speed = speed
    
    @staticmethod
    def create_empty():
        
        return Trip("empty trip", None, None, None, None, None, None, None)

    def equals(self, other) -> bool:
        if other is None: 
            return False
        
        return self.trip_id == other.trip_id 

Define the MeasureTool to calculate speed, point_distance, and point_direction, and add fields to the feature class.

In [None]:
class MeasureTool(object):
    
    def change_timefield(self, feature_class: str):
        out_feature_class = arcpy.conversion.ExportFeatures(
            in_features=feature_class,
            out_features="memory/traffic_data_changed_timefield",
            use_field_alias_as_name="NOT_USE_ALIAS",
            field_mapping='id "id" true true false 0 Long 0 0,First,#,traffic_data,id,-1,-1;trip "trip" true true false 0 Long 0 0,First,#,traffic_data,trip,-1,-1;person "person" true true false 0 Long 0 0,First,#,traffic_data,person,-1,-1;vehicle_type "vehicle_type" true true false 8 Text 0 0,First,#,traffic_data,vehicle_type,0,8;distance_crossed "distance_crossed" true true false 0 Long 0 0,First,#,traffic_data,distance_crossed,-1,-1;latitude "latitude" true true false 0 Double 0 0,First,#,traffic_data,latitude,-1,-1;longitude "longitude" true true false 0 Double 0 0,First,#,traffic_data,longitude,-1,-1;trip_time_old "trip_time_old" true true false 38 Text 0 0,First,#,traffic_data,trip_time,0,38',)
        
        return out_feature_class[0]
        
    def create_geometry(self, longitude, latitude):

        point = arcpy.Point(longitude, latitude)
        point_geometry = arcpy.PointGeometry(point, arcpy.SpatialReference(4326))

        return point_geometry

    def measure(self, feature_class):
        
        trip_point_a = Trip.create_empty()

        feature_class_column_names = ["trip", "longitude", "latitude", "trip_time", "trip_time_old", "point_direction", "point_distance", "speed"]
        with arcpy.da.UpdateCursor(feature_class, feature_class_column_names) as cur:

            for row in cur:
                
                timestamp_string = row[4]
                
                format_string = "%Y-%m-%dT%H:%M:%S"
                datetime_object = datetime.datetime.strptime(timestamp_string, format_string)
                row[3] = datetime.datetime.fromtimestamp(datetime.datetime.timestamp(datetime_object) - 3600)
                
                trip_point_b = Trip("Point B", row[0], row[1], row[2], row[3], float, float ,float)
                if trip_point_b.equals(trip_point_a):

                    trip_point_b = self.calculate_distance_speed(trip_point_b, trip_point_a)
                    row[5]=trip_point_b.angle
                    row[6]=trip_point_b.distance
                    row[7]=trip_point_b.speed

                cur.updateRow(row)

                trip_point_a = Trip("Point A", trip_point_b.trip_id, trip_point_b.long, trip_point_b.lat,
                                    trip_point_b.trip_time, trip_point_b.angle, trip_point_b.distance, trip_point_b.speed)

        return feature_class

    def calculate_distance_speed(self, trip_point_b, trip_point_a):
        current_point = self.create_geometry(trip_point_b.lat, trip_point_b.long)
        last_point = self.create_geometry(trip_point_a.lat, trip_point_a.long)
        trip_point_b.angle, trip_point_b.distance = current_point.angleAndDistanceTo(last_point)
        
        differential_seconds = datetime.timedelta.total_seconds(trip_point_b.trip_time - trip_point_a.trip_time)

        if trip_point_b.distance == 0:
            trip_point_b.speed = 0
        else:
            trip_point_b.speed = trip_point_b.distance/differential_seconds * 3.6

        trip_point_b.distance = round(trip_point_b.distance, 2)

        return trip_point_b

    def run(self, feature_class: str):
            
        feature_class = self.change_timefield(feature_class)

        sorted_feature_class = "memory/traffic_data"
        arcpy.Sort_management(feature_class, sorted_feature_class, [["trip", "ASCENDING"], ["trip_time_old", "ASCENDING"]])
        feature_class = arcpy.AddFields_management(in_table=sorted_feature_class, field_description=[["trip_time", "DATE"],["point_direction", "DOUBLE", "", "", "0", ""], ["point_distance", "DOUBLE", "", "", "0", ""], ["speed", "DOUBLE", "", "", "0", ""]])

        self.measure(feature_class)
        arcpy.DeleteField_management(feature_class, drop_field=["trip_time_old", "ORIG_FID"])
        return feature_class

## Run MeasureTool()

When you run MeasureTool.run(), it calculates speed, point_distance, and point_direction and adds fields to the feature class.

In [None]:
feature_class = read_sqlite_to_featureclass("traffic_data.sqlite", "SELECT * from agent_pos LIMIT 300000;")

In [None]:
MT = MeasureTool()
fc = MT.run(feature_class)

## Geoanalytics and Geoprocessing Tools

### Defining classes to create a space-time cube and visualize it

In [None]:
class SpaceTimeCube(object):
    
    def create_space_time_cube(self, feature_class: str, space_time_cube_path: str, time_interval: int, distance_interval: int):

        geodatabase_feature_class = arcpy.ExportFeatures_conversion(feature_class, "trafficFeatureClass")

        projected_feature_class = arcpy.Project_management(in_dataset = geodatabase_feature_class,
                                                           out_dataset = f"{geodatabase_feature_class}_projected_25832",
                                                           out_coor_system = arcpy.SpatialReference(25832))

        space_time_cube = arcpy.CreateSpaceTimeCube_stpm(projected_feature_class, 
                                       output_cube = f"{space_time_cube_path}/SpaceTimeTrafficCube.nc", 
                                       time_field ="trip_time",
                                       time_step_interval = f"{time_interval} Minutes",
                                       distance_interval =  f"{distance_interval} Meters",
                                       aggregation_shape_type="HEXAGON_GRID")

        return space_time_cube

    def visualize_space_time_cube(self, space_time_cube_path:str):

        arcpy.VisualizeSpaceTimeCube3D_stpm(in_cube = space_time_cube_path,
                                            cube_variable = "COUNT",
                                            display_theme = "VALUE",
                                            output_features = "SpaceTimeCubeVisualize")

### Creating a space-time cube for traffic data and visualizing it

In [None]:
stc = SpaceTimeCube()
space_time_cube = stc.create_space_time_cube(feature_class, workspace_dir, 1, 200)
space_time_cube_visualize = stc.visualize_space_time_cube(space_time_cube[0])

### Creating class for HotColdSpotsTool

In [None]:
class HotColdSpotsTool(object):

    def create_hot_cold_spots_space_time(self, space_time_cube_path: str, distance_interval: int):
    
        arcpy.stpm.EmergingHotSpotAnalysis(in_cube = space_time_cube_path,
                                           analysis_variable = "COUNT",
                                           output_features = "SpaceTimeCube_EmergingHotSpotAnalysis",
                                           neighborhood_distance =  f"{distance_interval} Meters")

        arcpy.stpm.LocalOutlierAnalysis(in_cube = space_time_cube_path,
                                        analysis_variable = "COUNT",
                                        output_features = "SpaceTimeCube_LocalOutlierAnalysis",
                                        neighborhood_distance =  f"{distance_interval} Meters")
        
    def create_hot_cold_spots_feature_class(self, feature_class: str, distance_interval: int, time_interval: int):

        arcpy.CalculateDensity_gapro(input_layer = feature_class,
                                     out_feature_class = "CalculateDensity",
                                     bin_type = "HEXAGON",
                                     bin_size = f"{distance_interval} Meters",
                                     weight = "UNIFORM",
                                     neighborhood_size = f"{distance_interval * 1.5} Meters",
                                     area_unit_scale_factor = "SQUARE_METERS",
                                     time_step_interval = f"{time_interval} Minutes")
        
        arcpy.FindHotSpots_gapro(point_layer = feature_class,
                                 out_feature_class = "FindHotSpots",
                                 bin_size = f"{distance_interval} Meters",
                                 neighborhood_size = f"{distance_interval * 1.5} Meters",
                                 time_step_interval = f"{time_interval} Minutes")

### Creating hot and cold spots for traffic simulation data

In [None]:
hot_cold_spots = HotColdSpotsTool()
hot_cold_spots.create_hot_cold_spots_space_time(space_time_cube[0], 200)

In [None]:
project = arcpy.mp.ArcGISProject("CURRENT")
for focus_map in project.listMaps("Map"):
    for ais_layer in focus_map.listLayers("trafficFeatureClass_projected_25832"):
        ais_layer.enableTime("trip_time")

In [None]:
hot_cold_spots.create_hot_cold_spots_feature_class("trafficFeatureClass_projected_25832", 200, 1)