**Note**: this script uses a slightly different method of assigning labels to data since the labels were collected differently than in the pilot study

In [1]:
import collections
import datetime
import geopandas
import glob
import pandas
import shapely
import tqdm.auto as tqdm
from matplotlib import pyplot

In [2]:
import lib

In [3]:
# map projections
WGS84 = {"init": "epsg:4326"}
WEB_MERCATOR = {"init": "epsg:3857"}
NAD83_MA_FEET = {"init": "epsg:2249"}

# basemap constants
ESRI_WORLD_IMAGERY = "https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}"
BASEMAP_ZOOM = 13

# time adjustments
PM_TIME_ADJUSTMENTS = collections.defaultdict(lambda: pandas.Timedelta(0), {
    "2019-10-27": - pandas.Timedelta("5 minutes") # there was a 5 minute drift in the PM sensor clock on this day
})
TZ_ADJUST_EASTERN_TIME = - pandas.Timedelta("5 hours") # GMT -5:00:00

### load the geometries that we have created earlier

data will be joined against these

In [4]:
gdf_stations = geopandas.read_file("./analysis/geometries/stations.geojson")
gdf_segments = geopandas.read_file("./analysis/geometries/segments.geojson")

In [5]:
# given a sampling date in yyyy-mm-dd format,
# 1) load the relevant timestamps and particulate readings (may have to merge multiple files)
# 2) perform necessary time adjustments:
#     * label timestamps are loaded as EST
#     * Dylos timestamps are loaded as GMT and must be converted to EST
# 3) return the relevant data
def read_data(sampling_date):
    subdir = "./data/%s" % sampling_date

    timestamps = pandas.read_csv("%s/timestamps.csv" % subdir)
    timestamps["TIME"] = pandas.to_datetime(timestamps["TIME"]) # tz-aware

    pm_readings = pandas.concat([
        pandas.read_csv(path)
        for path in glob.glob("%s/dylos*.csv" % subdir)
    ])
    pm_readings["TIME"] = pandas.to_datetime(pm_readings["TIME"], unit = "s")\
        + PM_TIME_ADJUSTMENTS[sampling_date] + TZ_ADJUST_EASTERN_TIME # not tz-aware: timezone must be applied
    
    return (timestamps, pm_readings)

In [6]:
all_timestamps = []
all_pm_readings = []

for data_dir in tqdm.tqdm(glob.glob("./data/2019*")):
    day = data_dir.split("/")[-1]
    try:
        (timestamps, pm_readings) = read_data(day)
        all_timestamps.append(timestamps)
        all_pm_readings.append(pm_readings)
    except Exception as error:
        print("ERROR: could not load %s" % data_dir)
        print(error)

timestamps = pandas.concat(all_timestamps)
pm_readings = pandas.concat(all_pm_readings)

# * SMALLPARTICLES = all particles >= 0.5 microns
# * LARGEPARTICLES = all particles >= 2.5 microns
# so, we can get a rough proxy of PM2.5 by subtracting LARGEPARTICLES from SMALLPARTICLES
pm_readings["SMALLPARTICLES"] = pm_readings["SMALLPARTICLES"] - pm_readings["LARGEPARTICLES"]

HBox(children=(FloatProgress(value=0.0, max=21.0), HTML(value='')))




# label data with track geometries

In [7]:
# TODO: if an arrival or departure is dropped, label the next thing correctly
# e.g. missed departure from station A: should not label A->B
# possible fix: check STATUS column (Arrived/Left/Passed)

