## Truck Detection Indicator and COG calculation for ESA RACE Dashboard

For integrating analysis results into the Rapid Action on coronovirus and EO (RACE) Dashboard, some postprocessing steps are currently (July 2020) required:
1. Indicator calculation in AOI and provision in specific format
2. Point (GPKG) conversion to Cloud-Optimized-Geotiffs (COGs)

This script does both of the steps. 

**[1]** The indicators are calculated in administrative areas from the Database of Global Administrative Areas. You can determine for which countries you would like to calculate the indicators as well as the years used. Currently, the threshold for denoting a deviation from the baseline as High or Low is +5 and -5 % respectively. You may also determine the roads for which calculate the indicator. By default, trunks and primary roads are combined as trunks are only are small percentage of roads and comparable to primary roads.

**[2]** For the COG generation the points are aggregated in macropixels. Their observations-normalized counts are written to the macropixel covering a specific area. These rasters are written and converted to COGs.

In [1]:
import os, sys, subprocess, datetime
import numpy as np
import geopandas as gpd
import pandas as pd
import rasterio
from rasterio.merge import merge
from rasterio.mask import mask
from affine import Affine
from shapely.geometry import box
from glob import glob

def install_package(pkg):
    subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])
install_package("geocube")
install_package("obspy")
from geocube.api.core import make_geocube
from obspy.geodetics import kilometers2degrees

In [2]:
indicator_fname = "indicatorCode_HenrikFisser_20200721.csv"
years = [2017, 2018, 2019, 2020] # all included years
deviation_threshold = 5 # % deviation to be denoted as High or Low (+5 or -5 % from reference)
fully_covered_month = "05" # a month in the middle of the period
road_types = ["Motorway", "Trunk", "Primary"]
combine_primary_and_trunk = True # Trunk and Primary will be combined if True
create_level1_indicators = ["Germany", "France", "Spain", "Italy", "Austria", "United Kingdom", "Poland", 
                            "Greece", "Sweden", "Finland"] # countries for which also indicators on GADM level 1 shall be calculated
include = ['Åland','Andorra', 'Austria', 'Belgium', 'Bulgaria', 'Switzerland', 'Cyprus', 
           'Czech Republic', 'Germany', 'Denmark', 'Spain', 'Estonia', 'Finland', 'France', 
           'Faroe Islands', 'United Kingdom', "Isle of Man", 'Guernsey', 'Gibraltar', 'Greece', 'Croatia', 
           'Hungary', 'Ireland', 'Italy', 'Jersey', 'Liechtenstein', 'Lithuania', 'Luxembourg',
           'Latvia', 'Monaco', 'Malta', 'Netherlands', 'Poland', 'Portugal', 
           'Romania', 'San Marino', 'Slovakia', 'Slovenia', 'Sweden'] # countries to be included in general, must be denoted as in GADM0 NAME_0
color_codes = {"High":"GREEN", "Normal":"BLUE", "Low":"RED"}
merged = "trucks_points_merged"
indicator_template_name="N3_Master_20200703T10-23.csv"
ext = ".gpkg"
crs = "EPSG:4326"
main_dir = os.getcwd()
dir_not_commit = os.path.join(main_dir, "not_commit")
dirs = {"processing":os.path.join(main_dir, "processing"), merged:os.path.join(dir_not_commit, "processed", "overall", merged),
        "indicators":os.path.join(dir_not_commit, "indicators"), "COG":os.path.join(dir_not_commit, "processed", "overall", "COG")}
def get_fname(year, merged): return merged + "_" + str(year)
def get_fname_gadm(fname, ext): return fname + "_" + "gadm" + ext
def get_country_code(points_gadm): 
    code = points_gadm.GID_0.unique()[0][0:2]
    code = "EE" if points_gadm.NAME_0.unique()[0]=="Estonia" else code # otherwise duplicate with Spain ('ES')
    return code
