In [14]:
import multiprocessing
import rasterio
from rasterio.io import MemoryFile
import numpy as np

# Set the number of processes to use for parallelization
num_processes = 2#multiprocessing.cpu_count()

# Define the input GeoTIFF file
input_file = "test/det_africa_2019_v2_ps_PSScene_2019_-0004_00039_041_130_000_0000_composite_lshm.tiftemp__66294_25935.tif"

# Define the output memory file
output_memfile = MemoryFile()

# Open the input file in read mode and get its metadata
with rasterio.open(input_file, "r") as src:
    profile = src.profile.copy()
    arr = src.read()
    print("Before",arr.sum())

    # Create the output memory file based on the metadata of the input file
    with rasterio.open(output_memfile, "w", **profile) as dst:

        # Define the function to be executed by each process
        def process_chunk(src,dst,chunks):
            for i in chunks:
                win = i[1]
                arr = src.read(window=win)
                dst.write(arr,window=win)

        # Split the input file into chunks based on the number of processes
        chunks = list(src.block_windows())
        chunk_size = len(chunks) // num_processes

        # Spawn multiple processes to write the chunks in parallel
        processes = []
        for i in range(num_processes):
            start_idx = i * chunk_size
            end_idx = start_idx + chunk_size
            if i == num_processes - 1:
                end_idx = len(chunks)
            process = multiprocessing.Process(target=process_chunk, args=(src,dst,chunks[start_idx:end_idx],))
            processes.append(process)
            process.start()

        # Wait for all processes to finish
        for process in processes:
            process.join()
        arr = dst.read()
        print("After",arr.sum())


Before 43945
After 0


In [19]:
import rasterio
from rasterio.windows import Window
from multiprocessing import Pool
from io import BytesIO

def process_window(args):
    src_path, memfile, window = args
    with rasterio.open(src_path) as src:
        data = src.read(window=window)
        kwargs = src.profile
        kwargs.update(
            driver='GTiff',
            height=window.height,
            width=window.width,
            transform=rasterio.windows.transform(window, src.transform),
            dtype=data.dtype
        )
        with rasterio.MemoryFile() as mem:
            with mem.open(**kwargs) as dst:
                dst.write(data, window=Window(0, 0, window.width, window.height))
            mem.seek(0)
            memfile.write(mem.read())
            
def main():
    src_path = "test/det_africa_2019_v2_ps_PSScene_2019_-0004_00039_041_130_000_0000_composite_lshm.tiftemp__66294_25935.tif"
    num_workers = 2
    tile_size = 256
    with rasterio.open(src_path) as src:
        height, width = src.shape
        windows = [Window(col_off, row_off, tile_size, tile_size)
                   for row_off in range(0, height, tile_size)
                   for col_off in range(0, width, tile_size)]
        with BytesIO() as memfile, Pool(num_workers) as pool:
            pool.map(process_window, [(src_path, memfile, window) for window in windows])
            memfile.seek(0)
            with rasterio.MemoryFile(memfile,"rb") as mem:
                with mem.open() as dst:
                    arr = dst.read()
                    print(arr.sum())

if __name__ == '__main__':
    main()

