In [1]:
import fio.database
import mip

db = fio.database.load_db("data")

In [2]:
import functools

def is_fluid(name):
    return db.fluid.query(f"""map(select(.name == "{name}"))|any""")

def is_item(name):
    return db.item.query(f"""map(select(.name == "{name}"))|any""")

assert is_item("iron-ore")
assert not is_fluid("iron-ore")

assert not is_item("water")
assert is_fluid("water")

@functools.lru_cache(16)
def get_assembly_machines(category, craftable_only=True):
  ms = []

  for m in db.assembling_machine.find(f""".[] | select(.crafting_categories."{category}")"""):
    if craftable_only and not db.recipe.query(f"""map( select(.main_product.name == "{m['name']}" )) | any"""):
      continue
    ms.append(m)

  for m in db.furnace.find(f""".[] | select(.crafting_categories."{category}") """):
    if craftable_only and not db.recipe.query(f"""map( select(.main_product.name == "{m['name']}" )) | any"""):
      continue
    ms.append(m)

  return ms

# assert get_assembly_machines("nuclear-fusion") == ['kr-fusion-reactor']
# assert set(get_assembly_machines("basic-crafting")) == {'assembling-machine-1', 'assembling-machine-2', 'assembling-machine-3', 'kr-advanced-assembling-machine'}

assert len(db.recipe.find(".[] | select(.hidden) | .name")) == 750
assert len(db.assembling_machine.find(""".[] | select(.crafting_categories."basic-crafting") | .name""")) == 6
assert db.recipe.query("""map( select(.main_product.name == "laser-turret" ))|any""") == True
assert db.recipe.query("""map( select(.main_product.name == "rien" ))|any""") == False

# Directed Hypergraph

We use a custom datastructure to represent all the recipes. This will make the linear optimization modeling easier.

In [3]:
class HyperDiGraph:
    def __init__(self):
        self._edges = []
        self._nodes = set()

    def add_edge(self, A, B, data=None):
        self._edges.append((A, B, data))
        self._nodes.update(A)
        self._nodes.update(B)
    
    def in_edges(self, v):
        assert v in self._nodes
        return [(e[0], e[1]) for e in self._edges if v in e[1]]

    def in_edges_data(self, v):
        assert v in self._nodes
        return [e for e in self._edges if v in e[1]]

    def out_edges(self, v):
        assert v in self._nodes
        return [(e[0], e[1]) for e in self._edges if v in e[0]]

    def out_edges_data(self, v):
        assert v in self._nodes
        return [e for e in self._edges if v in e[0]]

    def edges(self):
        return [(e[0], e[1]) for e in self._edges]

    def edges_data(self):
        return [e for e in self._edges]

    def nodes(self):
        return [v for v in self._nodes]

    def edge_by_name(self, name):
        return next(d for (_, _, d) in self._edges if d["name"] == name)



In [4]:
recipes = HyperDiGraph()

for r in db.recipe.find(""".[] | select(.hidden | not)"""):
  for m in get_assembly_machines(r["category"]):
    A = {i["name"] for i in r["ingredients"]}
    B = {p["name"] for p in r["products"]}

    recipes.add_edge(A, B, {"recipe": r, "machine": m, "name": f"{r['name']}#{m['name']}"})


In [5]:
import mip
import networkx as nx
import math
import fio.arith as arith

def compute_cost(data):
    return 1 # data["machine"]["energy_usage"]

def ingredient_coef(data, v):
    coef = data["machine"]["crafting_speed"] * sum(i["amount"] for i in data["recipe"]["ingredients"] if i["name"] == v) / data["recipe"]["energy"]
    assert coef > 0
    return coef

def product_coef(data, v):
    try:
        coef = data["machine"]["crafting_speed"] * sum(p["amount"]*p["probability"] for p in data["recipe"]["products"] if p["name"] == v) / data["recipe"]["energy"]
    except:
        coef = data["machine"]["crafting_speed"] * sum((p["amount_min"] + p["amount_max"])/2*p["probability"] for p in data["recipe"]["products"] if p["name"] == v) / data["recipe"]["energy"]
    assert coef > 0
    return coef

