In [5]:
import sys

sys.path.append("/Users/harshitsharma/Code/exp/factorio-learning-environment/")

from time import sleep
from typing import List, Set, Union, Optional
from fle.env.entities import Position, Entity, EntityGroup, Inventory, ResourcePatch, Ingredient
from fle.env.game_types import Prototype, Resource, Technology
from fle.env.tools.agent.connect_entities.groupable_entities import (
    agglomerate_groupable_entities,
)


class GetEntities:

    def __call__(
        self,
        response,
    ) -> List[Union[Entity, EntityGroup]]:
        """
        Get entities within a radius of a given position.
        :param entities: Set of entity prototypes to filter by. If empty, all entities are returned.
        :param position: Position to search around. Can be a Position object or "player" for player's position.
        :param radius: Radius to search within.
        :return: Found entities
        """
        try:
            if (not isinstance(response, dict) and not response) or isinstance(
                response, str
            ):  # or (isinstance(response, dict) and not response):
                raise Exception("Could not get entities", response)

            entities_list = []
            for raw_entity_data in response:
                if isinstance(raw_entity_data, list):
                    continue

                entity_data = self.clean_response(raw_entity_data)
                # Find the matching Prototype
                matching_prototype = None
                for prototype in Prototype:
                    if prototype.value[0] == entity_data["name"].replace("_", "-"):
                        matching_prototype = prototype
                        break

                if matching_prototype is None:
                    print(
                        f"Warning: No matching Prototype found for {entity_data['name']}"
                    )
                    continue

                # if matching_prototype not in entities and entities:
                #     continue
                metaclass = matching_prototype.value[1]
                while isinstance(metaclass, tuple):
                    metaclass = metaclass[1]

                # Process nested dictionaries (like inventories)
                for key, value in entity_data.items():
                    if isinstance(value, dict):
                        entity_data[key] = self.process_nested_dict(value)

                entity_data["prototype"] = prototype

                # remove all empty values from the entity_data dictionary
                entity_data = {
                    k: v for k, v in entity_data.items() if v or isinstance(v, int)
                }

                try:
                    entity = metaclass(**entity_data)
                    entities_list.append(entity)
                except Exception as e1:
                    print(f"Could not create {entity_data['name']} object: {e1}")

            # get all pipes into a list
            pipes = [
                entity
                for entity in entities_list
                if hasattr(entity, "prototype")
                and entity.prototype in (Prototype.Pipe, Prototype.UndergroundPipe)
            ]
            group = agglomerate_groupable_entities(pipes)
            [entities_list.remove(pipe) for pipe in pipes]
            entities_list.extend(group)

            poles = [
                entity
                for entity in entities_list
                if hasattr(entity, "prototype")
                and entity.prototype
                in (
                    Prototype.SmallElectricPole,
                    Prototype.BigElectricPole,
                    Prototype.MediumElectricPole,
                )
            ]
            group = agglomerate_groupable_entities(poles)
            [entities_list.remove(pole) for pole in poles]
            entities_list.extend(group)

            walls = [
                entity
                for entity in entities_list
                if hasattr(entity, "prototype")
                and entity.prototype == Prototype.StoneWall
            ]
            group = agglomerate_groupable_entities(walls)
            [entities_list.remove(wall) for wall in walls]
            entities_list.extend(group)

            belt_types = (
                Prototype.TransportBelt,
                Prototype.FastTransportBelt,
                Prototype.ExpressTransportBelt,
                Prototype.UndergroundBelt,
                Prototype.FastUndergroundBelt,
                Prototype.ExpressUndergroundBelt,
            )
            belts = [
                entity
                for entity in entities_list
                if hasattr(entity, "prototype") and entity.prototype in belt_types
            ]
            group = agglomerate_groupable_entities(belts)
            [entities_list.remove(belt) for belt in belts]
            entities_list.extend(group)

            return entities_list

        except Exception as e:
            raise Exception(f"Error in GetEntities: {e}")

    def process_nested_dict(self, nested_dict):
        """Helper method to process nested dictionaries"""
        if isinstance(nested_dict, dict):
            if all(isinstance(key, int) for key in nested_dict.keys()):
                return [
                    self.process_nested_dict(value) for value in nested_dict.values()
                ]
            else:
                return {
                    key: self.process_nested_dict(value)
                    for key, value in nested_dict.items()
                }
        return nested_dict

    def clean_response(self, response):
        def is_lua_list(d):
            """Check if dictionary represents a Lua-style list (keys are consecutive numbers from 1)"""
            if not isinstance(d, dict) or not d:
                return False
            keys = set(str(k) for k in d.keys())
            return all(str(i) in keys for i in range(1, len(d) + 1))

        def clean_value(value):
            """Recursively clean a value"""
            if isinstance(value, dict):
                # Handle Lua-style lists
                if is_lua_list(value):
                    # Sort by numeric key and take only the values
                    sorted_items = sorted(value.items(), key=lambda x: int(str(x[0])))
                    return [clean_value(v) for k, v in sorted_items]

                # Handle inventory special case
                if any(isinstance(k, int) for k in value.keys()) and all(
                    isinstance(v, dict) and "name" in v and "count" in v
                    for v in value.values()
                ):
                    cleaned_dict = {}
                    for v in value.values():
                        cleaned_dict[v["name"]] = v["count"]
                    return cleaned_dict

                # Regular dictionary
                return {k: clean_value(v) for k, v in value.items()}

            elif isinstance(value, list):
                return [clean_value(v) for v in value]

            return value

        cleaned_response = {}

        if not hasattr(response, "items"):
            pass

        for key, value in response.items():
            # if key == 'status' and isinstance(value, str):
            # cleaned_response[key] = EntityStatus.from_string(value)
            if key == "direction" and isinstance(value, str):
                cleaned_response[key] = Direction.from_string(value)
            elif not value and key in (
                "warnings",
                "input_connection_points",
                "output_connection_points",
            ):
                cleaned_response[key] = []
            else:
                cleaned_response[key] = clean_value(value)

        return cleaned_response


