In [1]:
import pandas as pd
import numpy as np
import networkx as nx
from itertools import combinations 
import time
import os
import sys
from tqdm import tqdm
from math import comb

In [2]:
start_time = time.perf_counter()

discharges_test_file = '../data/discharges_test.csv'
notes_test_file = '../data/notes_test.csv'

# discharges_test_file = '../data/discharges_w_disposition_abdul_test.csv'
# notes_test_file = '../data/notes_w_disposition_abdul_test.csv'

# discharges_test_file = '../data/discharges_w_datetime.csv'
# notes_test_file = '../data/notes_w_datetime.csv'

discharge_table = pd.read_csv(discharges_test_file, parse_dates=['arrive_date','discharge_date'])
notes_table = pd.read_csv(notes_test_file, parse_dates=['date'])
notes_table.rename(columns = {'discharge_id':'id'}, inplace = True)

# authors who wrote notes within 48 hours of arrival date are in index team for that patient
INDEX_DELTA = np.timedelta64(2, 'D')
# notes written within previous 90 days are considered when calculating collaborative experience
TEAMWORK_DELTA = np.timedelta64(90, 'D')

# match in admission datetime indexing on visit id from discharge table
notes_table = notes_table.merge(discharge_table, on='id', how='right')

# create new column for normalized date by ignoring time of day
notes_table['norm_arrive_date'] = notes_table['arrive_date'].astype('datetime64[D]')
notes_table['norm_note_date'] = notes_table['date'].astype('datetime64[D]')

notes_table.drop_duplicates(['norm_note_date','dr','id'], keep="first", inplace=True)

notes_table.sort_values('arrive_date', inplace=True)
FIRST_DATE = notes_table['arrive_date'].iloc[0]

# add indicator column for whether the note author is in the index team
notes_table['is_in_team'] = notes_table["date"] - notes_table["arrive_date"] <= INDEX_DELTA

stop_time = time.perf_counter()
print(f"It took {stop_time - start_time} seconds or {(stop_time - start_time) / 60} minutes to read in data and modify table")   

It took 0.0387992000032682 seconds or 0.0006466533333878033 minutes to read in data and modify table


In [3]:
notes_table.head()

Unnamed: 0,id,dr,date,patient,arrive_date,discharge_date,disposition,age,norm_arrive_date,norm_note_date,is_in_team
0,0,Brad Palmer,2019-01-01 19:15:00,patient1,2019-01-01,2019-01-01,1,75,2019-01-01,2019-01-01,True
1,1,Albert Romero,2019-01-24 10:19:00,patient2,2019-01-24,2019-01-24,0,68,2019-01-24,2019-01-24,True
2,1,Margie Meyer,2019-01-24 17:09:00,patient2,2019-01-24,2019-01-24,0,68,2019-01-24,2019-01-24,True
3,1,Evan Frazier,2019-01-24 16:48:00,patient2,2019-01-24,2019-01-24,0,68,2019-01-24,2019-01-24,True
4,1,Myrtle George,2019-01-24 13:41:00,patient2,2019-01-24,2019-01-24,0,68,2019-01-24,2019-01-24,True


In [4]:
start_time = time.perf_counter()

# do self join on discharge id and normalized note date to get table of edges
edges_table = notes_table.merge(notes_table[['dr','id','is_in_team','norm_note_date']], how='inner', on=['id','norm_note_date'])
# remove edges with the same name twice or with authors in reverse order
edges_table = edges_table[edges_table['dr_x'] < edges_table['dr_y']]
edges_table['edge'] = edges_table['dr_x'] + edges_table['dr_y']

# might be able to remove this line, need to discuss
edges_table['is_in_team'] = edges_table['is_in_team_x'] & edges_table['is_in_team_y']
# add column indicating whether there are 90 days prior to arrive date. if not, don't count as index team
edges_table['is_after_delta'] = edges_table["arrive_date"] > (FIRST_DATE + TEAMWORK_DELTA)

