In [1]:
from frictionless import transform, steps, Schema, Resource, extract, Package
from pprint import pprint
import json
import urllib

In [None]:
abkuerzung = "VZS_LUXG"

sensors = transform(
    "descriptors/standorte-automatische-fuss-und-velozaehlungen.resource.yaml",
    steps = [
        steps.row_filter(formula="bis is None"),
        steps.field_remove(names=["bis"]),
        steps.row_filter(formula=f"abkuerzung == '{abkuerzung}'"),
        steps.field_update(name="bezeichnung", new_name="sensor_name"),
        steps.field_update(name="abkuerzung", new_name="sensor_ref"),
        steps.field_update(name="fk_zaehler", new_name="sensor_id"),
    ]
)

data = sensors.to_inline(dialect=dict(keyed=True))[0]
data

In [None]:
counts_resource = Resource("descriptors/traffic-counts.resource.yaml")
counts_resource.dialect["filters"] = {"FK_ZAEHLER": data['sensor_id']}

res = transform(
    counts_resource,
    steps = [
        steps.table_normalize(),
        steps.table_write(path="data/bike-counts.csv")
    ]
)

res.to_petl()

In [None]:
from datetime import datetime, date, timedelta
from statistics import mean

today = date.today()
yesterday = today - timedelta(days=1)
thisyear = datetime.strftime(today, "%Y")

counts_resource = Resource("descriptors/traffic-counts.resource.yaml")
counts_resource.dialect["filters"] = {"FK_ZAEHLER": data['sensor_id']}

daily_sum = transform(
    counts_resource,
    steps = [
        steps.field_remove(names=["FK_ZAEHLER","FK_STANDORT","FUSS_IN","FUSS_OUT","OST","NORD"]),
        steps.field_add(name = "count", formula = "int(VELO_IN) + int(VELO_OUT)"),
        steps.field_add(
            name = "yearday",
            type = "integer",
            function=lambda x: datetime.strftime(datetime.strptime(x["DATUM"], "%Y-%m-%dT%H:%M"), "%j")
        ),
        steps.table_normalize(),
        steps.table_aggregate(
            group_name = "yearday",
            aggregation = {"day_sum": ("count", sum)}
        ),
        steps.row_sort(field_names = ["yearday"], reverse = True),
        steps.field_add(name = "year", type = "integer", value = int(thisyear)),
        steps.table_normalize(),
        steps.table_write(path=f"data/bike-{data['sensor_id']}-agg.csv"),
    ]
)

daily_sum.to_petl().display(limit=10)

In [None]:
from slugify import slugify
import locale
import math
from datetime import datetime, date, timedelta
from statistics import mean

locale.setlocale(locale.LC_ALL, 'en_US')

today = date.today()
yesterday = today - timedelta(days=1)
thisyear = datetime.strftime(today, "%Y")

counts_resource = Resource("descriptors/traffic-counts.resource.yaml", name="data")
counts_resource.dialect["filters"] = {"FK_ZAEHLER": data['sensor_id']}

sensor_slug = slugify(data['sensor_id'])

def format_value(value):
    value = math.ceil(float(value))
    if value >= 1000:
        return f"{value:n}".replace(",","'")
    else:
        return value

gauges = transform(
    Package(
        name = f"bike-{sensor_slug}",
        resources = [
            counts_resource
        ]
    ),
    steps = [
        steps.resource_transform(
            name = "data",
            steps = [
                steps.field_remove(names=["FK_ZAEHLER","FK_STANDORT","FUSS_IN","FUSS_OUT","OST","NORD"]),
                steps.field_add(name = "count", formula = "int(VELO_IN) + int(VELO_OUT)"),
                steps.field_add(
                    name = "yearday",
                    type = "integer",
                    function=lambda x: datetime.strftime(datetime.strptime(x["DATUM"], "%Y-%m-%dT%H:%M"), "%j")
                ),
                steps.table_normalize(),
                steps.table_aggregate(
                    group_name = "yearday",
                    aggregation = {"day_sum": ("count", sum)}
                ),
                steps.row_sort(field_names = ["yearday"], reverse = True),
                steps.field_add(name = "year", type = "integer", value = int(thisyear)),
                steps.table_normalize(),
                steps.table_write(path=f"data/bike-{data['sensor_id']}-agg.csv"),
            ]
        ),

        steps.resource_transform(
            name="data",
            steps=[
                steps.row_slice(head=1),
                steps.field_remove(names=["yearday","year"]),
                steps.field_add(name="label", type="string", value="Gestern"),
                steps.field_add(name="unit", type="string", value="Velos"),
                steps.field_update(name="day_sum", new_name="value", function=format_value),
                steps.table_normalize(),
            ]
        ),

        steps.resource_add(
            name = "dailymeanyear",
            path = f"data/bike-{data['sensor_id']}-agg.csv",
            schema = dict(
                fields = [
                    dict(name="yearday", type="integer"),
                    dict(name="day_sum", type="integer"),
                    dict(name="year", type="integer"),
                ]
            )
        ),
        steps.resource_transform(
            name="dailymeanyear",
            steps=[
                steps.table_normalize(),
                steps.table_aggregate(group_name="year", aggregation = {"day_mean": ("day_sum", mean)}),
                steps.field_remove(names=["year"]),
                steps.field_add(name="label", type="string", value="Tagesdurchschnitt seit Jahresbeginn"),
                steps.field_add(name="unit", type="string", value="Velos"),
                steps.field_update(name="day_mean", new_name="value", function=format_value),
                steps.table_normalize(),
            ]
        ),

        steps.resource_add(
            name = "sumyear",
            path = f"data/bike-{data['sensor_id']}-agg.csv",
            schema = dict(
                fields = [
                    dict(name="yearday", type="integer"),
                    dict(name="day_sum", type="integer"),
                    dict(name="year", type="integer"),
                ]
            )
        ),
        steps.resource_transform(
            name="sumyear",
            steps=[
                steps.table_normalize(),
                steps.table_aggregate(group_name="year", aggregation = {"day_mean": ("day_sum", sum)}),
                steps.field_remove(names=["year"]),
                steps.field_add(name="label", type="string", value="Seit Jahresbeginn"),
                steps.field_add(name="unit", type="string", value="Velos"),
                steps.field_update(name="day_mean", new_name="value", function=format_value),
                steps.table_normalize(),
            ]
        ),

        steps.resource_add(
            name = "rollingmean",
            path = f"data/bike-{data['sensor_id']}-agg.csv",
            schema = dict(
                fields = [
                    dict(name="yearday", type="integer"),
                    dict(name="day_sum", type="integer"),
                    dict(name="year", type="integer"),
                ]
            )
        ),
        steps.resource_transform(
            name="rollingmean",
            steps=[
                steps.table_normalize(),
                steps.row_slice(head=7),
                steps.table_aggregate(group_name="year", aggregation = {"day_mean": ("day_sum", mean)}),
                steps.field_remove(names=["year"]),
                steps.field_add(name="label", type="string", value="Tagesdurchschnitt letzte 7 Tage"),
                steps.field_add(name="unit", type="string", value="Velos"),
                steps.field_update(name="day_mean", new_name="value", function=format_value),
                steps.table_normalize(),
            ]
        ),
        
        steps.resource_transform(
            name="data",
            steps=[
                steps.table_merge(resource="dailymeanyear"),
                steps.table_merge(resource="sumyear"),
                steps.table_merge(resource="rollingmean"),
            ]
        ),
        steps.resource_remove(name="dailymeanyear"),
        steps.resource_remove(name="sumyear"),
        steps.resource_remove(name="rollingmean"),
    ]
)

