In [2]:
import pandas as pd
import numpy as np
from multiprocessing import Pool
import time
import os
import graph_tool.all as gt

# Contact Matching

In [8]:
def csv_to_parquet(p_group=False):
    # load data frame into memory
    df = pd.read_csv('./VF_data/region_niedersachsen_tapas_modell.csv')

    # sort data by person and by start time
    df = df.sort_values(['p_id', 'start_time_min'], ignore_index=True)

    # normalize p_id to be index like only the first time the cell is executed
    if not df['p_id'].iloc[0] == 0:
        df['p_id'] = df['p_id'] - 100_000_000

    # compute absolute value of loc_id_end and loc_id_start
    df['loc_id_end'] = df['loc_id_end'].abs()
    df['loc_id_start'] = df['loc_id_start'].abs()

    # compute activity end time
    df['activity_end_min'] = df['activity_start_min'] + df['activity_duration_min']

    # Downcast times
    # Select important coloumns
    if p_group:
        df = df.astype({'p_id': 'int32', 'loc_id_end': 'int32', 'activity_start_min': 'int16', 'activity_end_min': 'int16',
                        'p_group': 'int8'})
        df = df[['p_id', 'loc_id_end', 'activity_start_min', 'activity_end_min', 'p_group']]
    else:
        df = df.astype({'p_id': 'int32', 'loc_id_end': 'int32', 'activity_start_min': 'int16', 'activity_end_min': 'int16'})
        df = df[['p_id', 'loc_id_end', 'activity_start_min', 'activity_end_min']]

    # Save to parquet
    df.to_parquet('./VF_data/rns_data.parquet')

csv_to_parquet(True)

In [9]:
df = pd.read_parquet('./VF_data/rns_data.parquet')

In [10]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9012236 entries, 0 to 9012235
Data columns (total 5 columns):
 #   Column              Dtype
---  ------              -----
 0   p_id                int32
 1   loc_id_end          int32
 2   activity_start_min  int16
 3   activity_end_min    int16
 4   p_group             int8 
dtypes: int16(2), int32(2), int8(1)
memory usage: 111.7 MB


In [8]:
def get_contacts_vect(start_times, end_times, event_ids, loc_id_end):
    # Broadcast the start_time and end_time arrays for comparison with each other
    overlap_start = np.maximum.outer(start_times, start_times)
    overlap_end = np.minimum.outer(end_times, end_times)

    # Calculate the overlap duration matrix using NumPy
    overlap_durations = np.maximum(overlap_end - overlap_start, np.zeros(shape=overlap_start.shape)).astype('uint16')
    
    # Set diagonal elements to zero (overlap of an event with itself and double counting)
    overlap_durations = np.triu(overlap_durations, 1)

    # Extract contact rows, cols
    rows, cols = np.where(overlap_durations > 0)
    p_A = event_ids[rows]

    # Save contacts to new DataFrame
    contact_data = {'p_A': p_A,'p_B': event_ids[cols], 
                    'start_of_contact': overlap_start[rows, cols],
                    'end_of_contact': overlap_end[rows, cols],
                    'loc_id': np.repeat(loc_id_end, len(p_A)).astype('int32')}

    return pd.DataFrame(contact_data)

In [9]:
# Setup loop over groups
# Group by loc_id_end
grouped = df.groupby('loc_id_end')
group_keys = np.array(list(grouped.groups.keys()))
print('Grouped')

# Shuffle groups
np.random.seed(1)
np.random.shuffle(group_keys)

chunk_size = 10_000
N = len(group_keys)

# Create padding, transform into chunks
pad = (0, chunk_size * np.ceil(N / chunk_size).astype('int') - N)
group_keys = np.pad(group_keys, pad_width=pad, constant_values=(0, 0))
group_keys = group_keys.reshape(-1, chunk_size)
N = len(group_keys)
print(f'Created {N} chunks of size {chunk_size}')

# Fast group acces
groups = dict(list(grouped))

Grouped
Created 147 chunks of size 10000


