In [1]:
%matplotlib inline
import xarray as xr
import numpy as np 
import glob 
import os 
#import pyarrow as pa 
#from tdigest import TDigest
from crick import TDigest as TDigest_cr 
import matplotlib
import matplotlib.pyplot as plt
import multiprocessing
from multiprocessing import Pool
from multiprocessing import Process
import tqdm
import time

os.chdir('/home/b/b382291/git/AQUA') # CHANGE TO CORRECT PATH OR REMOVE IF USING OTHER DATA 
#imported_module = importlib.import_module("aqua")
from aqua import Reader
from aqua.reader import catalogue

!hostname

l40055.lvt.dkrz.de


# use ICON data 

In this example using ICON data over a 20 million grid cells. This can be any data however, the data isn't really important here. Another example below with data from disk

In [2]:

reader = Reader(model="ICON", exp="ngc2009", source="atm_2d_ml_R02B09")
reader.reset_stream()
data_gen = reader.retrieve(streaming_generator=True, stream_step=1, stream_unit = "hours")
count = 0 

for data in data_gen:
    print(f"start_date: {data.time[0].values} stop_date: {data.time[-1].values}")
    data = data.sp
    break 

start_date: 2020-01-20T00:00:00.000000000 stop_date: 2020-01-20T00:30:00.000000000


In [4]:
data = data.compute() 
ds = data.values# [:, 0:10000000] # extract the numpy array 
print(type(ds))
weight, array_length= np.shape(ds)
ds

<class 'numpy.ndarray'>


array([[100823.94 , 101063.91 , 100732.24 , ..., 100901.18 , 100901.18 ,
        100901.18 ],
       [100824.8  , 101064.89 , 100738.81 , ..., 100901.19 , 100909.35 ,
        100901.805]], dtype=float32)

If AQUA above doesn't work just use a random array 

In [7]:
ds = np.random.rand(10000)

#Running sequential 

initalising digests takes around 17 s 
doing the udpate takes around 1 m 15 s for full array 

for 10 million grid cells, init around 10 s, update 35 s (whole thing 42s)

In [4]:
digest_list = []

for j in tqdm.tqdm(range(array_length)):
   # initalising digests 
   digest_list.append(TDigest_cr(compression=1))
   
start_time = time.time() 
for j in tqdm.tqdm(range(array_length)):
   digest_list[j].update(ds[:,j])
print('time', time.time() - start_time, 's')

  1%|          | 109719/10000000 [00:00<00:09, 1097141.35it/s]

100%|██████████| 10000000/10000000 [00:08<00:00, 1174068.42it/s]
100%|██████████| 10000000/10000000 [00:35<00:00, 280746.58it/s]

time 35.62069272994995 s





In the example below attemping to parellelise using Pipes. The initilisation isn't parellelised, that's still sequential for simplicity as it's quicker. When you uncomment the Pipes communicator this takes a long time.

In [5]:
from multiprocessing import Pipe 

def update_digest(job_length, ds_short, short_list): #, connection):

    [short_list[j].update(ds_short[:, j]) for j in range(job_length)]
    #connection.send(short_list)
    
if __name__ == "__main__":
        
    digest_list = []
    for j in tqdm.tqdm(range(array_length)):
       # initalising digests 
       digest_list.append(TDigest_cr(compression=1))
    
    start_time = time.time()
    # Split the work among processes
    num_processes = 22  # this makes it explict 
    chunk_size = array_length // num_processes
    processes = []
    #pipes = []
        
    for i in range(num_processes):
        #conn1, conn2 = Pipe()
        #pipes.append(conn1)
        start = i * chunk_size
        end = (i + 1) * chunk_size if i < num_processes - 1 else array_length
        job_length = end - start
        job_args = (job_length, ds[:, start:end], digest_list[start:end]) #, conn2)
        process =  Process(target=update_digest, args=job_args)
        processes.append(process)

    for process in processes:
        process.start()
    
    # data_chunks = []
    # for conn1 in pipes:        
    #     try:
    #         data_chunk = conn1.recv()
    #         data_chunks.extend(data_chunk)    # read the data from the pipe

    #     except EOFError as err:
    #         print('Got here')
    #     except OSError as err: 
    #         print('OSError')
            
    for process in processes:
        process.join()
        
    print('time', time.time() - start_time, 's')

100%|██████████| 20971520/20971520 [00:17<00:00, 1182262.15it/s]


time 9.243178367614746 s


Tried to use a shared memory object but I can't find a c type object that suports the t-digests

In [13]:
from multiprocessing import Process
from multiprocessing.sharedctypes import Array
from numpy import ones
import ctypes 

