In [None]:
import pandas as pd
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from dotenv import load_dotenv
import os
import functools

load_dotenv()

In [None]:
ELK_HOST = os.getenv("ELK_HOST")

es = Elasticsearch(hosts=[ELK_HOST], timeout=300, max_retries=10, retry_on_timeout=True, verify_certs=False, ssl_show_warn=False)

In [None]:
index_kurento = [
    "loadtest-webrtc-stats-1716202874536"
]

index_list = index_kurento

index_kurento_names = [
    "loadtest-webrtc-2024-kurento-8p"
]

index_list_names = index_kurento_names

In [None]:
start_end_times = pd.read_json("dfs_final/start-end-times.json", orient="index")
start_end_times["from"] = pd.to_datetime(
    start_end_times["from"], format="ISO8601", utc=True
)
start_end_times["to"] = pd.to_datetime(
    start_end_times["to"], format="ISO8601", utc=True
)


def get_max_time(index):
    # tmp_serie = pd.Series(
    #     [df_index["@timestamp"].max(), start_end_times.loc[index, "to"]]
    # )
    # return tmp_serie.max()
    return start_end_times.loc[index, "to"]


def get_min_time(index):
    # tmp_serie = pd.Series(
    #     [df_index["@timestamp"].min(), start_end_times.loc[index, "from"]]
    # )
    # return tmp_serie.min()
    return start_end_times.loc[index, "from"]

In [None]:
def generate_user_data_from_elastic(index, index_name):
    min = get_min_time(index_name)
    max = get_max_time(index_name)
    query = {
        "query": {
            "bool": {
                "must": [{
                    "exists": {
                        "field": "new_participant_id"
                    }
                }, {
                    "range": {
                        "@timestamp": {
                            "gte": min,
                            "lte": max
                        }
                    }
                }
                ]
            }
        }
    }
    # Scan function to get all the data.
    rel = scan(
        client=es,
        query=query,
        scroll="1m",
        index=index,
        raise_on_error=True,
        preserve_order=False,
        clear_scroll=True,
    )
    # We need only '_source', which has all the fields required.
    # This elimantes the elasticsearch metdata like _id, _type, _index.
    for hit in rel:
        source = hit["_source"]
        data_to_save = {
            "@timestamp": source["@timestamp"],
            "participant": source["new_participant_id"],
            "session": source["new_participant_session"],
        }
        yield data_to_save


In [None]:
session_data = pd.read_json("dfs_final/session-data.json", orient="index")
for i, index in enumerate(index_list):
    index_participants = session_data[session_data.index == index_list_names[i]]["participants"]
    df_generators = generate_user_data_from_elastic(index, index_list_names[i])

    df_users = pd.DataFrame(df_generators)
    df_users = df_users.iloc[:index_participants.iloc[0]]
    df_users["@timestamp"] = pd.to_datetime(df_users["@timestamp"], format="ISO8601")
    df_users = df_users.sort_values(by="@timestamp")
    # add empty int streams column
    df_users["streams_in"] = 0
    df_users["streams_out"] = 0
    df_users["publishers"] = 0
    df_users["subscribers"] = 0
    users_in_session = {}
    prev_streams_in = 0
    prev_streams_out = 0
    index_name = index_list_names[i]
    for j, row in df_users.iterrows():
        if row["session"] not in users_in_session:
            users_in_session[row["session"]] = {
                "publishers": 0,
                "subscribers": 0,
            }
        is_publisher = ("-2p" in index_name) or ("-8p" in index_name) or (int(row["participant"].replace('User', '')) <= 3)
        if is_publisher:
            users_in_session[row["session"]]["publishers"] += 1
            publishers = users_in_session[row["session"]]["publishers"]
            subscribers = users_in_session[row["session"]]["subscribers"]
            streams_in = 2 + prev_streams_in
            streams_out = 2 * publishers * (publishers - 1) + 2 * subscribers + prev_streams_out
        else:
            users_in_session[row["session"]]["subscribers"] += 1
            publishers = users_in_session[row["session"]]["publishers"]
            subscribers = users_in_session[row["session"]]["subscribers"]
            streams_in = prev_streams_in
            streams_out = 2 * publishers + prev_streams_out
        df_users.at[j, "streams_in"] = streams_in
        df_users.at[j, "streams_out"] = streams_out
        df_users.at[j, "publishers"] = functools.reduce(lambda a, b: a + b, [users_in_session[session]["publishers"] for session in users_in_session])
        df_users.at[j, "subscribers"] = functools.reduce(lambda a, b: a + b, [users_in_session[session]["subscribers"] for session in users_in_session])
        prev_streams_in = streams_in
        prev_streams_out = streams_out
    df_users.to_csv(f"dfs_final/{index_list_names[i]}-user-join.csv", index=False)
    #df = df_list[i].sort_values(by="@timestamp")
    #df['user_count'] = [len(df_users[df_users['@timestamp'] <= ts]) for ts in df['@timestamp']]
    #df_list[i] = df

In [None]:
node_types = ["browseremulator", "masternode", "medianode"]
for i, index_name in enumerate(index_list_names):
    current_time = pd.Timestamp.now().isoformat()
    min = get_min_time(index_name)
    max = get_max_time(index_name)
    for node_type in node_types:
        query = {
            "query": {
                "bool": {
                    "must": [
                        {"match": {"fields.node_role": node_type}},
                        {
                            "range": {
                                "@timestamp": {
                                    "lte": max.isoformat(),
                                    "gte": min.isoformat(),
                                }
                            }
                        },
                    ]
                }
            }
        }
        rel = scan(
            client=es,
            query=query,
            scroll="8h",
            index="metric*",
            raise_on_error=True,
            preserve_order=False,
            clear_scroll=True,
            request_timeout=300,
        )

        # We need only '_source', which has all the fields required.
        # This elimantes the elasticsearch metdata like _id, _type, _index.
        def generate_data():
            i = 0
            for hit in rel:
                i += 1
                print("Data read: ", i, end="\r")
                data = hit["_source"]
                data_to_save = {"@timestamp": data["@timestamp"]}
                if "system" in data:
                    data = data["system"]
                    if "cpu" in data:
                        data_to_save["cpu"] = data["cpu"]["total"]["norm"]["pct"]
                    if "memory" in data:
                        data_to_save["memory"] = data["memory"]["used"]["pct"]
                if "cpu" in data or "memory" in data:
                    yield data_to_save

        print(f"{current_time} - Processing {index_name}-{node_type}")
        data_generator = generate_data()
        # Create a dataframe.
        df = pd.DataFrame(data_generator)
        if not df.empty:
            df = df.groupby("@timestamp", as_index=False).mean()
            df["@timestamp"] = pd.to_datetime(df["@timestamp"])
            df.to_csv(
                f"dfs_final/{index_list_names[i]}-{node_type}.csv", index=False
            )
            current_time = pd.Timestamp.now().isoformat()
            print(f"{current_time} - Saved {index_name}-{node_type}.csv")