In [74]:
from bw_graph_tools.graph_traversal import Edge as GraphEdge
from bw_graph_tools.graph_traversal import NewNodeEachVisitGraphTraversal
from bw_graph_tools.graph_traversal import Node as GraphNode

In [75]:
import json
from copy import deepcopy
from abc import abstractmethod

def identify_activity_type(activity):
    """Return the activity type based on its naming."""
    name = activity["name"]
    if "treatment of" in name:
        return "treatment"
    elif "market for" in name:
        # if not "to generic" in name:  # these are not markets, but also transferring activities
        return "market"
    elif "market group" in name:
        # if not "to generic" in name:
        return "marketgroup"
    else:
        return "production"
    
class BaseGraph(object):
    def __init__(self):
        self.json_data = None
        # stores previous graphs, if any, and enables back/forward buttons
        self.stack = []
        # stores graphs that can be returned to after having used the "back" button
        self.forward_stack = []

    def update(self, delete_unstacked: bool = True) -> None:
        self.store_previous()
        if delete_unstacked:
            self.forward_stack = []

    def forward(self) -> bool:
        """Go forward, if previously gone back."""
        if not self.forward_stack:
            return False
        self.retrieve_future()
        self.update(delete_unstacked=False)
        return True

    def back(self) -> bool:
        """Go back to previous graph, if any."""
        if len(self.stack) <= 1:
            return False
        self.store_future()
        self.update(delete_unstacked=False)
        return True

    def store_previous(self) -> None:
        """Store the current graph in the"""
        self.stack.append((deepcopy(self.json_data)))

    def store_future(self) -> None:
        """When going back, store current data in a queue."""
        self.forward_stack.append(self.stack.pop())
        self.json_data = self.stack.pop()

    def retrieve_future(self) -> None:
        """Extract the last graph from the queue."""
        self.json_data = self.forward_stack.pop()

    @abstractmethod
    def new_graph(self, *args, **kwargs) -> None:
        pass

    def save_json_to_file(self, filename: str = "graph_data.json") -> None:
        """Writes the current model´s JSON representation to the specifies file."""
        if self.json_data:
            filepath = os.path.join(os.path.dirname(__file__), filename)
            with open(filepath, "w") as outfile:
                json.dump(self.json_data, outfile)
                
