In [6]:
%pip install igraph niquests pandas

[33mDEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621[0m[33m
Collecting niquests
  Using cached niquests-3.7.2-py3-none-any.whl.metadata (16 kB)
Collecting kiss-headers<4,>=2 (from niquests)
  Using cached kiss_headers-2.4.3-py3-none-any.whl.metadata (13 kB)
Collecting urllib3-future<3,>=2.8.902 (from niquests)
  Using cached urllib3_future-2.8.907-py3-none-any.whl.metadata (13 kB)
Collecting wassima<2,>=1.0.1 (from niquests)
  Using cached wassima-1.1.2-cp37-abi3-macosx_10_12_x86_64.whl.metadata (3.9 kB)
Collecting jh2<6.0.0,>=5.0.3 (from urllib3-future<3,>=2.8.902->niquests)
  Using cached jh2-5.0.3-cp37-abi3-macosx_10_12_x86_64.whl.metadata (3.9 kB)
Collecting qh3<2.0.0,>=1.0.3 (from urllib3-future<3,>=2.8.902->niquests)
  Using cached qh3-1.0.9-cp37-abi3-macosx_10_12_x86_6

In [7]:
import pandas as pd
import numpy as np
import igraph as ig
import time
import itertools
import requests
import random
import asyncio
from niquests import AsyncSession, Response

pd.set_option("mode.copy_on_write", True)

In [8]:
import IPython
IPython.version_info

(7, 27, 0, '')

In [9]:
USE_PANDAS_PERF=True # if multiple CPU cores are available

In [33]:
async def _get_direct_edges_df(
  fids,
  df,
  max_neighbors,
):
    # WARNING we are operating on a shared dataframe...
    # ...inplace=False by default, explicitly setting here for emphasis
    out_df = df[df['i'].isin(fids)].sort_values(by=['v'], ascending=False, inplace=False)[:max_neighbors]
    return out_df

In [65]:
async def find_vertex_idx(ig, fid):
    try:
        return ig.vs.find(name=fid).index
    except:
        return None

async def _fetch_korder_neighbors(
  fids,graph,max_degree,max_neighbors,min_degree = 1
):

    # vids = [find_vertex_idx(graph.graph, fid) for fid in fids]
    # vids = list(filter(None, vids)) # WARNING - this filters vertex id 0 also
    vids = [vid for fid in fids for vid in [await find_vertex_idx(graph, fid)] if vid is not None ]
    if len(vids) <= 0:
        raise Exception(f"{fids}:Invalid fids")
    try:
        klists = []
        mindist_and_order = min_degree
        limit = max_neighbors
        while mindist_and_order <= max_degree:
            neighbors = graph.neighborhood(
                vids, order=mindist_and_order, mode="out", mindist=mindist_and_order
            )
            # TODO prune the graph after sorting by edge weight
            klists.append(graph.vs[neighbors[0][:limit]]["name"])
            limit = limit - len(neighbors[0])
            if limit <= 0:
                break # we have reached limit of neighbors
            mindist_and_order += 1
        # end of while
        return set(itertools.chain(*klists))
    except ValueError:
        raise Exception(f"{fids}:Neighbors not found")

In [60]:
async def _get_neighbors_edges(
  fids, df, graph, max_degree, max_neighbors,
):

    start_time = time.perf_counter()
    neighbors_df = await _get_direct_edges_df(fids, df, max_neighbors)
    print(f"{fids}:direct_edges_df took {time.perf_counter() - start_time} secs"
          f" for {len(neighbors_df)} first degree edges")
    max_neighbors = max_neighbors - len(neighbors_df)
    if max_neighbors > 0 and max_degree > 1:

        start_time = time.perf_counter()
        k_neighbors_list = await _fetch_korder_neighbors(fids, graph, max_degree, max_neighbors, min_degree=2)
        print(f"{fids}:{time.perf_counter() - start_time} secs for {len(k_neighbors_list)} neighbors")

        start_time  = time.perf_counter()
        if USE_PANDAS_PERF:
            # if multiple CPU cores are available
            k_df = df.query('i in @k_neighbors_list').query('j in @k_neighbors_list')
        else:
            # filter with an '&' is slower because of the size of the dataframe
            # split the filtering so that indexes can be used if present
            # k_df = graph.df[graph.df['i'].isin(k_neighbors_list) & graph.df['j'].isin(k_neighbors_list)]
            k_df = df[df['i'].isin(k_neighbors_list)]
            k_df = k_df[k_df['j'].isin(k_neighbors_list)]
        # .loc will throw KeyError when fids have no outgoing actions
        ### in other words, some neighbor fids may not be present in 'i'
        # k_df = graph.df.loc[(k_neighbors_list, k_neighbors_list)]
        print(f"{fids}:k_df took {time.perf_counter() - start_time} secs for {len(k_df)} edges")

        start_time  = time.perf_counter()
        neighbors_df = pd.concat([neighbors_df, k_df])
        print(f"{fids}:neighbors_df concat took {time.perf_counter() - start_time} secs"
              f" for {len(neighbors_df)} edges")

    return neighbors_df

In [61]:
async def get_neighbors_list(
  fids, df, graph, max_degree = 2, max_neighbors = 100,
):
    df = await _get_neighbors_edges(fids, df, graph, max_degree, max_neighbors)
    # WARNING we are operating on a shared dataframe...
    # ...inplace=False by default, explicitly setting here for emphasis
    out_df = df.groupby(by='j')[['v']].sum().sort_values(by=['v'], ascending=False, inplace=False)
    return out_df.index.to_list()

In [73]:
async def go_eigentrust(
    pretrust, max_pt_id, localtrust, max_lt_id,
):
    start_time = time.perf_counter()

    lt_len_before = len(localtrust)
    localtrust[:] = (x for x in localtrust if x["i"] != x["j"])
    lt_len_after = len(localtrust)
    if lt_len_before != lt_len_after:
        print(f"dropped {lt_len_before-lt_len_after} records with i == j")

    req = {
        "pretrust": {
            "scheme": "inline",
            # "size": int(max_pt_id)+1, #np.int64 doesn't serialize; cast to int
            "size": max_pt_id,
            "entries": pretrust,
        },
        "localTrust": {
            "scheme": "inline",
            # "size": int(max_lt_id)+1, #np.int64 doesn't serialize; cast to int
            "size": max_lt_id,
            "entries": localtrust,
        },
        "alpha": 0.5,
    }

    async with AsyncSession() as s:
        response = await s.post(
            "http://localhost:8080/basic/v1/compute",
            json=req,
            headers={"Accept": "application/json", "Content-Type": "application/json"},
            timeout=3000,
        )

        if response.status_code != 200:
            print(f"Server error: {response.status_code}:{response.reason}")
            raise Exception("Unknown error")
        trustscores = response.json()["entries"]
        print(
            f"{pretrust}:eigentrust took {time.perf_counter() - start_time} secs for {len(trustscores)} scores"
        )
        return trustscores

In [63]:
async def get_neighbors_scores(
    fids, df, graph, max_degree, max_neighbors
):
    start_time = time.perf_counter()
    try:
        df = await _get_neighbors_edges(fids, df, graph, max_degree, max_neighbors)
        print(
            f"{fids}:dataframe took {time.perf_counter() - start_time} secs for {len(df)} edges"
        )
    except Exception as e:
        print(e)
        return []

    if df.shape[0] < 1:
        return []
    
    stacked = df.loc[:, ("i", "j")].stack()
    pseudo_id, orig_id = stacked.factorize()

    # pseudo_df is a new dataframe to avoid modifying existing shared global df
    pseudo_df = pd.Series(pseudo_id, index=stacked.index).unstack()
    pseudo_df.loc[:, ("v")] = df.loc[:, ("v")]

    if len(fids) > 1:
        # when more than 1 fid in input list, the neighbor edges may not have some input fids.
        pt_fids = orig_id.where(orig_id.isin(fids))
    else:
        pt_fids = fids
    pt_len = len(fids)
    # pretrust = [{'i': fid, 'v': 1/pt_len} for fid in pt_fids]
    pretrust = [
        {"i": orig_id.get_loc(fid), "v": 1 / pt_len}
        for fid in pt_fids
        if not np.isnan(fid)
    ]
    # max_pt_id = max(pt_fids)
    max_pt_id = len(orig_id)

    localtrust = pseudo_df.to_dict(orient="records")
    # max_lt_id = max(df['i'].max(), df['j'].max())
    max_lt_id = len(orig_id)

    print(
        f"{fids}:max_lt_id:{max_lt_id}, localtrust size:{len(localtrust)},"
        f" max_pt_id:{max_pt_id}, pretrust size:{len(pretrust)}"
    )

    i_scores = await go_eigentrust(
        pretrust=pretrust,
        max_pt_id=max_pt_id,
        localtrust=localtrust,
        max_lt_id=max_lt_id,
    )

    # rename i and v to fid and score respectively
    # also, filter out input fids
    fid_scores = [
        {"fid": int(orig_id[score["i"]]), "score": score["v"]}
        for score in i_scores
        if score["i"] not in fids
    ]
    print(
        f"{fids}:sample fid_scores:{random.sample(fid_scores, min(10, len(fid_scores)))}"
    )
    return fid_scores


In [68]:
lt_df = pd.read_pickle("data/fc_v3engagement_fid_2024-06-18_filtered_df.pkl")

In [69]:
lt_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 716438 entries, 7 to 3772376
Data columns (total 3 columns):
 #   Column  Non-Null Count   Dtype
---  ------  --------------   -----
 0   i       716438 non-null  int64
 1   j       716438 non-null  int64
 2   v       716438 non-null  int64
dtypes: int64(3)
memory usage: 21.9 MB


In [20]:
lt_df.head()

Unnamed: 0,i,j,v
7,410568,525376,114
21,218142,516028,197
24,451888,349675,28
25,440212,3887,8
26,2282,2433,17


In [70]:
%%time
g = ig.Graph.Read_Pickle('data/fc_v3engagement_fid_2024-06-18_filtered_ig.pkl')

CPU times: user 613 ms, sys: 126 ms, total: 739 ms
Wall time: 1.54 s


In [71]:
g.summary()

'IGRAPH DN-- 17969 716438 -- \n+ attr: name (v), v (e)'

In [22]:
rainbow_fids = pd.read_csv('/tmp/rainbow_fids_18sep2024.csv')

In [23]:
rainbow_fids.head()

Unnamed: 0,fid
0,623997
1,20353
2,337715
3,327341
4,344026


In [24]:
rainbow_fids.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 22869 entries, 0 to 22868
Data columns (total 1 columns):
 #   Column  Non-Null Count  Dtype
---  ------  --------------  -----
 0   fid     22869 non-null  int64
dtypes: int64(1)
memory usage: 178.8 KB


In [41]:
n_df = await _get_neighbors_edges(fids=[189237], df=lt_df, graph=g, max_degree=5, max_neighbors=1000)

direct_edges_df took 0.014816754999969817 secs for 28 first degree edges
0.010226145000046927 secs for 972 neighbors
k_df took 0.0468426620000173 secs for 125092 edges
neighbors_df concat took 0.0014779929999804153 secs for 125120 edges


In [42]:
n_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 125120 entries, 312353 to 3771458
Data columns (total 3 columns):
 #   Column  Non-Null Count   Dtype
---  ------  --------------   -----
 0   i       125120 non-null  int64
 1   j       125120 non-null  int64
 2   v       125120 non-null  int64
dtypes: int64(3)
memory usage: 3.8 MB


In [43]:
n_df.head(10)

Unnamed: 0,i,j,v
312353,189237,9135,34
25066,189237,9856,31
136889,189237,2802,28
88834,189237,18537,20
339261,189237,3,14
1042557,189237,680,9
908159,189237,19832,7
68240,189237,2252,7
438472,189237,2282,7
1118171,189237,190968,5


In [44]:
n_list = await get_neighbors_list(fids=[189237], df=lt_df, graph=g, max_degree=5, max_neighbors=1000)

direct_edges_df took 0.03150221700002476 secs for 28 first degree edges
0.004706373999965763 secs for 972 neighbors
k_df took 0.05387483800006976 secs for 125092 edges
neighbors_df concat took 0.003971549000084451 secs for 125120 edges


In [45]:
len(n_list)

994

In [46]:
n_list

[602,
 403619,
 234616,
 436577,
 263574,
 20909,
 539,
 408746,
 8447,
 283144,
 516028,
 422233,
 403020,
 253127,
 326040,
 12626,
 320215,
 243300,
 397392,
 403090,
 379750,
 406815,
 407704,
 354795,
 237884,
 7418,
 236581,
 6596,
 285998,
 477292,
 448920,
 129,
 396484,
 248216,
 366713,
 308536,
 13874,
 230238,
 4923,
 3642,
 484256,
 308410,
 472,
 420540,
 15850,
 11528,
 500235,
 270138,
 4407,
 429539,
 444929,
 466111,
 444067,
 277952,
 412843,
 419741,
 284679,
 460229,
 337018,
 4163,
 270504,
 481476,
 190218,
 281676,
 528,
 508409,
 411778,
 509624,
 1325,
 446697,
 268455,
 16098,
 510364,
 323144,
 256500,
 2433,
 631749,
 323070,
 327165,
 422334,
 4905,
 1689,
 5431,
 354775,
 389456,
 315697,
 445359,
 195117,
 16148,
 234796,
 243818,
 288204,
 330471,
 440747,
 7143,
 221578,
 500144,
 394357,
 294370,
 1918,
 533,
 377111,
 420657,
 230147,
 420493,
 459170,
 734,
 2745,
 20919,
 276562,
 426045,
 461302,
 2211,
 473,
 437433,
 597941,
 14364,
 378275,
 27

In [47]:
n_scores = await get_neighbors_scores(fids=[189237], df=lt_df, graph=g, max_degree=5, max_neighbors=1000)

direct_edges_df took 0.012849111000036828 secs for 28 first degree edges
0.005008043999964684 secs for 972 neighbors
k_df took 0.03175515999998879 secs for 125092 edges
neighbors_df concat took 0.0013878689999273774 secs for 125120 edges
dataframe took 0.05720739900004901 secs for 125120 edges
max_lt_id:999, localtrust size:125120, max_pt_id:999, pretrust size:1
eigentrust took 1.0355587310000374 secs for 29 scores
sample fid_scores:[{'fid': 534, 'score': 0.008333333325572312}, {'fid': 9856, 'score': 0.05166666661854833}, {'fid': 20670, 'score': 0.00666666666045785}, {'fid': 19832, 'score': 0.011666666655801237}, {'fid': 8413, 'score': 0.0016666666651144624}, {'fid': 477997, 'score': 0.008333333325572312}, {'fid': 475330, 'score': 0.0016666666651144624}, {'fid': 507587, 'score': 0.0016666666651144624}, {'fid': 13833, 'score': 0.0016666666651144624}, {'fid': 8004, 'score': 0.0016666666651144624}]


In [48]:
n_scores

[{'fid': 189237, 'score': 0.6666666669771075},
 {'fid': 9135, 'score': 0.05666666661389173},
 {'fid': 9856, 'score': 0.05166666661854833},
 {'fid': 2802, 'score': 0.04666666662320495},
 {'fid': 18537, 'score': 0.03333333330228925},
 {'fid': 3, 'score': 0.023333333311602474},
 {'fid': 680, 'score': 0.01499999998603016},
 {'fid': 19832, 'score': 0.011666666655801237},
 {'fid': 2252, 'score': 0.011666666655801237},
 {'fid': 2282, 'score': 0.011666666655801237},
 {'fid': 190968, 'score': 0.008333333325572312},
 {'fid': 477997, 'score': 0.008333333325572312},
 {'fid': 534, 'score': 0.008333333325572312},
 {'fid': 133, 'score': 0.008333333325572312},
 {'fid': 24128, 'score': 0.00666666666045785},
 {'fid': 20670, 'score': 0.00666666666045785},
 {'fid': 627459, 'score': 0.003333333330228925},
 {'fid': 2904, 'score': 0.003333333330228925},
 {'fid': 11835, 'score': 0.0016666666651144624},
 {'fid': 7263, 'score': 0.0016666666651144624},
 {'fid': 13833, 'score': 0.0016666666651144624},
 {'fid': 50

In [74]:
async def main() -> None:
    tasks = []
    for fid in rainbow_fids['fid'].sample(5):
        print(fid)
        tasks.append(
            asyncio.create_task(
                get_neighbors_scores(
                    fids=[fid], df=lt_df, graph=g, max_degree=5, max_neighbors=1000)))
    responses = await asyncio.gather(*tasks)

In [75]:
import timeit
start_time = timeit.default_timer()
await main()
elapsed = timeit.default_timer() - start_time
print(f"Overall time taken: {elapsed} secs")

381376
401911
378112
249346
447671
[381376]:direct_edges_df took 0.02269602499973189 secs for 3 first degree edges
[381376]:0.021266510999794264 secs for 997 neighbors
[381376]:k_df took 0.08118097099986699 secs for 170780 edges
[381376]:neighbors_df concat took 0.01537540200024523 secs for 170783 edges
[381376]:dataframe took 0.1554080440000689 secs for 170783 edges
[381376]:max_lt_id:1001, localtrust size:170783, max_pt_id:1001, pretrust size:1
[401911]:direct_edges_df took 0.009839229999670351 secs for 0 first degree edges
[401911]:0.0016770669999459642 secs for 0 neighbors
[401911]:k_df took 0.035437240000192105 secs for 0 edges
[401911]:neighbors_df concat took 0.0011912800000573043 secs for 0 edges
[401911]:dataframe took 0.04958241900021676 secs for 0 edges
[378112]:direct_edges_df took 0.010593187999802467 secs for 0 first degree edges
[378112]:Invalid fids
[249346]:direct_edges_df took 0.006716549999964627 secs for 3 first degree edges
[249346]:0.007718137999745522 secs for 99