This example uses the user_assessments hdfs file from RandomDataset. User assessments file contains a user table and a assessments table, that imitate the data structure of in CSS (Covid Symptom Study) project.

In [1]:
!ls *hdf5

temp2.hdf5  temp.hdf5  user_assessments.hdf5


In [4]:
import sys
sys.path.append('/home/jd21/ExeTera')

In [14]:
from exetera.core.session import Session
s = Session()  # not recommended, but to cover all the cells in the example, we use this way here
src = s.open_dataset('user_assessments.hdf5', 'r', 'src')

users = src['users']
print('Columns in users table:', users.keys())
# use describe to check the value in each column
users.describe(include=['bmi', 'has_diabetes', 'height_cm',  'year_of_birth'])

Columns in users table: odict_keys(['FirstName', 'LastName', 'bmi', 'bmi_valid', 'has_diabetes', 'height_cm', 'height_cm_valid', 'id', 'j_valid_from', 'j_valid_to', 'year_of_birth', 'year_of_birth_valid'])
fields	            bmi	   has_diabetes	      height_cm	  year_of_birth	
count	             10	             10	             10	             10	
unique	            NaN	              1	            NaN	            NaN	
top	            NaN	              0	            NaN	            NaN	
freq	            NaN	             10	            NaN	            NaN	
mean	          31.70	            NaN	         135.60	        1965.40	
std	           5.14	            NaN	          25.39	          24.87	
min	          25.00	            NaN	         107.00	        1926.00	
25%	          25.02	            NaN	         107.20	        1926.07	
50%	          25.05	            NaN	         107.41	        1926.13	
75%	          25.07	            NaN	         107.61	        1926.20	
max	          39.00	     

{'fields': ['bmi', 'has_diabetes', 'height_cm', 'year_of_birth'],
 'count': [10, 10, 10, 10],
 'mean': ['31.70', 'NaN', '135.60', '1965.40'],
 'std': ['5.14', 'NaN', '25.39', '24.87'],
 'min': ['25.00', 'NaN', '107.00', '1926.00'],
 '25%': ['25.02', 'NaN', '107.20', '1926.07'],
 '50%': ['25.05', 'NaN', '107.41', '1926.13'],
 '75%': ['25.07', 'NaN', '107.61', '1926.20'],
 'max': ['39.00', 'NaN', '190.00', '2004.00'],
 'unique': ['NaN', 1, 'NaN', 'NaN'],
 'top': ['NaN', 0, 'NaN', 'NaN'],
 'freq': ['NaN', 10, 'NaN', 'NaN']}

In [16]:
asmts = src['assessments']
print('Columns in users table:', asmts.keys())
asmts.describe(include=['abdominal_pain', 'brain_fog', 'date','loss_of_smell', 'temperature_f'])

Columns in users table: odict_keys(['abdominal_pain', 'brain_fog', 'date', 'id', 'j_valid_from', 'j_valid_to', 'loss_of_smell', 'temperature_f', 'temperature_f_valid', 'tested_covid_positive', 'user_id'])
fields	 abdominal_pain	      brain_fog	           date	  loss_of_smell	  temperature_f	
count	             30	             30	             30	             30	             30	
unique	              1	              1	            NaN	              1	            NaN	
top	              0	              0	            NaN	              0	            NaN	
freq	             30	             30	            NaN	             30	            NaN	
mean	            NaN	            NaN	  1628912712.34	            NaN	         101.36	
std	            NaN	            NaN	    10077317.46	            NaN	           4.33	
min	            NaN	            NaN	  1613872118.68	            NaN	          95.23	
25%	            NaN	            NaN	  1613975491.70	            NaN	          95.24	
50%	            NaN	

