<a href="https://colab.research.google.com/github/gradoj/iot_poc/blob/main/iot_poc.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Helium Iot Poc Oracle data

This data comes from the iot oracle data which contains protobuf info split into messages as described https://docs.helium.com/oracles/oracle-data and saved as partitioned parquet files stored in an s3 bucket. 

The data has been broken up into send and received beacons partitioned across the variable `date` allowing processing of only the files required. To keep the files a manageable size the received beacons have also been partioned by hour. 
```
sent_beacons.parquet
|
├───date=2023-05-16
│       4a8554c5f1d849ceb177a27c91329a6f-0.parquet
│
├───date=2023-05-17
│       308fa6a011714a46bfc5d08d7f54c19f-0.parquet
│       4a8554c5f1d849ceb177a27c91329a6f-0.parquet


rx_beacons_hourly.parquet
|
├───date=2023-01-27
│   ├───hour=0
│   │       9edefc8181f740778aa2fcfa5a549bba-0.parquet
│   │
│   ├───hour=1
│   │       9edefc8181f740778aa2fcfa5a549bba-0.parquet
```




# Exploring the schema
Since the dataset it too large to fit in entirely in memory or maybe even on your local machine at all it will be accessed directly from the bucket. There are multiple options like pyarrow, dask, pandas, etc but duckdb will be the focus here https://duckdb.org/ as it support s3 buckets, geospatial and h3 extentions it appears a very good match for this dataset.

## Installation

Install the packages required and map google drive for convenience.

