In [149]:
import json


class AvroObject:
    def __init__(self, schema: dict, data: dict):
        self.schema = schema
        self.data = data

    @property
    def entity(self):
        class_name = self.schema.get("name")
        o = eval(f"{class_name}()")
        for key, value in self.data.items():
            setattr(o, key, value)
        return o


class AvroSchema:
    def __init__(self, url: str):
        self.url = url

    @staticmethod
    def schema(
            schema_namespace: str = None,
            schema_name: str = None,
            attrs: dict = None,
            obj: object = None,
            schema_type="record",
            data_types: dict = None
    ) -> dict:

        if obj is None and attrs is None:
            raise ValueError("attrs or o must be specified!")

        if obj is None and schema_name is None:
            raise ValueError("schema_name must be specified!")

        if schema_name is None and attrs is not None:
            raise ValueError("schema_name must be specified!")

        schema_name = schema_name or obj.__class__.__name__

        if attrs is None:
            attrs = obj.__dict__

        data_types = data_types or {}
        schema_dict = {
            "type": schema_type,
            "name": schema_name,
            "namespace": schema_namespace,
            "fields": []
        }
        for k in attrs.keys():
            if k in data_types.keys():
                avro_type = data_types[k]
            else:
                avro_type = AvroSchema.get_type(value=attrs[k], data_type=type(attrs[k]))

            schema_dict["fields"].append({"name": k, "type": avro_type})
        return schema_dict

    def schema_str(self):
        return json.dumps(self.schema())

    @staticmethod
    def get_type(value: object, data_type: type = None):

        if value is None and data_type is None:
            raise TypeError("value is None but the type is not explicit in data_types")

        if data_type is None:
            dt = type(value)
        else:
            dt = data_type

        if dt == str:
            return "string"
        elif dt == int:
            return "int"
        elif dt == float:
            return "float"
        elif dt == bool:
            return "boolean"
        elif dt == list:
            if len(value) == 0:
                return {"type": "array", "items": "string", "default": []}
            else:
                return {"type": "array", "items": AvroSchema.get_type(value[0]), "default": []}
        elif dt == dict:
            if len(value) == 0:
                return {"type": "map", "values": "string", "default": {}}
            else:
                return {"type": "map", "values": AvroSchema.get_type(list(value.values())[0]), "default": {}}
        else:
            raise TypeError(f"{type(value)} unknown")


In [150]:
class MyObject:
    def __init__(self):
        self.temp = 10
        self.historical = [1, 3, 5, 7, 9]
        self.sensors = {"um": [1, 2, 3], "dois": [4, 5, 6]}


print(MyObject().__dict__)


{'temp': 10, 'historical': [1, 3, 5, 7, 9], 'sensors': {'um': [1, 2, 3], 'dois': [4, 5, 6]}}


In [151]:
o = MyObject()
schema = AvroSchema.schema(obj=o)
print(schema)
print(o.__dict__)

avro_object = AvroObject(schema=schema, data=o.__dict__)


{'type': 'record', 'name': 'MyObject', 'namespace': None, 'fields': [{'name': 'temp', 'type': 'int'}, {'name': 'historical', 'type': {'type': 'array', 'items': 'int', 'default': []}}, {'name': 'sensors', 'type': {'type': 'map', 'values': {'type': 'array', 'items': 'int', 'default': []}, 'default': {}}}]}
{'temp': 10, 'historical': [1, 3, 5, 7, 9], 'sensors': {'um': [1, 2, 3], 'dois': [4, 5, 6]}}


In [81]:
x = avro_object.entity
print(x.__dict__)

{'temp': 10, 'local': 'Outside', 'historical': [1, 3, 5, 7, 9], 'sensors': {'um': 'umv', 'dois': 'vdois'}}


In [51]:
x = {"um": 1, "dois": 2}
y = list(x.values())[0]
print(y)

1


In [117]:
a="olá"
t = type(a)
t==str

True

In [None]:
class AvroSerializer:
    def __init__(self, schema: str):
        avro_schema = avro.schema.Parse(json.dumps(schema))
        self.writer = avro.io.DatumWriter(avro_schema)

        self.bytes_writer = io.BytesIO()
        self.encoder = avro.io.BinaryEncoder(self.bytes_writer)

    def append(self, row_dict: dict):
        self.writer.write(row_dict, self.encoder)

    def get_raw_value(self):
        return self.bytes_writer.getvalue()

    def get_b64_value(self):
        return base64.b64encode(self.get_raw_value()).decode()


class AvroDeserializer:
    def __init__(self, schema_url: str, schema_file: str):
        avro_schema = read_update(schema_url, schema_file)
        self.reader = avro.io.DatumReader(avro_schema)

    def deserialize_raw_value(self, raw_bytes: bytes):
        message_bytes = io.BytesIO(raw_bytes)
        decoder = avro.io.BinaryDecoder(message_bytes)
        return self.reader.read(decoder)

    def deserialize_b64_value(self, b64string: str):
        base64_str = base64.b64decode(b64string)
        message_bytes = io.BytesIO(base64_str)
        decoder = avro.io.BinaryDecoder(message_bytes)
        return self.reader.read(decoder)


In [153]:
d = {}
d["a"] = "bla"
d[None] = "ble"
d[None] = "bli"

print(d)

{'a': 'bla', None: 'bli'}


In [156]:
"http://localhost:8080".rstrip("/")

'http://localhost:8080'

False