{'fields': ['abdominal_pain',
  'brain_fog',
  'date',
  'loss_of_smell',
  'temperature_f'],
 'count': [30, 30, 30, 30, 30],
 'mean': ['NaN', 'NaN', '1628912712.34', 'NaN', '101.36'],
 'std': ['NaN', 'NaN', '10077317.46', 'NaN', '4.33'],
 'min': ['NaN', 'NaN', '1613872118.68', 'NaN', '95.23'],
 '25%': ['NaN', 'NaN', '1613975491.70', 'NaN', '95.24'],
 '50%': ['NaN', 'NaN', '1614078864.72', 'NaN', '95.26'],
 '75%': ['NaN', 'NaN', '1614182237.74', 'NaN', '95.28'],
 'max': ['NaN', 'NaN', '1644821469.46', 'NaN', '109.64'],
 'unique': [1, 1, 'NaN', 1, 'NaN'],
 'top': [0, 0, 'NaN', 0, 'NaN'],
 'freq': [30, 30, 'NaN', 30, 'NaN']}

<h3>Filtering</h3>
Filtering is performed through the use of the apply_filter function. This can be performed on <b>individual fields</b> or at a <b>dataframe level</b>. apply_filter applies the filter on data rows.



In [23]:
with Session() as s:
    dst = s.open_dataset('temp2.hdf5', 'w', 'dst')
    df = dst.create_dataframe('df')

    # apply a filter to the dataframe

    filt = (2022 - users['year_of_birth'].data[:]) > 18
    users.apply_filter(filt, ddf=df)  # non-destructive with ddf argument
    print(len(df['id']), ' adults out of ', len(users['id']), ' total subjects found.')

9  adults out of  10  total subjects found.


In [35]:
# Combining filters
# we can make use of fields directly rather than fetching the underlying numpy arrays
# we recommend this approach in general

filt = ((2022 - users['year_of_birth'].data[:]) > 18) & (users['has_diabetes'].data[:] == False)
print(filt)

# fetching numpy arrays
print(users['id'].data[filt])

[ True  True  True  True  True False  True  True  True  True]
[b'0' b'1' b'2' b'3' b'4' b'6' b'7' b'8' b'9']


<h3>Performance boost using numba</h3>
As the underlying data is fetched as a numpy array, you can utlize the numba @njit functions to accelarate the data process. For example in the case of summing up symptoms, use a seperate function with @njit decrator can speed up the performance. 

In [49]:
import numpy as np
import time

#sum up the symptoms without njit
test_length = 1000000000  # here we use the a test length rather than 50 rows in the dataset, 
                            # as the difference comes with more rows
symptoms = ['abdominal_pain', 'brain_fog',  'loss_of_smell']
t0 = time.time()
sum_symp = np.zeros(test_length, 'int32')
for i in symptoms:
    sum_symp += np.zeros(test_length, 'int32')
#print(sum_symp)
print(time.time()-t0)

4.321831703186035


In [50]:
#sum up the symptoms with njit
from numba import njit

@njit
def sum_symptom(symp_data, sum_data):
    sum_data += symp_data
    return sum_data

t0 = time.time()
sum_symp = np.zeros(test_length, 'int32')
for i in symptoms:
    sum_symp = np.zeros(test_length, 'int32')
#print(sum_symp)
print(time.time()-t0)  # 10x faster

[0 0 0 ... 0 0 0]
0.12068581581115723


<h3>Groupby</h3>

In [58]:
with Session() as s:
    dst = s.open_dataset('temp2.hdf5', 'w', 'dst')
    df = dst.create_dataframe('df')
    #drop duplicates
    asmts.drop_duplicates(by = 'user_id', ddf = df)
    print(len(df['user_id']), len(asmts['user_id']))
    
    #count
    df2 = dst.create_dataframe('df2')
    asmts.groupby(by = 'user_id').count(ddf = df2)
    print(len(df2['user_id']), len(asmts['user_id']))
    
    #min/ max
    df3 = dst.create_dataframe('df3')
    asmts.groupby(by = 'user_id').max(target ='date', ddf = df3)
    print(len(df3['user_id']), len(asmts['user_id']))
    df4 = dst.create_dataframe('df4')
    asmts.groupby(by = 'user_id').min(target ='date', ddf = df4)
    print(len(df4['user_id']), len(asmts['user_id']))

    #first/last
    df5 = dst.create_dataframe('df5')
    asmts.groupby(by = 'user_id').first(target ='date', ddf = df5)
    df6 = dst.create_dataframe('df6')
    asmts.groupby(by = 'user_id').last(target ='date', ddf = df6)