# task executed in a child process
def task(array, ds):
    # check some data in the array
    print(array[:10], len(array))
    # change data in the array

    for i in range(len(array)):
        array[i] = ds[1,j]
        #array[i].update(ds[:, i])
        
    # confirm the data was changed
    print(array[300:310], len(array))
    print('changed')
 
# protect the entry point
if __name__ == '__main__':
    
    # define the size of the numpy array
    n = 10000
    # create the numpy array
    data = ones((n,))
    #print(data[:10], data.shape)
    
    data = []
    for j in tqdm.tqdm(range(array_length)):
        # initalising digests 
        data.append(TDigest_cr(compression=1))
        
    #array = RawArray(ctypes.py_object, digest_list)
    # get ctype for our array
    # create ctype array initialized from our array
    array = Array(ctypes.c_wchar_p, data, lock=False)
    # confirm contents of the shared array
    print(array[:10], len(array))
    # create a child process
    child = Process(target=task, args=(array,ds,))
    # start the child process
    child.start()
    # wait for the child process to complete
    child.join()
    # check some data in the shared array
    print(array[:10], len(array))

100%|██████████| 100000/100000 [00:00<00:00, 1087489.08it/s]




TypeError: unicode string or integer address expected instead of crick.tdigest.TDigest instance

In [6]:
array[300]

TDigest<compression=20.0, size=0.0>

Tried with queue, this doesn't really work as it doesn't keep the order of the object. Dont' know if the code below is working

In [11]:
from multiprocessing import Queue 

# Define a function to update the digest for a range of indices

def update_digest(job_length, ds, digest_list, queue):

    [digest_list[j].update(ds[:, j]) for j in range(job_length)]
    #print(np.shape(ds))
    #print('job_length', job_length)
    queue.put(digest_list)

if __name__ == "__main__":
    
    digest_list = []
    for j in tqdm.tqdm(range(array_length)):
       # initalising digests 
       digest_list.append(TDigest_cr(compression=1))
    
    # Split the work among processes
    num_processes = 22  # this makes it explict 
    chunk_size = array_length // num_processes
    # now adding to the digests   
    processes = []
    queues = []

    queue = Queue() 

    for i in range(num_processes):
        #conn1, conn2 = Pipe()
        #queues.append(queue)  # Store the connection objects for later use
        start = i * chunk_size
        end = (i + 1) * chunk_size if i < num_processes - 1 else array_length
        job_length = end - start
        job_args = (job_length, ds[:,start:end], digest_list[start:end], queue)
        process = Process(target=update_digest, args=job_args)
        processes.append(process)

    for process in processes:
        process.start()

    #for queue in queues: 
        
         
    results = []
    for j in range(len(processes)):
        results.append(queue.get())

    for process in processes:
        process.join()
    print('done join')
    
    #results = []
    #for j in range(len(processes)):
    #    results.append(queue.get())

    print(results)
    # new_results = [y for x in results for y in x]

    # Now 'results' contains the updated 'digest_list' from all processes.

100%|██████████| 5000000/5000000 [00:02<00:00, 1927696.14it/s]


KeyboardInterrupt: 

Now trying with Pool as opposed to process, I think the code below is wrong because it takes for ages. The advantage of pool is don't need to worry about the communicator in the same way

In [7]:
# Define a function to calculate pu for a given (i, j) position
from multiprocessing import Pool 

#global calculate_power_output

def update_digests(j, digest_list, ds):
    
    digest_list[j].update(ds[:, j])

    return digest_list[j]

digest_list = []

for j in tqdm.tqdm(range(array_length)):
    # initalising digests 
    digest_list.append(TDigest_cr(compression=1))
        
#jobargs = [(j, digest_list, ds) for j in range(array_length)]

# Create a pool with the number of desired processes
num_processes = 22 # multiprocessing.cpu_count()  # Adjust this value based on your system capabilities
#num_processes = multiprocessing.cpu_count()

# Split the work into chunks for parallel processing
chunk_size = array_length // num_processes
chunks = [(j, digest_list, ds) for j in range(array_length)]

with Pool(num_processes) as pool:
    start = i * chunk_size
    end = (i + 1) * chunk_size if i < num_processes - 1 else array_length
    job_length = end - start
    jobargs = [(j, digest_list[start:end], ds[start:end,:]) for j in range(job_length)]
    #job_args = (job_length, ds[start:end,:], digest_list[start:end])
    res = list(pool.starmap(update_digests, jobargs))

pool.close()
pool.join()


100%|██████████| 5000000/5000000 [00:02<00:00, 2066843.80it/s]


: 

: 