In [1]:
import influxdb_client
import os
import time
from datetime import datetime
import pandas as pd
from influxdb_client.client.write_api import SYNCHRONOUS,ASYNCHRONOUS
from influxdb_client.client.query_api import TableList
from influxdb_client.client.write_api import Point
import s2cell

In [2]:
token = "MyInitialAdminToken0=="
# token = os.environ.get("INFLUXDB_TOKEN")
# export your token into the environment variable INFLUXDB_TOKEN first
url = "http://10.10.10.250:8086"
# replace url with your server address
# if your server is on the same machine, use "http://localhost:8086"

# set time 30s, the default is 10s may not be enough in this tutorial
timeout = 60 * 1000
client = influxdb_client.InfluxDBClient(url=url, token=token, timeout=timeout)

In [3]:
# In this tutoriakl we will investigate the data from Hartford Police Department
# This historical dataset reflects reported incidents of crime (with the execption of sexual assaults)
# that occurred in the City of Hartford from January 1, 2005 to May 18, 2021.

# download link
# - https://data.hartford.gov/datasets/hartfordgis::police-incidents-01012005-to-05182021/about
data = pd.read_csv("./Police_Incidents_01012005_to_05182021.csv", index_col=False)

In [4]:
data.head()

Unnamed: 0,X,Y,OBJECTID,Case_Number,Date,Time_24HR,Address,UCR_1_Category,UCR_1_Description,UCR_1_Code,UCR_2_Category,UCR_2_Description,UCR_2_Code,Neighborhood,PRIMARY_KEY
0,1019841.0,829628.619921,591636,21008791,2021/03/25 00:00:00+00,1317,32 WINSHIP ST,32* - PROPERTY DAMAGE ACCIDENT,PROP DAM ACC,3224,23* - DRIVING LAWS,IMPROPER BACKING,2340,SOUTHEND,CIRS-21008791-0
1,1013029.0,845714.939782,591637,21008798,2021/03/25 00:00:00+00,1438,410 HOMESTEAD AV,32* - PROPERTY DAMAGE ACCIDENT,PROP DAM ACC,3224,23* - DRIVING LAWS,FTGRW/DRIVEWAY,2336,UPPER ALBANY,CIRS-21008798-0
2,1010687.0,830429.727899,591638,21009417,2021/03/31 00:00:00+00,1738,65 WILLIAM SHORTY CAMPBELL ST,06* - LARCENY,LARC3 SHOPLIFT,624,,,0,BEHIND THE ROCKS,CIRS-21009417-0
3,1019481.0,840695.5699,591640,21008889,2021/03/26 00:00:00+00,30,111 ALLYN ST,05* - BURGLARY,BURG3-COM-NITE,522,,,0,DOWNTOWN,CIRS-21008889-0
4,1014458.0,849925.620009,591641,21009143,2021/03/29 00:00:00+00,0,320 BLUE HILLS AV,06* - LARCENY,LARC3-FROM M/V,634,,,0,BLUE HILLS,CIRS-21009143-0


In [5]:
# if your machine can not hold the data, you can use the following code to select data in a specific time range
data = data[(data["Date"] >= "2019-01-01")]
print(data.shape)

(50411, 15)


In [6]:
# preprocess the data heuristically
# we skip the data investigation step and directly give the conclusion and preprocess

# 1. lowercase all columns
data.columns = data.columns.str.lower()


# 2. X(longitutde), Y(altitue) -> combine by s2cell as location feature
S2_LEVEL = 10
combine_xy = lambda x: s2cell.lat_lon_to_token(x["x"], x["y"], S2_LEVEL)
data["location"] = data[data["x"].notna() & data["y"].notna()].apply(combine_xy, axis=1)



# 3. combine the Date and Time_24HR columns into a single datetime column
candidates = zip(pd.to_datetime(data["date"]).to_list(), data["time_24hr"].to_list())
datetime_values = []
for (dts, hms) in candidates:
    if hms < 60 :
        hr = 0
        m = hms
    else:
        hms_ = str(hms)
        hr, m = int(hms_[:-2]), int(hms_[-2:])
    datetime_values.append(datetime.fromtimestamp(dts.timestamp() + hr * 3600 + m * 60))
data["datetime"] = pd.to_datetime(datetime_values)


# 4. remove the duplicated columns OBJECTID, Case_Number
# these columns have high cardinality and are duplicated with PrimaryKey
data = data.drop(columns=["objectid", "case_number","date", "time_24hr"]) 


In [7]:
for column in data.columns:
    if data[column].isna().any():
        print(f"column {column} has NaN values")