d = dirs["processing"]
gadm0 = gpd.read_file(os.path.join(d, "gadm36_0.shp"))
gadm1 = gpd.read_file(os.path.join(d, "gadm36_1_subset.geojson"))
pop_places = gpd.read_file(os.path.join(d, "ne_10m_populated_places_simple.shp"))
capitals = pop_places[pop_places["featurecla"]=="Admin-0 capital"]
gadm0.crs = crs
gadm1.crs = crs
capitals.crs = crs

## Indicators

Join points with GADM information

In [None]:
for year in years:
    print(year)
    fname = get_fname_gadm(get_fname(year, merged), ".gpkg")
    fpath = os.path.join(dirs[merged], fname)
    if not os.path.exists(fpath):
        points = gpd.read_file(os.path.join(dirs[merged], fname))
        points.crs = crs
        points_gadm0 = gpd.sjoin(points, gadm0, "left", "within")
        points_gadm0 = points_gadm0.drop("index_right", 1)
        points_gadm0_gadm1 = gpd.sjoin(points_gadm0, gadm1, "left", "within")
        points_gadm0_gadm1 = points_gadm0_gadm1.drop("index_right", 1)
        points_gadm0_gadm1.to_file(fpath, driver="GPKG")
print("Done with GADM join")

Methods for indicator population and calculation

In [3]:
def populate_indicator(indicators, i, subset, description, year, month, region, aoi, sub_aoi):
    sum_mean_trucks = np.array(subset["truck_count_normalized"]).flatten().sum()
    indicators.at[i,"AOI"] = "'"+str(aoi)+"'"
    indicators.at[i,"Country"] = "'"+get_country_code(subset)+"'"
    indicators.at[i,"Region"] = "'"+region+"'" if region == "None" else "'"+region+" Region'"
    indicators.at[i,"City"] = "'None'"
    indicators.at[i,"Site Name"] = "'None'"
    indicators.at[i,"Description"] = "'"+description+"'"
    indicators.at[i,"Method"] = "'Inter-band parallax moving object detection on roads'"
    indicators.at[i,"EO Sensor"] = "'Sentinel 2'"
    indicators.at[i,"Input Data"] = "'Sentinel 2 L2A'"
    indicators.at[i,"Time"] = "'"+str(year)+"-"+month+"-"+"15'" # YYYY-MM-DD
    indicators.at[i,"Measurement Value"] = float(sum_mean_trucks)
    indicators.at[i,"Reference Description"] = "''" # update according to used year(s)
    indicators.at[i,"Rule"] = "'(counts_2020/median_reference)*100-100 < -5% outputs LOW [reference = median of reference years]'"
    indicators.loc[i,"Sub-AOI"] = "'"+sub_aoi+"'"
    indicators.at[i,"Y axis"] = "'Trucks per observation'"
    #indicators.at[i,"Indicator Name"] = ""
    indicators.at[i,"Data Provider"] = "'Henrik Fisser, University of Würzburg, Germany'"
    #indicators.at[i,"AOI_ID"] = ""
    indicators.at[i,"Update frequency"] = "'Monthly'"
    return indicators
    
def get_road_subset(subset, road, combine_primary_and_trunk):
    subset_road_type = subset[subset.osm_name==road]                      
    if road == "Primary" and combine_primary_and_trunk:
        subset_road_type = subset_road_type.append(subset[subset.osm_name=="Trunk"])
    return subset_road_type

def get_description(road):
    road = road if road == "Motorway" or road == "Trunk" else "Trunk/Primary"
    return "Regional " + road + " truck traffic"

def get_indicator_value(indicators, index, ref, threshold):
    m = indicators.at[index,"Measurement Value"]
    dev = np.float(m / ref * 100 - 100)
    if np.abs(dev) > threshold:
        return "Low" if dev < 0 else "High"
    else:
        return "Normal"