class InspectInventory:

    def __call__(
        self, response, entity=None, all_players: bool = False
    ) -> Union[Inventory, List[Inventory]]:
        """
        Inspects the inventory of the given entity. If no entity is given, inspect your own inventory.
        If all_players is True, returns a list of inventories for all players.
        :param entity: Entity to inspect
        :param all_players: If True, returns inventories for all players
        :return: Inventory of the given entity or list of inventories for all players
        """

        if entity:
            if isinstance(entity, Entity):
                x, y = self.get_position(entity.position)
            elif isinstance(entity, Position):
                x, y = entity.x, entity.y
            else:
                raise ValueError(
                    f"The first argument must be an Entity or Position object, you passed in a {type(entity)} object."
                )
        else:
            x, y = 0, 0

        if not isinstance(response, dict):
            if entity:
                raise Exception(f"Could not inspect inventory of {entity}.", response)
            else:
                # raise Exception("Could not inspect None inventory.", response)
                return Inventory()

        return Inventory(**response)

        from typing import Union


class Nearest:

    def __call__(
        self,
        type: Union[Prototype, Resource],
        # relative: bool = False,
        # **kwargs
    ) -> Position:
        """
        Find the nearest entity or resource to your position.
        :param type: Entity or resource type to find
        :return: Position of nearest entity or resource
        """
        try:
            if not isinstance(type, tuple) and isinstance(type.value, tuple):
                type = type.value

            name, metaclass = type

            if not isinstance(name, str):
                raise Exception(
                    "'Nearest' must be called with an entity name as the first argument."
                )

            response, time_elapsed = self.execute(self.player_index, name)

            if response is None or response == {}:
                if metaclass == ResourcePatch:
                    raise Exception(
                        f"No {type} found on the map within 500 tiles of the player. Move around to explore the map more."
                    )
                else:
                    raise Exception(f"No {type} found within 500 tiles of the player")

            #    self.game_state.last_observed_player_location = self.game_state.player_location

            # if relative:
            #    x = -response['x'] + self.game_state.last_observed_player_location[0]
            #    y = -response['y'] + self.game_state.last_observed_player_location[1]
            # else:
            x = response["x"]
            y = response["y"]

            position = Position(x=x, y=y)

            return position
        except TypeError:
            raise Exception(f"Could not find nearest {type[0]} on the surface")
        except Exception as e:
            raise Exception(f"Could not find nearest {type[0]}", e)


