In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import pandas as pd
import numpy as np
import logging

logging.basicConfig(level = logging.INFO) # Change this to logging.DEBUG pr INFO for more/less detailing debugging information.

# Load in the config files to create iot-ingester actions

In [3]:
from pathlib import Path
from obsproc.core.config_parser import parse_config
from dataclasses import asdict

config_file = Path("config/config.yaml")
config = parse_config(config_file)

# Construct the pipeline from the config
pipeline = {name : getattr(config, name) for name in config.pipeline}
print(f"Global Config:")
for k, v in asdict(config.global_config).items():
    if k == "canonical_variables": v = "..."
    print(f"    {k} : {v}")
    
for name, stage in pipeline.items():
    print(f"\nStage: {name.capitalize()}")
    for pipe in stage:
        print(f"    {str(pipe)}")

Global Config:
    canonical_variables : ...
    config_path : /Users/math/git/iot-ingester-deployment/dockerfiles/worker/iot-ingester/notebooks/sensor.community/config
    data_path : /Users/math/git/iot-ingester-deployment/dockerfiles/worker/iot-ingester/notebooks/sensor.community/data
    code_source : {'repo_status': 'Dirty', 'git_hash': '084fd04df6c5f76f22e2cc95a7e078c315a9bd68'}

Stage: Sources
    MultiFileSource(['**/*.csv'], source = 'sensor.community')

Stage: Other_processors
    CSVChunker([Match(state = 'big_file', source = 'sensor.community')])

Stage: Parsers
    CSVParser([Match(state = 'raw', source = 'sensor.community')])

Stage: Aggregators
    TimeAggregator([Match(state = 'parsed', source = 'sensor_community')], 1min, youngest)

Stage: Encoders
    CSVEncoder([Match(state = 'time_aggregated', source = 'sensor.community')])
    ODCEncoder([Match(state = 'time_aggregated', source = 'sensor.community')])


### Explanation of the above

Here we have a `MultiFileSource` which reads in the csv files from disk, a `CSVChunker` which splits them into manageable chunks, CSVParser which formats them, renames the columns and add column metadata, `TimeAggregator` which splits them into one hour chunks and finally `CSVEncoder` which outputs them again as a CSV file. There's also an ODB encoder.

Next we will take the first and only `source` and call `source.generate()` which will give a stream of messages, we extract the first using `next`.

In [4]:
source = pipeline["sources"][0]
incoming_message_stream = source.generate()
msg = next(incoming_message_stream)
msg

0,1
state,raw
source,sensor.community
filepath,/Users/math/git/iot-ingester-deployment/dockerfiles/worker/iot-ingester/notebooks/sensor.communit...


I've created a HTML representation of the messages so you can more quickly explore their content in a jupyternotebook. To see their normal string representation use `str(msg)` or `repr(msg)`

In [5]:
print(repr(msg))

FileMessage(metadata=MetaData(state='raw', source='sensor.community', observation_variable=None, time_slice=None, encoded_format=None, filepath=PosixPath('/Users/math/git/iot-ingester-deployment/dockerfiles/worker/iot-ingester/notebooks/sensor.community/data/inputs/sensor_community/2023-08/2023-08_htu21d.csv')), history=[])


# Follow a message all the way to the end

Each stage of the process can generally generate multiple messages at the output. The `TimeAggregator` is the only one currently that can accept a message and not immediatly give any output. 

To see what's going on here, in the next cell I fill create a file message manually, (you can put any CSV in here). Then in a loop we will pass that message to the next stage in the pipeline that it matches with and take only the first of what may be many output messages.

For the `TimeAggregator` we have to trigger it to give up its messages early by giving an input message followed by a `FinishMessage` that tells it we want the output right now. 

In [6]:
from obsproc.core.bases import FinishMessage, FileMessage, MetaData
from obsproc.aggregators import TimeAggregator

# Pull out the parser object that matches this message
message = FileMessage(
    metadata=MetaData(state='big_file',
        source='sensor_community',
        filepath=Path('data/inputs/sensor_community/2023-08/2023-08_sds011.csv'))) # May need to download this file manually

actions = [s for stage in list(pipeline.values())[1:] for s in stage]