# given a timestamps dataframe and pm_readings, group the data by station/segment
# data is first labeled by joining pm timestamps to label time ranges
# data is then joined to the geometries created earlier by using those labels
def label_data(timestamps, pm_readings):
    last_location = None
    last_timestamp = None

    pm_stations = []
    pm_segments = []

    for (index, row) in tqdm.tqdm(timestamps.iterrows(), total = timestamps.shape[0]):#timestamps.iterrows():
        this_location = row["LOCATION"]
        this_timestamp = row["TIME"]

        if ((row["TIME"] == None) or (row["LOCATION"] == "END")):
            last_location = None
            last_timestamp = None
            continue

        if (last_timestamp is not None):

            # pull out the relevant pm readings by subsetting on timestamps
            pm_subset = pm_readings[
                (pm_readings["TIME"] >= last_timestamp)
                & (pm_readings["TIME"] < this_timestamp)
            ].copy()
            pm_subset["DIRECTION"] = int(row["DIRECTION"])

            # this row is a station if the last two labels are the same
            if (last_location == this_location):
                station = gdf_stations[
                    gdf_stations["ROUTE"].apply(lambda route: row["ROUTE"] in route)
                    & (gdf_stations["STATION"] == this_location)
                ]

                # append to pm_stations array; this will be used to build a dataframe
                if ((station is None) or (len(station) == 0)):
                    print("could not match station to shapefile: %s" % this_location)
                else:
                    pm_subset["station_idx"] = station.index[0]
                    pm_stations.append(pm_subset)

            # otherwise this row is a track segment
            else:
                gdf_possible_segments = gdf_segments[
                    gdf_segments["ROUTE"].apply(lambda route: row["ROUTE"] in route)
                ]
                assert (gdf_possible_segments is not None)
                segment = gdf_possible_segments[
                    (gdf_possible_segments["START_STATION"] == last_location)
                    & (gdf_possible_segments["END_STATION"] == this_location)
                ]
                #if (len(segment) > 0):
                #    direction = 1
                #else:
                if len(segment) == 0:
                    segment = gdf_possible_segments[
                        (gdf_possible_segments["END_STATION"] == last_location)
                        & (gdf_possible_segments["START_STATION"] == this_location)
                    ]
                    #direction = -1

                # append to pm_segments array; this will be used to build a dataframe
                if ((segment is None) or (len(segment) == 0)):
                    print("could not match segment to shapefile: %s -> %s" % (last_location, this_location))
                else:
                    pm_subset["segment_idx"] = segment.index[0]
                    #pm_subset["direction"] = direction
                    pm_segments.append(pm_subset)

        last_location = this_location
        last_timestamp = this_timestamp

    gdf_pm_segments = pandas.merge(
        pandas.concat(pm_segments),
        gdf_segments,
        left_on = "segment_idx",
        right_index = True
    )
    gdf_pm_segments.crs = gdf_segments.crs
    gdf_pm_stations = pandas.merge(
        pandas.concat(pm_stations),
        gdf_stations,
        left_on = "station_idx",
        right_index = True
    )
    gdf_pm_stations.crs = gdf_stations.crs
    return (gdf_pm_segments, gdf_pm_stations)

In [8]:
(gdf_pm_segments, gdf_pm_stations) = label_data(timestamps, pm_readings)

HBox(children=(FloatProgress(value=0.0, max=2125.0), HTML(value='')))

could not match segment to shapefile: Pleasant Street -> Boston University Central
could not match segment to shapefile: Wellington -> Sullivan Square
could not match segment to shapefile: Wellington -> Sullivan Square



In [9]:
gdf_pm_stations.head(3)

Unnamed: 0,TIME,SMALLPARTICLES,LARGEPARTICLES,DIRECTION,station_idx,STATION,LINE,ROUTE,TERMINUS,geometry
4,2019-11-05 22:02:00,1903,109,0,48,Government Center,Blue,Blue,N,POINT (236291.822 901071.152)
41,2019-11-05 22:47:00,6395,255,1,48,Government Center,Blue,Blue,N,POINT (236291.822 901071.152)
82,2019-11-05 23:39:00,4309,74,1,48,Government Center,Blue,Blue,N,POINT (236291.822 901071.152)