class GetResearchProgress:

    def __call__(
        self, response, technology: Optional[Technology] = None
    ) -> List[Ingredient]:
        """
        Get the progress of research for a specific technology or the current research.
        :param technology: Optional technology to check. If None, checks current research.
        :return The remaining ingredients to complete the research
        """
        if technology is not None:
            if hasattr(technology, "value"):
                name = technology.value
            else:
                name = technology
        else:
            name = None

        success, elapsed = self.execute(self.player_index, name)

        if success != {} and isinstance(success, str):
            if success is None:
                raise Exception(
                    "No research in progress"
                    if name is None
                    else f"Cannot get progress for {name}"
                )
            else:
                result = ":".join(success.split(":")[2:]).replace('"', "").strip()
                if result:
                    raise Exception(result)
                else:
                    raise Exception(success)

        return [
            Ingredient(
                name=ingredient["name"],
                count=ingredient["count"],
                type=ingredient.get("type"),
            )
            for ingredient in success
        ]

In [6]:
from pathlib import Path

list(Path("../factorio-data-collector/factorio_replays/replay-logs/").glob("*"))

[PosixPath('../factorio-data-collector/factorio_replays/replay-logs/core-meta.jsonl'),
 PosixPath('../factorio-data-collector/factorio_replays/replay-logs/get_entities.jsonl'),
 PosixPath('../factorio-data-collector/factorio_replays/replay-logs/nearest.jsonl'),
 PosixPath('../factorio-data-collector/factorio_replays/replay-logs/get_research_progress.jsonl'),
 PosixPath('../factorio-data-collector/factorio_replays/replay-logs/inspect_inventory.jsonl')]

In [16]:
import pandas as pd

df = pd.read_json(
    "/Users/harshitsharma/Code/exp/factorio-data-collector/factorio_replays/replay-logs/get_entities.jsonl",
    lines=True,
)
df_i = pd.read_json(
    "/Users/harshitsharma/Code/exp/factorio-data-collector/factorio_replays/replay-logs/inspect_inventory.jsonl",
    lines=True,
)

In [17]:
df.head()

Unnamed: 0,t,ev,p,x,y,entities_raw
0,0,get_entities,1,-62.1,14.5,"{ [1] = { [""name""] = ""crash-site-spaceship"",[""..."
1,60,get_entities,1,3.7,-6.1,"{ [1] = { [""name""] = ""crash-site-spaceship"",[""..."
2,120,get_entities,1,-0.9,-13.1,"{ [1] = { [""name""] = ""crash-site-spaceship"",[""..."
3,180,get_entities,1,-7.3,-19.4,"{ [1] = { [""name""] = ""crash-site-spaceship"",[""..."
4,240,get_entities,1,-7.8,-21.1,"{ [1] = { [""name""] = ""crash-site-spaceship"",[""..."


In [18]:
from slpp import slpp as lua
from fle.env.utils.rcon import _lua2python


# Process the dataframe to extract the actual entity data
def process_dataframe_responses(df, column="entities_raw"):
    """Process dataframe responses to extract entity data from pcall outputs"""
    processed_responses = []

    for idx, row in df.iterrows():
        # Parse the raw Lua response
        parsed_data, elapsed = _lua2python("", row[column])

        if parsed_data is None:
            processed_responses.append([])
            continue

        # Extract the actual entity data from the pcall response
        # The structure is typically: {1: entity1_data, 2: entity2_data, ...}
        if isinstance(parsed_data, dict):
            # Convert the numbered dictionary to a list of entity data
            entity_list = []
            for key in sorted(parsed_data.keys()):
                if isinstance(key, int):  # Only process numeric keys
                    entity_data = parsed_data[key]
                    if isinstance(entity_data, dict):
                        entity_list.append(entity_data)

            processed_responses.append(entity_list)
        else:
            processed_responses.append([])

    return processed_responses


df["processed_entities"] = process_dataframe_responses(df)

In [19]:
res = df.iloc[1].processed_entities
res

[{'name': 'crash-site-spaceship',
  'position': {'y': -6, 'x': -5},
  'direction': 0,
  'health': 600,
  'energy': 0,
  'type': 'container',
  'status': 'normal',
  'fuel': {'firearm-magazine': 8},
  'inventory': {'firearm-magazine': 8},
  'turret_ammo': {'firearm-magazine': 8},
  'dimensions': {'width': 15.59765625, 'height': 7.796875},
  'neighbours': {},
  'id': 2,
  'tile_dimensions': {'tile_width': 16, 'tile_height': 8}}]

