In [14]:
import timeit
import pandas as pd
import numpy as np
import time
import glob
import timeit
from astropy.table import Table
import dask.dataframe as dd
import threading
import os
# use the env ../speedPy.yml
def summary(name,timeit_results):
    r=np.array(timeit_results)
    print(f'{name} {r.size} runs: \n Mean {r.mean()} \n Standard deviation {r.std()}')

### Reading many large files
Often when searching through large catalogs the data is divided into many large files. This can be slow to read into memory. This is normally avoided by having a database for the data and searching it that way. This is not always the case and can be difficult to query if you don't ordinarily use databases. I'll use the example large csv files here

In [5]:
list_of_large_files=glob.glob('/Users/danielwarshofsky/Desktop/everything/zenodo/all_preds/*.csv')

def read_one_file(path): # seems weird to define this but will be useful for later optimizations
    df=pd.read_csv(path)
    return df

def read_all_files(paths):# just loop and concat the other function
    df=pd.DataFrame()
    for path in paths:
        temp=read_one_file(path)
        df=pd.concat([df,temp]).reset_index(drop=True)
    return df

In [6]:
#doing all of the files will take too long. Lets just try 3 files at a time
bad_loop_times=np.arange(0,10).astype(float)
for i in range(bad_loop_times.shape[0]):
    ts=time.time()
    a=read_all_files(list_of_large_files[0:3])
    te=time.time()
    bad_loop_times[i]=te-ts
print(f'Mean 3 file read time : {bad_loop_times.mean():.3f} sec. Min: {min(bad_loop_times)} Max: {max(bad_loop_times)}')

Mean 3 file read time : 18.542 sec. Min: 17.557749032974243 Max: 21.573328256607056


Even with the minimum time this would be too long if you need to read 100s of files.

now lets try with threading (mostly from https://medium.com/codex/reading-files-fast-with-multi-threading-in-python-ff079f40fe56)

In [7]:
def multi_threaded_file_reader(file_paths):
    threads = []
    results = {}

    # Define the worker function
    def read_file_thread(file_path):
        result = read_one_file(file_path)
        results[file_path] = result

    # Create and start threads
    for file_path in file_paths:
        thread = threading.Thread(target=read_file_thread, args=(file_path,))
        threads.append(thread)
        thread.start()

    # Wait for all threads to finish
    for thread in threads:
        thread.join()
    # concat things
    df=pd.concat([results[key] for key in results.keys() ]).reset_index(drop=True)
    return df

In [8]:
multi_thread_times=np.arange(0,10).astype(float)
for i in range(multi_thread_times.shape[0]):
    ts=time.time()
    a=multi_threaded_file_reader(list_of_large_files[0:3])
    te=time.time()
    multi_thread_times[i]=te-ts
print(f'Mean 3 files read time : {multi_thread_times.mean():.3f} sec. Min: {min(multi_thread_times)} Max: {max(multi_thread_times)}')
print(f'Multi thread on average has a  {100*(bad_loop_times.mean()-multi_thread_times.mean())/bad_loop_times.mean():.3f}% speed increase over bad loop')

Mean 3 files read time : 11.094 sec. Min: 10.58876895904541 Max: 11.500808954238892
Multi thread on average has a  40.165% speed increase over bad loop


What about Dask?

In [9]:
dask_times=np.arange(0,10).astype(float)
for i in range(dask_times.shape[0]):
    ts=time.time()
    df=dd.read_csv(list_of_large_files[0:3])
    df.compute()
    te=time.time()
    dask_times[i]=te-ts
print(f'Mean 3 files read time : {dask_times.mean():.3f} sec. Min: {min(dask_times)} Max: {max(dask_times)}')
print(f'Dask on average has a  {100*(bad_loop_times.mean()-dask_times.mean())/bad_loop_times.mean():.3f}% speed increase over bad loop')

Mean 3 files read time : 6.134 sec. Min: 5.608299970626831 Max: 7.671574115753174
Dask on average has a  66.919% speed increase over bad loop


Dask is much faster than the rest; what is it doing different? Dask efficiently parallelizes the reading by managing many smaller dataframes called partitions. Operations can be preformed on all of the partitions in parallel. However when you call .compute() everything is concatanated together which can take some time.  Lets see how doing an operation on the data before or after the .compute() affects the timeing.

In [10]:
var='vnv_dnn > .9 & vnv_xgb > .9'
per='pnp_dnn > .9 & pnp_xgb > .9'
eclipse='e_dnn > .9 & e_xgb > .9'
non_acc='bis_dnn > .7 & bis_xgb > .7'
q=f'{var} & {per} & {eclipse} & {non_acc}' #some simple filter

q_last_times=np.arange(0,10).astype(float)
for i in range(q_last_times.shape[0]):
    df=dd.read_csv(list_of_large_files[0:3])
    ts=time.time()
    df.compute()
    df=df.query(q)
    te=time.time()
    q_last_times[i]=te-ts

q_first_times=np.arange(0,10).astype(float)
for i in range(q_first_times.shape[0]):
    df=dd.read_csv(list_of_large_files[0:3])
    ts=time.time()
    df=df.query(q)
    df.compute()
    te=time.time()
    q_first_times[i]=te-ts

print(f'First: {q_first_times.mean()}  Last: {q_last_times.mean()}')

First: 5.070731568336487  Last: 6.331090259552002


Doing the operation first is a better time wise and is much better memory wise. This is how you should do this in the future!

### File format
This whole time we have assumed that the data is in csv format. How do other file formats compare?

In [None]:
# make a large array
waste_of_memory=[np.random.uniform(size=int(1e5)) for i in range(10)]# 1 mill floats
col_names=[str(i) for i in range(10)]
waste_table=Table(waste_of_memory,names=col_names)
del waste_of_memory
waste_table.write('waste.parquet',format='parquet',overwrite=True)
waste_table.write('waste.hdf5',format='hdf5',overwrite=True)
waste_table.write('waste.votable',format='votable',overwrite=True)
waste_table.write('waste.csv',format='csv',overwrite=True)

First lets look at the memory footprint of the files

In [12]:
files=glob.glob('waste.*')
for file in files:
    print(f'{file.split(".")[-1]} format is {os.path.getsize(file)*1e-6} MB')

hdf5 format is 8.004095999999999 MB
csv format is 19.268774999999998 MB
parquet format is 10.132379 MB
votable format is 36.369704 MB


Already we can see that parquet and hdf5 files have a smaller footprint in memory. on to the speed tests!

In [15]:
for file in files:
    fmat=file.split(".")[-1]
    t=timeit.Timer(f'Table.read(file)',globals=globals()).repeat(10,1)
    summary(f'{fmat} read time',t)

hdf5 read time 10 runs: 
 Mean 0.00900921680040483 
 Standard deviation 0.0006776081524732835
csv read time 10 runs: 
 Mean 0.3706228706996626 
 Standard deviation 0.007344938178109973
parquet read time 10 runs: 
 Mean 0.011296174999733921 
 Standard deviation 0.008198364424538375
votable read time 10 runs: 
 Mean 3.143551466600184 
 Standard deviation 0.020637295549368342


We can see that hdf5 and parquet have smaller memory footprints and can be read much faster! Consider using them in the future!