import psutil import os import gc import pyarrow as pa import time import sys #gc.set_debug(gc.DEBUG_LEAK) #gc.set_threshold(0,0,0) #pa.set_memory_pool(pa.mimalloc_memory_pool()) #pa.set_memory_pool(pa.system_memory_pool()) #from memory_profiler import profile import tracemalloc #pa.jemalloc_set_decay_ms(0) #pa.log_memory_allocations(enable=True) ARROW_PATH = "tmp.arrow" if not os.path.exists(ARROW_PATH): arr = pa.array([b"a" * (2000 * 1024)] * 5000) # ~10000MB table = pa.table({"a": arr}) with open(ARROW_PATH, "wb") as f: #writer = pa.RecordBatchStreamWriter(f, schema=table.schema) writer = pa.RecordBatchFileWriter(f, schema=table.schema) writer.write_table(table) writer.close() def memory_mapped_arrow_table_from_file(filename: str) -> pa.Table: memory_mapped_stream = pa.memory_map(filename, 'r') #opened_stream = pa.ipc.open_stream(memory_mapped_stream)#, memory_pool=pa.system_memory_pool()) #pa_table = opened_stream.read_all() #pa_table = memory_mapped_stream.readall(memory_mapped_stream) pa_table = pa.ipc.open_file(memory_mapped_stream).read_all() return pa_table, memory_mapped_stream, None return pa_table, memory_mapped_stream, opened_stream table, memory_mapped_stream, opened_stream = memory_mapped_arrow_table_from_file(ARROW_PATH) arr = table[0] start_use = pa.total_allocated_bytes() pool = pa.default_memory_pool() start_peak_use = pool.max_memory() tracemalloc.start() first_size, first_peak = tracemalloc.get_traced_memory() #@profile #def main(): mem_before = psutil.Process(os.getpid()).memory_info().rss / 2**20 for idx, x in enumerate(arr): #idx1 = "asdadsf" #import objgraph #objgraph.show_refs([x]) #print(f"sys.getrefcount(x) {sys.getrefcount(x):2d} {sys.getrefcount(arr):2d}") #gc.collect() #print(f"len(gc.get_referrers(x) {len(gc.get_referrers(x)):2d} {len(gc.get_referrers(arr)):2d}") #print(gc.get_referrers(idx)) #print(gc.get_referrers(x)) #sys.exit() if idx % 10 == 0: gc.collect() time.sleep(0.1) mem_after = psutil.Process(os.getpid()).memory_info().rss / 2**20 mem_use = pa.total_allocated_bytes() - start_use mem_peak = pool.max_memory() - start_peak_use second_size, second_peak = tracemalloc.get_traced_memory() mem_diff = (second_size - first_size) / 2**20 mem_peak_diff = (second_peak - first_peak) / 2**20 # pa.jemalloc_memory_pool().release_unused() # pa.mimalloc_memory_pool().release_unused() # pa.system_memory_pool().release_unused() print(f"{idx:4d} {mem_after - mem_before:12.4f}MB {mem_diff:12.4f} {mem_peak_diff:12.4f} {memory_mapped_stream.size()/2**20:4.4}MB {mem_use/2**20:4.4}MB {mem_peak/2**20:4.4}MB") x = 5 mem_pool = pa.default_memory_pool() print(f"PyArrow mem pool info: {mem_pool.backend_name} backend, {mem_pool.bytes_allocated()} allocated, " f"{mem_pool.max_memory()} max allocated, ") print(f"PyArrow total allocated bytes: {pa.total_allocated_bytes()}") #main() # Important discussions # https://issues.apache.org/jira/browse/ARROW-11007 - looks very similar to our issue # in particular this part of the report: # https://issues.apache.org/jira/browse/ARROW-11007?focusedCommentId=17279642&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17279642