gauges.get_resource("data").to_petl()

In [None]:
gauges

In [7]:
sensor = "Zch_Schimmelstrasse"

# load_params = {'resource_id': '4466ec4a-b215-4134-8973-2f360e53c33d',
#                    "sort": 'Datum desc',
#                    'fields': ', '.join(fields),
#                    "limit": 9,
#                    "filters": json.dumps(filters),
#                    'include_total': False
#                    }

# path = f"https://data.stadt-zuerich.ch/api/3/action/datastore_search?{urllib.parse.urlencode(load_params)}"

def label_mapping(field_value):
    mapping = dict(
        NO2 = "Stickstoffdioxid (NO2)",
        O3 = "Ozon (O3)",
        PM10 = "Schwebestaub (PM10)",
    )
    return mapping[field_value]

airquality_resource = Resource(
    path="https://data.stadt-zuerich.ch",
    name="data",
    format="ckan",
    dialect=dict(
        dataset="ugz_luftschadstoffmessung_stundenwerte",
        resource="ugz_ogd_air_h1_2021.csv",
        filters={"Standort": sensor},
        limit=9,
        sort="Datum desc",
    )
)

res = transform(
    airquality_resource,
    steps = [
        steps.table_normalize(),
        # steps.field_remove(names=["Standort","Intervall","Status"]),
        steps.table_normalize(),
        steps.row_filter(formula="Parameter == 'NO2' or Parameter == 'PM10' or Parameter == 'O3'"),
        steps.row_slice(head=3),
        steps.field_update(name="Parameter", function=label_mapping)
    ]
)

res.write(path="data/air.csv")
res.to_inline(dialect=dict(keyed=True))

[{'Datum': '2021-09-21T15:00+0100',
  'Standort': 'Zch_Schimmelstrasse',
  'Parameter': 'Schwebestaub (PM10)',
  'Intervall': 'h1',
  'Einheit': 'µg/m3',
  'Wert': '9.76',
  'Status': 'provisorisch'},
 {'Datum': '2021-09-21T15:00+0100',
  'Standort': 'Zch_Schimmelstrasse',
  'Parameter': 'Stickstoffdioxid (NO2)',
  'Intervall': 'h1',
  'Einheit': 'µg/m3',
  'Wert': '26.34',
  'Status': 'provisorisch'},
 {'Datum': '2021-09-21T15:00+0100',
  'Standort': 'Zch_Schimmelstrasse',
  'Parameter': 'Ozon (O3)',
  'Intervall': 'h1',
  'Einheit': 'µg/m3',
  'Wert': '81.6',
  'Status': 'provisorisch'}]

In [None]:
from frictionless import Pipeline
from jinja2 import Environment, FileSystemLoader
import yaml

In [None]:
data = {
    "sensor_name": "Schimmelstrasse",
    "sensor_id": "Zch_Schimmelstrasse",
}

env = Environment(loader=FileSystemLoader('descriptors'))
template = env.get_template("air.pipeline.yaml")

output = template.render(params = data)

target = Pipeline(yaml.safe_load(output)).run()

pkg = target.task.target

# inline the data for serialization
pkg.get_resource("data").data = pkg.get_resource("data").to_inline(dialect=dict(keyed=True))

pkg