stop_time = time.perf_counter()
print(f"It took {stop_time - start_time} seconds or {(stop_time - start_time) / 60} minutes to self join and get edges table")  

It took 0.026982399984262884 seconds or 0.0004497066664043814 minutes to self join and get edges table


In [5]:
start_time = time.perf_counter()

# do self join on discharge id get table of team edges
team_table = notes_table.merge(notes_table[['dr','id','is_in_team','norm_note_date']], how='inner', on='id')
# remove edges with the same name twice or with authors in reverse order
team_table = team_table[team_table['dr_x'] < team_table['dr_y']]
team_table['edge'] = team_table['dr_x'] + team_table['dr_y']

# might be able to remove this line, need to discuss
team_table['is_in_team'] = team_table['is_in_team_x'] & team_table['is_in_team_y']
# add column indicating whether there are 90 days prior to arrive date. if not, don't count as index team
team_table['is_after_delta'] = team_table["arrive_date"] > (FIRST_DATE + TEAMWORK_DELTA)

team_table = team_table[team_table['is_in_team'] & team_table['is_after_delta']]

stop_time = time.perf_counter()
print(f"It took {stop_time - start_time} seconds or {(stop_time - start_time) / 60} minutes to self join and get edges table")  

It took 0.025414600007934496 seconds or 0.00042357666679890824 minutes to self join and get edges table


In [6]:
def get_output_for_row(g, visit_id, team):
    data = {}
    data['discharge_id'] = visit_id
    
    ''' Clustering coefficient of all nodes (in a dictionary) '''
    clustering_coefficient = nx.clustering(g, weight='weight')
    
    ''' Average clustering coefficient with divide-by-zero check '''
    clust_sum = sum(clustering_coefficient.values())
    clust_len = len(clustering_coefficient)
        
    data['avg_clust'] = clust_sum / clust_len if clust_len > 0 else 0 
    
    data['sum_clust'] = clust_sum
    data['team_size'] = len(team)
    potential_edges = comb(len(team),2)
    data['potential_edges'] = potential_edges
    data['team_edge_size'] = g.number_of_edges()
    
    experience = g.size(weight='weight') #Experience as sum of weights
    data['cumulative_experience'] = experience - data['team_edge_size']
    data['avg_cumulative_experience'] = data['cumulative_experience'] / potential_edges if data['team_size'] > 0 else 0
    
    return data

In [7]:
# map discharge id to list of edges
discharge_id_to_edges_dict = dict()
# map edge to list of dates when note authors collaborated
edge_to_date_dict = dict()
# map discharge id to list of team members
discharge_id_to_team_dict = dict()

def add_to_team_dict(edge_record):
    edge_tup = (edge_record['dr_x'], edge_record['dr_y'])
    # store edge, individual note author names, and arrive date in list item
    edge_list_item = (edge_record['edge'], edge_tup, edge_record['norm_arrive_date'])
    discharge_id_to_edges_dict.setdefault(edge_record['id'],[]).append(edge_list_item)
    discharge_id_to_team_dict.setdefault(edge_record['id'],set()).update(edge_tup)

def add_edge_to_dict(edge_record):
    edge_to_date_dict.setdefault(edge_record['edge'],[]).append(edge_record['norm_note_date'])
    
tqdm.pandas(desc="Dictionary Progress Bar!")

start_time = time.perf_counter()

# edges_table.apply(add_edge_to_dict, axis='columns')

# team_table.apply(add_to_team_dict, axis='columns')

edges_table.progress_apply(add_edge_to_dict, axis='columns')

team_table.progress_apply(add_to_team_dict, axis='columns')
    
stop_time = time.perf_counter()
print(f"It took {stop_time - start_time} seconds or {(stop_time - start_time) / 60} minutes to build dictionaries") 

  from pandas import Panel
