In [74]:
import json
import time
import numpy as np
from functools import wraps

class Platform:
    def __init__(self, config_json):
        with open(config_json, 'r') as f:
            config = json.load(f)
        self.raw_table = np.array([1,2,3]) # read_table(config["table"])
        self.algorithm = globals()[config["Algorithm"]]() # getattr(mymodule, class_name)() if class in different module
        self.processing = globals()[config["Processing"]]()
        self.run_type = config["runType"]
        self.logs = config["logs"]

        self.benchmark = {"name": self.algorithm.__name__}

        if self.run_type == 'test':
            self.test()
        elif self.run_type == 'compress':
            self.compress()
        elif self.run_type == 'decompress':
            self.decompress()

    def measure_time(flag):
        def decorator(func):
            def wrapper(self, *args, **kwargs):
                start_time = time.time()
                result = func(self, *args, **kwargs)
                end_time = time.time()

                if getattr(self, flag):
                    print(f"Function {func.__name__} took {end_time - start_time} seconds to execute.")
                    self.benchmark[func.__name__ + "_time"] = end_time - start_time

                return result
            return wrapper
        return decorator

    @measure_time("logs")
    def compress(self):
        preprocessed_table = self.processing.preprocess(self.raw_table)
        compressed_table = self.algorithm.compress(np.array(preprocessed_table))
        if self.logs:
            compression_rate = 1000 # get_compression_rate(compressed_data, self.raw_table)
            self.benchmark["compression_rate"] = compression_rate
        return compressed_table

    @measure_time("logs")
    def decompress(self, compressed_table):
        decompressed_table = self.algorithm.decompression(compressed_table)
        table = self.processing.postprocess(decompressed_table)

        if self.logs:
            loss_rate = 0.0  # get_loss_rate(self.table, table)
            self.benchmark["loss_rate"] = loss_rate

        return table

    @measure_time("logs")
    def test(self):
        compressed_data = self.compress()
        decompressed_data = self.decompress(compressed_data)

In [72]:
plug_test = Platform("plug_config.json")

DummyCompressionAlgorithm
<__main__.DummyCompressionAlgorithm object at 0x00000274046EE260>
I am dummy compression


[1, 1]

In [64]:
import abc

class CompressionAlgorithm(abc.ABC):
    def __init__(self):
        ...
    @property
    @abc.abstractmethod
    def name(self):
        ...

    @abc.abstractmethod
    def compression(self, table):
        ...

    @abc.abstractmethod
    def decompression(self, compressed_data):
        ...

class LearningCompressionAlgorithm(CompressionAlgorithm):

    @abc.abstractmethod
    def fit(self, training_data):
        ...

class NonLearningCompressionAlgorithm(CompressionAlgorithm):
    ...

class LosslessCompressionAlgorithm(CompressionAlgorithm):
    ...

class LossyCompressionAlgorithm(CompressionAlgorithm):
    ...

In [69]:
class DummyCompressionAlgorithm(LosslessCompressionAlgorithm):
    name = 'Dummy Compression Algorithm'
    def compression(self, table):
        time.sleep(2)
        print("I am dummy compression")
        return table
    def decompression(self, compressed_data):
        time.sleep(1)
        print("I am dummy decompression")
        return compressed_data

In [66]:
class Processing(abc.ABC):
    def __init__(self):
        ...
    @property
    @abc.abstractmethod
    def name(self):
        ...

    @abc.abstractmethod
    def preprocess(self, raw_table):
        ...

    @abc.abstractmethod
    def postprocess(self, processed_table):
        ...

In [67]:
class DummyProcessing(Processing):
    name = "Dummy Pre/Post-Processing"
    def preprocess(self, raw_table):
        return raw_table
    def postprocess(self, processed_table):
        return processed_table


In [None]:
import numpy as np
platform = Platform([DummyCompressionAlgorithm()], [np.array([1,2,3])])
platform.compress_table()
platform.results