In [10]:
print('Start extracting contacts')
def contacts_per_location(grp, loc_id):
    # Extracts all contacts from a certain location
    # print(grp.shape[0], grp.name)

    return get_contacts_vect(grp.activity_start_min.values, grp.activity_end_min.values, grp.p_id.values, loc_id)

def contacts_per_location_wrapper(chunk_ind):
    start_time = time.perf_counter()
    chunk = group_keys[chunk_ind]
    df_temp = pd.concat([contacts_per_location(groups[key], key) for key in chunk], ignore_index=True, sort=False)
    df_temp.to_parquet(f'./parquets/rns/rns_contacts_chunk{str(chunk_ind).zfill(3)}.parquet')
    dt = time.perf_counter() -  start_time
    print(f'Finished chunk {chunk_ind}/{N-1} in {round(dt, 2)}s')
    return chunk_ind


with Pool(processes=10) as pool:
    results = pool.imap_unordered(contacts_per_location_wrapper, range(N - 1))

    for res in results:
        # Remove padding from final chunk
        if res == N - 2:
            start_time = time.perf_counter()
            chunk_final = group_keys[-1]
            chunk_final = chunk_final[chunk_final > 0]

            df_temp = pd.concat([contacts_per_location(groups[key], key) for key in chunk_final], ignore_index=True, sort=False)
            df_temp.to_parquet(f'./parquets/rns/rns_contacts_chunk{str(N - 1).zfill(3)}.parquet')
            dt = time.perf_counter() -  start_time
            print(f'Finished chunk {N-1}/{N-1} in {round(dt, 2)}s')

print('Concate results into single DataFrame')
base_dir = './parquets/rns/'
files = os.listdir(base_dir)

all_dfs = []
for i, file in enumerate(files):
    print(i)
    all_dfs.append(pd.read_parquet(base_dir + file))
    
df_contacts = pd.concat(all_dfs)

print('Export to parquet')
df_contacts.to_parquet('./parquets/rns_contacts_full_1.parquet')

Start extracting contacts
Finished chunk 5/146 in 15.39s
Finished chunk 0/146 in 15.57s
Finished chunk 4/146 in 15.63s
Finished chunk 8/146 in 15.63s
Finished chunk 9/146 in 15.9s
Finished chunk 3/146 in 15.97s
Finished chunk 2/146 in 16.58s
Finished chunk 7/146 in 18.16s
Finished chunk 1/146 in 18.49s
Finished chunk 13/146 in 4.52s
Finished chunk 11/146 in 4.69s
Finished chunk 6/146 in 20.4s
Finished chunk 14/146 in 4.89s
Finished chunk 10/146 in 5.83s
Finished chunk 16/146 in 4.99s
Finished chunk 15/146 in 5.87s
Finished chunk 17/146 in 4.66s
Finished chunk 18/146 in 5.21s
Finished chunk 21/146 in 4.82s
Finished chunk 19/146 in 5.16s
Finished chunk 23/146 in 4.66s
Finished chunk 22/146 in 6.75s
Finished chunk 20/146 in 7.48s
Finished chunk 26/146 in 5.12s
Finished chunk 25/146 in 6.36s
Finished chunk 27/146 in 4.54s
Finished chunk 29/146 in 4.68s
Finished chunk 30/146 in 4.61s
Finished chunk 28/146 in 6.97s
Finished chunk 31/146 in 5.55s
Finished chunk 33/146 in 5.53s
Finished chunk 

# Network genertion

In [2]:
def full_network_generation(edge_path='./parquets/rns_contacts_full_1.parquet'):
    # It's faster to create a new Graph from the edges contained in the parquet file,
    # than to save the graph as a gt-file and reloade it.

    print('Load parquet file')
    edge_list = pd.read_parquet(edge_path, columns=['p_A', 'p_B']).values
    print('Add edges')
    G = gt.Graph(directed=False)
    G.add_edge_list(edge_list)

    return G

G = full_network_generation()

Load parquet file
Add edges


# Not used anymore
Maybe interesting for later
<br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br>