column ucr_2_category has NaN values
column ucr_2_description has NaN values


In [8]:
data.head()

Unnamed: 0,x,y,address,ucr_1_category,ucr_1_description,ucr_1_code,ucr_2_category,ucr_2_description,ucr_2_code,neighborhood,primary_key,location,datetime
0,1019841.0,829628.619921,32 WINSHIP ST,32* - PROPERTY DAMAGE ACCIDENT,PROP DAM ACC,3224,23* - DRIVING LAWS,IMPROPER BACKING,2340,SOUTHEND,CIRS-21008791-0,72e771,2021-03-25 13:17:00
1,1013029.0,845714.939782,410 HOMESTEAD AV,32* - PROPERTY DAMAGE ACCIDENT,PROP DAM ACC,3224,23* - DRIVING LAWS,FTGRW/DRIVEWAY,2336,UPPER ALBANY,CIRS-21008798-0,245ecb,2021-03-25 14:38:00
2,1010687.0,830429.727899,65 WILLIAM SHORTY CAMPBELL ST,06* - LARCENY,LARC3 SHOPLIFT,624,,,0,BEHIND THE ROCKS,CIRS-21009417-0,3a786b,2021-03-31 17:38:00
3,1019481.0,840695.5699,111 ALLYN ST,05* - BURGLARY,BURG3-COM-NITE,522,,,0,DOWNTOWN,CIRS-21008889-0,2841ad,2021-03-26 00:30:00
4,1014458.0,849925.620009,320 BLUE HILLS AV,06* - LARCENY,LARC3-FROM M/V,634,,,0,BLUE HILLS,CIRS-21009143-0,01278f,2021-03-29 00:00:00


In [9]:
# After normalizations, we inserted these data into the influxDB
# InfluxDB data schema design like this:
# - Bucket: Hartford
# - Measurement: police_incidents
# - Tags: 
#   - location: S2 cell token
#   - neighborhood: neighborhood name
#   - address: address
#   - ucr_1_code
#   - ucr_2_code
#   - ucr_1_category
#   - ucr_2_category
#   - ucr_1_description
#   - ucr_2_description
# - Fields:
#   - primary_key
#   - lon
#   - lat
# - Time: datetime

# In order to compare the difference of field and tag in query performance.
# We create another bucket

In [10]:
BucketName = "Hartford"
DEFAULT_ORG = "docs"
bucket = client.buckets_api().find_bucket_by_name(bucket_name=BucketName)
if bucket:
    # bucket exist , reset it
    print("bucket exist, delete it")
    client.buckets_api().delete_bucket(bucket)

# create bucket
bucket = client.buckets_api().create_bucket(bucket_name=BucketName, org_id=DEFAULT_ORG)
if bucket:
    print(f"bucket {BucketName} created")

bucket exist, delete it
bucket Hartford created


In [11]:
# insert these data into InfluxDB
# [WARNING] before inserting through pd.Dataframe, we should guarantee there is no NaN value in the dataframe
# OtherWise, the client will raise an error
# @lingze: plz check above statement.
 
# write_api = client.write_api(write_options=SYNCHRONOUS)
# measurement = "police_incidents"
# start = time.time()
# batch_size = 2**15

# for i in range(0, len(data), batch_size):
#     data_batch = data.iloc[i:i+batch_size]
#     write_api.write(
#         bucket = BucketName,
#         org = DEFAULT_ORG,
#         record = data_batch,
#         data_frame_measurement_name = measurement,
#         data_frame_tag_columns = [
#             "location", 
#             "neighborhood", 
#             "address", 
#             "ucr_1_code", 
#             "ucr_2_code", 
#             "ucr_1_category",
#             "ucr_2_category", 
#             "ucr_1_description", 
#             "ucr_2_description"
#         ],
#         data_frame_field_columns = ["primary_key"],
#         data_frame_time_index = "datetime"
#     )
#     print(f"{i}/{len(data)} inserted")
# print("==> finished")

In [12]:
# first we divide the data into two parts, with NaN and w/o Nan
nan_mask = data.isna().any(axis = 1)
data_with_nan = data[nan_mask]
data_without_nan = data[~nan_mask]

In [13]:
write_api = client.write_api(write_options=SYNCHRONOUS)
measurement = "police_incidents"

