Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Memory leak in pq.read_table and table.to_pandas #18431

Open
asfimport opened this issue Dec 22, 2020 · 30 comments
Open

[Python] Memory leak in pq.read_table and table.to_pandas #18431

asfimport opened this issue Dec 22, 2020 · 30 comments

Comments

@asfimport
Copy link
Collaborator

asfimport commented Dec 22, 2020

While upgrading our application to use pyarrow 2.0.0 instead of 0.12.1, we observed a memory leak in the read_table and to_pandas methods. See below for sample code to reproduce it. Memory does not seem to be returned after deleting the table and df as it was in pyarrow 0.12.1.

Sample Code

import io

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from memory_profiler import profile


@profile
def read_file(f):
    table = pq.read_table(f)
    df = table.to_pandas(strings_to_categorical=True)
    del table
    del df


def main():
    rows = 2000000
    df = pd.DataFrame({
        "string": ["test"] * rows,
        "int": [5] * rows,
        "float": [2.0] * rows,
    })
    table = pa.Table.from_pandas(df, preserve_index=False)
    parquet_stream = io.BytesIO()
    pq.write_table(table, parquet_stream)

    for i in range(3):
        parquet_stream.seek(0)
        read_file(parquet_stream)


if __name__ == '__main__':
    main()

Python 3.8.5 (conda), pyarrow 2.0.0 (pip), pandas 1.1.2 (pip) Logs

Filename: C:/run_pyarrow_memoy_leak_sample.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     9    161.7 MiB    161.7 MiB           1   @profile
    10                                         def read_file(f):
    11    212.1 MiB     50.4 MiB           1       table = pq.read_table(f)
    12    258.2 MiB     46.1 MiB           1       df = table.to_pandas(strings_to_categorical=True)
    13    258.2 MiB      0.0 MiB           1       del table
    14    256.3 MiB     -1.9 MiB           1       del df


Filename: C:/run_pyarrow_memoy_leak_sample.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     9    256.3 MiB    256.3 MiB           1   @profile
    10                                         def read_file(f):
    11    279.2 MiB     23.0 MiB           1       table = pq.read_table(f)
    12    322.2 MiB     43.0 MiB           1       df = table.to_pandas(strings_to_categorical=True)
    13    322.2 MiB      0.0 MiB           1       del table
    14    320.3 MiB     -1.9 MiB           1       del df


Filename: C:/run_pyarrow_memoy_leak_sample.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     9    320.3 MiB    320.3 MiB           1   @profile
    10                                         def read_file(f):
    11    326.9 MiB      6.5 MiB           1       table = pq.read_table(f)
    12    361.7 MiB     34.8 MiB           1       df = table.to_pandas(strings_to_categorical=True)
    13    361.7 MiB      0.0 MiB           1       del table
    14    359.8 MiB     -1.9 MiB           1       del df

Python 3.5.6 (conda), pyarrow 0.12.1 (pip), pandas 0.24.1 (pip) Logs

Filename: C:/run_pyarrow_memoy_leak_sample.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     9    138.4 MiB    138.4 MiB           1   @profile
    10                                         def read_file(f):
    11    186.2 MiB     47.8 MiB           1       table = pq.read_table(f)
    12    219.2 MiB     33.0 MiB           1       df = table.to_pandas(strings_to_categorical=True)
    13    171.7 MiB    -47.5 MiB           1       del table
    14    139.3 MiB    -32.4 MiB           1       del df


Filename: C:/run_pyarrow_memoy_leak_sample.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     9    139.3 MiB    139.3 MiB           1   @profile
    10                                         def read_file(f):
    11    186.8 MiB     47.5 MiB           1       table = pq.read_table(f)
    12    219.2 MiB     32.4 MiB           1       df = table.to_pandas(strings_to_categorical=True)
    13    171.5 MiB    -47.7 MiB           1       del table
    14    139.1 MiB    -32.4 MiB           1       del df


Filename: C:/run_pyarrow_memoy_leak_sample.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     9    139.1 MiB    139.1 MiB           1   @profile
    10                                         def read_file(f):
    11    186.8 MiB     47.7 MiB           1       table = pq.read_table(f)
    12    219.2 MiB     32.4 MiB           1       df = table.to_pandas(strings_to_categorical=True)
    13    171.8 MiB    -47.5 MiB           1       del table
    14    139.3 MiB    -32.4 MiB           1       del df