Dictionary Progress Bar!: 100%|██████████| 33/33 [00:00<00:00, 4066.88it/s]
Dictionary Progress Bar!: 100%|██████████| 28/28 [00:00<00:00, 4777.50it/s]

It took 0.029244600009405985 seconds or 0.00048741000015676644 minutes to build dictionaries





In [8]:
experience_data_list = []

start_time = time.perf_counter()

for discharge_id, edge_items in tqdm(discharge_id_to_edges_dict.items()):
    edge_list = []
    for edge_item in edge_items:
        edge = edge_item[0]
        if edge not in edge_to_date_dict: continue
        (dr_x,dr_y) = edge_item[1]
        arrive_date = edge_item[2]
        weight = len([note_day for note_day in edge_to_date_dict[edge] 
                      if note_day < arrive_date and note_day >= arrive_date - TEAMWORK_DELTA])
        if(weight < 1): continue 
        edge_list.append({'source':dr_x,'target':dr_y,'weight':weight})
    edge_df = pd.DataFrame(edge_list, columns = ['source', 'target', 'weight'])
    g = nx.from_pandas_edgelist(edge_df, source='source', target='target',edge_attr='weight')
    team = discharge_id_to_team_dict[discharge_id]
    experience_data_list.append(get_output_for_row(g, discharge_id, team))
        
stop_time = time.perf_counter()
print(f"It took {stop_time - start_time} seconds or {(stop_time - start_time) / 60} minutes to create graphs")     

100%|██████████| 1/1 [00:00<00:00, 139.00it/s]

It took 0.011213899997528642 seconds or 0.00018689833329214403 minutes to create graphs





In [9]:
columns = ['discharge_id',
        'avg_clust',
        'cumulative_experience',
        'avg_cumulative_experience',
        'team_edge_size',
           'team_size',
           'potential_edges'
          ]

# experience_df = pd.DataFrame(experience_data_list, columns=columns).drop_duplicates()
experience_df = pd.DataFrame(experience_data_list, columns=columns)
experience_df.head(20)

Unnamed: 0,discharge_id,avg_clust,cumulative_experience,avg_cumulative_experience,team_edge_size,team_size,potential_edges
0,6,0.677976,6.0,0.214286,15,8,28


In [16]:
print(discharge_id_to_edges_dict[6])
print(discharge_id_to_team_dict[6])