def optimize_model(graph, sources):
    model = mip.Model()
    model.verbose = 0

    flow_int = {data["name"]: model.add_var(f"flow_{data['name']}", lb=0, obj=compute_cost(data), var_type=mip.INTEGER) for a, b, data in graph.edges_data()}
    flow = {data["name"]: model.add_var(f"flow_{data['name']}", lb=0, obj=compute_cost(data)) for a, b, data in graph.edges_data()}
    
    for _, _, d in graph.edges_data():
        model.add_constr(flow[d["name"]] <= flow_int[d["name"]])
    
    diffs = {}
    produced = {}
    used = {}
    
    considered = {}

    for v in graph.nodes():
        
        diff = mip.quicksum(flow[data["name"]] * ingredient_coef(data, v) for (_, _, data) in graph.out_edges_data(v)) - mip.quicksum(flow[data["name"]] * product_coef(data, v) for (_, _, data) in graph.in_edges_data(v))
        diffs[v] = diff 
        
        considered[v] = mip.quicksum(flow[data["name"]] * ingredient_coef(data, v) for (_, _, data) in graph.out_edges_data(v)) + mip.quicksum(flow[data["name"]] * product_coef(data, v) for (_, _, data) in graph.in_edges_data(v))

        for (_, _, data) in graph.in_edges_data(v):
            produced[v, data["name"]] = flow[data["name"]] * product_coef(data, v)

        for (_, _, data) in graph.out_edges_data(v):
            used[v, data["name"]] = flow[data["name"]] * ingredient_coef(data, v)

        # model.objective = model.objective - diff


        if v in sources:
            model.add_constr(diff <= sources[v])
        else:
            model.add_constr(diff <= 0)

    model.optimize(max_seconds=10)
    if model.status in [mip.OptimizationStatus.FEASIBLE, mip.OptimizationStatus.OPTIMAL]:
        return {
            "kind": "solution",
            "status": model.status,
            "diffs": {k: arith.float_to_frac(v.x) for k, v in diffs.items() if round(v.x, 3) != 0},
            "considered": {k for k, v in considered.items() if round(v.x, 3) != 0},
            "used": {k: arith.float_to_frac(v.x) for k, v in used.items() if round(v.x, 3) != 0},
            "produced": {k: arith.float_to_frac(v.x) for k, v in produced.items() if round(v.x, 3) != 0},
            "flow": {k: arith.float_to_frac(v.x) for k, v in flow.items() if v.x is not None and round(v.x, 3) > 0},
            "flow_int": {k: int(v.x) for k, v in flow_int.items() if v.x is not None and int(v.x) > 0},
        }
    else:
        return {
            "kind": "none",
            "status": model.status
        }

In [6]:
# Transformation to graph of the LP solution

# The resulting graph have the folloing interface:



def build_detailed_graph(model):
    result = model

    assert result["kind"] == "solution"

    layout = nx.DiGraph()

    for u in result["considered"]:
        layout.add_node(u, kind="exchange")

    for u, w in result["flow_int"].items():
        layout.add_node(u, kind="sub-factory", amount=w)

    for (u, v), w in result["used"].items():
        layout.add_edge(v, u, flow=w)

    for (u, v), w in result["produced"].items():
        layout.add_edge(u, v, flow=w)

    for u, info in layout.nodes(data=True):
        if info["kind"] == "exchange":
            size = max(sum(math.ceil(f/15) for u, v, f in layout.out_edges(u, data="flow")),
                       sum(math.ceil(f/15) for u, v, f in layout.in_edges(u, data="flow"))) * 5
            layout.nodes[u]["size"] = size
            layout.nodes[u]["width"] = size
            layout.nodes[u]["height"] = size
            layout.nodes[u]["in__flow"] = sum(f for u, v, f in layout.in_edges(u, data="flow"))
            layout.nodes[u]["out_flow"] = sum(f for u, v, f in layout.out_edges(u, data="flow"))
        elif info["kind"] == "sub-factory":
            size = math.ceil(math.sqrt(info["amount"])) * 5
            layout.nodes[u]["size"] = size
            layout.nodes[u]["width"] = size
            layout.nodes[u]["height"] = size
        else:
            assert False
    
    return layout

In [7]:
def build_exchange(inputs, outputs):
  pass

In [8]:
source = {
    "iron-ore": float("+inf"),
    "crude-oil": float("+inf"),
    "copper-ore": float("+inf"),
    "stone": float("+inf"),
    "coal": float("+inf"),
    "water": float("+inf"),
    "wood": float("+inf"),
    "imersite-powder": float("+inf"),
    "mineral-water": float("+inf"),
    "raw-rare-metals": float("+inf"),
}

target = {
  k: 1 for k in {
    'advanced-tech-card',
    'automation-science-pack',
    'basic-tech-card',
    'biters-research-data',
    'chemical-science-pack',
    'logistic-science-pack',
    'matter-research-data',
    'matter-tech-card',
    'military-science-pack',
    'production-science-pack',
    'singularity-tech-card',
    'utility-science-pack'
  }
}

model = optimize_model(recipes, dict(**source, **{k: -v for k, v in target.items()}))


In [9]:
model["flow_int"]

{'blank-tech-card#kr-advanced-assembling-machine': 2,
 'biters-research-data#kr-advanced-assembling-machine': 2,
 'matter-research-data#kr-quantum-computer': 3,
 'coke#kr-advanced-furnace': 4,
 'glass#kr-advanced-furnace': 6,
 'nitrogen#kr-atmospheric-condenser': 86,
 'kr-s-c-copper-cable#kr-advanced-assembling-machine': 14,
 'sulfuric-acid#kr-advanced-chemical-plant': 1,
 'advanced-oil-processing#oil-refinery': 88,
 'pipe#kr-advanced-assembling-machine': 1,
 'sand#kr-crusher': 1,
 'transport-belt#kr-advanced-assembling-machine': 1,
 'fast-transport-belt#kr-advanced-assembling-machine': 1,
 'basic-tech-card#kr-advanced-assembling-machine': 1,
 'automation-science-pack#kr-advanced-assembling-machine': 1,
 'logistic-science-pack#kr-advanced-assembling-machine': 1,
 'military-science-pack#kr-advanced-assembling-machine': 1,
 'chemical-science-pack#kr-advanced-assembling-machine': 1,
 'production-science-pack#kr-quantum-computer': 2,
 'utility-science-pack#kr-quantum-computer': 2,
 'matter