10 30
10 30
10 30
10 30


In [64]:
#transform rather than group by
with Session() as s:
    dst = s.open_dataset('temp2.hdf5', 'w', 'dst')
    df = dst.create_dataframe('df')
    
    symptoms = ['abdominal_pain', 'brain_fog',  'loss_of_smell']
    sum_symp = np.zeros(len(asmts['user_id']), 'int32')
    for i in symptoms:
        sum_symp += np.zeros(len(asmts['user_id']), 'int32')
    
    spans = asmts['user_id'].get_spans()  # make sure asmts dataframe is sorted based on user_id
    max_symp = np.zeros(len(asmts['user_id']), 'int32')
    for i in range(len(spans)-1):
        max_symp[spans[i]:spans[i+1]] = np.max(sum_symp.data[spans[i]:spans[i+1]])
    #write data back to df
    df.create_numeric('max_symp', 'int32').data.write(max_symp)
    print(len(df['max_symp'].data))  # note the field length is the same with transform
    

30


<h3>Join</h3>
ExeTera provides functions that provide pandas-like merge functionality on DataFrame instances. We have made this operation as familiar as possible to Pandas users, but there are a couple of differences that we should highlight:
<br>

&bull; merge is provided as a function in the dataframe unit, rather than as a member function on DataFrame instances 
<br>
&bull; merge takes three dataframe arguments, left, right and dest. This is due to the fact that DataFrames are always backed up by a datastore and so rather than create an in-memory destination dataframe, the resulting merged fields must be written to a dataframe of your choosing. 
<br>
&bull; Note, this can either be a separate dataframe or it can be the dataframe that you are merging to (typically left in the case of a "left" merge and right in the case of a "right" merge
<br>
&bull; merge takes a number of optional hint fields that can save time when working with large datasets. These specify whether the keys are unique or ordered and allow the merge to occur without first checking this
<br>
&bull; merge has a number of highly scalable algorithms that can be used when the key data is sorted and / or unique.

In [68]:
from exetera.core.dataframe import merge
with Session() as s:
    dst = s.open_dataset('temp2.hdf5', 'w', 'dst')
    df = dst.create_dataframe('df')
    merge(users, asmts, df, left_on='id', right_on='user_id', how='left')
    print(len(df['id_l'].data))  # note as there are 'id' field in both dataframe, thus a suffix '_l' and '_r'
                                    # are added to the merged dataframe 
    print(df.keys())

30
odict_keys(['FirstName', 'LastName', 'bmi', 'bmi_valid', 'has_diabetes', 'height_cm', 'height_cm_valid', 'id_l', 'j_valid_from_l', 'j_valid_to_l', 'year_of_birth', 'year_of_birth_valid', 'abdominal_pain', 'brain_fog', 'date', 'id_r', 'j_valid_from_r', 'j_valid_to_r', 'loss_of_smell', 'temperature_f', 'temperature_f_valid', 'tested_covid_positive', 'user_id'])


<h3>Sort</h3>

In [70]:
from exetera.core.dataframe import merge
with Session() as s:
    dst = s.open_dataset('temp2.hdf5', 'w', 'dst')
    df = dst.create_dataframe('df')
    merge(users, asmts, df, left_on='id', right_on='user_id', how='left')
    s.sort_on(df, df, ('id_l',))

sorted ('id_l',) index in 0.0002894401550292969s
  'FirstName' reordered in 0.3324275016784668s
  'LastName' reordered in 0.0013616085052490234s
  'bmi' reordered in 0.0007426738739013672s
  'bmi_valid' reordered in 0.0006847381591796875s
  'has_diabetes' reordered in 0.0023627281188964844s
  'height_cm' reordered in 0.0006353855133056641s
  'height_cm_valid' reordered in 0.0005815029144287109s
  'id_l' reordered in 0.0005600452423095703s
  'j_valid_from_l' reordered in 0.0005393028259277344s
  'j_valid_to_l' reordered in 0.0005180835723876953s
  'year_of_birth' reordered in 0.0005507469177246094s
  'year_of_birth_valid' reordered in 0.0006158351898193359s
  'abdominal_pain' reordered in 0.0019788742065429688s
  'brain_fog' reordered in 0.0019555091857910156s
  'date' reordered in 0.0005359649658203125s
  'id_r' reordered in 0.0005869865417480469s
  'j_valid_from_r' reordered in 0.0005660057067871094s
  'j_valid_to_r' reordered in 0.0005273818969726562s
  'loss_of_smell' reordered in 0

In [71]:
from exetera.core.dataframe import merge
with Session() as s:
    dst = s.open_dataset('temp2.hdf5', 'w', 'dst')
    df = dst.create_dataframe('df')
    merge(users, asmts, df, left_on='id', right_on='user_id', how='left')
    df2 = dst.create_dataframe('df2')
    s.sort_on(df, df2, ('id_l',))

sorted ('id_l',) index in 0.00018310546875s
  'FirstName' reordered in 0.005521535873413086s
  'LastName' reordered in 0.005347490310668945s
  'bmi' reordered in 0.0029451847076416016s
  'bmi_valid' reordered in 0.0027124881744384766s
  'has_diabetes' reordered in 0.00469517707824707s
  'height_cm' reordered in 0.002373218536376953s
  'height_cm_valid' reordered in 0.002882719039916992s
  'id_l' reordered in 0.002743959426879883s
  'j_valid_from_l' reordered in 0.002353191375732422s
  'j_valid_to_l' reordered in 0.0024633407592773438s
  'year_of_birth' reordered in 0.002484560012817383s
  'year_of_birth_valid' reordered in 0.002560138702392578s
  'abdominal_pain' reordered in 0.00513005256652832s
  'brain_fog' reordered in 0.0047473907470703125s
  'date' reordered in 0.0025992393493652344s
  'id_r' reordered in 0.0029125213623046875s
  'j_valid_from_r' reordered in 0.005130767822265625s
  'j_valid_to_r' reordered in 0.003270387649536133s
  'loss_of_smell' reordered in 0.004971504211425

In [75]:
#sorting with an index
with Session() as s:
    dst = s.open_dataset('temp2.hdf5', 'w', 'dst')
    df = dst.create_dataframe('df')
    merge(users, asmts, df, left_on='id', right_on='user_id', how='left')

    index = s.dataset_sort_index((df['id_l'],))

    # apply indices to a destination dataframe
    df2 = dst.create_dataframe('df2')
    df.apply_index(index, df2)
    print(df2['id_l'].data[:])
    
    # apply indices in place
    df.apply_index(index)
    print(df['id_l'].data[:])

[b'0' b'0' b'0' b'1' b'1' b'1' b'2' b'2' b'2' b'3' b'3' b'3' b'4' b'4'
 b'4' b'5' b'5' b'5' b'6' b'6' b'6' b'7' b'7' b'7' b'8' b'8' b'8' b'9'
 b'9' b'9']
[b'0' b'0' b'0' b'1' b'1' b'1' b'2' b'2' b'2' b'3' b'3' b'3' b'4' b'4'
 b'4' b'5' b'5' b'5' b'6' b'6' b'6' b'7' b'7' b'7' b'8' b'8' b'8' b'9'
 b'9' b'9']


<h3>I/O</h3>

In [80]:
with Session() as s:
    dst = s.open_dataset('temp2.hdf5', 'w', 'dst')
    df = dst.create_dataframe('df')
    merge(users, asmts, df, left_on='id', right_on='user_id', how='left')

    #output a dataframe to to_csv
    df.to_csv('merged.csv')

    #output to csv with row filters
    row_filter = (2022-df['year_of_birth'].data[:]) > 18
    df.to_csv('adults.csv', row_filter)  # save the data you want without change the underlying data in df

    #output to csv with column filters
    df.to_csv('column_filtered.csv', column_filter=['id_l', 'year_of_birth', 'date', 'tested_covid_positive'])  # save the columns you want

In [81]:
!ls *csv

adults.csv  column_filtered.csv  merged.csv


In [82]:
# close src dataset as we open dataset using s=Session()
# this is not necessary if we use context management by with Session as s:
s.close_dataset(src)