def calc_indicators(indicators, indicator_road, deviation_threshold, color_codes):
    time_values = list(indicator_road.Time)
    reference_values = {}
    indices = {}
    ref_years = ["2017", "2018", "2019"]
    for i, t in enumerate(time_values):
        y = t[1:5]
        indices[y] = indicator_road.index[i]
        if y in ref_years: # reference
            reference_values[y] = indicator_road.iloc[i]["Measurement Value"]
    ref_vals = []
    if "2017" in reference_values.keys(): ref_vals.append(reference_values["2017"])
    if "2018" in reference_values.keys(): ref_vals.append(reference_values["2018"])
    if "2019" in reference_values.keys(): ref_vals.append(reference_values["2019"])
    # Reference value
    ref_vals_copy = ref_vals.copy()
    ref_vals = np.array(ref_vals)
    ref_vals.sort()
    if len(ref_vals) > 2:
        ref_vals = [ref_vals[1]] # take median value
    ref_years = [list(reference_values.keys())[ref_vals_copy.index(val)] for val in ref_vals]
    years = list(reference_values.keys())
    y = "2020"
    years.append(y)
    for year in years:
        index = indices[year]
        if year in ref_years or year == "2020":
            indicators.at[index,"Reference value"] = "'"+str(list(ref_vals))+"'"
            indicators.at[index,"Reference Description"] = "'Detected trucks in same period: "+str(ref_years)+"'"
            # Indicator Value
            val = get_indicator_value(indicators, index, np.array(ref_vals).mean(), deviation_threshold)        
            indicators.at[index,"Indicator Value"] = "'"+val+"'"
            # Color code
            indicators.at[index,"Color code"] = "'"+color_codes[val]+"'"
    return indicators

def get_indicator_code(road):
    return "E12c" if road.lower() == "motorway" else "E12d"

Prepare indicators

In [4]:
indicators = pd.read_csv(os.path.join(dirs["indicators"], indicator_template_name))
for i in range(len(indicators)): indicators = indicators.drop(i)
if "Trunk" in road_types and "Primary" in road_types and combine_primary_and_trunk:
    road_types.remove("Trunk")
i = -1 # for populating rows
print("Countries")
for year in years:
    print(year)
    fname_gadm = get_fname_gadm(get_fname(year, merged), ext)
    points = gpd.read_file(os.path.join(dirs[merged], fname_gadm)) # read points with gadm information
    points.crs = crs
    points.rename(columns={"NAME_1_right":"NAME_1"}, inplace=True)
    gadm0_unique, gadm1_unique = list(points.NAME_0.unique()), list(points.NAME_1.unique())
    for country in gadm0_unique:
        # calc indicator stats for countries
        subset = points[points["NAME_0"]==country]
        region = "None"
        sub_aoi = gadm0[gadm0.NAME_0 == country].geometry.iloc[0].wkt    
        if len(subset) > 0:
            # get capital point from pop places
            country_name = subset.NAME_0.unique()[0]
            country_capital = capitals[capitals["adm0name"]==country_name].geometry
            # if not available use centroid
            if len(country_capital) == 0:
                country_capital = gadm0[gadm0.NAME_0 == country].geometry.iloc[0].centroid
            aoi = str(float(country_capital.y))+","+str(float(country_capital.x))
            # different road types
            for road in road_types:
                subset_road = get_road_subset(subset, road, combine_primary_and_trunk)
                if len(subset_road) > 0:
                    i += 1
                    description = get_description(road)
                    indicator_code = get_indicator_code(road)
                    indicators.at[i,"Indicator code"] = "'"+indicator_code+"'"
                    indicators = populate_indicator(indicators, i, subset_road, description, year, fully_covered_month, region, aoi, sub_aoi)
