In [1]:
import pandas as pd
import plotly.graph_objects as go
import colorlover as cl
import pandas as pd
import dotenv, os, json
from dune_client.types import QueryParameter
from dune_client.client import DuneClient
from dune_client.query import QueryBase

In [2]:
# change the current working directory where .env file lives
os.chdir("/Users/zokum/local-Workspace/python-notebook-examples")
# load .env file
dotenv.load_dotenv(".env")
# setup Dune Python client
dune = DuneClient.from_env()

In [3]:
# option 1: query the query (run the execution)
query = QueryBase(
    name="Flow of tokens when swapping on Uniswap (Ethereum chain)",
    query_id=3062941,
    params=[
        QueryParameter.text_type(name="include_direct_swap", value="false"),
        QueryParameter.number_type(name="threshold", value=50),
        QueryParameter.text_type(
            name="token_A_address", value="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
        ),
        QueryParameter.text_type(
            name="token_B_address", value="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
        ),
    ],
)
query_result = dune.run_query_dataframe(query=query, performance="large") # specify large cluster for faster runtime

2023-09-29 14:34:20,184 INFO dune_client.api.base executing 3062941 on large cluster
2023-09-29 14:34:21,622 INFO dune_client.api.base waiting for query execution 01HBH3GRXQZ7ZKRA5Q0H5QASTV to complete: ExecutionState.EXECUTING
2023-09-29 14:34:23,317 INFO dune_client.api.base waiting for query execution 01HBH3GRXQZ7ZKRA5Q0H5QASTV to complete: ExecutionState.EXECUTING
2023-09-29 14:34:25,262 INFO dune_client.api.base waiting for query execution 01HBH3GRXQZ7ZKRA5Q0H5QASTV to complete: ExecutionState.EXECUTING


In [4]:
# option 2: get latest query result 
query_result = dune.get_latest_result(3062941, max_age_hours=72)
query_result = pd.DataFrame(query_result.result.rows)

In [5]:
# peak at the result 
query_result.head()

Unnamed: 0,source,target,value
0,USDC,USDT,4675
1,USDC,DAI,4290
2,MKR,WETH,352
3,USDT,WBTC,103
4,DAI,MKR,85


In [6]:
#give customized color
predefined_colors = {
    "USDC": "rgb(38, 112, 196)",
    "USDT": "rgb(0, 143, 142)",
    "WETH": "rgb(144, 144, 144)",
    "WBTC": "rgb(247, 150, 38)",
    "COMP": "rgb(32, 217, 152)",
    "DAI": "rgb(254, 175, 48)",
    "MKR": "rgb(38, 173, 158)",
    "UNI": "rgb(255, 21, 126)",
}

In [7]:
# function to create Sankey diagram
def create_sankey(
    query_result: pd.DataFrame,
    predefined_colors: dict,
    columns: dict,
    viz_config: dict,
    title: str = "unnamed",
):
    """
    Creates a Sankey diagram based on input query_result
    , which must contain source, target, value columns
    """
    # Check if the dataframe contains required columns
    required_columns = [columns["source"], columns["target"], columns["value"]]
    for col in required_columns:
        if col not in query_result.columns:
            raise ValueError(f"Error: The dataframe is missing the '{col}' column")

    # Check if 'value' column is numeric
    if not pd.api.types.is_numeric_dtype(query_result[columns["value"]]):
        raise ValueError("Error: The 'value' column must be numeric")

    # preprocess query result dataframe
    all_nodes = list(
        pd.concat(
            [query_result[columns["source"]], query_result[columns["target"]]]
        ).unique()
    )
    # In Sankey, 'source' and 'target' must be indices. Thus, you need to map projects to indices.
    query_result["source_idx"] = query_result[columns["source"]].map(all_nodes.index)
    query_result["target_idx"] = query_result[columns["target"]].map(all_nodes.index)

    # create color map for Sankey
    colors = cl.scales["12"]["qual"]["Set3"]  # default color
    color_map = {}
    for node in all_nodes:
        for name, color in predefined_colors.items():
            if name.lower() in node.lower():  # check if name exists in the node name
                color_map[node] = color
                break
        else:
            color_map[node] = colors[
                len(color_map) % len(colors)
            ]  # default color assignment

    fig = go.Figure(
        go.Sankey(
            node=dict(
                pad=viz_config["node_pad"],
                thickness=viz_config["node_thickness"],
                line=dict(color="black", width=viz_config["node_line_width"]),
                label=all_nodes,
                color=[
                    color_map.get(node, "blue") for node in all_nodes
                ],  # customize node color
            ),
            link=dict(
                source=query_result["source_idx"],
                target=query_result["target_idx"],
                value=query_result[columns["value"]],
                color=[
                    color_map.get(query_result[columns["source"]].iloc[i], "black")
                    for i in range(len(query_result))
                ],  # customize link color
            ),
        )
    )
    fig.update_layout(
        title_text=title,
        font_size=viz_config["font_size"],
        height=viz_config["figure_height"],
        width=viz_config["figure_width"],
    )

    return fig

In [9]:
# Create the Sankey 
create_sankey(
    query_result=query_result,
    predefined_colors=predefined_colors,
    columns={"source": "source", "target": "target", "value": "value"},
    viz_config={
        "node_pad": 15,
        "node_thickness": 20,
        "node_line_width": 0.5,
        "font_size": 12,
        "figure_height": 800,
        "figure_width": 1000   
    },
    title="Flow of tokens when swapping on Uniswap (Ethereum chain)"
).show()