In [None]:
!pip install --quiet duckdb
!pip install --quiet duckdb-engine
!pip install --quiet pandas
!pip install --quiet matplotlib
!pip install --quiet colab-env --upgrade
!pip install --quiet s3fs

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.6/61.6 kB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m268.2/268.2 kB[0m [31m30.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m85.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.8/40.8 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for colab-env (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m72.7/72.7 kB[0m [31m10.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m160.1/160.1 kB[0m [31m22.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m79.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
# map to google drive to retrieve secret credentials or store output files like csv 
from google.colab import drive
drive.mount('/gdrive')#, force_remount=True)

import colab_env

Mounted at /gdrive
Mounted at /content/gdrive


## Make duckdb connection to bucket
Install httpfs and setup the variables needed in duckdb to access the bucket.

In [None]:
import os
import duckdb

db = duckdb.connect() # No need to pass a file name, we will use a VIEW

query = """
INSTALL httpfs;
LOAD httpfs;
SET s3_endpoint='{s3_endpoint}';
SET s3_access_key_id='{s3_access_key_id}';
SET s3_secret_access_key='{s3_secret_access_key}';
""".format(s3_endpoint = os.getenv("s3_endpoint"),s3_access_key_id=os.getenv("s3_access_key_id"),s3_secret_access_key=os.getenv("s3_secret_access_key"))

%time db.execute(query)

CPU times: user 210 ms, sys: 0 ns, total: 210 ms
Wall time: 218 ms


<duckdb.DuckDBPyConnection at 0x7f999ff8dc30>

## Query the Parquet files schema using DESCRIBE
Since the data is partition by date the best method is to open the dataset using wildcards as shown below. The `hive_partioning=1` tells duckdb to process the directories as a data column.

In [None]:
%time rxbeacon_count = db.execute("DESCRIBE SELECT * FROM read_parquet('s3://iotpoc/sent_beacons.parquet/**/*.parquet', hive_partitioning=1)").df()

#%time txbeacon_count = db.execute("SELECT COUNT(*),MIN(date) as firstTime,MAX(date) as lastTime \
#                                  FROM read_parquet('s3://iotpoc/sent_beacons.parquet/**/*.parquet', hive_partitioning=1) \
#                                  WHERE (date = '2023-02-23' )").df()
display(rxbeacon_count)




CPU times: user 216 ms, sys: 18.1 ms, total: 235 ms
Wall time: 1.58 s


Unnamed: 0,column_name,column_type,null,key,default,extra
0,pocId,VARCHAR,YES,,,
1,receivedTimestamp,TIMESTAMP,YES,,,
2,location,BIGINT,YES,,,
3,lat,FLOAT,YES,,,
4,lon,FLOAT,YES,,,
5,hexScale,BIGINT,YES,,,
6,rewardUnit,BIGINT,YES,,,
7,pubKey,VARCHAR,YES,,,
8,frequency,UBIGINT,YES,,,
9,datarate,VARCHAR,YES,,,


## SUMMARIZE

In [None]:
%time txbeacon_count = db.execute("SUMMARIZE SELECT * \
                                   FROM read_parquet('s3://iotpoc/sent_beacons.parquet/**/*.parquet', hive_partitioning=1) \
                                   WHERE (date = '2023-02-23' ) ").df()

display(txbeacon_count)


FloatProgress(value=0.0, layout=Layout(width='100%'), style=ProgressStyle(bar_color='black'))

CPU times: user 4.86 s, sys: 66.8 ms, total: 4.92 s
Wall time: 11.8 s


Unnamed: 0,column_name,column_type,min,max,approx_unique,avg,std,q25,q50,q75,count,null_percentage
0,pocId,VARCHAR,+++OJCLW/KYELlwYk1Rf66R9jAx97xPTQJ9DzE27M9A=,zzzf9fb+pgzF8hLk67RbUWLIblAPwLbel1weB29qBuk=,1227805,,,,,,1240487,0.0%
1,receivedTimestamp,TIMESTAMP,2023-02-23 07:00:00.023,2023-02-24 06:59:59.961,1210587,,,,,,1240487,0.0%
2,location,BIGINT,630524015154703359,634351374012531199,401633,6.313013912606953e+17,475286752107816.5,6.310493235446947e+17,6.3121206770825e+17,6.312791947505699e+17,1240487,0.0%
3,lat,FLOAT,-46.434513,70.377785,377147,39.93165830466295,14.440351631856393,35.723223173891824,41.62162023631424,48.777144602977806,1240487,0.0%
4,lon,FLOAT,-170.66478,178.03728,400833,-36.94049656593446,63.93491282166989,-89.46784605653907,-44.39574252817715,11.76724910123056,1240487,0.0%
5,hexScale,BIGINT,3,10000,7659,6894.1837141380765,3114.068375596404,4323.0,7491.0,10000.0,1240487,0.0%
6,rewardUnit,BIGINT,0,18926,15,14363.843339752855,6603.208683568792,10772.0,18658.0,18926.0,1240487,0.0%
7,pubKey,VARCHAR,AA++5L+F0qRox1uyhm8zZ+dv4tTeEenB9QUZlT1Onafw,APzzv8fWOcK8JHGzWd+xoZtXc1lLRUtJMJR5Nun/BL9e,398252,,,,,,1240487,0.0%
8,frequency,UBIGINT,486300000,924600000,67,886854397.7405648,31268003.392004333,867899976.7547817,903900000.0,904700000.0,1240487,0.0%
9,datarate,VARCHAR,SF12BW125,SF9BW125,3,,,,,,1240487,0.0%


In [None]:
%time rxbeacon_count = db.execute("SUMMARIZE SELECT * \
                                   FROM read_parquet('s3://iotpoc/rx_beacons_hourly.parquet/**/*.parquet', hive_partitioning=1) \
                                   WHERE (date = '2023-02-23' ) \
                                   AND (hour = 0) ").df()

display(rxbeacon_count)

FloatProgress(value=0.0, layout=Layout(width='100%'), style=ProgressStyle(bar_color='black'))

CPU times: user 16.2 s, sys: 225 ms, total: 16.4 s
Wall time: 30.5 s


Unnamed: 0,column_name,column_type,min,max,approx_unique,avg,std,q25,q50,q75,count,null_percentage
0,pocId,VARCHAR,+++j3tc7O43KI41Ov1YzMHBTtyOYYGVBPqWzbjgnSVY=,zzy5o8guv39l/+n7/YfWCeWhV7RCktoNs2kcON7joOY=,53463,,,,,,4425958,0.0%
1,receivedTimestamp,TIMESTAMP,2023-02-23 07:00:00.5,2023-02-23 08:09:39.651,2357688,,,,,,4425958,0.0%
2,location,BIGINT,0,634351374005476863,320145,5.585859741487421e+17,2.0143279229619296e+17,6.310339576567077e+17,6.310599102796897e+17,6.312781964552997e+17,4425958,0.0%
3,lat,FLOAT,-45.853218,70.06249,304777,37.60852354202554,16.276953873819217,35.8468484808033,41.4652450563109,48.04991452025459,4425958,0.0%
4,lon,FLOAT,-159.38126,178.02933,316570,-12.343681253039708,53.04109005481873,-9.185695100794067,4.862861558840258,24.61303885958575,4425958,0.0%
5,hexScale,BIGINT,0,10000,5611,5575.792431830578,3685.1687561513263,2104.0,5881.0,9704.0,4425958,0.0%
6,rewardUnit,BIGINT,0,10000,12,362.2360176034205,1150.4087740456664,0.0,0.0,0.0,4425958,0.0%
7,pubKey,VARCHAR,AA++3jev7QqHuANS7uAXMX/TEAA4gTf3nNYvWDWZoak3,AeOyyBYc9wJh667S9OkUdqKUFKU3EqV6NziO4dUyUakm,326226,,,,,,4425958,0.0%
8,frequency,UBIGINT,486300000,975099968,117,877245888.1674485,17395988.385984655,867500032.0,868099964.0704175,896424356.8965634,4425958,0.0%
9,datarate,VARCHAR,SF12BW125,SF9BW125,3,,,,,,4425958,0.0%


# Accessing the data

In [24]:
%time rxbeacon_count = db.execute("SELECT * \
                                   FROM read_parquet('s3://iotpoc/rx_beacons_hourly.parquet/**/*.parquet', hive_partitioning=1) \
                                   WHERE (date = '2023-03-23' ) \
                                   AND (hour = 0) \
                                   LIMIT 2").df()

display(rxbeacon_count)

CPU times: user 96.2 ms, sys: 16 ms, total: 112 ms
Wall time: 5.85 s


Unnamed: 0,pocId,receivedTimestamp,location,lat,lon,hexScale,rewardUnit,pubKey,frequency,datarate,...,timestamp,tmst,elevation,gain,status,invalidReason,participantSide,selected,date,hour
0,5WREMalstBKW+cU0qTeeKdrWdCpW5koX9q8zNj+Fl0Q=,2023-03-23 06:00:03.793,630950130547433983,53.164478,-2.197567,10000,2918,AIXXZXnm7A+1L2wLBwpZyRgqL8OZRUingMhZFMKOnwR2,867700032,SF12BW125,...,1679551203467954308,2013472588,0,0,valid,reason_none,side_none,1,2023-03-23,0
1,5WREMalstBKW+cU0qTeeKdrWdCpW5koX9q8zNj+Fl0Q=,2023-03-23 06:00:03.807,630950126756440063,53.073185,-2.19101,10000,2918,APBIdLGvO4JtKO30K5DdRrzQPePvjYXpiParqILJHQbo,867700032,SF12BW125,...,1679551203454019885,1651927641,0,0,valid,reason_none,side_none,1,2023-03-23,0