In [10]:
def label_time_category(time):
    day = time.dayofweek
    hour = time.hour
    if (
        (day == 0) # monday
        and (hour >= 6) # after 6am
        and (hour <= 9) # before 9am
    ):
        return "rush_hour"
    elif (
        (
            (day in {6, 1, 2}) # sunday / tuesday / wednesday
            and (hour >= 12+10) # after 10pm
        )
        or ( # OR
            (day == 2) # wednesday
            and (hour >= 10) # after 10am
            and (hour <= 12+2) # before 2pm
        )
    ):
        return "off_peak"

In [11]:
gdf_pm_segments["load"] = gdf_pm_segments["TIME"].apply(label_time_category)
gdf_pm_stations["load"] = gdf_pm_stations["TIME"].apply(label_time_category)

In [12]:
gdf_pm_segments.head(3)

Unnamed: 0,TIME,SMALLPARTICLES,LARGEPARTICLES,DIRECTION,segment_idx,LINE,ROUTE,START_STATION,END_STATION,length,GRADE,geometry,load
5,2019-11-05 22:03:00,1373,31,0,4,Blue,Blue,Government Center,State,147.582994,7,"LINESTRING (236291.822 901071.152, 236298.589 ...",off_peak
40,2019-11-05 22:46:00,6432,176,1,4,Blue,Blue,Government Center,State,147.582994,7,"LINESTRING (236291.822 901071.152, 236298.589 ...",off_peak
44,2019-11-05 22:54:00,1052,24,0,4,Blue,Blue,Government Center,State,147.582994,7,"LINESTRING (236291.822 901071.152, 236298.589 ...",off_peak


# Aggregations and file exports

In [13]:
PM_AGGREGATES = {
    "SMALLPARTICLES": ["median", "mean", "max", "std", "count"],
    "LARGEPARTICLES": ["median", "mean", "max", "std", "count"]
}

In [14]:
lib.export_all_aggregates(gdf_pm_stations, gdf_pm_segments, PM_AGGREGATES, "particulates")
for load in ["rush_hour", "off_peak"]:
    lib.export_all_aggregates(
        gdf_pm_stations[gdf_pm_stations["load"] == load],
        gdf_pm_segments[gdf_pm_segments["load"] == load],
        PM_AGGREGATES,
        "particulates_{}".format(load)
    )

for (particle_size, extra_info) in [
    ("small", " (PM2.5 Proxy)"),
    ("large", "")
]:
    quantity = "{}PARTICLES".format(particle_size.upper())
    center = "{}.mean".format(quantity)
    variability = "{}.std".format(quantity)
    for (load, label_text) in [
        ("rush_hour", "rush hour"),
        ("off_peak", "late night")
    ]:
        stations = gdf_pm_stations[gdf_pm_stations["load"] == load]
        segments = gdf_pm_segments[gdf_pm_segments["load"] == load]
        quantity_name = "particulates_{}_{}".format(load, particle_size)
        lib.export_webapp_input(
            stations, segments,
            aggregates=PM_AGGREGATES,
            value_generators={
                "center": lambda row: row[center],
                "upper": lambda row: (row[center] + row[variability])
                    if pandas.notnull(row[variability])
                    else row[center],
                "lower": lambda row: (row[center] - row[variability])
                    if pandas.notnull(row[variability])
                    else row[center],
            },
            metadata={
                "axis": "{} Particles{} [count/ft³]"\
                    .format(particle_size.capitalize(), extra_info),
                "selection": "{} particles ({})"\
                    .format(particle_size, load.replace("_", " ")),
                "center_desc": "Mean",
                "variability_desc": "Mean ± 1SD",
                "colormap": "viridis"
            },
            quantity=quantity_name
        )
        lib.export_webapp_histograms(
            stations, segments, quantity, quantity_name, 30
        )

In [15]:
gdf_pm_segments.drop("geometry", axis=1).to_csv("output/segments_particulates_new_rawdata.csv", index=False)
gdf_pm_stations.drop("geometry", axis=1).to_csv("output/stations_particulates_new_rawdata", index=False)