In [None]:
# from dataclasses import dataclass
#
# @dataclass(kw_only=True)
# class Parent:
#     name: str
#     age: int
#     ugly: bool = False
#
# @dataclass(kw_only=True)
# class Child(Parent):
#     school: str
#
# ch = Child(name="Kevin", age=17, school="42")
# print(ch.ugly)
#
# ch

In [None]:
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"

import os

In [None]:
from dataclasses import dataclass, field
from typing import Optional
from collections import UserDict


@dataclass(kw_only=True)
class RemoteCanException(Exception):
    """Base class for all remote CAN exceptions."""

    err_code: Optional[int] = 1001  # default exception is unknown connection error
    extra_msg: Optional[str] = None
    codes: UserDict = UserDict(  # class attribute, if not given use the default
        {
            0: "success",
            1: "client_cannot_connect_to_server",
            2: "ai_mode_shutdown",
            1000: "network_connection_error",
            1001: "network_unknown_error",
            1002: "network_timeout",
            -1: "tsp_internal_error",
            202: "tsp_no_API_exist",
            206: "tsp_parameter_wrong",
            301: "tsp_out_of_time",
            302: "tsp_command_execute_error",
            303: "tsp_car_not_registered",
            304: "tsp_car_offline",
            310: "tsp_internal_exception_error",
            311: "tsp_tbox_returned_error",
            3000: "tsp_return_result_is_not_dictionary",
            3001: "tsp_return_result_has_no_oss_link",
            2000: "uds_version_failed",
            2001: "ab_torque_switch_failed",
            2002: "oss_data_not_enough",
            2003: "torque_shape_error",
            2004: "torque_range_error",
            2005: "remote_can_unknown_format",
        }
    )

    def __post_init__(self):
        print(
            f"{{'header': 'err_code': '{self.err_code}', "
            f"'msg': '{self.codes[self.err_code]}', "
            f"'extra_msg': '{self.extra_msg}'}}"
        )

In [None]:
# raise RemoteCanException(err_code=1000, extra_msg="test")
# raise RemoteCanException(err_code=1000)
raise RemoteCanException()

In [None]:
1000 in (1, 1000, 1002)

In [None]:
# def cross_product(seq1, seq2):
#
#     if not seq1 or not seq2:
#         raise ValueError('Sequence arguments must be non-empty')
#
#     return [(x1, x2) for x1 in seq1 for x2 in seq2]
#
#
# cross_product([1,2,3], seq2=None)

In [None]:
# # # descriptors.py
# class Verbose_attribute():
#     def __get__(self, obj, type=None) -> object:
#         print("accessing the attribute to get the value")
#         return 42
#     def __set__(self, obj, value) -> None:
#         print("accessing the attribute to set the value")
#         raise AttributeError("Cannot change the value")
#
# class Foo():
#     attribute1 = Verbose_attribute()
#
# my_foo_object = Foo()
# # print(my_foo_object.attribute1)
# # x = my_foo_object.attribute1
# # print(x)
# print(Foo.attribute1)

In [None]:
class Parent:
    def __init_subclass__(cls):
        print("Subclass of Parent Created!")
        print(cls.__base__.__dict__)
        print(cls.__name__)
        print(cls.__dict__)


class Child(Parent):
    pass

In [None]:
import dask.dataframe as dd
import pandas as pd

ddf = dd.from_pandas(pd.DataFrame([]), npartitions=1)
ddf

In [None]:
from dask.bag import Bag
import dask.bag as db

In [None]:
import numpy as np

N_train = 1200
from numpy.random import choice

X_train = np.zeros((1200, 20, 1))
one_indexes = choice(a=N_train, size=N_train // 2, replace=False)
X_train[one_indexes, 0, 0] = 1  # very long term memory.

In [None]:
def prepare_sequences(x_train, y_train, window_length):
    windows = []
    windows_y = []
    for i, sequence in enumerate(x_train):
        len_seq = len(sequence)
        for window_start in range(0, len_seq - window_length + 1):
            window_end = window_start + window_length
            window = sequence[window_start:window_end]
            windows.append(window)
            windows_y.append(y_train[i])
    return np.array(windows), np.array(windows_y)


X = [[1, 0, 0, 0, 0]]
Y = [1]
window_length = 2

prepare_sequences(X, Y, window_length)

In [None]:
X_out, Y_out = prepare_sequences(X_train.tolist(), X_train[:, 0].tolist(), 10)

X_train = X_out[:11000, :]
X_test = X_out[11000:, :]
y_train = Y_out[:11000]
y_test = Y_out[11000:]

In [None]:
# from tensorflow.python.keras.layers import Input, Dense, LSTM
# from tensorflow.python.keras.models import Sequential
from keras import Input, Sequential
from keras.layers import LSTM, Dense

print("Building STATELESS model...")
max_len = 10
batch_size = 64
# model = Sequential()
# # model.add(Input(shape=(max_len, 1)))
# model.add(LSTM(10, input_shape=(max_len, 1), return_sequences=False, stateful=False))
# model.add(Dense(1, activation='sigmoid'))
# model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
# model.fit(X_train, y_train, batch_size=batch_size, epochs=15,
#             validation_data=(X_test, y_test), shuffle=False)
# # score, acc = model.evaluate(X_test, y_test, batch_size=batch_size, verbose=0)
model = Sequential()
model.add(LSTM(10, input_shape=(max_len, 1), return_sequences=False, stateful=False))
model.add(Dense(1, activation="sigmoid"))
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
model.fit(
    X_train,
    y_train,
    batch_size=batch_size,
    epochs=15,
    validation_data=(X_test, y_test),
    shuffle=False,
)
score, acc = model.evaluate(X_test, y_test, batch_size=batch_size, verbose=0)

In [None]:
model = Sequential()
model.add(
    LSTM(
        10,
        batch_input_shape=(batch_size, max_len, 1),
        return_sequences=False,
        stateful=True,
    )
)
model.add(Dense(1, activation="sigmoid"))
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
model.fit(
    X_train,
    y_train,
    batch_size=batch_size,
    epochs=15,
    validation_data=(X_test, y_test),
    shuffle=False,
)
score, acc = model.evaluate(X_test, y_test, batch_size=batch_size, verbose=0)

In [None]:
import numpy as np

N_train = 1200
from numpy.random import choice

X_in = np.zeros((1200, 10, 1))
one_indexes = choice(a=N_train, size=N_train // 2, replace=False)
X_in[one_indexes, 0, 0] = 1  # very long term memory.
Y_in = np.expand_dims(X_in[:, 0, 0], axis=1)

X_train = X_in[:1000, :].tolist()
X_test = X_in[1000:, :].tolist()
Y_train = Y_in[:1000].tolist()
Y_test = Y_in[1000:].tolist()
max_len = 10

In [None]:
print("Build STATEFUL model...")
model = Sequential()
model.add(LSTM(10, batch_input_shape=(1, 1, 1), return_sequences=False, stateful=True))
model.add(Dense(1, activation="sigmoid"))
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])