In [20]:
ge = GetEntities()

In [25]:
ents = []

for res in df.processed_entities:
    try:
        ents.append(ge(res))
    except:
        ents.append(None)



In [26]:
df["final_entities"] = ents

In [27]:
df.head()

Unnamed: 0,t,ev,p,x,y,entities_raw,processed_entities,final_entities
0,0,get_entities,1,-62.1,14.5,"{ [1] = { [""name""] = ""crash-site-spaceship"",[""...","[{'name': 'crash-site-spaceship', 'position': ...",[]
1,60,get_entities,1,3.7,-6.1,"{ [1] = { [""name""] = ""crash-site-spaceship"",[""...","[{'name': 'crash-site-spaceship', 'position': ...",[]
2,120,get_entities,1,-0.9,-13.1,"{ [1] = { [""name""] = ""crash-site-spaceship"",[""...","[{'name': 'crash-site-spaceship', 'position': ...",[]
3,180,get_entities,1,-7.3,-19.4,"{ [1] = { [""name""] = ""crash-site-spaceship"",[""...","[{'name': 'crash-site-spaceship', 'position': ...",[]
4,240,get_entities,1,-7.8,-21.1,"{ [1] = { [""name""] = ""crash-site-spaceship"",[""...","[{'name': 'crash-site-spaceship', 'position': ...",[]


In [28]:
import json

df[["t", "final_entities"]].head(20).to_dict("records")