message_history = []
while True:
    display(message)
    message_history.append(message)
    matching = [action for action in actions if action.matches(message)]
    
    if not matching: 
        print("No more matches, the message is fully processed!")
        break

    print("That messages matches with this/these action(s): \n\n", "\n".join(str(a) for a in matching))
    action = matching[0]
    print("\nHere's the first output message from passing the previous message to the first matching action: \n")

    # Special case for the TimeAggregator
    if action.__class__.__name__ == "TimeAggregator":
        list(action.process(message)) #need the list call here to pump the iterator to completion even if it doesn't return anything
        message = next(action.process(FinishMessage("We're done!")))
    else:
        message = next(action.process(message))
        

0,1
state,big_file
source,sensor_community
filepath,data/inputs/sensor_community/2023-08/2023-08_sds011.csv


That messages matches with this/these action(s): 

 CSVChunker([Match(state = 'big_file', source = 'sensor.community')])

Here's the first output message from passing the previous message to the first matching action: 



0,1
state,raw
source,sensor_community

0,1
name,FileMessage
metadata,"MetaData(source = sensor_community, variable = None)"

0,1
name,CSVChunker

sensor_id,sensor_type,location,lat,lon,timestamp,P1,durP1,ratioP1,P2,durP2,ratioP2
78798,SDS011,68146,51.578000,4.778000,2023-08-01T00:00:00,,,,,,
13649,SDS011,6898,50.918000,4.688000,2023-08-01T00:00:00,0.00,,,0.00,,
24225,SDS011,51462,51.422000,5.514000,2023-08-01T00:00:00,0.00,,,0.00,,
32599,SDS011,19277,51.778000,7.912000,2023-08-01T00:00:00,0.00,,,0.00,,
37227,SDS011,23123,43.125047,25.682165,2023-08-01T00:00:00,0.00,,,0.00,,
44648,SDS011,30354,51.130280,5.607592,2023-08-01T00:00:00,0.00,,,0.00,,
51606,SDS011,37691,53.264591,6.483177,2023-08-01T00:00:00,0.00,,,0.00,,
62884,SDS011,48971,47.539732,19.113040,2023-08-01T00:00:00,0.00,,,0.00,,
65226,SDS011,51805,49.012000,12.104000,2023-08-01T00:00:00,0.00,,,0.00,,
70729,SDS011,58315,53.336000,55.932000,2023-08-01T00:00:00,0.00,,,0.00,,


That messages matches with this/these action(s): 

 CSVParser([Match(state = 'raw', source = 'sensor.community')])

Here's the first output message from passing the previous message to the first matching action: 



0,1
state,parsed
source,sensor_community
observation_variable,P1

0,1,2,3
time,"datetime64[ns, UTC]",,The time that the observation was made.
station_id,object,,A unique identifer for a stationary sensor.
sensor_type,object,,
location,int,,
lat,float64,°,"The lattitude of the observation, referenced to WGS84 (EPSG: 4326)"
lon,float64,°,"The longitude of the observation, referenced to WGS84 (EPSG: 4326)"
P1,float64,,

0,1
name,FileMessage
metadata,"MetaData(source = sensor_community, variable = None)"

0,1
name,CSVChunker

0,1
name,TabularMessage
metadata,"MetaData(source = sensor_community, variable = None)"

0,1
name,CSVParser

time,station_id,sensor_type,location,lat,lon,P1
2023-08-01 00:00:00+00:00,78798,SDS011,68146,51.578000,4.778000,
2023-08-01 00:00:00+00:00,13649,SDS011,6898,50.918000,4.688000,0.00
2023-08-01 00:00:00+00:00,24225,SDS011,51462,51.422000,5.514000,0.00
2023-08-01 00:00:00+00:00,32599,SDS011,19277,51.778000,7.912000,0.00
2023-08-01 00:00:00+00:00,37227,SDS011,23123,43.125047,25.682165,0.00
2023-08-01 00:00:00+00:00,44648,SDS011,30354,51.130280,5.607592,0.00
2023-08-01 00:00:00+00:00,51606,SDS011,37691,53.264591,6.483177,0.00
2023-08-01 00:00:00+00:00,62884,SDS011,48971,47.539732,19.113040,0.00
2023-08-01 00:00:00+00:00,65226,SDS011,51805,49.012000,12.104000,0.00
2023-08-01 00:00:00+00:00,70729,SDS011,58315,53.336000,55.932000,0.00