In [14]:
# for data without Nan, we insert through DataFrame easily
start = time.time()
batch_size = 2**15
for i in range(0, len(data_without_nan), batch_size):
    batch_data = data_without_nan.iloc[i:i+batch_size]
    write_api.write(
            bucket = BucketName,
            org = DEFAULT_ORG,
            record = batch_data,
            data_frame_measurement_name = measurement,
            data_frame_tag_columns = [
                "location", 
                "neighborhood", 
                "address", 
                "ucr_1_code", 
                "ucr_2_code", 
                "ucr_1_category",
                "ucr_2_category", 
                "ucr_1_description", 
                "ucr_2_description"
            ],
            data_frame_field_columns = [
                "primary_key",
                "x",
                "y"
            ],
            data_frame_timestamp_column = "datetime"
        )
    print(f"{i}/{len(data_without_nan)} inserted")
end = time.time()
print(f"==> finished in {end - start} seconds")

0/25251 inserted
==> finished in 0.8032379150390625 seconds


In [15]:
# for data with Nan, we insert by constructing Point
nan_mask = data_with_nan.isna()
records = data_with_nan.to_dict(orient="records")
record_key_mask = data.isna().to_dict(orient="records")

tags_columns = [
    "location", 
    "neighborhood", 
    "address", 
    "ucr_1_code", 
    "ucr_2_code", 
    "ucr_1_category",
    "ucr_2_category", 
    "ucr_1_description", 
    "ucr_2_description"
]

fields_columns = [
    "primary_key",
    "x",
    "y"
]

time_column = "datetime"

In [16]:
points = []
batch_size = 2**15
start = time.time()
for idx,(record, key_mask) in enumerate(zip(records, record_key_mask)):
    point_dict = {
        "measurement": measurement
    }
    record_without_nan = {k: v for k, v in record.items() if not key_mask[k]}
    point_dict['tags']={k: v for k, v in record_without_nan.items() if k in tags_columns}
    point_dict['fields'] = {k: v for k, v in record_without_nan.items() if k in fields_columns}
    point_dict['time'] = record_without_nan[time_column]
    points.append(Point.from_dict(dictionary=point_dict))
    
    if (idx + 1) % batch_size == 0 or (idx + 1) == len(records):
        write_api.write(bucket=BucketName, org=DEFAULT_ORG, record=points)
        print(f"{idx}/{len(records)} inserted")
        points = []

end = time.time()
print(f"==> finished in {end - start} seconds")


25159/25160 inserted
==> finished in 1.334134578704834 seconds


In [17]:
query_api = client.query_api()

In [18]:
# First Query
# Question: Top-10 most common types of polices cases in all time
query = """
import "influxdata/influxdb/v1"
option v = {timeRangeStart:1970-01-01T00:00:00Z , timeRangeStop: now()}

from(bucket: "Hartford")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "police_incidents" and r["_field"] == "primary_key")
    |> group(columns: ["ucr_1_code", "ucr_1_category", "ucr_1_description"])
    |> count(column: "_value") // count the number of records per group
    |> group() // Ungroup to allow sorting across all groups
    |> sort(columns: ["_value"], desc: true) // Sort by count in descending order
    |> limit(n:10)
    |> keep(columns: ["ucr_1_code", "_value", "ucr_1_category", "ucr_1_description"]) // Specify columns to retain
"""
start = time.time()
tables = query_api.query(query=query, org=DEFAULT_ORG)
end = time.time()
print(f"==> query finished in {end - start} seconds")
tables.to_values()


==> query finished in 8.623098611831665 seconds


