# Protobag Demo

This notebook shows:
  * [How to write and read Protobuf messsages to a `protobag` zip archive](#write_read)
  * [How to convert protobag zip archives to and from Pandas Dataframes and Parquet tables](#dataframes)
  
If you only have 30 seconds, hit "run all cells" and try scrolling to these sections:
 * [Writing Plain Protobuf Messages](#write_plain)
 * [Reading Plain Protobuf Messages](#read_plain)
 * [Converting a Protobag to a Pandas Dataframe](#dataframes) (longer, but hopefully easy to skim)

Protobag archives are just zip archives that contain serialized Protobuf messages as files. We hope this notebook shows how `protobag` may prove more useful than working with Protobuf + Zip archives directly.

## Environment Set-up

To run this notebook locally, try using the protobag-demo dockerized environment:

`docker run -it --rm --net=host protobag:0.0.3 jupyter notebook --allow-root`
TODO: make this image available on DockerHub

**Google Colab** You can also [run this notebook in Google Colab](https://colab.sandbox.google.com/github/StandardCyborg/protobag/blob/master/examples/protobag-parquet/protobag-demo-full.ipynb) TODO FIX LINK. In the Colab environment, you'll need to install `protobag` and some other dependencies. Running the cell below will take care of that for you. You might need to restart the runtime (Use the menu option: Runtime > Restart runtime ...) in order for Colab to recognize the new modules.

In [1]:
import os
import sys
if os.path.exists('/opt/protobag'):
    print("We're running in the dockerized environment! We can simply add protobag to the PYTHONPATH")
    if not os.path.exists('/opt/protobag/python/protobag/protobag_native.cpython-36m-x86_64-linux-gnu.so'):
        # We need to build the `protobag_native` module.  The easiest way is to:
        !cd /opt/protobag/python && python3 setup.py test
    sys.path.append('/opt/protobag/python/')
    print("Protobag added to PYTHONPATH")
elif 'google.colab' in sys.modules:
    try:
      import protobag
    except:
      print("Installing protobag")
      !pip3 install protobag-0.0.3-cp36-cp36m-linux_x86_64.whl
    print("Installing libarchive")
    !ls /usr/local/lib/libarchive.so.17 || (cd /tmp && \
      wget https://github.com/libarchive/libarchive/archive/v3.4.2.tar.gz && \
      tar xfz v3.4.2.tar.gz && \
      (rm -rf /opt/libarchive || true) && \
      mv libarchive-3.4.2 /opt/libarchive && \
      cd /opt/libarchive && \
      mkdir -p build && cd build && \
      cmake .. && \
      make -j `nproc` && \
      make install)
    print("Installing libc++")
    !apt-get install -y libc++-8-dev libc++abi-8-dev || echo "ignoring 'libmkldnn.so.0 is not a symbolic link' issue"

import protobag
print("Using protobag version %s at %s" % (protobag.__version__, protobag.__file__))

We're running in the dockerized environment! We can simply add protobag to the PYTHONPATH
Protobag added to PYTHONPATH
Using protobag version 0.0.3 at /opt/protobag/python/protobag/__init__.py


## Our Protobuf Messages

Suppose we've developed a cool game called Dino Hunters where people run around on a deserted island and try to capture wild dinosaurs.  We're using Protobuf to persist data about the `DinoHunter` characters in our game.  Furthermore, we want to use Protobuf to log the 2D `Position`s of our hunters as the run around and hunt dinos.  Our Protobuf message schema is as follows:

```protobuf
syntax = "proto3";

package my_messages;

message DinoHunter {
  string first_name = 1;
  int32 id = 2;
  map<string, string> attribs = 3;

  enum DinoType {
    IDK = 0;
    VEGGIESAURUS = 1;
    MEATIESAURUS = 2;
    PEOPLEEATINGSAURUS = 3;
  }

  message Dino {
    string name = 1;
    DinoType type = 2;
  }

  repeated Dino dinos = 4;
}

message Position {
  float x = 1;
  float y = 2;
}
```

We now need the `protoc`-generated Python code in order to use these messages.  For convenience, we'll just download a copy posted in the `protobag` repository:

In [2]:
if not os.path.exists('MyMessages_pb2.py'):
    !wget http://fixme/this/path

from MyMessages_pb2 import DinoHunter
from MyMessages_pb2 import Position

## Note: you can prove to yourself that the downloaded file matches the schema above using the code below:
# import MyMessages_pb2
# from google.protobuf.descriptor_pb2 import FileDescriptorProto
# fd = FileDescriptorProto()
# MyMessages_pb2.DESCRIPTOR.CopyToProto(fd)
# print(fd)

OK! Let's create some dino hunters:

In [3]:
max_hunter = DinoHunter(
      first_name='py_max',
      id=1,
      dinos=[
        {'name': 'py_nibbles', 'type': DinoHunter.PEOPLEEATINGSAURUS},
      ])
print(max_hunter)

lara_hunter = DinoHunter(
      first_name='py_lara',
      id=2,
      dinos=[
        {'name': 'py_bites', 'type': DinoHunter.PEOPLEEATINGSAURUS},
        {'name': 'py_stinky', 'type': DinoHunter.VEGGIESAURUS},
      ])

print(lara_hunter)

first_name: "py_max"
id: 1
dinos {
  name: "py_nibbles"
  type: PEOPLEEATINGSAURUS
}

first_name: "py_lara"
id: 2
dinos {
  name: "py_bites"
  type: PEOPLEEATINGSAURUS
}
dinos {
  name: "py_stinky"
  type: VEGGIESAURUS
}



<a id='write_read'></a>
## Writing and Reading Protobuf messages to a `protobag` archive
<a id='write_plain'></a>
### Plain Messages (`protobag.MessageEntry`)

`protobag` archives are just zip archives that contain serialized Protobuf messages as files. (Tar and other formats are also supported via [libarchive](https://github.com/libarchive/libarchive)).  So `protobag` offers a simple API for **writing** messages to an archive:

In [4]:
bag = protobag.Protobag(path='example.zip')
writer = bag.create_writer()
writer.write_msg("hunters/py_max", max_hunter)
writer.write_msg("hunters/py_lara", lara_hunter)
writer.close()

You can verify that the above just wrote a zip archive for you:

In [5]:
!which unzip > /dev/null || (apt-get update && apt-get install unzip)
!unzip -l example.zip

Archive:  example.zip
  Length      Date    Time    Name
---------  ---------- -----   ----
       72  1980-01-01 00:00   hunters/py_max
       86  1980-01-01 00:00   hunters/py_lara
     8691  1980-01-01 00:00   /_protobag_index/bag_index/1595460996.0.stampedmsg.protobin
---------                     -------
     8849                     3 files


Hmm, what's that `/_protobag_index/bag_index/xxxxx.stampedmsg.protobin` file?

By default, `protobag` not only saves those messages but also **indexes Protobuf message descriptors** so that your `protobag` readers don't need your proto schemas to decode your messages.  (You can also disable this indexing if you wish.  For further discussion, see [the root README.md](todo fixme link now at README.md#protobag-indexes-protobuf-message-descriptors)

<a id='read_plain'></a>
To **read** specific messages from a `protobag` archive, you can use this simple API:

In [6]:
bag = protobag.Protobag(
        path='example.zip',
        
        # Tell protobag to use our protoc-generated python code:
        msg_classes=(DinoHunter, Position))
entry = bag.get_entry("hunters/py_max")
print(entry)

MessageEntry:
  entryname: hunters/py_max
  type_url: type.googleapis.com/my_messages.DinoHunter
  has serdes: True
  has descriptor_data: False
  msg:
first_name: "py_max"
id: 1
dinos {
  name: "py_nibbles"
  type: PEOPLEEATINGSAURUS
}



### Time-Series Data (`protobag.StampedEntry`)
`protobag` features a **topic-timestamp-message** API for recording time-series data.  This API is modeled after [`rosbag`](http://wiki.ros.org/rosbag), [LCM log files](https://lcm-proj.github.io/log_file_format.html) (where topics are called "channels"), and your favorite message bus systems like Kafka or AWS SQS.  Each **topic** contains Protobuf messages of a single type, and each message has a nanosecond-precision timestamp (using the `google.protobuf.Timestamp` object, which has built-in conversion to other timestamp datastructures like Python's `datetime`).  

Protobag has special handling for these timestamped entries:
  * For writing:
      * Topics organized into archive "folders" and filenames are chosen automatically.
      * Protobag indexes Protobuf Message Descriptors as described above.
      * Protobag indexes the message timestamps for efficient time-ordered playback.
  * For reading:
     * Protobag offers a simple [Selection](fixme link to c%2B%2B/protobag/protobag_msg/ProtobagMsg.proto) API for reading specific sets of topics, time ranges, or even just individual events.
     * Protobag offers a [`TimeSync`](fixme link to c%2B%2B/protobag/protobag/Utils/TimeSync.hpp) (in C++ FIXME add python API) for synchronizing topics that have messages recorded at different rates.  
  

Using our Dino Hunters example, we'll log (**write**) the 2D positions of a dino and a hunter during a chase scene:

In [7]:
bag = protobag.Protobag(path='example.zip')
writer = bag.create_writer()
for t in range(5):
    lara_pos = Position(x=t, y=t+1)
    writer.write_stamped_msg("positions/lara", lara_pos, t_sec=t)

    trex_pos = Position(x=t+2, y=t+3)
    writer.write_stamped_msg("positions/trex", trex_pos, t_sec=t)
writer.close()
!unzip -l example.zip

Archive:  example.zip
  Length      Date    Time    Name
---------  ---------- -----   ----
      100  1980-01-01 00:00   positions/lara/0.0.stampedmsg.protobin
      105  1980-01-01 00:00   positions/trex/0.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/lara/1.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/trex/1.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/lara/2.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/trex/2.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/lara/3.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/trex/3.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/lara/4.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/trex/4.0.stampedmsg.protobin
    10623  1980-01-01 00:00   /_protobag_index/bag_index/1595460997.0.stampedmsg.protobin
---------                     -------
    11684                     11 files


We can now **read** them using the `protobag.SelectionBuilder` helper tool:

In [8]:
bag = protobag.Protobag(
        path='example.zip',
        
        # Tell protobag to use our protoc-generated python code:
        msg_classes=(DinoHunter, Position))

print("Read just the positions of trex:")
sel_trex = protobag.SelectionBuilder.select_window(topics=["positions/trex"])
for entry in bag.iter_entries(selection=sel_trex):
    print("Time: %s Position: %s %s" % (entry.timestamp.ToDatetime(), entry.msg.x, entry.msg.y))
print()
print()
    
print("Read *all* timeseries data:")
sel_all_time_series = protobag.SelectionBuilder.select_window()
for entry in bag.iter_entries(selection=sel_all_time_series):
    print("Topic: %s Time: %s Position: %s %s" % (entry.topic, entry.timestamp.ToDatetime(), entry.msg.x, entry.msg.y))

Read just the positions of trex:
Time: 1970-01-01 00:00:00 Position: 2.0 3.0
Time: 1970-01-01 00:00:01 Position: 3.0 4.0
Time: 1970-01-01 00:00:02 Position: 4.0 5.0
Time: 1970-01-01 00:00:03 Position: 5.0 6.0
Time: 1970-01-01 00:00:04 Position: 6.0 7.0


Read *all* timeseries data:
Topic: positions/lara Time: 1970-01-01 00:00:00 Position: 0.0 1.0
Topic: positions/trex Time: 1970-01-01 00:00:00 Position: 2.0 3.0
Topic: positions/lara Time: 1970-01-01 00:00:01 Position: 1.0 2.0
Topic: positions/trex Time: 1970-01-01 00:00:01 Position: 3.0 4.0
Topic: positions/lara Time: 1970-01-01 00:00:02 Position: 2.0 3.0
Topic: positions/trex Time: 1970-01-01 00:00:02 Position: 4.0 5.0
Topic: positions/lara Time: 1970-01-01 00:00:03 Position: 3.0 4.0
Topic: positions/trex Time: 1970-01-01 00:00:03 Position: 5.0 6.0
Topic: positions/lara Time: 1970-01-01 00:00:04 Position: 4.0 5.0
Topic: positions/trex Time: 1970-01-01 00:00:04 Position: 6.0 7.0


### Raw Data (`protobag.RawEntry`)
We can also just put raw data like text files or images into our protobag archive because it's just a zip file.  The C++ `protobag` API even includes an [`ArchiveUtil.hpp` module](fixme) TODO FIXME LINK that has helper functions for common archive operations like zipping a directory, unarchiving a tar file, etc.  (In python, one would probably use the excellent `zipfile` or `tarfile` libraries, but those don't exist in C++).  The raw write API makes `protobag` skip all indexing and type-tracking activity.

To **write** data, just use the `write_raw` API:

In [9]:
bag = protobag.Protobag(path='example.zip')
writer = bag.create_writer()

s = b"i am a raw string"
writer.write_raw("raw_data", s)

writer.close()

!unzip -l example.zip
!echo "Reading the raw data using unzip:"
!unzip -p example.zip raw_data

Archive:  example.zip
  Length      Date    Time    Name
---------  ---------- -----   ----
       19  1980-01-01 00:00   raw_data
      129  1980-01-01 00:00   /_protobag_index/bag_index/1595460998.0.stampedmsg.protobin
---------                     -------
      148                     2 files
Reading the raw data using unzip:
i am a raw string

To **read** raw data, you can use the same API we used for regular messages (but `protobag` will do no deserialization):

In [10]:
bag = protobag.Protobag(path='example.zip')
entry = bag.get_entry("raw_data")
print(entry)

RawEntry:
  entryname: raw_data
  raw_bytes: i am a raw string ... (17 bytes)


<a id='dataframes'></a>
## Converting `protobag` archives to DataFrames

For various data mining tasks, dealing with files and archives can be cumbersome.  Suppose we wanted to:
 * Load our `protobag` data into a `DataFrame` (a database table)
 * Compute some quick statistics of our data using python code
 * Query our `protobag` contents using SQL

In each of these above cases, we might think of each entry of our `protobag` archive as a row in a table.  (You might also skip some entries, or want a single entry to map to several rows).  Protobag provides a `DictRowEntry` utility to help aid this usage.

Protobuf 3.x introduced mature `json` support, and in python, `google.protobuf.json_format` also provides a means to convert Protobuf messages to and from *python dicts*.  `DictRowEntry` leverages this feature to help you translate between `protobag` archive entries and table rows using whatever database or DataFrame tool you have at hand.


### From `protobag` to `pandas` and back
In this section, we'll show how to use `DictRowEntry` to create a `pandas.DataFrame` from our `protobag` archive.  We can also convert from `DataFrame` to `protobag` if desired.

First we need to have `pandas` available, as well as an example `protobag` archive:


In [11]:
!pip3 install pandas
import pandas as pd



In [12]:
bag = protobag.Protobag(path='pandas_example.zip')
writer = bag.create_writer()

# Include some protobuf messages in the example
writer.write_msg("hunters/py_max", max_hunter)
writer.write_msg("hunters/py_lara", lara_hunter)

# Include some time series data in the example
for t in range(5):
    writer.write_stamped_msg("positions/lara", Position(x=t, y=t+1), t_sec=t)
    writer.write_stamped_msg("positions/trex", Position(x=t+2, y=t+3), t_sec=t)

# Include some raw data
s = b"i am a raw string"
writer.write_raw("raw_data", s)

writer.close()

!unzip -l pandas_example.zip

Archive:  pandas_example.zip
  Length      Date    Time    Name
---------  ---------- -----   ----
       72  1980-01-01 00:00   hunters/py_max
       86  1980-01-01 00:00   hunters/py_lara
      100  1980-01-01 00:00   positions/lara/0.0.stampedmsg.protobin
      105  1980-01-01 00:00   positions/trex/0.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/lara/1.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/trex/1.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/lara/2.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/trex/2.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/lara/3.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/trex/3.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/lara/4.0.stampedmsg.protobin
      107  1980-01-01 00:00   positions/trex/4.0.stampedmsg.protobin
       19  1980-01-01 00:00   raw_data
    10675  1980-01-01 00:00   /_protobag_index/bag_index/15954610

Now we'll read back that `protobag` data and load it into a `pandas.DataFrame`.  First, let's take a close look at what `DictRowEntry` actually provides for a single entry:

In [13]:
bag = protobag.Protobag(path='pandas_example.zip', msg_classes=(DinoHunter, Position))

entry = bag.get_entry("hunters/py_lara")
row = protobag.DictRowEntry.from_entry(entry)
print(row)

protobag.DictRowEntry:
  entryname: hunters/py_lara
  topic:  timestamp 
  type_url: type.googleapis.com/my_messages.DinoHunter
  has serdes: True
  descriptor_data: (protobuf message) google.protobuf.FileDescriptorSet file {
  name: "MyMe (422 bytes)
  msg_dict:
 {'dinos': [{'name': 'py_bites', 'type': 'PEOPLEEATINGSAURUS'},
           {'name': 'py_stinky', 'type': 'VEGGIESAURUS'}],
 'firstName': 'py_lara',
 'id': 2}


Now let's create the `DataFrame`.  For simplicity, we just create one `DataFrame` table row per archive file:

In [14]:
# Collect "rows" here, which will be just python dicts.  Pandas is happy to convert dicts to rows.
rows = []
for entry in bag.iter_entries():
    # ignore the index
    if '_protobag_index' in entry.entryname:
      continue

    rows.append(protobag.DictRowEntry.from_entry(entry))

print("Read %s rows" % len(rows))

df = pd.DataFrame([
# Convert to pyarrow-friendly types
    dict(
      entryname=row.entryname,
      type_url=row.type_url,
      msg_dict=row.msg_dict,
      topic=row.topic,
      timestamp=
        row.timestamp.ToDatetime() if row.timestamp else None,
      descriptor_data=
        row.descriptor_data.SerializeToString() if row.descriptor_data else None,
    )
    for row in rows
])
df.info()
df

Read 13 rows
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 13 entries, 0 to 12
Data columns (total 6 columns):
 #   Column           Non-Null Count  Dtype         
---  ------           --------------  -----         
 0   entryname        13 non-null     object        
 1   type_url         13 non-null     object        
 2   msg_dict         13 non-null     object        
 3   topic            13 non-null     object        
 4   timestamp        10 non-null     datetime64[ns]
 5   descriptor_data  12 non-null     object        
dtypes: datetime64[ns](1), object(5)
memory usage: 752.0+ bytes


Unnamed: 0,entryname,type_url,msg_dict,topic,timestamp,descriptor_data
0,hunters/py_max,type.googleapis.com/my_messages.DinoHunter,"{'firstName': 'py_max', 'id': 1, 'dinos': [{'n...",,NaT,b'\n\xa3\x03\n\x10MyMessages.proto\x12\x0bmy_m...
1,hunters/py_lara,type.googleapis.com/my_messages.DinoHunter,"{'firstName': 'py_lara', 'id': 2, 'dinos': [{'...",,NaT,b'\n\xa3\x03\n\x10MyMessages.proto\x12\x0bmy_m...
2,positions/lara/0.0.stampedmsg.protobin,type.googleapis.com/my_messages.Position,{'y': 1.0},positions/lara,1970-01-01 00:00:00,b'\n\xa3\x03\n\x10MyMessages.proto\x12\x0bmy_m...
3,positions/trex/0.0.stampedmsg.protobin,type.googleapis.com/my_messages.Position,"{'x': 2.0, 'y': 3.0}",positions/trex,1970-01-01 00:00:00,b'\n\xa3\x03\n\x10MyMessages.proto\x12\x0bmy_m...
4,positions/lara/1.0.stampedmsg.protobin,type.googleapis.com/my_messages.Position,"{'x': 1.0, 'y': 2.0}",positions/lara,1970-01-01 00:00:01,b'\n\xa3\x03\n\x10MyMessages.proto\x12\x0bmy_m...
5,positions/trex/1.0.stampedmsg.protobin,type.googleapis.com/my_messages.Position,"{'x': 3.0, 'y': 4.0}",positions/trex,1970-01-01 00:00:01,b'\n\xa3\x03\n\x10MyMessages.proto\x12\x0bmy_m...
6,positions/lara/2.0.stampedmsg.protobin,type.googleapis.com/my_messages.Position,"{'x': 2.0, 'y': 3.0}",positions/lara,1970-01-01 00:00:02,b'\n\xa3\x03\n\x10MyMessages.proto\x12\x0bmy_m...
7,positions/trex/2.0.stampedmsg.protobin,type.googleapis.com/my_messages.Position,"{'x': 4.0, 'y': 5.0}",positions/trex,1970-01-01 00:00:02,b'\n\xa3\x03\n\x10MyMessages.proto\x12\x0bmy_m...
8,positions/lara/3.0.stampedmsg.protobin,type.googleapis.com/my_messages.Position,"{'x': 3.0, 'y': 4.0}",positions/lara,1970-01-01 00:00:03,b'\n\xa3\x03\n\x10MyMessages.proto\x12\x0bmy_m...
9,positions/trex/3.0.stampedmsg.protobin,type.googleapis.com/my_messages.Position,"{'x': 5.0, 'y': 6.0}",positions/trex,1970-01-01 00:00:03,b'\n\xa3\x03\n\x10MyMessages.proto\x12\x0bmy_m...


Now suppose we want to convert back to `protobag` format.  We need a few key things from our table:
 * A `msg_dict` message dictionary that we want to convert back to a Protobuf message.
 * Some `descriptor_data`, which Protobuf's underlying `json_format.ParseDict()` needs in order to convert `msg_dict` into a protobuf message.
 * Some other context like `entryname` or a `topic`-`timestamp` that defines where to put the message in the `protobag` archive.
 
Note that `msg_dict` and `descriptor_data` are only required if you want a lossless / bijective transform between your Protobuf messages and table row data.  You may very well have a more complex mapping between your Protobuf and table data; `protobag` simply provides tooling to support as much of a basic bijective mapping as possible.
 
Our table above has all the required data, so conversion is simple:

In [15]:
# Get row dicts from pandas
rows = df.to_dict(orient='records')

# We'll write to this archive:
bag = protobag.Protobag(path='pandas_to_protobag_example.zip')
writer = bag.create_writer()

# Loop through rows, convering them back to `protobag` Entries using `DictRowEntry`
for row in rows:
    dict_row_entry = protobag.DictRowEntry(**row)
    entry = dict_row_entry.to_entry()
    writer.write_entry(entry)
writer.close()

!unzip -l pandas_to_protobag_example.zip


Archive:  pandas_to_protobag_example.zip
  Length      Date    Time    Name
---------  ---------- -----   ----
       72  1980-01-01 00:00   hunters/py_max
       86  1980-01-01 00:00   hunters/py_lara
       49  1980-01-01 00:00   positions/lara/0.0.stampedmsg.protobin
       54  1980-01-01 00:00   positions/trex/0.0.stampedmsg.protobin
       54  1980-01-01 00:00   positions/lara/1.0.stampedmsg.protobin
       54  1980-01-01 00:00   positions/trex/1.0.stampedmsg.protobin
       54  1980-01-01 00:00   positions/lara/2.0.stampedmsg.protobin
       54  1980-01-01 00:00   positions/trex/2.0.stampedmsg.protobin
       54  1980-01-01 00:00   positions/lara/3.0.stampedmsg.protobin
       54  1980-01-01 00:00   positions/trex/3.0.stampedmsg.protobin
       54  1980-01-01 00:00   positions/lara/4.0.stampedmsg.protobin
       54  1980-01-01 00:00   positions/trex/4.0.stampedmsg.protobin
       19  1980-01-01 00:00   raw_data
     9141  1980-01-01 00:00   /_protobag_index/bag_in