That messages matches with this/these action(s): 

 TimeAggregator([Match(state = 'parsed', source = 'sensor_community')], 1min, youngest)

Here's the first output message from passing the previous message to the first matching action: 



0,1
state,time_aggregated
source,sensor_community
observation_variable,P1
time_slice,2023-08-01 00:00

0,1,2,3
time,"datetime64[ns, UTC]",,The time that the observation was made.
station_id,object,,A unique identifer for a stationary sensor.
sensor_type,object,,
location,int,,
lat,float64,°,"The lattitude of the observation, referenced to WGS84 (EPSG: 4326)"
lon,float64,°,"The longitude of the observation, referenced to WGS84 (EPSG: 4326)"
P1,float64,,

0,1
name,FileMessage
metadata,"MetaData(source = sensor_community, variable = None)"

0,1
name,CSVChunker

0,1
name,TabularMessage
metadata,"MetaData(source = sensor_community, variable = None)"

0,1
name,CSVParser

0,1
name,TabularMessage
metadata,"MetaData(source = sensor_community, variable = P1)"

0,1
name,TimeAggregator

time,station_id,sensor_type,location,lat,lon,P1
2023-08-01 00:00:00+00:00,78798,SDS011,68146,51.578000,4.778000,
2023-08-01 00:00:00+00:00,13649,SDS011,6898,50.918000,4.688000,0.00
2023-08-01 00:00:00+00:00,24225,SDS011,51462,51.422000,5.514000,0.00
2023-08-01 00:00:00+00:00,32599,SDS011,19277,51.778000,7.912000,0.00
2023-08-01 00:00:00+00:00,37227,SDS011,23123,43.125047,25.682165,0.00
2023-08-01 00:00:00+00:00,44648,SDS011,30354,51.130280,5.607592,0.00
2023-08-01 00:00:00+00:00,51606,SDS011,37691,53.264591,6.483177,0.00
2023-08-01 00:00:00+00:00,62884,SDS011,48971,47.539732,19.113040,0.00
2023-08-01 00:00:00+00:00,65226,SDS011,51805,49.012000,12.104000,0.00
2023-08-01 00:00:00+00:00,70729,SDS011,58315,53.336000,55.932000,0.00


That messages matches with this/these action(s): 

 CSVEncoder([Match(state = 'time_aggregated', source = 'sensor.community')])
ODCEncoder([Match(state = 'time_aggregated', source = 'sensor.community')])

Here's the first output message from passing the previous message to the first matching action: 



0,1
state,encoded
source,sensor_community
observation_variable,P1
time_slice,2023-08-01 00:00
encoded_format,csv
filepath,/Users/math/git/iot-ingester-deployment/dockerfiles/worker/iot-ingester/notebooks/sensor.communit...

0,1
name,FileMessage
metadata,"MetaData(source = sensor_community, variable = None)"

0,1
name,CSVChunker

0,1
name,TabularMessage
metadata,"MetaData(source = sensor_community, variable = None)"

0,1
name,CSVParser

0,1
name,TabularMessage
metadata,"MetaData(source = sensor_community, variable = P1)"

0,1
name,TimeAggregator

0,1
name,TabularMessage
metadata,"MetaData(source = sensor_community, variable = P1)"

0,1
name,CSVEncoder


No more matches, the message is fully processed!


A few things to note here:

- The input file is 13GB!
- I've started trying to keep some kind of record of the message history including the git hash of the code that performed each action.
- The `state` of the message changes through each step and extra metadata gets attached. I've tried to make this system very flexible.
- The time aggregator does not currently propagate column metadata through, working on a fix for that.
- I'm not sure what it would mean to keep track of history through the time aggregator but maybe there's a way to improve that.

For this demo I set the time window for the time aggregator to one minute:

In [7]:
data = message_history[-2].data
data.time.min(), data.time.max(), 

(Timestamp('2023-08-01 00:00:00+0000', tz='UTC'),
 Timestamp('2023-08-01 00:00:59+0000', tz='UTC'))

# ODC Encoding

In [8]:
odb_encoder = next(e for e in pipeline["encoders"] if e.__class__.__name__ == "ODCEncoder")
msg = next(odb_encoder.process(message_history[-2]))
msg

