# Indexing MDFs with Pony ORM.
## MySQL

SQLlite is a good tool for development, but it does not scale well.

In addition to SQLLite PonyORM supports PostgreSQL, MySQL, and Oracle.

This example picks up from the SQLLite example but adds MySQL support.

Using a MySQL (or PostreSQL/Oracle) backend will allow for distributed processing on any node that can access the s3 buckets.

In [1]:
import fsspec
from asammdf import MDF

import fsspec
import os
s3_cfg = {
    "key": "mdf_minio_access_key",
    "secret": "mdf_minio_secret_key",
    "client_kwargs": {
        "endpoint_url": "http://minio:9000",
    },
}
fs = fsspec.filesystem("s3", **s3_cfg)
buckets = fs.ls("")
print(f"Buckets: {fs.ls('')}")

Buckets: ['mdfbucket-0', 'mdfbucket-1', 'mdfbucket-2', 'mdfbucket-3', 'mdfbucket-4', 'mdfbucket-5', 'mdfbucket-6', 'mdfbucket-7', 'mdfbucket-8', 'mdfbucket-9', 'test']


# Walk Through All Files:

Walk through all S3 files and find the first one.

In [2]:
import os
mdf_paths=list()
for bucket in fs.ls(""):
    for root, dirs, files in fs.walk(bucket):
        for file in files:
            if file.lower().endswith(".mf4") or file.lower().endswith(".mdf"):
                mdf_paths.append(os.path.join(root, file))
                

mdf_size = sum([fs.info(mdf_path)["size"] for mdf_path in mdf_paths])
print(f"Indexing {mdf_size/1024**3:.2f} GB in {len(mdf_paths)} MDF files")

Indexing 2.14 GB in 975 MDF files


Display a random MDF path.

In [7]:
import random
random.choice(mdf_paths)

'mdfbucket-5/DäsCarGmbh/DumpTruck/27c181e6-74eb-4fd7-8476-0d751bc9b5fe.mf4'

In [8]:
import os

import pony.orm
from pony.orm.core import EntityMeta
from datetime import datetime

pony.orm.set_sql_debug(False)

db = pony.orm.Database()

db.bind(
    provider='mysql',
    host='mysql',
    user='mdf_indexer_user',
    passwd='mdf_indexer_pass',
    db='mdf_database',
)

In [9]:
try:
    db.drop_all_tables(with_all_data=True)
except pony.orm.ERDiagramError:
    pass

In [None]:
import asammdf
# For Local Indexing.
class MDF(db.Entity):
    """MDF ORM Entity Fancy"""
    # Filesystem Bits.
    key = pony.orm.Required(str, unique=True,)
    last_modified = pony.orm.Optional(datetime, volatile=True)
    etag = pony.orm.Optional(str,)
    size = pony.orm.Optional(int,)
    size_mb = pony.orm.Optional(float,)
    storage_class = pony.orm.Optional(str,)
    type = pony.orm.Optional(str,)
    name = pony.orm.Optional(str,)
    
    # Pre-calculated bits.
    basename = pony.orm.Optional(str,)
    product = pony.orm.Optional(str,)
    company = pony.orm.Optional(str,)

    # ASAM MDF Bits.
    version = pony.orm.Optional(str,)
    channels = pony.orm.Set("Channel",)
    
    
    # Basename.
    basename = pony.orm.Optional(str,)
    channels = pony.orm.Set("Channel",)
    
    # Metadata
    product = pony.orm.Optional(str,)
    company = pony.orm.Optional(str,)
    
    def __repr__(self):
        return f"MDF<{self.id},{self.product},{self.company},Ch:{len(self.channels)}>"

class Channel(db.Entity):
    """Channel entity to represent a 
    
    """
    name = pony.orm.Required(str, unique=True,)
    mdfs = pony.orm.Set("MDF",)
    
    def __repr__(self):
        return f"Channel<{self.id},{self.name}>"

def upsert(cls, get, set=None):
    """
    Interacting with Pony entities.

    :param cls: The actual entity class
    :param get: Identify the object (e.g. row) with this dictionary
    :param set: Additional fields to set if ```get``` returns nothing.
    :return:
    """
    # does the object exist
    assert isinstance(cls, EntityMeta), f"{cls} is not a database entity"

    # if no set dictionary has been specified
    set = set or {}
    db.flush()
    if not cls.exists(**get):
        # make new object
        return cls(**set, **get)
    else:
        # get the existing object
        obj = cls.get(**get)
        for key, value in set.items():
            obj.__setattr__(key, value)
        return obj
    

db.generate_mapping(create_tables=True)

In [37]:
def index_mdf(mdf_path):
    """ Index the mdf file itself. """
    info = fs.info(mdf_path)
    # Local File
    MDF_ = upsert(
    cls=MDF,
    get={"key": info["Key"]},
    set={
        "last_modified": info["LastModified"],
        "etag": info["ETag"],
        "size": info["size"],
        "size_mb": info["size"] / 1024 ** 2,
        "storage_class": info["StorageClass"],
        "type": info["type"],
        "name": info["name"],
        "basename": os.path.basename(info["name"])
        },
    )
    try:
        db.commit()
        return MDF_
    except:
        db.rollback()
        return None
    
def index_channels(mdf):
    """Given a MDF files, process the channels
    
    """
    # Open the MDF file.
    with fs.open(mdf.name, "rb") as fid:
        mdf_ = asammdf.MDF(fid)
    # 
    channels=list()
    # Loop through each of the channels in the database.
    for channel in mdf_.channels_db.keys():
        channel_ = upsert(Channel, {"name": channel})
        channels.append(channel_)
    MDF_ = upsert(
    cls=MDF,
    get={"name": mdf.name},
    set={
        "channels": channels
        },
    )
    try:
        db.commit()
        return channels
    except:
        db.rollback()
        return None
        
def index_mdf_info(mdf):
    if isinstance(mdf, str):
        name = mdf
    elif isinstance(mdf, MDF):
        name = mdf.name
    else:
        raise Exception
    
    
    """ Index company and product information in the database from the filename."""
    product = os.path.basename(os.path.dirname(name))
    company = os.path.basename(
        os.path.dirname(
            os.path.dirname(
                name
            )
        )
    )
    # Local File
    MDF_ = upsert(
    cls=MDF,
    get={"name": mdf.name},
    set={
        "product": product,
        "company": company,
        },
    )
    try:
        db.commit()
        return MDF_
    except:
        db.rollback()
        return None

In [38]:
import random

In [39]:
# Pick one out of a hat.
mdf_path = random.choice(mdf_paths)

Insert the MDF file into the database.

[Notice the __repr__ string isn't fully populated, the data isn't yet in the database]

In [40]:
mdf = index_mdf(mdf_path)
mdf

MDF<19,,,Ch:0>

Index the product and company name of the mdf:

In [41]:
index_mdf_info(mdf)

MDF<19,Bulldozer,HeavyEquipmentInc,Ch:0>

Note, channels is still '0', because the channels have not been read with ```asammdf```. 

Write a function to open the file, read the channels with asammdf and insert them into the database.

In [42]:
index_channels(mdf)

[Channel<14,time>,
 Channel<15,engine_speed>,
 Channel<16,engine_speed_desired>,
 Channel<17,vehicle_speed>,
 Channel<18,coolant_temp>,
 Channel<19,longitude>,
 Channel<20,latitude>,
 Channel<21,power>,
 Channel<22,efficiency>,
 Channel<26,ADAS5_failure>,
 Channel<23,X>,
 Channel<24,Y>,
 Channel<25,Z>]