ERROR 4: `/vsimem/rb/rb.tif' not recognized as a supported file format.


ValueError: 'driver' is required to read/write dataset.

In [2]:
import rasterio
from rasterio.windows import Window
import multiprocessing as mp
import numpy as np

# function to read the data from source raster file into a queue object
def read_raster_data(queue, file_path,num_processes):
    with rasterio.open(file_path) as src:
        for i, window in enumerate(src.block_windows(1)):
            window = window[1]
            data = src.read(1, window=window)
            data[:] = 1
            queue.put((i, window, data))
        for i in range(num_processes):
            queue.put(None)

# function to write chunks to memory file
def write_chunk_to_memory_file(queue, dst,lock):
    running = True 
    while running:
        while (not queue.empty()):
            args = queue.get()
            if args is None:
                running = False
                break
            else:
                i, window, data = args
                with lock:
                    dst.write(data, window=window,indexes=1)
                #print(dst.read().sum())

# main function
def main():
    # source file path
    src_path = "test/det_africa_2019_v2_ps_PSScene_2019_-0004_00039_041_130_000_0000_composite_lshm.tiftemp__66294_25935.tif"

    # destination memory file object
    mem = rasterio.MemoryFile()

    # get source file metadata
    with rasterio.open(src_path) as src:
        meta = src.meta.copy()
        arr = src.read()

    # set destination file metadata
    meta.update(driver='GTiff', count=1)

    
    # create destination memory file
    with rasterio.open(mem, 'w', **meta) as dst:
        # set number of processes
        num_processes =2 #mp.cpu_count()

        # create queue object to store raster data
        queue = mp.JoinableQueue()

        # create processes to read and write raster data
        processes = []
        for i in range(1):
            p = mp.Process(target=read_raster_data, args=(queue, src_path,num_processes))
            processes.append(p)
            p.start()
        
        # create windows for destination file
        dst_windows = [Window(col_off=0, row_off=0, width=meta['width'], height=meta['height'])]
        
        lock = mp.Lock()
        for i in range(num_processes):
            p = mp.Process(target=write_chunk_to_memory_file, args=(queue, dst,lock))
            processes.append(p)
            p.start()

        # join processes
        for p in processes:
            p.join()
            

        
        # # Write the memory file to disk
        # with rasterio.open('output.tif', 'w', **meta) as dst_disk:
        #     dst_disk.write(dst.read())
        mem.seek(0)
        print(arr.sum()) 
        arr = dst.read()
        print(arr.sum())
        queue.close()
main()

43945
0


In [None]:
import threading

import rasterio
from rasterio.windows import Window
from queue import Queue
import multiprocessing as mp
import numpy as np

# function to read the data from source raster file into a queue object
def read_raster_data(queue, file_path,num_processes):
    with rasterio.open(file_path) as src:
        for i, window in enumerate(src.block_windows(1)):
            window = window[1]
            data = src.read(1, window=window)
            data[:] = 1
            queue.put((i, window, data))
        for i in range(num_processes):
            queue.put(None)

# function to write chunks to memory file
def write_chunk_to_memory_file(queue, dst):
    running = True 
    while running:
        while (not queue.empty()):
            args = queue.get()
            if args is None:
                running = False
                break
            else:
                i, window, data = args
                dst.write(data, window=window,indexes=1)
                print(dst.read().sum())

# main function
def main():
    # source file path
    src_path = "test/det_africa_2019_v2_ps_PSScene_2019_-0004_00039_041_130_000_0000_composite_lshm.tiftemp__66294_25935.tif"

    # destination memory file object
    mem = rasterio.MemoryFile()

    # get source file metadata
    with rasterio.open(src_path) as src:
        meta = src.meta.copy()
        arr = src.read()

    # set destination file metadata
    meta.update(driver='GTiff', count=1)

    
    # create destination memory file
    with rasterio.open(mem, 'w', **meta) as dst:
        # set number of processes
        num_processes =10 #mp.cpu_count()

        # create queue object to store raster data
        queue = Queue(maxsize=num_processes)

        # create processes to read and write raster data
        processes = []
        for i in range(1):
            p = mp.Process(target=read_raster_data, args=(queue, src_path,num_processes))
            processes.append(p)
            p.start()
        
        # create windows for destination file
        dst_windows = [Window(col_off=0, row_off=0, width=meta['width'], height=meta['height'])]
        
        for i in range(num_processes):
            p = threading.Thread(target=write_chunk_to_memory_file, args=(queue, dst))#mp.Process(target=write_chunk_to_memory_file, args=(queue, dst))
            processes.append(p)
            p.start()

        # join processes
        for p in processes:
            p.join()
            

        
        # # Write the memory file to disk
        # with rasterio.open('output.tif', 'w', **meta) as dst_disk:
        #     dst_disk.write(dst.read())

        print(arr.sum()) 
        arr = dst.read()
        print(arr.sum())
        queue.close()
main()

In [5]:
import rasterio
import numpy as np
import queue
from joblib import Parallel, delayed
import multiprocessing as mp
import time

# set the input and output filenames
input_filename = "test/det_africa_2019_v2_ps_PSScene_2019_-0004_00039_041_130_000_0000_composite_lshm.tiftemp__66294_25935.tif"
output_filename = 'file.tif'




    

# open the input file using Rasterio
with rasterio.open(input_filename) as src:
    # get the profile of the input file
    profile = src.profile.copy()

    # set up the memory file using Rasterio
    memfile = rasterio.MemoryFile()
    with memfile.open(**profile) as dst:
        
        # set up a queue to manage the chunks of data
        chunk_queue = mp.JoinableQueue()
        
        # create processes to read and write raster data
        for i in range(1):
            p = mp.Process(target=read_raster_data, args=(chunk_queue, src,4))
            p.start()

        # define the function to write the chunks of data
        def write_chunk(dst, data, window):
            dst.write(data, window=window,indexes=1)

        # define the function to be executed by each thread
        def process_chunk():
            while True:
                try:
                    d = chunk_queue.get(block=False)
                    if d is None:
                        break
                    i,window,data = d
                    write_chunk(dst, data, window)
                    chunk_queue.task_done()
                except queue.Empty:
                    break
        
        # start the threads using joblib
        Parallel(n_jobs=-1, prefer='threads')(delayed(process_chunk)() for i in range(4))

        # # write the output to disk
        # with rasterio.open(output_filename, 'w', **profile) as dst:
        #     for i in range(1, dst.count + 1):
        #         dst.write(memfile.getband(i), i)
        # # Write the memory file to disk
        with rasterio.open(output_filename, 'w', **profile) as raster:
            raster.write(dst.read())
        
p.join()

Process Process-5:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 318, in _bootstrap
    util._exit_function()


KeyboardInterrupt: 

  File "/usr/lib/python3.8/multiprocessing/util.py", line 360, in _exit_function
    _run_finalizers()
  File "/usr/lib/python3.8/multiprocessing/util.py", line 300, in _run_finalizers
    finalizer()
  File "/usr/lib/python3.8/multiprocessing/util.py", line 224, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 195, in _finalize_join
    thread.join()
  File "/usr/lib/python3.8/threading.py", line 1011, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt


In [None]:
import rasterio
from rasterio.windows import Window
import multiprocessing

# Define the window size
window_size = 256

# Open the input raster dataset
with rasterio.open("test/det_africa_2019_v2_ps_PSScene_2019_-0004_00039_041_130_000_0000_composite_lshm.tiftemp__66294_25935.tif") as src:

    # Define the output raster dataset
    dst_profile = src.profile
    dst_profile['driver'] = 'GTiff'
    dst_profile['count'] = 1
    dst_profile['compress'] = 'lzw'

    # Create a MemoryFile for the output raster
    with rasterio.MemoryFile() as memfile:

        # Create the output raster dataset
        with memfile.open(**dst_profile) as dst:

            # Define the window parameters
            width = src.width
            height = src.height

            windows = []
            for i in range(0, height, window_size):
                for j in range(0, width, window_size):
                    window = Window(j, i, min(window_size, width - j), min(window_size, height - i))
                    windows.append(window)

            # Define the worker function for each process
            def write_window(window):
                # Read the data from the input raster
                data = src.read(window=window)

                # Write the data to the output raster
                dst.write(data, window=window)

            # Create a process for each window and run them in parallel
            with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
                pool.map(write_window, windows)
                
            # Do something with the output raster here, like saving it to disk
            memfile.seek(0)
            with open('output.tif', 'wb') as f:
                 f.write(memfile.read())

In [2]:
import threading
import queue
import multiprocessing as mp
import rasterio
import numpy as np
import time 
from joblib import Parallel, delayed


# Define the number of threads to use
NUM_THREADS = 20

# Define the input and output files
input_file = "/home/jovyan/work/satellite_data/tmp/inference/smp_unet_mitb3_08_03_2023_170715.pth/2018/tmp_shape_22_03_2023_103021_201.tif"
#"test/det_africa_2019_v2_ps_PSScene_2019_-0004_00039_041_130_000_0000_composite_lshm.tiftemp__66294_25935.tif"
#

# Create a MemoryFile for the output data
output_memfile = rasterio.io.MemoryFile()


# function to read the data from source raster file into a queue object
def read_raster_data(queue, src,num_processes):
    for i, window in src.block_windows(1):
        data = src.read(1, window=window)
        #data[:] = 1
        queue.put(( data,window))
    for i in range(num_processes):
        queue.put(None)

with rasterio.Env(GDAL_CACHEMAX=32000000):
    # Open the input file
    with rasterio.open(input_file) as src:

        # Define the output profile based on the input profile
        output_profile = src.profile.copy()
        output_profile.update(count=1,num_threads="ALL_CPUS")

        # Create a writer for the output data
        output_writer = output_memfile.open(**output_profile)

        # Define a queue for the input windows
        input_queue =mp.JoinableQueue(NUM_THREADS*5)#eue.Queue() queue.Queue(NUM_THREADS) #

        # create processes to read and write raster data
        for i in range(1):
            p = mp.Process(target=read_raster_data, args=(input_queue, src,NUM_THREADS))
            p.start()

        # Define a function to process input windows
        def process_window():
            while True:
                # Get an input window from the queue
                try:
                    d = input_queue.get(block=False)
                    if d is None:
                        break
                    data,window= d

                    # Write the data to the output window
                    output_writer.write(data, window=window,indexes=1)

                    # Mark the input window as done
                    input_queue.task_done()
                except queue.Empty:
                    continue

        start = time.time()
        # Start the threads to process the input windows
        # threads = []
        # for i in range(NUM_THREADS):
        #     t = threading.Thread(target=process_window)
        #     t.start()
        #     threads.append(t)
            
        # # start the threads using joblib
        Parallel(n_jobs=-1, prefer='threads')(delayed(process_window)() for i in range(NUM_THREADS))

        # Wait for all input windows to be processed
        # for t in threads:
        #     t.join()

        end = time.time()
        a = output_writer.read().copy()
        print(output_writer.read().sum())
        print(end-start)
        output_writer.close()
        p.close()
        input_queue.close()
        b = src.read().copy()
        print(b.sum())
    # Close the output writer
    with rasterio.open("output.tif", 'w', **output_profile) as raster:
        raster.write(a)

Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()

KeyboardInterrupt

  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_47433/1195922539.py", line 27, in read_raster_data
    queue.put(( data,window))
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 306, in put
    if not self._sem.acquire(block, timeout):
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 318, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.8/multiprocessing/util.py", line 360, in _exit_function
    _run_finalizers()
  File "/usr/lib/python3.8/multiprocessing/util.py", line 300, in _run_finalizers
    finalizer()
  File "/usr/lib/python3.8/multiprocessing/util.py", line 22