In [8]:
import datetime
import tiledb
import numpy as np
import pandas as pd
import logging
from typing import Union
from bottletar import GtsmBottleTar
logger = logging.getLogger()

In [37]:
class StrainTiledbArray:
    def __init__(self, uri, period=None, location='S3'):
        #self.session_edid = session_edid
        if location == 'S3':
            self.set_s3_ctx()
        elif location == 'local':
            self.set_local_ctx()
        self.uri = uri
        self.period = period #period between samples in seconds

    def set_local_ctx(self):
        # default ctx
        config = tiledb.Config()
        try:
            tiledb.default_ctx(config)
        except tiledb.TileDBError as e:
            print(e)
        config["sm.consolidation.mode"] = "fragment_meta"
        config["sm.vacuum.mode"] = "fragment_meta"
        self.ctx = tiledb.Ctx(config=config)
        return self.ctx

    def set_s3_ctx(self):
        config = tiledb.Config()
        config["vfs.s3.region"] = "us-east-2"
        config["vfs.s3.scheme"] = "https"
        config["vfs.s3.endpoint_override"] = ""
        config["vfs.s3.use_virtual_addressing"] = "true"
        config["sm.consolidation.mode"] = "fragment_meta"
        config["sm.vacuum.mode"] = "fragment_meta"
        self.ctx = tiledb.Ctx(config=config)

#    def get_schema(self):
#         filters1 = tiledb.FilterList([tiledb.ZstdFilter(level=7)])
#         filters2 = tiledb.FilterList([tiledb.ByteShuffleFilter(), tiledb.ZstdFilter(level=7)])
#         filters3 = tiledb.FilterList([tiledb.BitWidthReductionFilter(), tiledb.ZstdFilter(level=7)])
#         filters4 = tiledb.FilterList([tiledb.DoubleDeltaFilter(), tiledb.ZstdFilter(level=7)])
#         filters5 = tiledb.FilterList([tiledb.FloatScaleFilter(1e-6, 0, bytewidth=8), tiledb.ZstdFilter(level=7)])
#         filters6 = tiledb.FilterList([tiledb.PositiveDeltaFilter(), tiledb.BitWidthReductionFilter(),
#                                       tiledb.ZstdFilter(level=7)])

#         stt = np.datetime64('1970-01-01T00:00:00.000000')
#         end = np.datetime64('2069-12-07T00:00:00.000000')

#         # time dimension with second precision and 24 hour tiles
#         d0 = tiledb.Dim(name="data_type", dtype="ascii", filters=filters1)
#         d1 = tiledb.Dim(name="timeseries", dtype="ascii", filters=filters1)
#         d2 = tiledb.Dim(name="time", domain=(stt, end), tile=np.timedelta64(1, 'D'), dtype='datetime64[us]',
#                         filters=filters4)

#         dom = tiledb.Domain(d0, d1, d2)

#         a0 = tiledb.Attr(name="data", dtype=np.float64, filters=filters1)
#         a1 = tiledb.Attr(name="quality", dtype="ascii", var=True, filters=filters1)
#         a2 = tiledb.Attr(name="level", dtype="ascii", var=True, filters=filters1)
#         a3 = tiledb.Attr(name="version", dtype=np.int64, filters=filters1)
#         attrs = [a0, a1, a2, a3]

#         # coords_filters = filters1
#         schema = tiledb.ArraySchema(domain=dom,
#                                     sparse=True,
#                                     attrs=attrs,
#                                     cell_order='row-major',
#                                     tile_order='row-major',
#                                     capacity=100000,
#                                     offsets_filters=filters6)

