In [None]:
import json
import re
import tempfile
from collections import defaultdict
from dataclasses import dataclass, field
from io import BytesIO
from pathlib import Path
from typing import Literal
from urllib.request import urlopen

from graphviz import Digraph
from PIL import Image, ImageDraw, ImageFont, ImageFilter
from slpp import slpp as lua

# Configs

In [None]:
# TODO: Move to config file or dataclass

# Paths
OUTPUT_DIR_PATH = Path("output")
FONT_PATH = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf"

# Configuration for the Factorio game
DIFFICULTY = "normal"

# Define the machines to use
CRAFTING_MACHINE = "assembling-machine-3"
SMELTING_MACHINE = "electric-furnace"

# Hardcoded for now
SMELTING_SPEED = {
    "stone-furnace": 1.0,
    "steel-furnace": 2.0,
    "electric-furnace": 2.0,
}.get(SMELTING_MACHINE, 2.0)

# Hardcoded for now
CRAFTING_SPEED = {
    "assembling-machine-1": 0.5,
    "assembling-machine-2": 0.75,
    "assembling-machine-3": 1.25,
}.get(CRAFTING_MACHINE, 1.25)

# Hardcoded for now
RECIPE_SPEEDS = {
    "advanced-crafting": CRAFTING_SPEED,
    "crafting": CRAFTING_SPEED,
    "crafting-with-fluid": CRAFTING_SPEED,
    "smelting": SMELTING_SPEED,
}

In [None]:
# FATORIO DATA REPOSITORY
FACTORIO_VERSION = "2.0.71"

FACTORIO_DATA_PATH = Path(
    "/mnt/c/Program Files (x86)/Steam/steamapps/common/Factorio/data"
)
FACTORIO_GITHUB_URL = (
    f"https://raw.githubusercontent.com/wube/factorio-data/refs/tags/{FACTORIO_VERSION}"
)

RECIPE_FILE_PATH = "base/prototypes/recipe.lua"
ITEM_FILE_PATH = "base/prototypes/item.lua"
FLUID_FILE_PATH = "base/prototypes/fluid.lua"
EQUIPMENT_FILE_PATH = "base/prototypes/equipment.lua"


def read_factorio_file(file_path: str, binary: bool = False) -> str | bytes:
    """
    Returns the contents of a Factorio data file from local installation or GitHub.

    Args:
        file_path: relative path inside Factorio folder
        binary: if True, returns bytes instead of a string

    Returns:
        str (text) or bytes (binary)
    """
    file_path = file_path.replace("__base__", "base")
    # Try local installation first
    local_path = FACTORIO_DATA_PATH / file_path
    if local_path.exists():
        if binary:
            return local_path.read_bytes()
        else:
            return local_path.read_text(encoding="utf-8")

    # Fallback to GitHub (only has text files)
    url = f"{FACTORIO_GITHUB_URL}/{file_path}"
    with urlopen(url) as response:
        data = response.read()
        if binary:
            return data
        return data.decode("utf-8")


def clean_factorio_lua(text: str) -> str:
    """Clean Factorio Lua file by removing comments and unnecessary parts."""
    # Remove multi-line comments
    text = re.sub(r"--\[\[.*?\]\]", "", text, flags=re.DOTALL)
    # Remove single-line comments
    text = re.sub(r"--.*", "", text)
    return text


def format_number(value: float, max_decimals: int = 3) -> str:
    """Format number with up to max_decimals places, no trailing zeros."""
    return f"{value:.{max_decimals}f}".rstrip('0').rstrip('.')

# Clean recipes

In [None]:
recipe_file_text = read_factorio_file(RECIPE_FILE_PATH)

# Remove wrapper
text = recipe_file_text.split("data:extend\n(", 1)[1].removesuffix(")")
text = clean_factorio_lua(text)
raw_recipes = lua.decode(text)

(OUTPUT_DIR_PATH / "recipe.json").write_text(json.dumps(raw_recipes, indent=2))

print(len(raw_recipes))


In [None]:
@dataclass
class Recipe:
    name: str
    category: str = "crafting"
    energy_required: float = 0.5
    ingredients: dict[str, int] = field(default_factory=dict)
    result_amount: float = 1

    @property
    def time_per_cycle(self) -> float:
        """Time per crafting cycle adjusted by machine speed."""
        return self.energy_required / RECIPE_SPEEDS.get(self.category, CRAFTING_SPEED)

    @property
    def machine(self) -> str:
        if "crafting" in self.category:
            return CRAFTING_MACHINE
        if self.category == "smelting":
            return SMELTING_MACHINE
        if self.category == "oil-processing":
            return "oil-refinery"
        if self.category == "chemistry":
            return "chemical-plant"
        if self.category == "rocket-building":
            return "rocket-silo"
        if self.category == "centrifuging":
            return "centrifuge"
        print(
            f"Couldn't determine name for recipe {self.name} with category {self.category}"
        )