In [None]:
print("Train...")
for epoch in range(15):
    mean_tr_acc = []
    mean_tr_loss = []
    for i in range(len(X_train)):
        Y_true = Y_train[i]
        for j in range(max_len):
            tr_loss, tr_acc = model.train_on_batch(
                np.expand_dims(np.expand_dims(X_train[i][j], axis=1), axis=1),
                np.array([Y_true]),
            )
            mean_tr_acc.append(tr_acc)
            mean_tr_loss.append(tr_loss)
        model.reset_states()
        if i % 100 == 0:
            print(
                "epoch {}, sample {}, tr_acc {}, tr_loss {}".format(
                    epoch, i, np.mean(mean_tr_acc), np.mean(mean_tr_loss)
                )
            )

    print("accuracy training = {}".format(np.mean(mean_tr_acc)))
    print("loss training = {}".format(np.mean(mean_tr_loss)))
    print("___________________________________")

    mean_te_acc = []
    mean_te_loss = []
    for i in range(len(X_test)):
        for j in range(max_len):
            te_loss, te_acc = model.test_on_batch(
                np.expand_dims(np.expand_dims(X_test[i][j], axis=1), axis=1),
                np.array([Y_test[i]]),
            )
            mean_te_acc.append(te_acc)
            mean_te_loss.append(te_loss)
        model.reset_states()

        for j in range(max_len):
            Y_pred = model.predict_on_batch(
                np.expand_dims(np.expand_dims(X_test[i][j], axis=1), axis=1)
            )
        model.reset_states()
        if i % 100 == 0:
            print(
                "test, sample {}, tr_acc {}, tr_loss {}".format(
                    i, np.mean(mean_te_acc), np.mean(mean_te_loss)
                )
            )

    print("accuracy testing = {}".format(np.mean(mean_te_acc)))
    print("loss testing = {}".format(np.mean(mean_te_loss)))
    print("___________________________________")

# pydantic with custom serialization

In [None]:
from datetime import datetime
import ujson
from pydantic import BaseModel


class User(BaseModel):
    id: int
    name: str = 'John Doe'
    signup_ts: datetime = None

    class Config:
        json_loads = ujson.loads


user = User.model_validate_json('{"id": 123,"signup_ts":1234567890,"name":"John Doe"}')
print(user)
#> id=123 signup_ts=datetime.datetime(2009, 2, 13, 23, 31, 30,
#> tzinfo=datetime.timezone.utc) name='John Doe'

In [None]:
from datetime import datetime
import orjson
from pydantic import BaseModel


def orjson_dumps(v, *, default):
    # orjson.dumps returns bytes, to match standard json.dumps we need to decode
    return orjson.dumps(v, default=default).decode()


class User(BaseModel):
    id: int
    name:str = 'John Doe'
    signup_ts: datetime = None

    class Config:
        json_loads = orjson.loads
        json_dumps = orjson_dumps


user = User.model_validate_json('{"id":123,"signup_ts":1234567890,"name":"John Doe"}')
print(user.model_dump_json())
#> {"id":123,"signup_ts":"2009-02-13T23:31:30+00:00","name":"John Doe"}

In [None]:
from zoneinfo import ZoneInfo
from pydantic import BaseModel, ConfigDict
from eos.data_io.eos_struct.eos_location import locations_by_abbr

from pydantic_core import SchemaSerializer

class OrJsonSerializer(SchemaSerializer):

    def to_json(self, *args, **kwargs):
        return orjson.dumps(...)  # implementation not complete, need to understand args/kwargs

    def to_python(self, *args, **kwargs): # real signature unknown
        return orjson.loads(...)  # implementation not complete, need to understand args/kwargs


class EosLocation(BaseModel):
    
    __pydantic_serializer__ = OrJsonSerializer

    abbr: str
    name: str
    cname: str
    tz: ZoneInfo
    


In [None]:
loc = EosLocation.model_validate_json('{"abbr":"CHN","name":"China","cname":"中国","tz":"Asia/Shanghai"}')