#         return schema

    def create(self):
        #self.schema = self.get_schema()
        try:
            tiledb.Array.create(self.uri, self.schema, ctx=self.ctx)
            with tiledb.Array(self.uri, "w", ctx=self.ctx) as A:
                #A.meta["session_edid"] = self.session_edid
                A.meta["version"] = '3.5'
            logger.info(f'Created array at {self.uri}')
        except tiledb.TileDBError as e:
            logger.warning(e)

    def delete(self):
        try:
            tiledb.remove(self.uri, ctx=self.ctx)
            print("Deleted ", self.uri)
        except tiledb.TileDBError as e:
            print(e)

    def consolidate_meta(self):
        #config = self.ctx.config()
        #config["sm.consolidation.mode"] = "fragment_meta"
        config = tiledb.Config(
            {"sm.consolidation.mode": "fragment_meta"}
        )
        ctx = tiledb.Ctx(config)
        tiledb.consolidate(self.uri, ctx=ctx)
        logger.info("consolidated meta")

    def consolidate_fragments(self):
        config = self.ctx.config()
        config["sm.consolidation.mode"] = "fragments"
        ctx = tiledb.Ctx(config)
        tiledb.consolidate(self.uri, ctx=ctx)

    def vacuum_meta(self):
        #config = self.ctx.config()
        #config["sm.vacuum.mode"] = "fragment_meta"
        config = tiledb.Config(
            {"sm.consolidation.mode": "fragment_meta"}
        )
        ctx = tiledb.Ctx(config)
        tiledb.vacuum(self.uri, ctx=ctx)
        logger.info("vacuumed meta")

    def vacuum_fragments(self):
        config = self.ctx.config()
        config["sm.vacuum.mode"] = "fragments"
        ctx = tiledb.Ctx(config)
        tiledb.vacuum(self.uri, ctx=ctx)

    def get_nonempty_domain(self):
        with tiledb.open(self.uri, 'r', ctx=self.ctx) as A:
            return A.nonempty_domain()[2][0], A.nonempty_domain()[2][1]


In [63]:
source_bottle = "bottles/TSM22130619_20.tar"
gtsm_file = GtsmBottleTar(source_bottle, session='Min')

def get_schema():
    filters1 = tiledb.FilterList([tiledb.ZstdFilter(level=7)])
    filters2 = tiledb.FilterList([tiledb.ByteShuffleFilter(), tiledb.ZstdFilter(level=7)])
    filters3 = tiledb.FilterList([tiledb.BitWidthReductionFilter(), tiledb.ZstdFilter(level=7)])
    filters4 = tiledb.FilterList([tiledb.DoubleDeltaFilter(), tiledb.ZstdFilter(level=7)])
    filters5 = tiledb.FilterList([tiledb.FloatScaleFilter(1e-6, 0, bytewidth=8), tiledb.ZstdFilter(level=7)])
    filters6 = tiledb.FilterList([tiledb.PositiveDeltaFilter(), tiledb.BitWidthReductionFilter(),
                                  tiledb.ZstdFilter(level=7)])
    filters7 = tiledb.FilterList([tiledb.FloatScaleFilter(1e-6, 0, bytewidth=8), tiledb.DoubleDeltaFilter(),
                                  tiledb.ZstdFilter(level=7)])

    stt = np.datetime64('1970-01-01T00:00:00.000000')
    end = np.datetime64('2069-12-07T00:00:00.000000')

    # time dimension with second precision and 24 hour tiles
    d0 = tiledb.Dim(name="data_type", dtype="ascii", filters=filters1)
    d1 = tiledb.Dim(name="timeseries", dtype="ascii", filters=filters1)
    d2 = tiledb.Dim(name="time", domain=(stt, end), tile=np.timedelta64(1, 'D'), dtype='datetime64[us]',
                    filters=filters4)

    dom = tiledb.Domain(d0, d1, d2)

    a0 = tiledb.Attr(name="data", dtype=np.float64, filters=filters1)
    a1 = tiledb.Attr(name="quality", dtype="ascii", var=True, filters=filters1)
    a2 = tiledb.Attr(name="level", dtype="ascii", var=True, filters=filters1)
    a3 = tiledb.Attr(name="version", dtype=np.int64, filters=filters1)
    attrs = [a0, a1, a2, a3]

    # coords_filters = filters1
    schema = tiledb.ArraySchema(domain=dom,
                                sparse=True,
                                attrs=attrs,
                                cell_order='row-major',
                                tile_order='row-major',
                                capacity=100000,
                                offsets_filters=filters6)

    return schema