[dict_values(['_result', 0, 4508, '1901', '19* - CRIMES AGAINST THE PUBLIC', 'BREACH-PEACE             ']),
 dict_values(['_result', 0, 2459, '5104', '51* - MISC. MANAGEMENT INFO.', 'COMM TENSION;COMM-SERVICE']),
 dict_values(['_result', 0, 2038, '3503', '35* - MISC. CRIMES AGAINST PROPERTY', 'CR MISCHIEF 3            ']),
 dict_values(['_result', 0, 2026, '2090', '20* - RADIO SIGNAL', 'RADIO SIGNAL             ']),
 dict_values(['_result', 0, 1940, '3221', '32* - PROPERTY DAMAGE ACCIDENT', 'PROP DAM ACC             ']),
 dict_values(['_result', 0, 1886, '2903', '29* - FOUND PERSON/PROPERTY', 'ABANDONED M/V            ']),
 dict_values(['_result', 0, 1863, '5211', '52* - SHOTS FIRED', 'SHOTS FIRED - UNCONFIRMED']),
 dict_values(['_result', 0, 1813, '2331', '23* - DRIVING LAWS', 'PARKING VIOLATION        ']),
 dict_values(['_result', 0, 1681, '3224', '32* - PROPERTY DAMAGE ACCIDENT', 'PROP DAM ACC             ']),
 dict_values(['_result', 0, 1414, '801', '08* - SIMPLE ASSAULT', 'ASSAULT

In [19]:
# Second Query
# Question: Number of cases with code "1901" over time grouped by week
query = """
import "influxdata/influxdb/v1"
option v = {timeRangeStart:1970-01-01T00:00:00Z , timeRangeStop: now()}

from(bucket: "Hartford")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "police_incidents" and r["_field"] == "primary_key")
    |> filter(fn: (r) => r["ucr_1_code"] == "1901")
    |> truncateTimeColumn(unit: 1w) // Truncate time to week
    |> group(columns: ["_time"])
    |> count(column: "_value")
    |> group()
"""
start = time.time()
tables = query_api.query(query=query, org=DEFAULT_ORG)
end = time.time()
print(f"==> query finished in {end - start} seconds")
tables.to_values()[:5]
# better to execute above result in the InfluxDB UI
# it will visualize the result in timeline graph, more intuitive

==> query finished in 2.013685464859009 seconds


[dict_values(['_result', 0, datetime.datetime(2019, 12, 26, 0, 0, tzinfo=tzlocal()), 14]),
 dict_values(['_result', 0, datetime.datetime(2020, 1, 2, 0, 0, tzinfo=tzlocal()), 81]),
 dict_values(['_result', 0, datetime.datetime(2020, 1, 9, 0, 0, tzinfo=tzlocal()), 71]),
 dict_values(['_result', 0, datetime.datetime(2020, 1, 16, 0, 0, tzinfo=tzlocal()), 79]),
 dict_values(['_result', 0, datetime.datetime(2020, 1, 23, 0, 0, tzinfo=tzlocal()), 73])]

In [20]:
# Third Query

# Question: the latest incident (most recent time) for the top ten ucr_1_code from police_incidents measurement 

# we need to utilize the result of Query 1
query = """
import "influxdata/influxdb/v1"
option v = {timeRangeStart:1970-01-01T00:00:00Z , timeRangeStop: now()}
codeset = ["1901", "5104", "3224", "2903", "3221", "2090", "3503", "2331", "5211", "801"]
from(bucket: "Hartford")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "police_incidents")
    |> filter(fn: (r) => r["_field"] == "primary_key")
    |> filter(fn: (r) => contains(value: r.ucr_1_code, set:codeset) ) 
    |> group(columns: ["ucr_1_code"])  // Group by ucr_1_code
    |> sort(columns: ["_time"], desc: true)  // Sort by time, most recent first
    |> limit(n: 10)  // Limit to the top 10 most recent incidents
    |> last()  // Get the most recent incident for each ucr_1_code
    |> keep(columns: ["ucr_1_code", "_value", "ucr_1_category", "ucr_1_description", "_time", "address"])
    |> yield(name: "Latest Incident by ucr_1_code")
"""
start = time.time()
tables = query_api.query(query=query, org=DEFAULT_ORG)
end = time.time()
print(f"==> query finished in {end - start} seconds")
tables.to_values()[:5]

==> query finished in 10.817521333694458 seconds


[dict_values(['Latest Incident by ucr_1_code', 0, datetime.datetime(2021, 5, 17, 2, 31, tzinfo=tzlocal()), 'CIRS-21014480-0    ', '60 CAMPFIELD AV', '19* - CRIMES AGAINST THE PUBLIC', '1901', 'BREACH-PEACE             ']),
 dict_values(['Latest Incident by ucr_1_code', 1, datetime.datetime(2021, 5, 16, 17, 45, tzinfo=tzlocal()), 'CIRS-21014436-0    ', '24 MERRILL ST', '20* - RADIO SIGNAL', '2090', 'RADIO SIGNAL             ']),
 dict_values(['Latest Incident by ucr_1_code', 2, datetime.datetime(2021, 5, 14, 13, 33, tzinfo=tzlocal()), 'CIRS-21014211-0    ', '203 TRUMBULL ST', '23* - DRIVING LAWS', '2331', 'PARKING VIOLATION        ']),
 dict_values(['Latest Incident by ucr_1_code', 3, datetime.datetime(2021, 5, 12, 17, 50, tzinfo=tzlocal()), 'CIRS-21014004-0    ', '69 CURTISS ST', '29* - FOUND PERSON/PROPERTY', '2903', 'ABANDONED M/V            ']),
 dict_values(['Latest Incident by ucr_1_code', 4, datetime.datetime(2021, 5, 14, 20, 15, tzinfo=tzlocal()), 'CIRS-21014250-0    ', 'COGSWEL