class Graph(BaseGraph):
    """
    Python side representation of the graph.
    Functionality for graph navigation (e.g. adding and removing nodes).
    A JSON representation of the graph (edges and nodes) enables its use in javascript/html/css.
    """

    def new_graph(self, data):
        self.json_data = Graph.get_json_data(data)
        self.update()

    @staticmethod
    def get_json_data(data) -> str:
        """Transform graph traversal output to JSON data.

        We use the [dagre](https://github.com/dagrejs/dagre) javascript library for rendering directed graphs. We need to provide the following:

        ```python
        {
            'max_impact': float,  # Total LCA score,
            'title': str,  # Graph title
            'edges': [{
                'source_id': int,  # Unique ID of producer of material or energy in graph
                'target_id': int,  # Unique ID of consumer of material or energy in graph
                'weight': float,  #  In graph units, relative to `max_edge_width`
                'label': str,  # HTML label
                'product': str,  # The label of the flowing material or energy
                'class': str,  # "benefit" or "impact"; controls styling
                'label': str,  # HTML label
                'toottip': str,  # HTML tooltip
            }],
            'nodes': [{
                'direct_emissions_score_normalized': float,  # Fraction of total LCA score from direct emissions
                'product': str,  # Reference product label, if any
                'location': str,  # Location, if any
                'id': int,  # Graph traversal ID
                'database_id': int,  # Node ID in SQLite database
                'database': str,  # Database name
                'class': str,  # Enumerated set of class label strings
                'label': str,  # HTML label including name and location
                'toottip': str,  # HTML tooltip
            }],
        }

        ```

        """
        lca_score = data["metadata"]["lca"].score
        lcia_unit = data["metadata"]["unit"]
        demand = data["metadata"]["lca"].demand

        def convert_edge_to_json(
            edge: GraphEdge,
            nodes: dict[int, GraphNode],
            total_score: float,
            lcia_unit: str,
            max_edge_width: int = 40,
        ) -> dict:
            cum_score = nodes[edge.producer_unique_id].cumulative_score
            unit = bd.get_node(
                id=nodes[edge.producer_unique_id].reference_product_datapackage_id
            ).get("unit", "(unknown)")
            return {
                "source_id": edge.producer_unique_id,
                "target_id": edge.consumer_unique_id,
                "amount": edge.amount,
                "weight": abs(cum_score / total_score) * max_edge_width,
                "label": f"{round(cum_score, 3)} {lcia_unit}",
                "class": "benefit" if cum_score < 0 else "impact",
                "tooltip": f"<b>{round(cum_score, 3)} {lcia_unit}</b> ({edge.amount:.2g} {unit})",
            }

        def convert_node_to_json(
            graph_node: GraphNode,
            total_score: float,
            fu: dict,
            lcia_unit: str,
            max_name_length: int = 20,
        ) -> dict:
            db_node = bd.get_node(id=graph_node.activity_datapackage_id)
            data = {
                "direct_emissions_score_normalized": graph_node.direct_emissions_score
                / (total_score or 1),
                "direct_emissions_score": graph_node.direct_emissions_score,
                "cumulative_score": graph_node.cumulative_score,
                "cumulative_score_normalized": graph_node.cumulative_score
                / (total_score or 1),
                "product": db_node.get("reference product", ""),
                "location": db_node.get("location", "(unknown)"),
                "id": graph_node.unique_id,
                "database_id": graph_node.activity_datapackage_id,
                "database": db_node["database"],
                "class": (
                    "demand"
                    if graph_node.activity_datapackage_id in fu
                    else identify_activity_type(db_node)
                ),
                "name": db_node.get("name", "(unnamed)"),
            }
            frac_dir_score = round(data["direct_emissions_score_normalized"] * 100, 2)
            dir_score = round(data["direct_emissions_score"], 3)
            frac_cum_score = round(data["cumulative_score_normalized"] * 100, 2)
            cum_score = round(data["cumulative_score"], 3)
            data[
                "label"
            ] = f"""{db_node['name'][:max_name_length]}
{data['location']}
{frac_dir_score}%"""
            data[
                "tooltip"
            ] = f"""
                <b>{data['name']}</b>
                <br>Individual impact: {dir_score} {lcia_unit} ({frac_dir_score }%)
                <br>Cumulative impact: {cum_score} {lcia_unit} ({frac_cum_score}%)
            """
            return data

        json_data = {
            "nodes": [
                convert_node_to_json(node, lca_score, demand, lcia_unit)
                for idx, node in data["nodes"].items()
                if idx != -1
            ],
            "edges": [
                convert_edge_to_json(edge, data["nodes"], lca_score, lcia_unit)
                for edge in data["edges"]
                if edge.producer_index != -1 and edge.consumer_index != -1
            ],
            "title": "Sankey graph result",
            # "title": self.build_title(demand, lca_score, lcia_unit),
        }

        return json.dumps(json_data)

    def build_title(self, demand: tuple, lca_score: float, lcia_unit: str) -> str:
        act, amount = demand[0], demand[1]
        if type(act) is tuple or type(act) is int:
            act = bd.get_activity(act)
        format_str = (
            "Reference flow: {:.2g} {} {} | {} | {} <br>" "Total impact: {:.2g} {}"
        )
        return format_str.format(
            amount,
            act.get("unit"),
            act.get("reference product") or act.get("name"),
            act.get("name"),
            act.get("location"),
            lca_score,
            lcia_unit,
        )

In [76]:
import bw2data as bd

bd.projects.set_current("timex")