[{'t': 0, 'final_entities': []},
 {'t': 60, 'final_entities': []},
 {'t': 120, 'final_entities': []},
 {'t': 180, 'final_entities': []},
 {'t': 240, 'final_entities': []},
 {'t': 300, 'final_entities': []},
 {'t': 360, 'final_entities': []},
 {'t': 420, 'final_entities': []},
 {'t': 480, 'final_entities': []},
 {'t': 540, 'final_entities': []},
 {'t': 600, 'final_entities': []},
 {'t': 660, 'final_entities': []},
 {'t': 720, 'final_entities': []},
 {'t': 780, 'final_entities': []},
 {'t': 840, 'final_entities': []},
 {'t': 900, 'final_entities': []},
 {'t': 960,
  'final_entities': [
 {'t': 1020,
  'final_entities': [
   
   
   
   
 {'t': 1080,
  'final_entities': [
   
   
   
   
   
 {'t': 1140,
  'final_entities': [
   
   
   
   
   
   

In [33]:
df[(df['t'] > 5300) & (df['t'] < 5500)][["processed_entities", "final_entities"]].iloc[1].to_dict()

{'processed_entities': [{'name': 'crash-site-spaceship',
   'position': {'y': -6, 'x': -5},
   'direction': 0,
   'health': 600,
   'energy': 0,
   'type': 'container',
   'status': 'normal',
   'fuel': {'firearm-magazine': 8},
   'inventory': {'firearm-magazine': 8},
   'turret_ammo': {'firearm-magazine': 8},
   'dimensions': {'width': 15.59765625, 'height': 7.796875},
   'neighbours': {},
   'id': 2,
   'tile_dimensions': {'tile_width': 16, 'tile_height': 8}},
  {'name': 'stone-furnace',
   'position': {'y': 29, 'x': -25},
   'direction': 0,
   'health': 200,
   'energy': 1600,
   'type': 'furnace',
   'status': 'no_ingredients',
   'fuel': {'coal': 2},
   'burnt_result': {},
   'inventory': {'coal': 2},
   'furnace_source': {},
   'furnace_result': {'copper-plate': 1},
   'furnace_modules': {},
   'assembling_machine_input': {},
   'assembling_machine_output': {'copper-plate': 1},
   'assembling_machine_modules': {},
   'lab_input': {},
   'lab_modules': {'copper-plate': 1},
   'tur

In [65]:
df_i.head()

Unnamed: 0,t,ev,p,inventory_error,inventory_raw
0,0,inspect_inventory,1,...rently-playing/script/observations/inspect_...,
1,600,inspect_inventory,1,,"{ [""burner-mining-drill""] = 1,[""stone-furnace""..."
2,1200,inspect_inventory,1,,"{ [""stone-furnace""] = 4,[""wood""] = 1,[""coal""] ..."
3,1800,inspect_inventory,1,,"{ [""stone-furnace""] = 3,[""wood""] = 1,[""stone""]..."
4,2400,inspect_inventory,1,,"{ [""stone-furnace""] = 3,[""wood""] = 1,[""stone""]..."


In [66]:
df_i.iloc[1].inventory_raw

'{ ["burner-mining-drill"] = 1,["stone-furnace"] = 1,["wood"] = 1,["coal"] = 27,["stone"] = 47,} '

In [64]:
_lua2python("", df_i.iloc[1].inventory_raw)

({'burner-mining-drill': 1,
  'stone-furnace': 1,
  'wood': 1,
  'coal': 27,
  'stone': 47},
 889589.88554225)

In [67]:
ii = InspectInventory()

ii(_lua2python("", df_i.iloc[1].inventory_raw))

Inventory({})

In [None]:
def process_inventory_responses(df, column_name="inventory_raw"):
    """Process inventory responses from dataframe"""
    processed_inventories = []

    for idx, row in df.iterrows():
        # Parse the raw Lua response
        parsed_data, elapsed = _lua2python("", row[column_name])

        if parsed_data is None:
            processed_inventories.append(Inventory())
            continue

        # Create Inventory object from the parsed data
        try:
            inventory = Inventory(**parsed_data)
            processed_inventories.append(inventory)
        except Exception as e:
            print(f"Error processing inventory at row {idx}: {e}")
            processed_inventories.append(Inventory())

    return processed_inventories


# Usage:
processed_inventories = process_inventory_responses(df_i, "inventory_raw")

# Test with the first non-empty inventory
for i, inventory in enumerate(processed_inventories):
    if len(inventory) > 0:
        print(f"Row {i} inventory: {inventory}")
        break

Row 1 inventory: {'burner-mining-drill': 1, 'stone-furnace': 1, 'wood': 1, 'coal': 27, 'stone': 47}


In [70]:
processed_inventories[1]

Inventory({'burner-mining-drill': 1, 'stone-furnace': 1, 'wood': 1, 'coal': 27, 'stone': 47})

In [None]:
from fle.env.gym_env.observation_formatter import BasicObservationFormatter, Observation

In [92]:
df

Unnamed: 0,t,ev,p,x,y,entities_raw,processed_entities,final_entities
0,0,get_entities,1,-62.1,14.5,{ },[],
1,600,get_entities,1,-12.7,-16.2,"{ [1] = { [""name""] = ""crash-site-spaceship"",[""...","[{'name': 'crash-site-spaceship', 'position': ...",[]
2,1200,get_entities,1,-24.0,32.0,"{ [1] = { [""name""] = ""stone-furnace"",[""positio...","[{'name': 'stone-furnace', 'position': {'y': 2...",[fuel=Inventory({'coal': 3}) name='stone-furna...
3,1800,get_entities,1,-24.0,32.0,"{ [1] = { [""name""] = ""stone-furnace"",[""positio...","[{'name': 'stone-furnace', 'position': {'y': 2...",[fuel=Inventory({'coal': 3}) name='stone-furna...
4,2400,get_entities,1,-0.7,-29.7,{ },[],
...,...,...,...,...,...,...,...,...
397,238200,get_entities,1,148.5,218.5,"{ [1] = { [""name""] = ""steel-furnace"",[""positio...","[{'name': 'steel-furnace', 'position': {'y': 2...",[fuel=Inventory({'coal': 4}) name='steel-furna...
398,238800,get_entities,1,150.3,218.5,"{ [1] = { [""name""] = ""steel-furnace"",[""positio...","[{'name': 'steel-furnace', 'position': {'y': 2...",[fuel=Inventory({'coal': 5}) name='steel-furna...
399,239400,get_entities,1,146.7,218.5,"{ [1] = { [""name""] = ""construction-robot"",[""po...","[{'name': 'construction-robot', 'position': {'...",[fuel=Inventory({'coal': 5}) name='steel-furna...
400,240000,get_entities,1,127.8,219.9,"{ [1] = { [""name""] = ""transport-belt"",[""positi...","[{'name': 'transport-belt', 'position': {'x': ...",[\n\tBeltGroup(inputs=[Belt((x=137.5 y=216.5)-...


In [None]:
df.formatted_entities = df.final_entities.apply(
    lambda x: BasicObservationFormatter.format_entities([str(e) for e in x])
)

TypeError: 'NoneType' object is not iterable