In [7]:
DEVICES = "2,3"

import os
os.environ["CUDA_VISIBLE_DEVICES"] = DEVICES

import numpy as np
import pandas as pd
from tqdm import tqdm
import cudf

tqdm.pandas()

# Import raw data
First, read data in `.jsonl` file format as a pandas data frame
Then store the dataframe in `.parquet` format for easy access later

In [8]:
%%script false --no-raise-error

df = pd.read_json("../fulldata/kiva_activity_2023-08-28T11-09-39.jsonl", lines=True)
df = pd.json_normalize(df["loan"], sep='_')

In [9]:
%%script false --no-raise-error

df["loanAmount"] = df["loanAmount"].astype(float)
df["loanFundraisingInfo_fundedAmount"] = df["loanFundraisingInfo_fundedAmount"].astype(float)
df["raisedDate"] = pd.to_datetime(df["raisedDate"])
df["fundraisingDate"] = pd.to_datetime(df["fundraisingDate"])
df["geocode_country_name"] = df["geocode_country_name"].astype("category")
df["sector_id"] = df["sector_id"].astype(int)
df["sector_name"] = df["sector_name"].astype("category")
df["activity_id"] = df["activity_id"].astype(int)
df["activity_name"] = df["activity_name"].astype("category")

Future exception was never retrieved
future: <Future finished exception=BrokenPipeError(32, 'Broken pipe')>
Traceback (most recent call last):
  File "/home/datnt527/setups/.miniconda3/envs/cudf/lib/python3.10/asyncio/unix_events.py", line 676, in write
    n = os.write(self._fileno, data)
BrokenPipeError: [Errno 32] Broken pipe


In [10]:
%%script false --no-raise-error
df.to_parquet("../fulldata/kiva_activity_2023-08-28T11-09-39.parquet")

In [11]:
ds = cudf.read_parquet("../fulldata/kiva_activity_2023-08-28T11-09-39.parquet")

In [12]:
ds.dropna(axis=0, how="all", inplace=True)
ds.tail()