In [77]:
bd.databases

Databases dictionary with 6 object(s):
	ecoinvent-3.10-biosphere
	ecoinvent-3.10-cutoff
	ei310_IMAGE_SSP2_RCP19_2020_electricity
	ei310_IMAGE_SSP2_RCP19_2030_electricity
	ei310_IMAGE_SSP2_RCP19_2040_electricity
	foreground

In [90]:
import bw2calc as bc


lca = bc.LCA(demand={bd.Database("ecoinvent-3.10-cutoff").random(): 1}, method=bd.methods.random())
lca.lci(factorize=True)
lca.lcia()

In [91]:
data = NewNodeEachVisitGraphTraversal.calculate(
                    lca_object=lca, max_depth=2,
                )

  data = NewNodeEachVisitGraphTraversal.calculate(


In [92]:
# from bw_graph_tools.graph_traversal.settings import GraphTraversalSettings

# gt = NewNodeEachVisitGraphTraversal(lca, GraphTraversalSettings())
# gt.traverse()
# gt.nodes

In [93]:
# store the metadata from this calculation
data["metadata"] = {
    "lca": lca,
    "unit": "kg CO2-eq",
}

In [94]:
g = Graph()
g.new_graph(data)

In [100]:
res = json.loads(g.json_data)

In [104]:
id_to_database_id = {}
for node in res['nodes']:
    id_to_database_id[node['id']] = node['database_id']

In [105]:
id_to_database_id

{0: 20930, 1: 12450, 2: 23728}

In [120]:
res["nodes"]

[{'direct_emissions_score_normalized': 0.0,
  'direct_emissions_score': 0.0,
  'cumulative_score': 0.004121816381659224,
  'cumulative_score_normalized': 1.0000000000000004,
  'product': 'electricity, medium voltage',
  'location': 'US-HICC',
  'id': 0,
  'database_id': 20930,
  'database': 'ecoinvent-3.10-cutoff',
  'class': 'demand',
  'name': 'market for electricity, medium voltage',
  'label': 'market for electrici\nUS-HICC\n0.0%',
  'tooltip': '\n                <b>market for electricity, medium voltage</b>\n                <br>Individual impact: 0.0 kg CO2-eq (0.0%)\n                <br>Cumulative impact: 0.004 kg CO2-eq (100.0%)\n            '},
 {'direct_emissions_score_normalized': 0.0,
  'direct_emissions_score': 0.0,
  'cumulative_score': 8.551829659782503e-05,
  'cumulative_score_normalized': 0.020747721072281233,
  'product': 'transmission network, electricity, medium voltage',
  'location': 'GLO',
  'id': 1,
  'database_id': 12450,
  'database': 'ecoinvent-3.10-cutoff',
 

In [122]:
df_data = {"source": [], "target": [], "amount": []}
for edge in res["edges"]:
    df_data["source"].append(id_to_database_id[edge["source_id"]])
    df_data["target"].append(id_to_database_id[edge["target_id"]])
    df_data["amount"].append(edge["amount"])

In [128]:
import pandas as pd
df = pd.DataFrame(df_data)
df.sort_values(by="amount", ascending=False)

Unnamed: 0,source,target,amount
1,23728,20930,0.9560936
0,12450,20930,1.867658e-08


In [102]:
res["edges"]

[{'source_id': 1,
  'target_id': 0,
  'amount': 1.8676577612333276e-08,
  'weight': 0.8299088428912493,
  'label': '0.0 kg CO2-eq',
  'class': 'impact',
  'tooltip': '<b>0.0 kg CO2-eq</b> (1.9e-08 kilometer)'},
 {'source_id': 2,
  'target_id': 0,
  'amount': 0.9560936021853338,
  'weight': 38.980582169785585,
  'label': '0.004 kg CO2-eq',
  'class': 'impact',
  'tooltip': '<b>0.004 kg CO2-eq</b> (0.96 kilowatt hour)'}]