[('Grant DeLongMargie Meyer', ('Grant DeLong', 'Margie Meyer'), Timestamp('2019-04-15 00:00:00')), ('Grant DeLongMyrtle George', ('Grant DeLong', 'Myrtle George'), Timestamp('2019-04-15 00:00:00')), ('Grant DeLongVictoria Washington', ('Grant DeLong', 'Victoria Washington'), Timestamp('2019-04-15 00:00:00')), ('Grant DeLongNeil Mitchell', ('Grant DeLong', 'Neil Mitchell'), Timestamp('2019-04-15 00:00:00')), ('Grant DeLongHenry Philofsky', ('Grant DeLong', 'Henry Philofsky'), Timestamp('2019-04-15 00:00:00')), ('Albert RomeroGrant DeLong', ('Albert Romero', 'Grant DeLong'), Timestamp('2019-04-15 00:00:00')), ('Albert RomeroMargie Meyer', ('Albert Romero', 'Margie Meyer'), Timestamp('2019-04-15 00:00:00')), ('Albert RomeroEvan Frazier', ('Albert Romero', 'Evan Frazier'), Timestamp('2019-04-15 00:00:00')), ('Albert RomeroMyrtle George', ('Albert Romero', 'Myrtle George'), Timestamp('2019-04-15 00:00:00')), ('Albert RomeroVictoria Washington', ('Albert Romero', 'Victoria Washington'), Time

In [11]:
'''This code does not work but playing around with an easy API for the library'''

# column_names = {
#     "visit_id": "discharge_id",
#     "admission_date": "arrive_date",
#     "note_author": "dr",
#     "note_date": "date"
# }

# EXPERIENCE_WINDOW = 90
# TEAM_WINDOW = 2

# study = TeamWorkStudy(notes_csv_file, EXPERIENCE_WINDOW,TEAM_WINDOW, column_names=column_names)



'This code does not work but playing around with an easy API for the library'

In [12]:
myset = set([1,2,3])
myset.add(1)
myset.add(4)
myset.update((2,5,8))
print(myset)
print(len(myset))

from math import comb
comb(4,2)


{1, 2, 3, 4, 5, 8}
6


6

In [13]:
test_df = pd.DataFrame({'date': ['3/10/2000', '3/11/2000', '3/12/2000'],
                   'value': [2, 3, 4],
                       'another': [6,7,8]})
def add_column(df):
    df['another_column'] = [7,8,9]
add_column(test_df)
print(test_df.columns.values)

['date' 'value' 'another' 'another_column']


In [14]:
cols = {'values':'value', 'datetime':'custdate'}
defcols = {'datetime':'date','values':'value','hello':[1,2,3], 'goodbye':{1,2,3}}
mycols = {**defcols,**cols}
def take_params(**cols):
    print(cols['hello'])
take_params(**mycols)

_,v,_,gb = [*mycols.values()]
print(gb)  

def print_val(v):
    print(v)
    
list(map(print_val, [1,4,5]))


[1, 2, 3]
{1, 2, 3}
1
4
5


[None, None, None]

In [15]:
from multiprocessing import Pool, Process, connection, current_process, cpu_count
from random import randint
from time import sleep
from datetime import datetime


def run(i):
    sleep_time = randint(2,10)
    sleep(sleep_time)
    print(f"{datetime.now()} {current_process().name} exiting with arg {i} after sleeping for {sleep_time}")

def cpu_bound(numbers, r):
    print(f"\n{datetime.now()} {current_process().name} running for r := {r}")
    thesum = sum(sum(i * i for i in range(number)) for number in numbers)
    print(f"{datetime.now()} {current_process().name} exiting with sum of {thesum}")
    return thesum

def get_numbers(r):
    return [5_000_000 + i for i in range(r)]

def print_res(res):
    print(f'in callback: {res}')

if __name__ == '__main__':
    with Pool(cpu_count()) as pool:
        print(f'cpu count: {cpu_count()}')
#         pool = [Process(target=cpu_bound,args=(get_numbers(r := randint(10,20)), r)) for _ in range(4)]
        print(f"{datetime.now()} {current_process().name} waiting")
        multiple_results = [pool.apply_async(cpu_bound, args=(get_numbers(r := randint(10,20)), r), callback=print_res) for _ in range(4)]
        [res.get() for res in multiple_results]

    print(f"{datetime.now()} {current_process().name} out of with block")


2021-05-26 20:10:16.799886 ForkPoolWorker-2 running for r := 13
2021-05-26 20:10:16.799987 ForkPoolWorker-3 running for r := 20
2021-05-26 20:10:16.799884 ForkPoolWorker-1 running for r := 11
2021-05-26 20:10:16.800133 ForkPoolWorker-4 running for r := 20



cpu count: 8
2021-05-26 20:10:16.797450 MainProcess waiting
2021-05-26 20:10:22.277877 ForkPoolWorker-1 exiting with sum of 458334570834992500825
in callback: 458334570834992500825
2021-05-26 20:10:23.363070 ForkPoolWorker-2 exiting with sum of 541668454169537501716
in callback: 541668454169537501716
2021-05-26 20:10:25.930157 ForkPoolWorker-4 exiting with sum of 833337833344750010830
2021-05-26 20:10:26.090395 ForkPoolWorker-3 exiting with sum of 833337833344750010830
in callback: 833337833344750010830
in callback: 833337833344750010830
2021-05-26 20:10:26.099596 MainProcess out of with block