print("Regions")
name1 = "NAME_1_right"
for year in years:
    print(year)
    fname_gadm = get_fname_gadm(get_fname(year, merged), ext)
    points = gpd.read_file(os.path.join(dirs[merged], fname_gadm)) # read points with gadm information
    for region in gadm1_unique:
        gadm1_region = gadm1[gadm1.NAME_1 == region]
        if gadm1_region.NAME_0.iloc[0] in create_level1_indicators:      
            # calc indicator stats for regions
            subset = points[points[name1]==region]
            if len(subset) > 0:
                region = subset[name1].unique()[0]
                sub_aoi = gadm1_region.geometry
                center = sub_aoi.centroid # use the center of the region as aoi
                aoi = str(float(center.y))+","+str(float(center.x))
                # different road types
                for road in road_types:
                    subset_road = get_road_subset(subset, road, combine_primary_and_trunk)
                    if len(subset_road) > 0:
                        i += 1
                        description = get_description(road)
                        indicator_code = get_indicator_code(road)
                        indicators.at[i,"Indicator code"] = indicator_code
                        indicators = populate_indicator(indicators, i, subset_road, description, year, fully_covered_month, region, aoi, sub_aoi.iloc[0].wkt)
print("Done with indicator populating")
indicators_copy = indicators.copy()


Countries
2017
2018
2019
2020
Regions
2017
2018
2019
2020
Done with indicator populating


In [5]:
# "Reference value"
# "Color code"
# "Indicator Value"
indicators_copy.to_csv("tmp_indicators.csv", sep=",", index=False)
indicators = indicators_copy.copy()
print("Calculating indicator values")
for road in road_types:
    for country in indicators.Country.unique():
        country = "EE" if country == "Estonia" else country
        indicator_country = indicators[indicators.Country==country]
        indicator_country = indicator_country[indicator_country.Region == "'None'"]
        if len(indicator_country) > 0:
            indicator_road = indicator_country[indicator_country.Description=="'"+get_description(road)+"'"]
            if len(indicator_road) > 0:
                indicators = calc_indicators(indicators, indicator_road, deviation_threshold, color_codes)
    for region in gadm1_unique:
        gadm1_region = gadm1[gadm1.NAME_1 == region]
        if gadm1_region.NAME_0.iloc[0] in create_level1_indicators:        
            indicator_region = indicators[indicators.Region=="'"+region+" Region'"]
            if len(indicator_region) > 0:
                indicator_road = indicator_region[indicator_region.Description=="'"+get_description(road)+"'"]
                if len(indicator_road) > 0:
                    indicators = calc_indicators(indicators, indicator_road, deviation_threshold, color_codes)
indicators_complete = indicators[indicators["Indicator Value"].notna()]
print(indicators_complete[indicators_complete["Indicator Value"].isna()])
print("Done.. Writing")
code_motorway = get_indicator_code("motorway")
code_primary = get_indicator_code("primary")
fpath_motorway = os.path.join(dirs["indicators"], indicator_fname.replace("indicatorCode", code_motorway))
fpath_primary = os.path.join(dirs["indicators"], indicator_fname.replace("indicatorCode", code_primary))
code = "Indicator code"
print("Motorway")
indicators_motorway = indicators_complete[indicators_complete[code]=="'"+code_motorway+"'"]
indicators_motorway = indicators_motorway.sort_values("Country")
indicators_motorway.to_csv(fpath_motorway, sep=",", index=False)
print("Primary")
indicators_primary = indicators_complete[indicators_complete[code]=="'"+code_primary+"'"]
indicators_primary = indicators_primary.sort_values("Country")
indicators_primary.to_csv(fpath_primary, sep=",", index=False)
print("Done")

Calculating indicator values


  ret = ret.dtype.type(ret / rcount)


Empty DataFrame
Columns: [AOI, Country, Region, City, Site Name, Description, Method, EO Sensor, Input Data, Indicator code, Time, Measurement Value, Reference Description, Reference time, Reference value, Rule, Indicator Value, Sub-AOI, Y axis, Indicator Name, Color code, Data Provider, AOI_ID, Update frequency]
Index: []

[0 rows x 24 columns]
Done.. Writing
Motorway
Primary
Done


## Cloud-Optimized Geotiffs (COGs)

