In [13]:
import sqlite3
import cbor2
from monad_std import Option
from typing import *
from dataclasses import dataclass
import math

In [14]:
db = sqlite3.connect("./blueprints.db")
cursor = db.cursor()

In [15]:
cursor.execute("""create table if not exists blp_recursive (
    blp_id integer primary key not null unique,
    blp_type integer not null default 0,
    material blob,
    product blob,
    `time` integer not null default 1
);""")
db.commit()

In [16]:
@dataclass(eq=False, order=False, unsafe_hash=False, repr=True, frozen=True, init=True)
class BlpData:
    type_id: int
    flag: int
    material: Dict[int, int]
    product: Option[Tuple[int, int]] # id, amo
    time: int

    @staticmethod
    def search(blp_id: int) -> Option["BlpData"]:
        cursor.execute("select flag, material, product, time from material where type_id == ?", (blp_id,))
        val = cursor.fetchone()
        if val is None:
            return Option.none()

        return Option.some(BlpData(
            type_id=blp_id,
            flag=int(val[0]),
            material=cbor2.loads(val[1]),
            product=Option.from_nullable(val[2]).map(lambda x: tuple(cbor2.loads(x))),
            time=int(val[3]),
        ))

In [17]:
def get_blp_from_prod(prod_id: int) -> Option[Tuple[int, int]]:
    """
    int @ 1: blp id
    int @ 2: product amount
    """
    cursor.execute("select blp_id, amount from product where product_id == ?", (prod_id,))
    val = cursor.fetchone()
    if val is None:
        return Option.none()

    return Option.some(val)

In [18]:
@dataclass(eq=False, order=False, unsafe_hash=False, repr=True, frozen=False, init=True)
class BlpRecMaterial:
    raw_quantity: float = 0.0
    manu_level: int = 0
    reaction_level: int = 0

    def dict(self) -> Dict[str, Union[int, float]]:
        return {
            "raw_quantity": self.raw_quantity,
            "manu_level": self.manu_level,
            "reaction_level": self.reaction_level,
        }

In [20]:
# Tuple[int, int]: type id, quantity
@dataclass(eq=False, order=False, unsafe_hash=False, repr=True, frozen=True, init=True)
class BlpRecursive:
    blp_id: int
    time: int
    blp_type: int
    material: Dict[int, List[BlpRecMaterial]]
    product: Option[Tuple[int, int]] # id, amount

    @staticmethod
    def __fetch_with_blp(blp: BlpData, factor: float, stack: List[Tuple[int, int]]) -> Dict[int, List[BlpRecMaterial]]:
        val_to_collect: List[Dict[int, List[BlpRecMaterial]]] = []
        for material_id, material_amount in blp.material.items():
            fetched = BlpRecursive.__fetch_type(
                material_id,
                factor * material_amount,
                stack
            )
            if blp.flag == 1:
                for key in fetched.keys():
                    for index in range(len(fetched[key])):
                        fetched[key][index].manu_level += 1
            elif blp.flag == 2:
                for key in fetched.keys():
                    for index in range(len(fetched[key])):
                        fetched[key][index].reaction_level += 1
            val_to_collect.append(fetched)
        val_to_return: Dict[int, Dict[Tuple[int, int], float]] = {}
        for val in val_to_collect:
            for t_id, t_amounts in val.items():
                if t_id not in val_to_return:
                    val_to_return[t_id] = {}
                for t_amount in t_amounts:
                    if (t_amount.manu_level, t_amount.reaction_level) not in val_to_return[t_id].keys():
                        val_to_return[t_id][(t_amount.manu_level, t_amount.reaction_level)] = t_amount.raw_quantity
                    else:
                        val_to_return[t_id][(t_amount.manu_level, t_amount.reaction_level)] += t_amount.raw_quantity
        return {k: [BlpRecMaterial(
                        raw_quantity=iv,
                        manu_level=ik[0],
                        reaction_level=ik[1]
                    ) for ik, iv in v.items()
                    ] for k, v in val_to_return.items()}

    @staticmethod
    def __fetch_type(type_id: int, quantity: int, stack: List[Tuple[int, int]]) -> Dict[int, List[BlpRecMaterial]]:
        # if type_id == 17888:
        #     print(quantity, stack)
        if type_id in map(lambda x: x[0], stack):
            return {type_id: [BlpRecMaterial(raw_quantity=quantity)]}
        blp_data = get_blp_from_prod(type_id)
        if blp_data.is_none():
            return {type_id: [BlpRecMaterial(raw_quantity=quantity)]}
        blp_id, product_amount = blp_data.unwrap_unchecked()
        raw_blp: Option[BlpData] = BlpData.search(blp_id)
        if raw_blp.is_none():
            return {type_id: [BlpRecMaterial(raw_quantity=quantity)]}
        blp: BlpData = raw_blp.unwrap_unchecked()
        factor = quantity / product_amount
        return BlpRecursive.__fetch_with_blp(blp, factor, [*stack, (type_id, quantity)])

    @staticmethod
    def get_recursive(blp_id: int) -> Option["BlpRecursive"]:
        raw_val: Option[BlpData] = BlpData.search(blp_id)
        if raw_val.is_none():
            return Option.none()

        val = raw_val.unwrap_unchecked()
        blp_time = val.time
        blp_product = val.product
        dat = BlpRecursive.__fetch_with_blp(
            val,
            1,
            [] if blp_product.is_none() else [tuple(blp_product.unwrap_unchecked())]
        )
        return Option.some(BlpRecursive(
            blp_id=blp_id,
            time=blp_time,
            product=blp_product,
            material=dat,
            blp_type=val.flag,
        ))

In [21]:
BlpRecursive.get_recursive(28660).unwrap_unchecked().material[17888]

[BlpRecMaterial(raw_quantity=140.625, manu_level=4, reaction_level=2),
 BlpRecMaterial(raw_quantity=12139.4775, manu_level=3, reaction_level=1),
 BlpRecMaterial(raw_quantity=16197.7040625, manu_level=3, reaction_level=2)]

In [22]:
def insert_blp(blp_id: int, blp_flag: int, blp: Option[BlpRecursive]):
    if blp.is_none():
        cursor.execute("insert into blp_recursive values (?, ?, null, null, 1)", (blp_id, blp_flag))
    blp = blp.unwrap_unchecked()
    mat = cbor2.dumps({x: [z.dict() for z in y] for x, y in blp.material.items()})
    if blp.product.is_none():
        cursor.execute("insert into blp_recursive values (?, ?, ?, null, ?)", (blp_id, blp_flag, mat, blp.time))
    else:
        cursor.execute(
            "insert into blp_recursive values (?, ?, ?, ?, ?)",
            (blp_id, blp_flag, mat, cbor2.dumps(blp.product.unwrap_unchecked()), blp.time)
        )
    db.commit()

In [25]:
cursor.execute("select type_id, flag from material;")
id_list = cursor.fetchall()
for i, f in id_list:
    insert_blp(i, f, BlpRecursive.get_recursive(i))

In [24]:
cursor.execute("delete from blp_recursive;")
db.commit()