Unnamed: 0,id,name,fundraisingDate,raisedDate,loanAmount,tags,loanFundraisingInfo_fundedAmount,geocode_country_name,sector_id,sector_name,activity_id,activity_name,lendingActions_totalCount,lendingActions_values
2547758,3800,Anonymous,2007-01-16 23:10:03,2007-01-17 10:26:47,600.0,[],600.0,Kenya,1,Agriculture,61,Dairy,18,[{'latestSharePurchaseDate': '2007-01-17T03:14...
2547759,3799,Anonymous,2007-01-16 23:04:56,2007-01-17 18:59:15,125.0,[],125.0,Kenya,1,Agriculture,31,Farming,5,[{'latestSharePurchaseDate': '2007-01-17T02:01...
2547760,3797,Anonymous,2007-01-16 23:00:56,2007-01-16 23:06:31,150.0,[],150.0,Kenya,1,Agriculture,31,Farming,1,[{'latestSharePurchaseDate': '2007-01-16T23:06...
2547761,3796,Anonymous,2007-01-16 23:00:32,2007-01-17 00:47:14,300.0,[],300.0,Kenya,1,Agriculture,31,Farming,6,[{'latestSharePurchaseDate': '2007-01-16T23:09...
2547762,3795,Anonymous,2007-01-16 23:00:16,2007-01-17 18:41:08,750.0,[],750.0,Kenya,14,Construction,97,Cement,6,[{'latestSharePurchaseDate': '2007-01-16T23:25...


# Filter

Filtering, only take `Vietnam` into account
Why? Because there are a lot of rows and we try to localize the task

In [13]:
ds['geocode_country_name'].value_counts()['Vietnam']

39327

In [14]:
ds = ds[ds['geocode_country_name'] == 'Vietnam']

In [15]:
ds.head()

Unnamed: 0,id,name,fundraisingDate,raisedDate,loanAmount,tags,loanFundraisingInfo_fundedAmount,geocode_country_name,sector_id,sector_name,activity_id,activity_name,lendingActions_totalCount,lendingActions_values
142,2634972,Nhi,2023-08-28 04:30:05,,2100.0,"[#Eco-friendly, #Parent]",220.0,Vietnam,10,Housing,134,Personal Housing Expenses,7,[{'latestSharePurchaseDate': '2023-08-28T09:36...
179,2634943,Nguyệt,2023-08-28 03:10:05,,1675.0,"[#Animals, #Parent]",0.0,Vietnam,1,Agriculture,56,Cattle,0,[]
196,2634926,Thọ,2023-08-28 02:50:05,,2100.0,"[#Elderly, #Health and Sanitation]",90.0,Vietnam,10,Housing,134,Personal Housing Expenses,3,[{'latestSharePurchaseDate': '2023-08-28T07:28...
197,2634927,Lương,2023-08-28 02:50:05,,1475.0,"[#Schooling, #Repeat Borrower]",5.0,Vietnam,15,Education,158,Primary/secondary school costs,1,[{'latestSharePurchaseDate': '2023-08-28T10:07...
198,2634935,Nguyệt,2023-08-28 02:50:05,,2100.0,"[#Parent, #Health and Sanitation]",30.0,Vietnam,10,Housing,134,Personal Housing Expenses,2,[{'latestSharePurchaseDate': '2023-08-28T05:27...


# Contruct a Graph

The idea is construct a graph with following node type
- `Lender`
- `Loan`
- `Tag`

With following relationships
- `Lender`s can `LEND` to `Loan`s
- `Loan`s can be `TAGGED_WITH` `Tag`s

Lenders have properties
- `id`
- `name`
- `publicId`

Loan have properties
- `id`
- `name`
- `loanAmount`
- `fundedAmount`
- `postDate`
- `raisedDate`

`Tag` have properties:
- `name`

LEND's properties
- `shareAmount`
- `date`

TAGGED_WITH have no properties

## Remove duplicated `loan`

There are loans which have a same `id` but different `fundedAmount`
It might because the query time is different
Here, only keep records which have the highest `fundedAmount`

In [16]:
ds.loc[[9628, 1366545]]

Unnamed: 0,id,name,fundraisingDate,raisedDate,loanAmount,tags,loanFundraisingInfo_fundedAmount,geocode_country_name,sector_id,sector_name,activity_id,activity_name,lendingActions_totalCount,lendingActions_values
9628,2619844,Thể,2023-07-30 01:10:06,,2125.0,"[user_favorite, #Health and Sanitation, #Repea...",1295.0,Vietnam,10,Housing,134,Personal Housing Expenses,51,[{'latestSharePurchaseDate': '2023-08-01T19:10...
1366545,2619844,Thể,2023-07-30 01:10:06,,2125.0,"[user_favorite, #Health and Sanitation, #Repea...",1720.0,Vietnam,10,Housing,134,Personal Housing Expenses,62,[{'latestSharePurchaseDate': '2023-08-22T13:03...


In [17]:
temp = ds.groupby('id', group_keys=False)[['loanFundraisingInfo_fundedAmount']].idxmax()
iloc = temp['loanFundraisingInfo_fundedAmount'].values # NOTE: just iloc, not loc
ds = ds.iloc[iloc]
del iloc
del temp
ds.loc[[9628, 1366545]] # see, only keep the one with higher fundedAmount

Unnamed: 0,id,name,fundraisingDate,raisedDate,loanAmount,tags,loanFundraisingInfo_fundedAmount,geocode_country_name,sector_id,sector_name,activity_id,activity_name,lendingActions_totalCount,lendingActions_values
1366545,2619844,Thể,2023-07-30 01:10:06,,2125.0,"[user_favorite, #Health and Sanitation, #Repea...",1720.0,Vietnam,10,Housing,134,Personal Housing Expenses,62,[{'latestSharePurchaseDate': '2023-08-22T13:03...


In [18]:
ds[ds.duplicated(subset=['id'], keep=False)].sort_values(by=['id']) # no duplicated

Unnamed: 0,id,name,fundraisingDate,raisedDate,loanAmount,tags,loanFundraisingInfo_fundedAmount,geocode_country_name,sector_id,sector_name,activity_id,activity_name,lendingActions_totalCount,lendingActions_values


## create `lender-loan-tag` df

In [19]:
ads = ds.explode('tags').explode('lendingActions_values')
del ds
len(ads)

3350706

In [20]:
ads['tags'] = ads['tags'].astype('category')

In [21]:
# drop some loans that has no lender
ads.dropna(subset=['lendingActions_values'], inplace=True)

In [22]:
# dict processing cant be done in cuDF, so convert to pandas
adf = ads.to_pandas()

In [23]:
adf['lender_id'] = adf.progress_apply(lambda x: x['lendingActions_values']['lender']['id'], axis=1).astype(int)
adf['lender_name'] = adf.progress_apply(lambda x: x['lendingActions_values']['lender']['name'], axis=1)
adf['lender_publicId'] = adf.progress_apply(lambda x: x['lendingActions_values']['lender']['publicId'], axis=1)
adf['shareAmount'] = adf.progress_apply(lambda x: x['lendingActions_values']['shareAmount'], axis=1).astype(float)
adf['date'] = pd.to_datetime(adf.progress_apply(lambda x: x['lendingActions_values']['latestSharePurchaseDate'], axis=1))

100%|██████████| 3350684/3350684 [00:35<00:00, 93815.04it/s] 
100%|██████████| 3350684/3350684 [00:38<00:00, 87110.03it/s] 
100%|██████████| 3350684/3350684 [00:31<00:00, 107552.38it/s]
100%|██████████| 3350684/3350684 [00:29<00:00, 112786.64it/s]
100%|██████████| 3350684/3350684 [00:35<00:00, 95231.29it/s] 


In [24]:
# cuDF do not work with timezone yet
adf['date'] = adf['date'].dt.tz_localize(None)

In [25]:
ads = cudf.from_pandas(adf)
del adf

In [26]:
ads.drop(['lendingActions_values'], axis=1, inplace=True)

In [27]:
ads.drop_duplicates(inplace=True)

In [28]:
ads.to_feather("ads.feather")



In [29]:
DEVICES = "2,3"

import os
os.environ["CUDA_VISIBLE_DEVICES"] = DEVICES

import numpy as np
import pandas as pd
from tqdm import tqdm
import cudf

tqdm.pandas()

ads = cudf.read_feather("ads.feather")



## Remove some tags 
The folowing tags should be remove:  
- `tag_`
- `user_favorite`
- `user_like`
- `volunteer_like`
- `volunteer_pick`

In [30]:
if 'user_like' not in ads['tags'].cat.categories:
    ads['tags'] = ads['tags'].cat.add_categories(['user_like'])

if 'user_favorite' not in ads['tags'].cat.categories:
    ads['tags'] = ads['tags'].cat.add_categories(['user_favorite'])

if 'volunteer_like' not in ads['tags'].cat.categories:
    ads['tags'] = ads['tags'].cat.add_categories(['volunteer_like'])

if 'volunteer_pick' not in ads['tags'].cat.categories:
    ads['tags'] = ads['tags'].cat.add_categories(['volunteer_pick'])

In [31]:
(ads['tags'] == 'user_favorite').sum(), (ads['tags'] == 'user_like').sum(), (ads['tags'] == 'volunteer_like').sum(), (ads['tags'] == 'volunteer_pick').sum()

(676592, 0, 16746, 31433)

In [32]:
ads = ads[~ads['tags'].isin(['user_favorite', 'user_like', 'volunteer_like', 'volunteer_pick'])]

## create `Tag` nodes

In [33]:
# create those df
ds_tags = ads[['tags']].drop_duplicates().dropna()
ds_tags[':LABEL'] = 'Tag'
ds_tags.rename(columns={'tags': 'name:ID'}, inplace=True)
ds_tags.to_csv('../data/neo4jtry/tags.csv',index=False)
del ds_tags

## create `Loan` nodes

In [34]:
ds_loan = ads.drop(['tags', 'lendingActions_totalCount', 'lender_id', 'lender_name', 'lender_publicId', 'shareAmount', 'date'], axis=1).drop_duplicates()

In [35]:
ds_loan[':LABEL'] = 'Loan'
ds_loan.rename(columns={'id': 'id:ID(Loan-ID)'}, inplace=True)
ds_loan.to_csv('../data/neo4jtry/loans.csv',index=False)
del ds_loan

## create `Lender` nodes

In [36]:
ds_lender = ads[['lender_id', 'lender_name', 'lender_publicId']].drop_duplicates()
ds_lender.tail(2)

Unnamed: 0,lender_id,lender_name,lender_publicId
3350544,2765817,Laura,laura43424517
3350549,1156482,Brian,brian5212


In [37]:
ds_lender[ds_lender.duplicated(subset=['lender_id'], keep=False)].sort_values(by=['lender_id'])

Unnamed: 0,lender_id,lender_name,lender_publicId
1655875,80732,Anonymous,
2124314,80732,Gerard,gerard9070
827667,197089,Anonymous,
1915779,197089,Francisco M,franciscomendes
58688,224248,Anonymous,
623854,224248,James Klich,james1061
2299454,514603,Charmaine,charmaine6891
2603559,514603,Anonymous,
585825,516504,Mary,mary6316
780383,516504,Anonymous,


In [38]:
duplicated_lender_id = ds_lender[ds_lender.duplicated(subset=['lender_id'])]['lender_id']
should_remove = ds_lender[(ds_lender['lender_id'].isin(duplicated_lender_id)) & (ds_lender['lender_publicId'].isna())]
ds_lender.drop(should_remove.index, axis=0, inplace=True)

In [39]:
# drop duplicated_lender who publicId is None
duplicated_lender_id = ds_lender[ds_lender.duplicated(subset=['lender_id'])]['lender_id']
should_remove = ds_lender[(ds_lender['lender_id'].isin(duplicated_lender_id)) & (ds_lender['lender_publicId'].isna())]
ds_lender.drop(should_remove.index, axis=0, inplace=True)
# still duplicate, might be because user change name and publicId. Just remove duplicates here.
ds_lender.drop_duplicates(subset='lender_id', inplace=True)
del duplicated_lender_id
del should_remove
# display the duplicated
ds_lender[ds_lender.duplicated(subset=['lender_id'], keep=False)]

Unnamed: 0,lender_id,lender_name,lender_publicId


In [40]:
ds_lender.rename(columns={'id': 'id:ID(Lender-ID)'}, inplace=True)
ds_lender[':LABEL'] = 'Lender'
ds_lender.to_csv('../data/neo4jtry/lenders.csv',index=False)
del ds_lender

## Create `TAGGED_WITH` relationship between `Loan` and `Tags`

In [41]:
ds_loan_tags = ads[['id', 'tags']].dropna() # dropna helps to avoid inplace here, because we're process in a slide of the `ads`
ds_loan_tags.isna().sum()

id      0
tags    0
dtype: int64

In [42]:
'the number of loans is ', len(ads[['id']].drop_duplicates())

('the number of loans is ', 29376)

In [43]:
ds_loan_tags.drop_duplicates(inplace=True)
ds_loan_tags.duplicated().sum()

0

In [44]:
'the number of loan-tag relationships is', len(ds_loan_tags)

('the number of loan-tag relationships is', 61806)

In [45]:
ds_loan_tags['tags'].value_counts()

#Parent                    10697
#Health and Sanitation      7195
#Woman-Owned Business       6914
#Repeat Borrower            6794
#Eco-friendly               5320
#Elderly                    5176
#Animals                    4991
#First Loan                 4072
#Schooling                  2798
#Repair Renew Replace       1421
#Widowed                    1265
#Biz Durable Asset           790
#Vegan                       699
#Single Parent               664
#Supporting Family           599
#Trees                       561
#Job Creator                 434
#Low-profit FP               352
#Single                      203
#Fabrics                     162
#Female Education            152
#Technology                  125
#Sustainable Ag              124
#Unique                      115
#Interesting Photo            79
#Inspiring Story              43
#Post-disbursed               31
#Hidden Gem                   12
#Orphan                        6
#Tourism                       5
#Refugee  

In [46]:
ds_loan_tags.rename(columns={'id': ':START_ID(Loan-ID)', 'tags': ':END_ID'}, inplace=True)
ds_loan_tags[':TYPE'] = 'TAGGED_WITH'
ds_loan_tags.to_csv('../data/neo4jtry/loan_tags.csv', index=False)
del ds_loan_tags

## create `LEND` relationship between `Lender` and `Loan`

In [47]:
ds_lender_loan = ads[['id', 'lender_id', 'shareAmount', 'date']]
ds_lender_loan.dropna(inplace=True)
ds_lender_loan.tail(5)

Unnamed: 0,id,lender_id,shareAmount,date
3350679,1498137,2806486,125.0,2018-04-29 00:56:02
3350680,1498137,585876,25.0,2018-04-29 15:57:47
3350681,1498137,1156482,25.0,2018-04-29 17:40:33
3350682,1498137,608087,50.0,2018-04-29 19:25:30
3350683,1498137,1157597,25.0,2018-04-28 08:38:49


In [48]:
ds_lender_loan.drop_duplicates(inplace=True) # duplicated cause by exploding the tags

In [49]:
ds_lender_loan[':TYPE'] = 'LEND'
ds_lender_loan.rename(columns={'lender_id': ':START_ID(Lender-ID)', 'id':':END_ID(Loan-ID)'}, inplace=True)
ds_lender_loan.to_csv('../data/neo4jtry/lender_loan.csv', index=False)
del ds_lender_loan

## Manually create the `SHARES_LOANS` relationship

In [50]:
ds_lender_loan = ads[['id', 'lender_id', 'shareAmount', 'date']]
ds_lender_loan.dropna(inplace=True)
ds_lender_loan.drop_duplicates(inplace=True) # duplicated cause by exploding the tags
ds_lender_loan.drop(['shareAmount', 'date'], axis=1, inplace=True)
ds_lender_loan.tail(2)

Unnamed: 0,id,lender_id
3350550,1498137,608087
3350551,1498137,1157597


In [51]:
ds_lender_loan.to_feather("ds_lender_loan.feather")



In [52]:
ds_lender_loan = cudf.read_feather("ds_lender_loan.feather")



In [53]:
ds_lender_loan.info()

<class 'cudf.core.dataframe.DataFrame'>
RangeIndex: 1009513 entries, 0 to 1009512
Data columns (total 2 columns):
 #   Column     Non-Null Count    Dtype
---  ------     --------------    -----
 0   id         1009513 non-null  int64
 1   lender_id  1009513 non-null  int64
dtypes: int64(2)
memory usage: 15.4 MB


In [54]:
ds_lender_loan.lender_id.nunique()

280275

In [56]:
ds_lender_loan.id.unique()

0        1960253
1        1179933
2        1676985
3        1765208
4        1866440
          ...   
29371     814801
29372    1439111
29373    1987395
29374     235812
29375    1498137
Name: id, Length: 29376, dtype: int64

In [71]:
ads[ads['tags'] == '#Married'].id.unique()

0    570685
Name: id, dtype: int64

In [None]:
ds_lender_loan.id.nunique()

In [None]:
"expected number of row of self_merge is", pow(ds_lender_loan.lender_id.nunique()/ds_lender_loan.id.nunique(), 2) * ds_lender_loan.id.nunique()

In [None]:
self_merged = ds_lender_loan.merge(ds_lender_loan, on='id')
del ds_lender_loan
self_merged.head(3)

In [None]:
self_merged = self_merged[self_merged['lender_id_x'] > self_merged['lender_id_y']]

In [None]:
shares_loan = self_merged.groupby(['lender_id_x', 'lender_id_y']).nunique().reset_index()
shares_loan.rename(columns={'id': "number_common_loans"}, inplace=True)
del self_merged
shares_loan.head()

In [None]:
len(shares_loan)

In [None]:
shares_loan.rename(
    columns={
        'lender_id_x': ':START_ID(Lender-ID)', 
        'lender_id_y': ':END_ID(Lender-ID)',
        'id': 'weight'
        }, inplace=True)
shares_loan[':TYPE'] = 'SHARES_LOAN'
shares_loan.to_csv('../data/neo4jtry/lender_lender_share_loan.csv', index=False)
del shares_loan

## Manually creat the `INTEREST` relationship

In [None]:
lender_tag_ds = ads[['id', 'tags', 'lender_id', 'shareAmount', 'date']]
lender_tag_ds.tail(3)

In [None]:
lender_tag_ds.duplicated().sum()

In [None]:
lender_tag_ds = lender_tag_ds.groupby(['lender_id', 'tags']).count()
lender_tag_ds.reset_index(inplace=True)
lender_tag_ds

In [None]:
lender_tag_ds.drop(['shareAmount', 'date'], axis=1, inplace=True)
lender_tag_ds.rename(columns={'id': 'weight'}, inplace=True)
lender_tag_ds.sort_values(by=['weight'], ascending=False).head()

In [None]:
lender_tag_ds[':TYPE'] = 'INTEREST'
lender_tag_ds.rename(columns={'lender_id': ':START_ID(Lender-ID)', 'tags':':END_ID(Tag-ID)'})\
    .to_csv('../data/neo4jtry/lender_tag.csv', index=False)

## Manually create the `SHARE_TAGS` relationship

In [None]:
lender_tag_ds.drop(['weight', ':TYPE'], axis=1, inplace=True)
lender_tag_ds.head()

In [None]:
lender_tag_ds.info()

In [None]:
import gc
gc.collect()

In [None]:
ads.memory_usage().sum() / pow(2, 30)

In [None]:
lender_tag_ds.memory_usage().sum() / pow(2, 30)

In [None]:
lender_tag_ds.to_csv('temp.csv', index=False)

### Re-load data

In [1]:
DEVICES = "1,2"
n_devices = len(DEVICES.split(','))

import os
os.environ["CUDA_VISIBLE_DEVICES"] = DEVICES

import numpy as np
import pandas as pd
from tqdm import tqdm
import cudf
import dask_cudf

tqdm.pandas()

lender_tag_ds : cudf.DataFrame = cudf.read_csv('temp.csv')
lender_tag_ds['lender_id'] = lender_tag_ds['lender_id'].astype('uint32')
lender_tag_ds['tags'] = lender_tag_ds['tags'].astype('category')
lender_tag_ds.info()

<class 'cudf.core.dataframe.DataFrame'>
RangeIndex: 1036684 entries, 0 to 1036683
Data columns (total 2 columns):
 #   Column     Non-Null Count    Dtype
---  ------     --------------    -----
 0   lender_id  1036684 non-null  uint32
 1   tags       1036684 non-null  category
dtypes: category(1), uint32(1)
memory usage: 4.9 MB


In [2]:
# label encoding the tags, for easy integration with `parquet` and later programming. But, remember to store the label encoder
from cuml.preprocessing.LabelEncoder import LabelEncoder
le = LabelEncoder()
lender_tag_ds['tags'] = le.fit_transform(lender_tag_ds['tags'])

In [3]:
lender_tag_ds = lender_tag_ds.sort_values(by=['tags'])

In [4]:
lender_tag_ds.info()

<class 'cudf.core.dataframe.DataFrame'>
Int64Index: 1036684 entries, 9 to 1036675
Data columns (total 2 columns):
 #   Column     Non-Null Count    Dtype
---  ------     --------------    -----
 0   lender_id  1036684 non-null  uint32
 1   tags       1036684 non-null  uint8
dtypes: uint32(1), uint8(1)
memory usage: 12.9 MB


In [None]:
lender_tag_ds.lender_id.nunique()

In [None]:
lender_tag_ds.tags.nunique()

In [None]:
exp_row = pow(lender_tag_ds.lender_id.nunique()/lender_tag_ds.tags.nunique(), 2) * lender_tag_ds.tags.nunique()
"expected number of row of self_merged is", exp_row

### Using SQL

In [None]:
from sqlalchemy import create_engine
engine = create_engine('sqlite:///lender_tag.sqlite', echo=False)
engine = create_engine('postgresql+psycopg2://postgres:postgres@localhost:32772/datdb', echo=False)

In [None]:
lender_tag_ds.to_pandas().to_sql(name='lender_tag', con=engine, index=False)

### Using `Dask-cuDF`

In [5]:
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES=DEVICES, memory_limit="auto", device_memory_limit="auto", n_workers=None)
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_cuda.LocalCUDACluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 2,Total memory: 125.53 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:36011,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 2
Started: Just now,Total memory: 125.53 GiB

0,1
Comm: tcp://127.0.0.1:38989,Total threads: 1
Dashboard: http://127.0.0.1:39055/status,Memory: 62.77 GiB
Nanny: tcp://127.0.0.1:46287,
Local directory: /tmp/dask-scratch-space/worker-1xm2x8sq,Local directory: /tmp/dask-scratch-space/worker-1xm2x8sq
GPU: NVIDIA RTX A5000,GPU memory: 23.99 GiB

0,1
Comm: tcp://127.0.0.1:35215,Total threads: 1
Dashboard: http://127.0.0.1:38175/status,Memory: 62.77 GiB
Nanny: tcp://127.0.0.1:39417,
Local directory: /tmp/dask-scratch-space/worker-f3q485d4,Local directory: /tmp/dask-scratch-space/worker-f3q485d4
GPU: NVIDIA RTX A5000,GPU memory: 23.99 GiB


defined the computation graph, and execute it in background

In [6]:
# lds = lender_tag_ds.set_index('tags')
lds = lender_tag_ds
ddf : dask_cudf.DataFrame = dask_cudf.from_cudf(lds, npartitions=1024) # here we use tags as index, so it would split into 32 partitions
# ddf : dask_cudf.DataFrame = dask_cudf.from_cudf(lds, chunksize=1024)
print("number of divisior", len(ddf.divisions))

merged : dask_cudf.DataFrame = ddf.merge(ddf, on='tags', npartitions=8192)
print("number of divisior", len(merged.divisions))
# merged = merged.repartition(npartitions=8192)
# print("number of divisior", len(merged.divisions))

filtered : dask_cudf.DataFrame = merged[merged['lender_id_x'] > merged['lender_id_y']]
# filtered = filtered.repartition(npartitions=1024)
# filtered = filtered.reset_index() # should avoid this
print("number of divisior", len(filtered.divisions))
filtered = client.persist(filtered)


def nunique(series):
    return series.nunique()

share_tags_ds_dask_collection : dask_cudf.DataFrame = filtered.groupby(['lender_id_x', 'lender_id_y'], sort=False).tags.apply(nunique, meta=('tags', 'int64'))
print("number of divisior", len(share_tags_ds_dask_collection.divisions))

share_tags_ds_dask_collection_persist = client.persist(share_tags_ds_dask_collection)
share_tags_ds_dask_collection_persist.dask

number of divisior 1025
number of divisior 8193
number of divisior 8193


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


number of divisior 8193


0,1
layer_type  MaterializedLayer  is_materialized  True  number of outputs  8192,

0,1
layer_type,MaterializedLayer
is_materialized,True
number of outputs,8192


Key:       ('getitem-4cbd3f29ec34d2afc25760b606726a0d', 1162)
Function:  subgraph_callable-19cf9c65-fc3d-482e-8538-d0bdef9b
args:      ('lender_id_y', 'lender_id_x',         lender_id  tags
67229     4077329    18
85998     1438769    18
67236      357851    18
86097     1823305    18
67239     1392308    18
...           ...   ...
325018     954441    18
339063     360736    18
342399    1020822    18
324974    5165882    18
342417    2661305    18

[30290 rows x 2 columns], 'repartition-get-88c4232fd448d88e92828bfcd75bd13c')
kwargs:    {}
Exception: "MemoryError('std::bad_alloc: out_of_memory: CUDA error at: /home/datnt527/setups/.miniconda3/envs/cudf/include/rmm/mr/device/cuda_memory_resource.hpp')"

2023-10-02 19:22:53,871 - distributed.worker - ERROR - Exception during execution of task ('split-shuffle-0-83a11518216b059cd0c9861725854d10', 10, (8, 3, 4)).
Traceback (most recent call last):
  File "/home/datnt527/setups/.miniconda3/envs/cudf/lib/python3.10/site-packages/distributed/

In [None]:
%%script false --no-raise-error

lender_tag_ds = lender_tag_ds.sort_values(by=['tags', 'lender_id'])
lender_tag_ds.reset_index(drop=True, inplace=True) # make sure the input index is monotonically-increasing
ddf : dask_cudf.DataFrame = dask_cudf.from_cudf(lender_tag_ds, sort=False, chunksize=1024, npartitions=1024) # nparition < number of unique tags
# ddf : dask_cudf.DataFrame = dask_cudf.from_cudf(lender_tag_ds, npartitions=32) # nparition < number of unique tags
merged : dask_cudf.DataFrame = ddf.merge(ddf, on='tags')
merged = merged.repartition(npartitions=8192)
# merged_meta = cudf.DataFrame(columns=['lender_id_x', 'lender_id_y', 'tags'], dtype={'lender_id_x': 'uint32', 'lender_id_y': 'uint32', 'tags': 'category'})
filtered : dask_cudf.DataFrame = merged[merged['lender_id_x'] > merged['lender_id_y']]
# filtered : dask_cudf.DataFrame = filtered.repartition(npartitions=8192)


def nunique(series):
    return series.nunique()

share_tags_ds_dask_collection : dask_cudf.DataFrame = filtered.groupby(['lender_id_x', 'lender_id_y'], sort=False).tags.apply(nunique, meta=('tags', 'int64'))


excute the computatation in background

In [None]:
dict(share_tags_ds_dask_collection_persist.dask)

In [None]:
fut = share_tags_ds_dask_collection_persist.dask[('nunique-af4acf0676342f0939f34bea6059690e', 999)]
fut.result()

In [None]:
# filtered_gathered = client.gather(filtered_persist)

In [None]:
share_tags_ds_gathered = client.gather(share_tags_ds_dask_collection_persist)

In [None]:
share_tags_ds = share_tags_ds_gathered.compute()

In [None]:
share_tags_ds = share_tags_ds.rename('common_tags_count')

In [None]:
share_tags_ds = share_tags_ds.to_frame()

In [None]:
share_tags_ds.to_parquet("share_tags_ds_1.parquet")

In [None]:
lazada = cudf.read_parquet("share_tags_ds_1.parquet")

In [None]:
lazada.sort_values('common_tags_count', ascending=False).reset_index().info()

# `cuGRAPH`

In [None]:
import cugraph as cnx

G = cnx.Graph()