Reporter: Michael Peleshenko
Assignee: Weston Pace / @westonpace

Related issues:

Original Issue Attachments:

Note: This issue was originally created as ARROW-11007. Please see the migration documentation for further details.

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:
Hello, thank you for writing up this analysis.  Pyarrow uses jemalloc, a custom memory allocator which does its best to hold onto memory allocated from the OS (since this can be an expensive operation).  Unfortunately, this makes it difficult to track line by line memory usage with tools like memory_profiler.  There are a couple of options:

  • You could use https://arrow.apache.org/docs/python/generated/pyarrow.total_allocated_bytes.html#pyarrow.total_allocated_bytes to track allocation instead of using memory_profiler (it might be interesting to see if there is a way to get memory_profile to use this function instead of kernel statistics).

  • You can also put the following line at the top of your script, this will configure jemalloc to release memory immediately instead of holding on to it (this will likely have some performance implications):

    pa.jemalloc_set_decay_ms(0)

     

    The behavior you are seeing is pretty typical for jemalloc.  For further reading, in addition to reading up on jemalloc itself, I encourage you to take a look at these other issues for more discussions and examples of jemalloc behaviors:

     

    https://issues.apache.org/jira/browse/ARROW-6910

    https://issues.apache.org/jira/browse/ARROW-7305

     

    I have run your test read 10,000 times and it seems that memory usage does predictably stabilize.  In addition, total_allocated_bytes is behaving exactly as expected.  So I do not believe there is any evidence of a memory leak in this script.

@asfimport
Copy link
Collaborator Author

Michael Peleshenko:
@westonpace  Thanks for the detailed comment. I tried adding pa.jemallic_set_decay_ms(0), but I ran into the below error which seems to indicate jemalloc is not being used. I suspect this is because I am running on Windows 10.

Traceback (most recent call last):
  File "C:/Users/mipelesh/Workspace/Git/Lynx/src-pyLynx/pyLynx/run_pyarrow_memoy_leak_sample.py", line 35, in <module>
    main()
  File "C:/Users/mipelesh/Workspace/Git/Lynx/src-pyLynx/pyLynx/run_pyarrow_memoy_leak_sample.py", line 18, in main
    pa.jemalloc_set_decay_ms(0)
  File "pyarrow\memory.pxi", line 171, in pyarrow.lib.jemalloc_set_decay_ms
  File "pyarrow\error.pxi", line 84, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: jemalloc support is not built

@asfimport
Copy link
Collaborator Author