recipes: dict[str, Recipe] = {}
for raw_recipe in raw_recipes:
    name = raw_recipe["name"]
    amount_dict = raw_recipe.get(DIFFICULTY, raw_recipe)

    # Parse out the ingredients
    ingredients: dict[str, int] = {}
    raw_ingredients = amount_dict.get("ingredients")
    ingredients = {
        raw_ingredient["name"]: raw_ingredient["amount"]
        for raw_ingredient in raw_ingredients
    }

    # Parse out the results
    raw_results = amount_dict.get("results")
    if len(raw_results) != 1:
        # Skip oil and uranium recipes for now
        print(f"Skipping {name} with multiple results")
        continue

    raw_result_dict = next(iter(raw_results))
    result_name = raw_result_dict["name"]
    result_amount = raw_result_dict["amount"]
    if result_name != name:
        # Skip oil and uranium recipes for now
        print(f"Skipping {name} with different result name {result_name}")
        continue

    recipes[name] = Recipe(
        name=name,
        category=raw_recipe.get("category", "crafting"),
        ingredients=ingredients,
        energy_required=amount_dict.get("energy_required", 0.5),
        result_amount=result_amount,
    )


# Clean item, fluid, and equipment icon paths

In [None]:
RESOLUTIONS = Literal[64, 32, 16, 8]

class IconManager:
    """Manages loading, caching, and rendering of Factorio icons."""
    
    def __init__(self, font_path: str = FONT_PATH):
        self._cache: dict[tuple[str, int], Image.Image] = {}
        self.font = ImageFont.truetype(font_path, size=24)
        
        # Load icon paths once
        self.icon_paths = self._load_all_icon_paths()
    
    def _load_all_icon_paths(self) -> dict[str, str]:
        """Load all icon paths from item, fluid, and equipment files."""
        paths = {}
        
        # Items
        item_text = read_factorio_file(ITEM_FILE_PATH)
        item_text = item_text.split("data:extend\n(", 1)[1].removesuffix(")\n")
        raw_items = lua.decode(clean_factorio_lua(item_text))
        paths.update({item.get("name"): item.get("icon") for item in raw_items})
        
        # Fluids
        fluid_text = read_factorio_file(FLUID_FILE_PATH)
        fluid_text = fluid_text.split("data:extend(", 2)[2].removesuffix(")\n")
        raw_fluids = lua.decode(clean_factorio_lua(fluid_text))
        paths.update({fluid.get("name"): fluid.get("icon") for fluid in raw_fluids})
        
        # Equipment
        equip_text = read_factorio_file(EQUIPMENT_FILE_PATH)
        equip_text = equip_text.split("data:extend(", 1)[1].removesuffix(")\n")
        raw_equipment = lua.decode(clean_factorio_lua(equip_text))
        paths.update({eq.get("name"): eq.get("icon") for eq in raw_equipment})
        
        return paths
    
    def load_icon(self, name: str, resolution: RESOLUTIONS = 64) -> Image.Image | None:
        """Load an icon at specified resolution with caching."""
        cache_key = (name, resolution)
        if cache_key in self._cache:
            return self._cache[cache_key]
        
        icon_path = self.icon_paths.get(name)
        if not icon_path:
            print(f"No icon path found for: {name}")
            return None
        
        icon_bytes = read_factorio_file(icon_path, binary=True)
        img = Image.open(BytesIO(icon_bytes))
        
        if img.size != (120, 64):
            raise ValueError(f"Unexpected icon size for {name}: {img.size}")
        
        # Crop to correct resolution
        crops = {
            64: (0, 0, 64, 64),
            32: (64, 0, 96, 32),
            16: (96, 0, 112, 16),
            8: (112, 0, 120, 8),
        }
        icon = img.crop(crops[resolution]).convert("RGBA")
        self._cache[cache_key] = icon
        return icon
    
    def create_shadow(
        self,
        product_img: Image.Image,
        canvas_size: tuple[int, int],
        position: tuple[int, int],
        intensity: float = 2.5,
        blur_radius: int = 6,
        expand_size: int = 5,
    ) -> Image.Image:
        """Create a drop shadow for an icon."""
        shadow = Image.new("RGBA", product_img.size, (0, 0, 0, 0))
        shadow.putalpha(product_img.split()[3])
        
        shadow_padded = Image.new("RGBA", canvas_size, (0, 0, 0, 0))
        shadow_padded.paste(shadow, position)
        shadow_padded = shadow_padded.filter(ImageFilter.MaxFilter(size=expand_size))
        shadow_padded = shadow_padded.filter(ImageFilter.GaussianBlur(radius=blur_radius))
        
        alpha = shadow_padded.split()[3].point(lambda p: min(int(p * intensity), 255))
        shadow_padded.putalpha(alpha)
        
        return shadow_padded
    
    def add_text_overlay(
        self,
        image: Image.Image,
        top_right_text: str,
        bottom_right_text: str,
        stroke_width: int = 3,
    ) -> Image.Image:
        """Add text overlays to top-right and bottom-right corners."""
        draw = ImageDraw.Draw(image)
        w, h = image.size
        
        # Top-right text (production rate)
        if top_right_text:
            bbox = draw.textbbox((0, 0), top_right_text, font=self.font)
            text_w = bbox[2] - bbox[0]
            draw.text(
                (w - stroke_width - text_w, 0),
                top_right_text,
                fill="white",
                font=self.font,
                stroke_width=stroke_width,
                stroke_fill="black",
            )
        
        # Bottom-right text (machine count)
        if bottom_right_text:
            bbox = draw.textbbox((0, 0), bottom_right_text, font=self.font)
            text_w = bbox[2] - bbox[0]
            text_h = bbox[3] - bbox[1]
            draw.text(
                (w - stroke_width - text_w, h - 10 - text_h),
                bottom_right_text,
                fill="white",
                font=self.font,
                stroke_width=stroke_width,
                stroke_fill="black",
            )
        
        return image
    
    def create_product_image(
        self,
        product: str,
        machine: str | None,
        production_rate: float,
        machine_count: float,
        save_dir: Path,
        filename: str | None = None
    ) -> str | None:
        """Create a complete product image with machine, shadow, and text overlays."""
        product_img = self.load_icon(product, resolution=64)
        if product_img is None:
            return None
        
        if machine_count and machine:
            machine_img = self.load_icon(machine, resolution=64)
            if machine_img is None:
                return None
            
            # Create base image
            image = machine_img.resize((128, 128), resample=Image.BICUBIC)
            
            # Add shadow
            shadow = self.create_shadow(
                product_img,
                canvas_size=image.size,
                position=(32, 32),
            )
            image = Image.alpha_composite(image, shadow)
            
            # Add product on top
            image.paste(product_img, (32, 32), product_img)
            
            machine_text = f"{format_number(machine_count)}"
        else:
            image = product_img.resize((128, 128), resample=Image.BICUBIC)
            machine_text = ""
        
        # Add text overlays
        production_text = f"{format_number(production_rate)}/s"
        image = self.add_text_overlay(image, production_text, machine_text)
        
        if filename is None:
            filename = f"{product}_with_{machine}_{format_number(production_rate)}_{format_number(machine_count)}.png"
    
        save_path = save_dir / filename

        # Save
        image.save(save_path)
        return str(save_path)


# Usage:
icon_manager = IconManager()
icon_path = icon_manager.create_product_image(
    product="automation-science-pack",
    machine="assembling-machine-3",
    production_rate=12.5,
    machine_count=10,
    save_dir=OUTPUT_DIR_PATH,
)

# Create graph

In [None]:
@dataclass
class ProductionNode:
    """Represents a product in the production graph."""
    product: str
    recipe: Recipe | None
    item_demand: float  # items per second
    machine_count: float  # number of machines needed


@dataclass
class ProductionEdge:
    """Represents a connection between two products."""
    ingredient: str
    product: str
    item_flow: float  # items per second flowing along this edge
    machine_count: float  # machines needed to produce this flow


class GraphBuilder:
    """Builds and analyzes Factorio production graphs."""
    
    def __init__(self, recipes: dict[str, Recipe]):
        self.recipes = recipes
    
    def calculate_production_graph(
        self,
        target: str,
        target_demand: float = 2.5,
        raw_materials: set[str] = None,
    ) -> tuple[dict[str, ProductionNode], list[ProductionEdge]]:
        """
        Calculate production requirements for a target product.
        
        Returns:
            nodes: dict mapping product name to ProductionNode
            edges: list of ProductionEdges showing material flow
        """
        raw_materials = raw_materials or set()
        
        # Get topological ordering
        topo_order = self._get_topological_order(target, raw_materials)
        
        # Initialize target node
        target_recipe = self.recipes[target]
        item_demands = defaultdict(float, {target: target_demand})
        machine_counts = defaultdict(
            float,
            {
                target: target_demand
                / target_recipe.result_amount
                * target_recipe.time_per_cycle
            },
        )
        
        # Calculate demands backwards through graph
        edges: list[ProductionEdge] = []
        for product in reversed(topo_order):
            recipe = self.recipes.get(product)
            if product in raw_materials or recipe is None:
                continue  # raw material, no inputs
            
            for ingredient in recipe.ingredients:
                # Calculate flow along this edge
                item_flow = (
                    machine_counts[product]
                    * recipe.ingredients[ingredient]
                    / recipe.time_per_cycle
                )
                item_demands[ingredient] += item_flow
                
                # Calculate machines needed for ingredient
                machine_count_edge = 0.0
                ing_recipe = self.recipes.get(ingredient)
                if ing_recipe:
                    machine_count_edge = (
                        item_flow
                        / ing_recipe.result_amount
                        * ing_recipe.time_per_cycle
                    )
                machine_counts[ingredient] += machine_count_edge
                
                edges.append(ProductionEdge(
                    ingredient=ingredient,
                    product=product,
                    item_flow=item_flow,
                    machine_count=machine_count_edge,
                ))
        
        # Build nodes dict
        nodes = {
            product: ProductionNode(
                product=product,
                recipe=self.recipes.get(product),
                item_demand=item_demands[product],
                machine_count=machine_counts.get(product, 0.0),
            )
            for product in topo_order
        }
        
        return nodes, edges
    
    def _get_topological_order(
        self, target: str, raw_materials: set[str]
    ) -> list[str]:
        """Get a topological ordering of products via DFS."""
        visited = set()
        topo_order = []

        def dfs(product: str):
            if product in visited:
                return
            visited.add(product)
            recipe = self.recipes.get(product)
            is_raw_material = product in raw_materials or recipe is None
            if not is_raw_material:
                for ingredient in recipe.ingredients:
                    dfs(ingredient)
            topo_order.append(product)

        dfs(target)
        return topo_order
    
    def visualize(
        self,
        nodes: dict[str, ProductionNode],
        edges: list[ProductionEdge],
        icon_manager: IconManager,
        output_path: Path,
    ) -> None:
        """Create a Graphviz visualization of the production graph."""
        dot = Digraph()
        
        with tempfile.TemporaryDirectory() as tmpdir:
            # Create node images
            for product, node in nodes.items():
                icon_path = icon_manager.create_product_image(
                    product=node.product,
                    machine=node.recipe.machine if node.recipe else None,
                    production_rate=node.item_demand,
                    machine_count=node.machine_count,
                    save_dir=Path(tmpdir),
                )
                
                if icon_path and Path(icon_path).exists():
                    dot.node(
                        product,
                        label="",
                        image=str(icon_path),
                        shape="none",
                        imagescale="true",
                    )
                else:
                    dot.node(
                        product,
                        label=f"{product} - {format_number(node.item_demand)}/s - {format_number(node.machine_count)} machines",
                    )
            
            # Create edges
            for edge in edges:
                label = f"{format_number(edge.item_flow)}/s"
                dot.edge(edge.ingredient, edge.product, label=label)
            
            dot.render(str(output_path), format="png", cleanup=True)
    
    def build_and_visualize(
        self,
        target: str,
        icon_manager: IconManager,
        target_demand: float = 2.5,
        raw_materials: set[str] = None,
        output_path: Path = None,
    ) -> tuple[dict[str, ProductionNode], list[ProductionEdge]]:
        """Convenience method to calculate and visualize in one call."""
        nodes, edges = self.calculate_production_graph(
            target, target_demand, raw_materials
        )
        
        if output_path is None:
            output_path = OUTPUT_DIR_PATH / target
        
        self.visualize(nodes, edges, icon_manager, output_path)
        return nodes, edges


# Usage:
icon_manager = IconManager()
graph_builder = GraphBuilder(recipes)

# Simple usage
graph_builder.build_and_visualize(
    "chemical-science-pack",
    icon_manager,
    raw_materials={"advanced-circuit"}
)