uri = f'arrays/TSM2.tdb'
array = StrainTiledbArray(uri)
array.schema = get_schema()
array.delete()
array.create()
gtsm_file.to_tiledb(array)

2023-02-01 15:30:24,993 - INFO - bottles/TSM22130619_20.tar: unpacking tar file
2023-02-01 15:30:25,240 - INFO - Created array at arrays/TSM2.tdb
2023-02-01 15:30:25,241 - INFO - bottles/TSM22130619_20.tar: Min tar contains 240 bottles.
2023-02-01 15:30:25,242 - INFO - bottles/TSM22130619_20.tar: loading 240 bottles into dataframe


Deleted  arrays/TSM2.tdb


2023-02-01 15:30:26,508 - INFO - bottles/TSM22130619_20.tar: Writing to arrays/TSM2.tdb
2023-02-01 15:30:27,061 - INFO - bottles/TSM22130619_20.tar: Written to arrays/TSM2.tdb
2023-02-01 15:30:27,064 - INFO - consolidated meta
2023-02-01 15:30:27,066 - INFO - vacuumed meta


In [64]:
!tree --du -h $uri

[398K]  [01;34marrays/TSM2.tdb[0m
├── [  96]  [01;34m__commits[0m
│   └── [   0]  [00m__1675294226725_1675294226725_37ea9cfa6c7d44c98ea27a88b5bea25f_16.wrt[0m
├── [  64]  [01;34m__fragment_meta[0m
├── [397K]  [01;34m__fragments[0m
│   └── [397K]  [01;34m__1675294226725_1675294226725_37ea9cfa6c7d44c98ea27a88b5bea25f_16[0m
│       ├── [7.8K]  [00m__fragment_metadata.tdb[0m
│       ├── [347K]  [00ma0.tdb[0m
│       ├── [7.8K]  [00ma1.tdb[0m
│       ├── [1.3K]  [00ma1_var.tdb[0m
│       ├── [7.8K]  [00ma2.tdb[0m
│       ├── [ 306]  [00ma2_var.tdb[0m
│       ├── [2.0K]  [00ma3.tdb[0m
│       ├── [8.7K]  [00md0.tdb[0m
│       ├── [ 772]  [00md0_var.tdb[0m
│       ├── [8.7K]  [00md1.tdb[0m
│       ├── [ 758]  [00md1_var.tdb[0m
│       └── [3.5K]  [00md2.tdb[0m
├── [ 236]  [01;34m__meta[0m
│   └── [ 140]  [00m__1675294225238_1675294225238_5bd0bfba7aed4b94a4bf26db3a31f08d[0m
└── [ 382]  [01;34m__schema[0m
    └── [ 286]  [00m__1675

Best performance.  Filters1 on everything except time dimension is filters4

[398K]  arrays/TSM2.tdb
├── [  96]  __commits
│   └── [   0]  __1675293793874_1675293793874_b8435ec4a5a34fdfab7ed1007f946c53_16.wrt
├── [  64]  __fragment_meta
├── [397K]  __fragments
│   └── [397K]  __1675293793874_1675293793874_b8435ec4a5a34fdfab7ed1007f946c53_16
│       ├── [7.8K]  __fragment_metadata.tdb
│       ├── [347K]  a0.tdb
│       ├── [7.8K]  a1.tdb
│       ├── [1.3K]  a1_var.tdb
│       ├── [7.8K]  a2.tdb
│       ├── [ 306]  a2_var.tdb
│       ├── [2.0K]  a3.tdb
│       ├── [8.7K]  d0.tdb
│       ├── [ 772]  d0_var.tdb
│       ├── [8.7K]  d1.tdb
│       ├── [ 758]  d1_var.tdb
│       └── [3.5K]  d2.tdb