Wes McKinney / @wesm:
Depending on how you obtained pyarrow, it may be using mimalloc rather than the system allocator. We've done comparatively much less analysis of memory usage behaviors when using mimalloc, but we do know empirically that mimalloc improves application performance on Windows.

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
Also note that "does not return memory immediately" is different from "memory leak". The allocator (mimalloc and/or the system allocator, here) may opt to cache deallocated blocks instead of returning them to the system, because returning them is costly (it's a system call), and allocations often happen in a repeated fashion.

If you're merely worried about a potential memory leak, the way to check for it is to run your function in a loop and check whether memory occupation is constantly increasing, or if it quickly reaches a stable plateau.

@asfimport
Copy link
Collaborator Author

Joris Van den Bossche / @jorisvandenbossche:
Given that this comes up from time to time, it might be useful to document this to some extent: expectations around watching memory usage (explaining that the deallocated memory might be cached by the memory allocator etc), how you can actually see how much memory is used (total_allocated_bytes), how you can check if high "apparent" memory usage is indeed related to this and not caused by a memory leak (use pa.jemalloc_set_decay_ms(0); run your function many times in a loop and see it stabilizes or keeps constantly growing), ..

@asfimport
Copy link
Collaborator Author

Dmitry Kashtanov:
I have a somewhat similar issue observed both on pyarrow v1.0.0 and v3.0.0 in Linux (within Docker containers, python:3.8-slim-based image, local and AWS Fargate). The issue is with reading from BigQuery with BigQuery Storage API using ARROW data format. Under the hood it downloads a set of RecordBatches and combines them into a Table. After this in my code the Table is converted to a pandas DataFrame and then deleted, but the Table's memory is not released to OS.

This behavior remains also if I use mimalloc or system-based pools set either in code or via ARROW_DEFAULT_MEMORY_POOL environment variable.

Also after that I drop a referenced column (not copied) from that pandas DataFrame, this results in DataFrame data copy and the memory from the original DataFrame is also not released to OS. The subsequent transformations of the DataFrame release memory as expected.

The exactly same code with exactly same Python (3.8.7) and packages versions on MacOS releases memory to OS as expected (also will all kinds of the memory pool).

 

The very first lines of the script are:

import pyarrow
pyarrow.jemalloc_set_decay_ms(0)

 

Mac OS:

 

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
   460    141.5 MiB    141.5 MiB           1   @profile
   461                                         def bqs_stream_to_pandas(session, stream_name):
   463    142.2 MiB      0.7 MiB           1       client = bqs.BigQueryReadClient()
   464    158.7 MiB     16.5 MiB           1       reader = client.read_rows(name=stream_name, offset=0)
   465   1092.2 MiB    933.5 MiB           1       table = reader.to_arrow(session)
   470   2725.1 MiB   1632.5 MiB           2       dataset = table.to_pandas(deduplicate_objects=False, split_blocks=False, self_destruct=False,
   471   1092.6 MiB      0.0 MiB           1                                 strings_to_categorical=True,)
   472   1405.0 MiB  -1320.1 MiB           1       del table
   473   1405.0 MiB      0.0 MiB           1       del reader
   474   1396.1 MiB     -8.9 MiB           1       del client
   475   1396.1 MiB      0.0 MiB           1       time.sleep(1)
   476   1396.1 MiB      0.0 MiB           1       if MEM_PROFILING:
   477   1396.1 MiB      0.0 MiB           1           mem_pool = pyarrow.default_memory_pool()
   478   1396.1 MiB      0.0 MiB           1           print(f"PyArrow mem pool info: {mem_pool.backend_name} backend, {mem_pool.bytes_allocated()} allocated, "
   479                                                       f"{mem_pool.max_memory()} max allocated, ")
   480   1396.1 MiB      0.0 MiB           1           print(f"PyArrow total allocated bytes: {pyarrow.total_allocated_bytes()}")
   481   1402.4 MiB      6.3 MiB           1       mem_usage = dataset.memory_usage(index=True, deep=True)
   485   1404.2 MiB      0.0 MiB           1       return dataset

# Output
PyArrow mem pool info: jemalloc backend, 1313930816 allocated, 1340417472 max allocated,
PyArrow total allocated bytes: 1313930816

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
...
   139   1477.7 MiB      0.4 MiB           1           dataset_label = dataset[label_column].astype(np.int8)
   140
   141   1474.2 MiB     -3.5 MiB           1           dataset.drop(columns=label_column, inplace=True)
   142   1474.2 MiB      0.0 MiB           1           gc.collect()
   143
   144   1474.2 MiB      0.0 MiB           1           if MEM_PROFILING:
   145   1474.2 MiB      0.0 MiB           1               mem_pool = pyarrow.default_memory_pool()
   146   1474.2 MiB      0.0 MiB           1               print(f"PyArrow mem pool info: {mem_pool.backend_name} backend, {mem_pool.bytes_allocated()} allocated, "
   147                                                           f"{mem_pool.max_memory()} max allocated, ")
   148   1474.2 MiB      0.0 MiB           1               print(f"PyArrow total allocated bytes: {pyarrow.total_allocated_bytes()}")

# Output
PyArrow mem pool info: jemalloc backend, 0 allocated, 1340417472 max allocated,
PyArrow total allocated bytes: 0

 

 

Linux (python:3.8-slim``-based image):

 

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
   460    153.0 MiB    153.0 MiB           1   @profile
   461                                         def bqs_stream_to_pandas(session, stream_name):
   463    153.5 MiB      0.6 MiB           1       client = bqs.BigQueryReadClient()
   464    166.9 MiB     13.4 MiB           1       reader = client.read_rows(name=stream_name, offset=0)
   465   1567.5 MiB   1400.6 MiB           1       table = reader.to_arrow(session)
   469   1567.5 MiB      0.0 MiB           1       report_metric('piano.ml.preproc.pyarrow.table.bytes', table.nbytes)
   470   2843.7 MiB   1276.2 MiB           2       dataset = table.to_pandas(deduplicate_objects=False, split_blocks=False, self_destruct=False,
   471   1567.5 MiB      0.0 MiB           1                                 strings_to_categorical=True,)
   472   2843.7 MiB      0.0 MiB           1       del table
   473   2843.7 MiB      0.0 MiB           1       del reader
   474   2843.9 MiB      0.2 MiB           1       del client
   475   2842.2 MiB     -1.8 MiB           1       time.sleep(1)
   476   2842.2 MiB      0.0 MiB           1       if MEM_PROFILING:
   477   2842.2 MiB      0.0 MiB           1           mem_pool = pyarrow.default_memory_pool()
   478   2842.2 MiB      0.0 MiB           1           print(f"PyArrow mem pool info: {mem_pool.backend_name} backend, {mem_pool.bytes_allocated()} allocated, "
   479                                                       f"{mem_pool.max_memory()} max allocated, ")
   480   2842.2 MiB      0.0 MiB           1           print(f"PyArrow total allocated bytes: {pyarrow.total_allocated_bytes()}")
   481   2838.9 MiB     -3.3 MiB           1           mem_usage = dataset.memory_usage(index=True, deep=True)
   485   2839.1 MiB      0.0 MiB           1       return dataset

# Output
PyArrow mem pool info: jemalloc backend, 1313930816 allocated, 1338112064 max allocated,
PyArrow total allocated bytes: 1313930816

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
...
   139   2839.1 MiB      0.0 MiB           1           dataset_label = dataset[label_column].astype(np.int8)
   140
   141   2836.6 MiB     -2.6 MiB           1           dataset.drop(columns=label_column, inplace=True)
   142   2836.6 MiB      0.0 MiB           1           gc.collect()
   143
   144   2836.6 MiB      0.0 MiB           1           if MEM_PROFILING:
   145   2836.6 MiB      0.0 MiB           1               mem_pool = pyarrow.default_memory_pool()
   146   2836.6 MiB      0.0 MiB           1               print(f"PyArrow mem pool info: {mem_pool.backend_name} backend, {mem_pool.bytes_allocated()} allocated, "
   147                                                           f"{mem_pool.max_memory()} max allocated, ")
   148   2836.6 MiB      0.0 MiB           1               print(f"PyArrow total allocated bytes: {pyarrow.total_allocated_bytes()}")

# Output
PyArrow mem pool info: jemalloc backend, 0 allocated, 1338112064 max allocated,
PyArrow total allocated bytes: 0

 

 

A case with dropping a referenced (not copied) column:

 

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
...
   134   2872.0 MiB      0.0 MiB           1           dataset_label = dataset[label_column]
   135
   136   4039.4 MiB   1167.4 MiB           1           dataset.drop(columns=label_column, inplace=True)
   137   4035.9 MiB     -3.6 MiB           1               gc.collect()
   138
   139   4035.9 MiB      0.0 MiB           1           if MEM_PROFILING:
   140   4035.9 MiB      0.0 MiB           1               mem_pool = pyarrow.default_memory_pool()
   141   4035.9 MiB      0.0 MiB           1               print(f"PyArrow mem pool info: {mem_pool.backend_name} backend, {mem_pool.bytes_allocated()} allocated, "
   142                                                           f"{mem_pool.max_memory()} max allocated, ")

# Output
PyArrow mem pool info: jemalloc backend, 90227904 allocated, 1340299200 max allocated,

 

 

Package versions:

 

boto3==1.17.1
botocore==1.20.1
cachetools==4.2.1
certifi==2020.12.5
cffi==1.14.4
chardet==4.0.0
google-api-core[grpc]==1.25.1
google-auth==1.25.0
google-cloud-bigquery-storage==2.2.1
google-cloud-bigquery==2.7.0
google-cloud-core==1.5.0
google-crc32c==1.1.2
google-resumable-media==1.2.0
googleapis-common-protos==1.52.0
grpcio==1.35.0
idna==2.10
jmespath==0.10.0
joblib==1.0.0
libcst==0.3.16
memory-profiler==0.58.0
mypy-extensions==0.4.3
numpy==1.20.0
pandas==1.2.1
proto-plus==1.13.0
protobuf==3.14.0
psutil==5.8.0
pyarrow==3.0.0
pyasn1-modules==0.2.8
pyasn1==0.4.8
pycparser==2.20
python-dateutil==2.8.1
pytz==2021.1
pyyaml==5.4.1
requests==2.25.1
rsa==4.7
s3transfer==0.3.4
scikit-learn==0.24.1
scipy==1.6.0
setuptools-scm==5.0.1
six==1.15.0
smart-open==4.1.2
threadpoolctl==2.1.0
typing-extensions==3.7.4.3
typing-inspect==0.6.0
unidecode==1.1.2
urllib3==1.26.3

 

 

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
As you can see, the memory was returned to the allocator ("0 allocated"). The allocator is then free to return those pages to the OS or not.

Also, how is "Mem usage" measured in your script?

@asfimport
Copy link
Collaborator Author

Dmitry Kashtanov:
"Mem usage" is by memory_profiler.

And as we may see, the following line doesn't help.
pyarrow.jemalloc_set_decay_ms(0)

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:

"Mem usage" is by memory_profiler.

That doesn't really answer the question: what does it measure? RSS? Virtual memory size?

And as we may see, the following line doesn't help.

Perhaps, but I still don't see what Arrow could do, or even if there is an actual problem.

Can you run "bqs_stream_to_pandas" in a loop and see whether memory usage increases? Or does it stay stable as its initial peak value?

@asfimport
Copy link
Collaborator Author

Dmitry Kashtanov:

MALLOC_CONF="background_thread:true,narenas:1,tcache:false,dirty_decay_ms:0,muzzy_decay_ms:0"

Specifying the above environment variable also doesn't help for jemalloc.

The suspicious things are that everything works in MacOS and that also that all allocators behave similarly. 

@asfimport
Copy link
Collaborator Author

Dmitry Kashtanov:
 

 That doesn't really answer the question: what does it measure? RSS? Virtual memory size?

It looks like memory_profiler uses the first item from the tuple returned by psutil.Process().memory_info() which is rss.

 

 Can you run "bqs_stream_to_pandas" in a loop and see whether memory usage increases? Or does it stay stable as its initial peak value?

PSB. It doesn't increase (almost).

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
...
   117   2866.0 MiB   2713.1 MiB           1       dataset = bqs_stream_to_pandas(session, stream_name)
   118   2865.6 MiB     -0.4 MiB           1       del dataset
   119   2874.6 MiB      9.0 MiB           1       dataset = bqs_stream_to_pandas(session, stream_name)
   120   2874.6 MiB      0.0 MiB           1       del dataset
   121   2887.0 MiB     12.4 MiB           1       dataset = bqs_stream_to_pandas(session, stream_name)
   122   2878.2 MiB     -8.8 MiB           1       del dataset
   123   2903.2 MiB     25.1 MiB           1       dataset = bqs_stream_to_pandas(session, stream_name)
   124   2903.2 MiB      0.0 MiB           1       del dataset
   125   2899.2 MiB     -4.1 MiB           1       dataset = bqs_stream_to_pandas(session, stream_name)
   126   2899.2 MiB      0.0 MiB           1       del dataset
   127   2887.9 MiB    -11.3 MiB           1       dataset = bqs_stream_to_pandas(session, stream_name)
   128   2887.9 MiB      0.0 MiB           1       del dataset

 

Interestingly, the first chunk of memory is freed when gRPC connection/session (may call it incorrecty) is reset: 

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
   471   2898.9 MiB   2898.9 MiB           1   @profile
   472                                         def bqs_stream_to_pandas(session, stream_name, row_limit=3660000):
   474   2898.9 MiB      0.0 MiB           1       client = bqs.BigQueryReadClient()
   475   1628.4 MiB  -1270.5 MiB           1       reader = client.read_rows(name=stream_name, offset=0)
   476   1628.4 MiB      0.0 MiB           1       rows = reader.rows(session)
...

If a message is google.protobuf message and a batch is created like below, will it be a zero-copy operation?

pyarrow.ipc.read_record_batch(
    pyarrow.py_buffer(message.arrow_record_batch.serialized_record_batch),
    self._schema,
)

 

 

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
Ah, I didn't know that gRPC was involved. Since Arrow returned all the memory it had allocated, it's quite possible that the memory is held at the gRPC level.

If a message is google.protobuf message and a batch is created like below, will it be a zero-copy operation?

Hmm... I guess it probably should? But I think you may find more expertise about this by asking the BigQuery developers / community.

@asfimport
Copy link
Collaborator Author

Dmitry Kashtanov:
It looks like it's a zero-copy operation since after pyarrow.Table creation and before pandas.DataFrame creation, pyarrow reports zero prior memory allocation (both in Linux and MacOS):

 

Before pandas dataframe creation
PyArrow mem pool info: jemalloc backend, 0 allocated, 0 max allocated, 
PyArrow total allocated bytes: 0

So with this, it looks like we have the following container sequence:

 

  1. a list of pyarrow.RecordBatches backed by memory allocated by google.protobuf

  2. pyarrow.Table backed by (most likely, exactly the same) memory allocated by google.protobuf

  3. then, pandas.DataFrame backed by memory allocated by pyarrow

  4. then, after a column drop, pandas.DataFrame backed by memory allocated by pandas/numpy

    So my current assumption is that google.protobuf uses a memory allocator for Linux, different from the one used for MacOS. The former one can be Google's TCMalloc (which is Linux only).

@asfimport
Copy link
Collaborator Author

shadowdsp:
I have the similar issue in nested data on Ubuntu16.04 pyarrow v3.0, even if I set pa.jemalloc_set_decay_ms(0). But non-nested data can work well.
 
Here is my script:

import io
import pandas as pd
import pyarrow as pa
pa.jemalloc_set_decay_ms(0)
import pyarrow.parquet as pq
from memory_profiler import profile

@profile
def read_file(f):
    table = pq.read_table(f)
    df = table.to_pandas(strings_to_categorical=True)
    del table
    del df

def main():
    rows = 2000000
    df = pd.DataFrame({
        "string": [{"test": [1, 2], "test1": [3, 4]}] * rows,
        "int": [5] * rows,
        "float": [2.0] * rows,
    })
    table = pa.Table.from_pandas(df, preserve_index=False)
    parquet_stream = io.BytesIO()
    pq.write_table(table, parquet_stream)
    for i in range(3):
        parquet_stream.seek(0)
        read_file(parquet_stream)

if __name__ == '__main__':
    main()

Output:

Filename: memory_leak.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
    14    329.5 MiB    329.5 MiB           1   @profile
    15                                         def read_file(f):
    16    424.4 MiB     94.9 MiB           1       table = pq.read_table(f)
    17   1356.6 MiB    932.2 MiB           1       df = table.to_pandas(strings_to_categorical=True)
    18   1310.5 MiB    -46.1 MiB           1       del table
    19    606.7 MiB   -703.8 MiB           1       del df


Filename: memory_leak.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
    14    606.7 MiB    606.7 MiB           1   @profile
    15                                         def read_file(f):
    16    714.9 MiB    108.3 MiB           1       table = pq.read_table(f)
    17   1720.8 MiB   1005.9 MiB           1       df = table.to_pandas(strings_to_categorical=True)
    18   1674.5 MiB    -46.3 MiB           1       del table
    19    970.6 MiB   -703.8 MiB           1       del df


Filename: memory_leak.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
    14    970.6 MiB    970.6 MiB           1   @profile
    15                                         def read_file(f):
    16   1079.6 MiB    109.0 MiB           1       table = pq.read_table(f)
    17   2085.5 MiB   1005.9 MiB           1       df = table.to_pandas(strings_to_categorical=True)
    18   2039.2 MiB    -46.3 MiB           1       del table
    19   1335.3 MiB   -703.8 MiB           1       del df

df and table cannot fully release in this case.
 
pkg info

pip show pyarrow     
Name: pyarrow
Version: 3.0.0
Summary: Python library for Apache Arrow
Home-page: https://arrow.apache.org/
Author: None
Author-email: None
License: Apache License, Version 2.0
Location: 
Requires: numpy
Required-by: utifypip show pandas 
Name: pandas
Version: 1.2.1
Summary: Powerful data structures for data analysis, time series, and statistics
Home-page: https://pandas.pydata.org
Author: None
Author-email: None
License: BSD
Location: 
Requires: python-dateutil, pytz, numpy
Required-by: utify, seaborn, fastparquet

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:
[~shadowdsp] Thanks for the great reproducible test case.  I worked on this today and believe it is different than the problem described earlier in this issue.  I have created ARROW-11855 to track your bug.

@asfimport
Copy link
Collaborator Author

shadowdsp:
@westonpace  thank you very much!

@asfimport
Copy link
Collaborator Author

Peter Gaultney:
Hi,

I think this bug still exists in 6.0.0 of pyarrow.

I'm attaching a script that requires fastparquet, pyarrow, and psutil to be installed. benchmark-pandas-parquet.py

It allows switching between fastparquet and pyarrow to see the difference between memory usage between each iteration, where the number of calls to read_table is also parameterizable, but defaults to 5.

There seems to be a large memory leak, followed by smaller ones on every iteration. Even with pyarrow.jemalloc_set_decay_ms(0), I cannot get pyarrow to ever give up the memory it allocates.

I've been able to reproduce with many different kinds of parquet files, but I don't know about nested vs non-nested data. 

 

@asfimport
Copy link
Collaborator Author

Cory Nezin:
I am also seeing similar behavior with pd.read_parquet and the latest version, 7.0.0.  Interestingly, this seems to only happen in the particular case of running it on a gunicorn server.

@asfimport
Copy link
Collaborator Author

Jan Skorepa:
 

I have been struggling with memory leak in to_table method and also in other use cases. Here is simple example to reproduce it.

 

 

import pandas as pd
from pyarrow import dataset as ds
import pyarrow as pa


def create_parquet(path: str):
    pd.DataFrame({'range': [x for x in range(1000000)]}).to_parquet(path)


def load_parquet_to_table(path: str):
    dataset = ds.dataset(path, format='parquet')
    dataset.to_table()


if __name__ == '__main__':
    PATH = 'test.parquet'
    pa.jemalloc_set_decay_ms(0)
    create_parquet(PATH)
    for x in range(100):
        load_parquet_to_table(PATH) 

 

I tested on version 9.0.0 with python 3.8 on macOS.

And pa.jemalloc_set_decay_ms(0) also didn't help with this.

Memory Usage:

Screenshot 2022-08-17 at 11.10.05.png

 

Even though the memory usage doesn't grow linearly here, when I used this in more complex example in long running process it ended up increasing linearly until exceeding the memory limit.

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
[~skorepaj] Can you try calling pa.jemalloc_memory_pool().release_unused() after each call to load_parquet_to_table ?

@asfimport
Copy link
Collaborator Author

Jan Skorepa:
@pitrou Thanks for your fast reply. Unfortunately it has no effect.

@asfimport
Copy link
Collaborator Author

Ninh Chu:
Hi, I also encounter memory problem in v9.0.0. But in my case, the memory pool is scaled with dataset size, even I tried to limit batch size. Based on the document, RecordBatchReader is the safe way to read dataset big dataset. But in my case, if the memory scales with dataset size, it counters the purpose of Dataset and RecordBatchReader.

I'm running on Ubuntu20.04 / WSL2

import pyarrow.dataset as ds
import pyarrow as pa
pa.jemalloc_set_decay_ms(0)

delta_ds = ds.dataset("delta")

row_count = delta_ds.count_rows()
print("row_count = ", row_count)

reader = delta_ds.scanner(batch_size=10000).to_reader()
batch = reader.read_next_batch()
print("first batch row count = ", batch.num_rows)
print("Total allocated mem for pyarrow = ", pa.total_allocated_bytes() // 1024**2)

 
The results are interesting:

Small dataset

dataset row_count =  66651
first batch row count =  10000
Total allocated mem for pyarrow =  103

Big dataset created by duplicating the same file 4 times

dataset row_count =  333255
first batch row count =  10000
Total allocated mem for pyarrow =  412

If load all the data in dataset into Table:

import pyarrow.dataset as ds
import pyarrow as pa
pa.jemalloc_set_decay_ms(0)

delta_ds = ds.dataset("delta")

row_count = delta_ds.count_rows()
print("dataset row_count = ", row_count)

pa_table = delta_ds.to_table()
print("Total allocated mem for pyarrow = ", pa.total_allocated_bytes() // 1024**2)
dataset row_count =  333255
Total allocated mem for pyarrow =  512

@asfimport
Copy link
Collaborator Author

Julius Uotila:
Hi,

I am having the exact same issue as Jan Skorepa, but with Windows/Windows Server.

I have a process building a dataset overnight from SQL database to .parquet with a predefined save interval (does 30+ saves a night) and limited memory. Each save is slowly creeping up memory until process crashes.

python 3.9.12
pyarrow 9.0.0

Windows Server 2019

Windows 10

Many thanks,

Julius

@asfimport
Copy link
Collaborator Author

wondertx:
Also encountered memory leak when using pyarrow.fs.HadoopFileSystem.open_input_stream

@dxe4
Copy link

dxe4 commented Feb 8, 2023

hi, i was profiling this and i spotted that in pyarrow/array.pxi deduplicate_objects has a default value set to true but the docs say its false. so what is the expected default?

deduplicate_objects : bool, default False

bint deduplicate_objects=True,

the memory goes down when its false.

no_dup@2x
duplicate@2x

@westonpace
Copy link
Member

@dxe4 this is a rather old issue (perhaps we should close it) and not necessarily getting much attention. It's also not clear this issue is related to deduplicate_objects. Can you open a new issue, specific to your deduplicate_objects question?

@ales-vilchytski
Copy link

Encountered highly likely same issue
Our use case:

  • dagster, k8s, pods have 24GB mem limit
  • job reads dataset of 50-100 large files (around 200MB each) one by one, processes each file with pandas then writes it back (so 1-2 files at a time are loaded into memory)
  • our parquet files contain few columns, but one column is a pretty large json string (300+KB)
  • usually job ends with OOM

I can't provide our data or code, but I created a repository with smallest possible scripts to reproduce issue, it can be found here: https://github.com/ales-vilchytski/pyarrow-parquet-memory-leak-demo. Repository includes scripts to generate parquet file and to reproduce OOM, Dockerfile and instructions how to run it.

Issue reproduces on pyarrow (13, 14) and pandas 2+, different docker images, native MacOS ARM 13, different python version (3.10, 3.11, 3.12).

Core thing (https://github.com/ales-vilchytski/pyarrow-parquet-memory-leak-demo/blob/main/src/mem_leak.py#L10):

    c = 0
    while True:
        start = time.time()
        data = ds.dataset('../data/example.parquet')  # parquet file with large strings
        df = data.to_table().to_pandas()

        pa.Table.from_pandas(df)
        
        end = time.time()
        print(f'iteration {c}, time {end - start}s')
        c += 1

As example:
With 12GB memory limit script iterates about 5 times before get killed by OOM (Docker, WSL2 Ubuntu 22.04 with 16GB memory)

Also I experimented with jemalloc settings and found that JE_ARROW_MALLOC_CONF=abort_conf:true,confirm_conf :true,retain:false,background_thread:true,dirty_decay_ms:0,muzzy_decay_ms:0,lg_extent_max_active_fit:2 works a bit better.

Parquet file in example is written with object types by default (https://github.com/ales-vilchytski/pyarrow-parquet-memory-leak-demo/blob/main/src/gen_parquet.py) but writing string explicitly delays OOM slightly.

Any attempt to fix things by triggering GC, clearing memory pools or switching to system memory allocator failed. It still gets OOM but just earlier or later.

@kyle-ip
Copy link

kyle-ip commented Mar 23, 2024

Hi @ales-vilchytski ,

Have you fixed this yet? I encountered similar issue.
Every time I used pyarrow.parquet.ParquetDataset to load parquet file from S3, the memory usage of the service continued to increase and cannot be released, so I used release_unused after the I/O operations.
However, the occupied memory cannot be released immediately until the next time I executed an I/O operation.
On the other hand, it’s not sure how much memory can be released.

I was wondering if you have a solution? Thanks.

@ales-vilchytski
Copy link

Hello @kyle-ip.

Unfortunately we've found workarounds only: use explicit string type instead of object where possible, split large files and process data chunk by chunk, process chunks in separate processes (e.g. multiprocessing) if possible.

It makes things harder to implement, but works in our case (at least for now)

@JEM-Mosig
Copy link

Are people still working on this? It is preventing us from even considering parquet as a file format, even though in theory it'd be perfect for our needs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants