# MDF Analysis from S3 w/local caching.

Example Scenario : 

- MDF data, sitting on a local hard drive. eg, CANape recordings from a vehicle at any point in the development cycle.

### ```minio``` Setup.

>  MinIO is a cloud storage server compatible with Amazon S3, released under Apache License v2.

> As an object store, MinIO can store unstructured data such as photos, videos, log files, backups and container images. The maximum size of an object is 5TB. 

- Install the [minio binary for your machine](https://docs.min.io/docs/minio-quickstart-guide.html).
- Start a Minio instance pointing at your datafiles.  
  ```minio.exe serve C:\CANapeRecordings```
  
### Pony ORM

> Pony is a Python ORM with beautiful query syntax  
> Write your database queries using Python generators & lambdas  

> - Supports major relational databases: SQLite, PostgreSQL, MySQL, Oracle 

https://ponyorm.org/

In [3]:
import s3fs
from asammdf import MDF

### ```minio``` configs.

The many ways to use a custom S3 endpoint url:

In this example a large dataset of fake information is generated with asammdf and scipy signal generators.

Looking at the data through the native file system and again through ```fsspec``` and minio.

In [None]:
!du -skh /projects/MDF_Data_Pipeline/Data/

In [None]:
!du -skh /projects/MDF_Data_Pipeline/Data/*

In [None]:
fs.ls("")

In [None]:
fs.ls("AerospaceStartup")

# Walk Through All Files:

Walk through all S3 files and find the first one.

In [None]:
?timeit

In [None]:
%%timeit
import os
mdf_paths=list()
for company in fs.ls(""):
    for root, dirs, files in fs.walk(company):
        for file in files:
            if file.lower().endswith(".mf4") or file.lower().endswith(".mdf"):
                mdf_paths.append(os.path.join(root, file))

In [None]:
len(mdf_paths)

Loop through all mdf files found:

In [None]:
for mdf_path in mdf_paths:
    break

Print off the information ```fsspec.info```  contains:

In [None]:
fs.info(mdf_path)

Goal 1: Find the largest MDF file indexed through minio.

Get all the MDF file sizes:

In [None]:
sizes = [fs.info(mdf_path)["size"] for mdf_path in mdf_paths]

In [None]:
# https://stackoverflow.com/a/6423325
myList = [1, 2, 3, 100, 5]    
sorted(range(len(myList)),key=myList.__getitem__)

In [None]:
biggest_idx = sorted(range(len(sizes)),key=sizes.__getitem__)[-1]
biggest_idx

In [None]:
mdf_paths[biggest_idx]

In [None]:
fs.info(mdf_paths[biggest_idx])

In [None]:
biggest_mdf = fs.info(mdf_paths[biggest_idx])

GB:

In [None]:
biggest_mdf["size"]/1024**3

MB:

In [None]:
biggest_mdf["size"]/1024**2

In [None]:
# https://github.com/dask/s3fs/issues/273
# https://github.com/pandas-dev/pandas/pull/29050
%env AWS_ACCESS_KEY_ID="minioadmin"
%env AWS_SECRET_ACCESS_KEY="minioadmin"
%env S3_ENDPOINT="http://127.0.0.1:9000"

fs2=s3fs.S3FileSystem(
    client_kwargs={
        "endpoint_url": "http://127.0.0.1:9000",
    })
fs2.ls("")

In [None]:
import asammdf

In [None]:
%%timeit
of = fs.open(mdf_paths[biggest_idx])
mdf = asammdf.MDF(of)

In [None]:
of = fs.open(mdf_paths[biggest_idx])

mdf = asammdf.MDF(of)

In [None]:
mdf

In [None]:
mdf.channels_db

In [None]:
x = mdf.get_group(0)

# Convert to fsspec filecache

Exact example from anaconda article: [Introducing Remote Content Caching with FSSpec.](https://www.anaconda.com/fsspec-remote-caching/)

In [None]:
import fsspec
of = fsspec.open("filecache://anaconda-public-datasets/iris/iris.csv", mode='rt', 
                 cache_storage='/tmp/cache1',
                 target_protocol='s3', target_options={'anon': True})
with of as f:
    print(f.readline())

In [None]:
cache_dir = "/tmp/mdf_cache"
fsspec_kwargs = {
    "urlpath": f"filecache://{mdf_path}",
    "mode": 'rb', 
    "cache_storage": cache_dir,
    "target_protocol": 's3',
    "target_options": s3_cfg,
}

In [None]:
import shutil
import time

In [None]:
shutil.rmtree(
    path=cache_dir,
    ignore_errors=True
)
t1=time.time()
with fsspec.open(**fsspec_kwargs) as of:
    mdf = MDF(of)
t2=time.time()
with fsspec.open(**fsspec_kwargs) as of:
    mdf2 = MDF(of)
t3=time.time()

print(f"Uncached Read: {t2-t1}s")
print(f"Cached Read: {t3-t2}s")

# All S3 Files Cached/Uncached.

In [None]:
import os
mdf_paths=list()
for root, dirs, files in fs.walk("canedge-live-demo-2"):
    for file in files:
        if file.lower().endswith(".mf4"):
            mdf_paths.append(os.path.join(root, file))
len(mdf_paths)

In [None]:
mdfs_uncached = list()
mdfs_cached = list()

cache_dir = "/tmp/mdf_cache"



shutil.rmtree(
    path=cache_dir,
    ignore_errors=True
)
t1=time.time()
for mdf_path in mdf_paths:
    fsspec_kwargs = {
        "urlpath": f"filecache://{mdf_path}",
        "mode": 'rb', 
        "cache_storage": cache_dir,
        "target_protocol": 's3',
        "target_options": s3_cfg,
    }
    with fsspec.open(**fsspec_kwargs) as of:
        mdfs_uncached.append(MDF(of))  
        
t2=time.time()

for mdf_path in mdf_paths:
    fsspec_kwargs = {
        "urlpath": f"filecache://{mdf_path}",
        "mode": 'rb', 
        "cache_storage": cache_dir,
        "target_protocol": 's3',
        "target_options": s3_cfg,
    }
    with fsspec.open(**fsspec_kwargs) as of:
        mdfs_cached.append(MDF(of))  
t3=time.time()

print(f"Uncached Read: {t2-t1}s")
print(f"Cached Read: {t3-t2}s")