In [None]:
def create_truck_density(points_country, gadm0, spacing):
    # create a grid and extract the counts in each pixel (represents a box of spacing dimensions)
    country_geom = gadm0[gadm0.NAME_0==country].geometry
    country_extent = country_geom.bounds
    resolution = kilometers2degrees(spacing)
    lon_len = country_extent.maxx-country_extent.minx
    lat_len = country_extent.maxy-country_extent.miny
    shape = (int(lat_len / resolution), int(lon_len / resolution))
    arr = np.zeros(shape)
    half_res = resolution / 2
    first_lon = float(country_extent.minx + half_res)
    first_lat = float(country_extent.maxy - half_res)
    # working with pixel center
    lon = np.arange(start=first_lon, stop=float(country_extent.maxx - half_res), step=resolution)
    lat = np.flip(np.arange(start=float(country_extent.miny + half_res), stop=first_lat, step=resolution))
    # get or each pixel (box) the number of trucks
    for y in range(arr.shape[0]):
        for x in range(arr.shape[1]):
            lon_val = lon[x]
            lat_val = lat[y]
            pixel_box = box(lon_val-half_res, lat_val-half_res, lon_val+half_res, lat_val+half_res)
            pixel_box_gpd = gpd.GeoDataFrame({"geometry":[pixel_box], "index":1})
            
            # set to nan if outside country borders (e.g. ocean)
            country_polygon = country_geom.iloc[0]
            if not pixel_box_gpd.geometry.iloc[0].intersects(country_polygon):
                arr[y,x]= np.nan
                continue
            
            # join with country points
            points_country.crs = crs
            pixel_box_gpd.crs = crs
            points_pixel_box = gpd.sjoin(points_country, pixel_box_gpd, how="left", op="within")
            points_pixel_box = points_pixel_box[points_pixel_box.index_right==0.]
            if len(points_pixel_box) == 0:
                arr[y,x] = 0.
            else:
                count = np.array(points_pixel_box["mean_trucks"]).sum()
                arr[y,x] = count
    transform = Affine(resolution, 0, first_lon-half_res, 0, -resolution, first_lat+half_res)
    return arr.astype(np.float32), transform

In [None]:
grid_gadm = gpd.read_file(os.path.join(dirs["processing"], "processing_grid_gadm.geojson"))
spacing = 10 #km
if "Trunk" in road_types and "Primary" in road_types and combine_primary_and_trunk:
    road_types.remove("Trunk")
for road_type in road_types:
    indicator_code = get_indicator_code(road_type)
    print("----- Road type: %s" %(road_type))
    for year in years:
        print(year)
        points = gpd.read_file(os.path.join(dirs[merged], get_fname_gadm(get_fname(year, merged), ".gpkg")))
        points_road = points[points.osm_name==road_type]
        if road_type == "Primary" and combine_primary_and_trunk:
            points_trunk = points[points.osm_name=="Trunk"]
            points_road = pd.concat([points_road, points_trunk])
        for country in points.NAME_0.unique():
            print(country)
            points_country = points_road[points_road.NAME_0_right==country]
            file_str = indicator_code+"_"+country+"_"+str(year)+"_"+road_type+"_"
            fpath = os.path.join(dirs["COG"], file_str+".tiff")
            if os.path.exists(fpath):
                print("Already exists %s %s %s" %(country, str(year), road_type))
            else:
                if len(points_country) > 0:
                    try:
                        s1=datetime.datetime.now()
                        point_density, transform = create_truck_density(points_country, gadm0, spacing)
                        s2=datetime.datetime.now()-s1
                        print(s2)
                        print("Writing raster with shape: %s / %s" %(str(point_density.shape[0]), str(point_density.shape[1])))
                        meta = {"driver":"GTiff", 
                                "height":point_density.shape[0], "width":point_density.shape[1], 
                                "transform":transform, "nodata":"nan", "count":1, "dtype":point_density.dtype}
                        with rasterio.open(fpath, "w", **meta) as target:
                            target.write(np.array([point_density]))
                        print("Done")
                    except:
                        print("Creating point density raster failed %s %s %s" %(country, road_type, str(year)))