0,1
state,encoded
source,sensor_community
observation_variable,P1
time_slice,2023-08-01 00:00
encoded_format,odb
filepath,/Users/math/git/iot-ingester-deployment/dockerfiles/worker/iot-ingester/notebooks/sensor.communit...

0,1
name,FileMessage
metadata,"MetaData(source = sensor_community, variable = None)"

0,1
name,CSVChunker

0,1
name,TabularMessage
metadata,"MetaData(source = sensor_community, variable = None)"

0,1
name,CSVParser

0,1
name,TabularMessage
metadata,"MetaData(source = sensor_community, variable = P1)"

0,1
name,TimeAggregator

0,1
name,TabularMessage
metadata,"MetaData(source = sensor_community, variable = P1)"

0,1
name,ODCEncoder

0,1
encoded_by,obsproc
source,sensor_community
observation_variable,P1
timeslice,2023-08-01T00:00:00

Unnamed: 0,dtype,First Entry,Unique Entries,ODB codec
entryno@body,int64,1,1,constant
varno@body,int64,-1,1,constant
statid@hdr,object,++++xxxx,1,constant_string
obstype@hdr,int64,-1,1,constant
codetype@hdr,int64,-1,1,constant
source@hdr,object,sensor_c,1,constant_string
project@hdr,object,iCHANGE,1,constant_string
dataset@hdr,object,sensor_c,1,constant_string
groupid@hdr,int64,17,1,constant
reportype@hdr,int64,16022,1,constant

Unnamed: 0,entryno@body,varno@body,statid@hdr,obstype@hdr,codetype@hdr,source@hdr,project@hdr,dataset@hdr,groupid@hdr,reportype@hdr,class@desc,type@desc,stream@desc,expver@desc,levtype@desc,date@hdr,time@hdr,andate@desc,antime@desc,stalt@hdr,min@body,obsvalue@body,lon@hdr,stationid@hdr,lat@hdr
0,1,-1,++++xxxx,-1,-1,sensor_c,iCHANGE,sensor_c,17,16022,2,264,1247,xxxx,sfc,20230801,0,20230801,0,,0.000000,,4.778000,954ca366,51.577999
1,1,-1,++++xxxx,-1,-1,sensor_c,iCHANGE,sensor_c,17,16022,2,264,1247,xxxx,sfc,20230801,0,20230801,0,,0.000000,0.00,4.688000,84431d56,50.917999
2,1,-1,++++xxxx,-1,-1,sensor_c,iCHANGE,sensor_c,17,16022,2,264,1247,xxxx,sfc,20230801,0,20230801,0,,0.000000,0.00,5.514000,fc703914,51.422001
3,1,-1,++++xxxx,-1,-1,sensor_c,iCHANGE,sensor_c,17,16022,2,264,1247,xxxx,sfc,20230801,0,20230801,0,,0.000000,0.00,7.912000,5ff96127,51.778000
4,1,-1,++++xxxx,-1,-1,sensor_c,iCHANGE,sensor_c,17,16022,2,264,1247,xxxx,sfc,20230801,0,20230801,0,,0.000000,0.00,25.682165,1397542b,43.125046
5,1,-1,++++xxxx,-1,-1,sensor_c,iCHANGE,sensor_c,17,16022,2,264,1247,xxxx,sfc,20230801,0,20230801,0,,0.000000,0.00,5.607593,542ca9e8,51.130280
6,1,-1,++++xxxx,-1,-1,sensor_c,iCHANGE,sensor_c,17,16022,2,264,1247,xxxx,sfc,20230801,0,20230801,0,,0.000000,0.00,6.483177,8b110e04,53.264591
7,1,-1,++++xxxx,-1,-1,sensor_c,iCHANGE,sensor_c,17,16022,2,264,1247,xxxx,sfc,20230801,0,20230801,0,,0.000000,0.00,19.113041,9a359664,47.539730
8,1,-1,++++xxxx,-1,-1,sensor_c,iCHANGE,sensor_c,17,16022,2,264,1247,xxxx,sfc,20230801,0,20230801,0,,0.000000,0.00,12.104000,28e86ca9,49.012001
9,1,-1,++++xxxx,-1,-1,sensor_c,iCHANGE,sensor_c,17,16022,2,264,1247,xxxx,sfc,20230801,0,20230801,0,,0.000000,0.00,55.931999,cd57a41e,53.335999


Open the `ODB File Data` tab in the above and have a look at the odb properties.