Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python][Parquet] Memory leak still showed on parquet.write_table and Table.from_pandas #40738

Closed
guozhans opened this issue Mar 22, 2024 · 8 comments

Comments

@guozhans
Copy link

guozhans commented Mar 22, 2024

Hi,

I tried to save Pandas dataframe to parquet files, and encountered a memory leak issue. Even i have installed nightly build pyarrow 16.0.0.dev356 from the server, as the comment mentioned this issue is fixed from #37989

Any idea?

here is the memory usage by using memory profiler.

Line # Mem usage Increment Occurrences Line Contents

33    425.8 MiB    425.8 MiB           1           @profile
34                                                 def to_parquet(self, df: pd.DataFrame, filename: str):
35    537.6 MiB    111.9 MiB           1               table = Table.from_pandas(df)
36    559.1 MiB     21.4 MiB           1               parquet.write_table(table, filename, compression="snappy")
37    559.1 MiB      0.0 MiB           1               del table
38                                                     #df.to_parquet(filename, compression="snappy")

My method

import pandas as pd

from memory_profiler import profile
from pyarrow import parquet
from pyarrow import Table

@profile
def to_parquet(self, df: pd.DataFrame, filename: str):
    table = Table.from_pandas(df)
    parquet.write_table(table, filename, compression="snappy")
    del table
    #df.to_parquet(filename, compression="snappy")

My related installed packages in the docker:
numpy 1.22.4
pandas 2.1.4
pyarrow 16.0.0.dev356
pyarrow-hotfix 0.6 --> from dask
dask 2024.2.1

OS: Ubuntu 22.04

Component(s)

Parquet, Python

@kyle-ip
Copy link

kyle-ip commented Mar 23, 2024

Hi @guozhans .
Have you tried to release the memory pool? pyarrow.MemoryPool

I encountered a similar issue:
Every time I used pyarrow.parquet.ParquetDataset to load parquet from S3, the memory usage continued to increase and cannot be released, so I used release_unused after the I/O operations:

import pyarrow as pa

pool = pa.default_memory_pool()
# ...
pool.release_unused()

However, the occupied memory cannot be released immediately until I executed next time. On the other hand, it’s not sure how much memory can be released.

@guozhans
Copy link
Author

Hi @kyle-ip,
Thanks for the info, and i didn't. But the issue still occurred after i tried. The issue can be seen in multi-thread or multi-process environment. I don't know if it must work under single thread? This issue occurred only if i changed n_workers more than one

Result

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    39    394.6 MiB    394.6 MiB           1   @profile
    40                                         def to_parquet(df: pd.DataFrame, filename: str):
    41    386.2 MiB     -8.4 MiB           1       table = Table.from_pandas(df)
    42    386.2 MiB      0.0 MiB           1       pool = pa.default_memory_pool()
    43    401.2 MiB     15.0 MiB           1       parquet.write_table(table, filename, compression="snappy")
    44    401.2 MiB      0.0 MiB           1       del table
    45    401.2 MiB      0.0 MiB           1       pool.release_unused()

The script to reproduce this issue

import logging
import os
from concurrent.futures import ThreadPoolExecutor

import dask
import dask.dataframe as dd
import pandas as pd
import pyarrow as pa
from dask import delayed

from distributed import WorkerPlugin, Worker, LocalCluster, Client, wait
from loky import ProcessPoolExecutor
from memory_profiler import profile
from pyarrow import Table, parquet


class TaskExecutorPool(WorkerPlugin):
    def __init__(self, logger, name):
        self.logger = logger
        self.worker = None
        self.name = name

    def setup(self, worker: Worker):
        executor = ThreadPoolExecutor(max_workers=worker.state.nthreads)
        worker.executors[self.name] = executor
        self.worker = worker


@profile
def to_parquet(df: pd.DataFrame, filename: str):
    table = Table.from_pandas(df)
    pool = pa.default_memory_pool()
    parquet.write_table(table, filename, compression="snappy")
    del table
    pool.release_unused()


def from_parquet(filename: str):
    return pd.read_parquet(filename)


def main():
    cluster = LocalCluster(n_workers=2, processes=False, silence_logs=logging.DEBUG)
    with Client(cluster) as client:
        client.register_plugin(TaskExecutorPool(logging, "process"), name="process")
        with dask.annotate(executor="process", retries=10):
            nodes = dd.read_parquet("a parquet file", columns=["id", "tags"])
            os.makedirs("/opt/project/parquet", exist_ok=True)
            for i in range(1, 10):
                dfs = nodes.to_delayed()
                filenames = [os.path.join("/opt/project/parquet", f"nodes-{i}.parquet") for i, df in enumerate(dfs)]
                writes = [delayed(to_parquet)(df, fn) for df, fn in zip(dfs, filenames)]
                dd.compute(*writes)
                wait(writes)


if __name__ == "__main__":
    main()

@kyle-ip
Copy link

kyle-ip commented Mar 24, 2024

Okay. I think it may also be related to OS environment. For example, my environment is Ubuntu and the default memory pool is based on jemalloc.
To adjust the behaviors of memory pool, this documentation is for reference: https://github.com/jemalloc/jemalloc/blob/dev/TUNING.md

It seems to have a little effect.

@kou kou changed the title Memory leak still showed on parquet.write_table and Table.from_pandas [Python][Parquet] Memory leak still showed on parquet.write_table and Table.from_pandas Mar 24, 2024
@guozhans
Copy link
Author

Hi @kyle-ip ,
after you mentioned, i checked again, and i found i have an old arrow lib version installed, and i am now fixing the environment issue. That might cause this issue.

@kyle-ip
Copy link

kyle-ip commented Mar 27, 2024

@guozhans
Great! Which version are you using now?

@guozhans
Copy link
Author

Hi @kyle-ip,

I had Arrow 14.0.0 and 16.0.0 DEV version installed in different folders before, and i am not aware of the old version until that day. I removed Arrow 14.0.0 complete from my ubuntu docker, and re-build source froma main branch again with these commands. And then re-install PyArrow 16.0.0 dev (I know i can build it as well, but i am bit lazy). Now everything looks fine now.

cmake -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \
               -DCMAKE_INSTALL_LIBDIR=lib \
               -DCMAKE_BUILD_TYPE=Release \
               -DARROW_BUILD_TESTS=ON \
               -DARROW_COMPUTE=ON \
               -DARROW_CSV=ON \
               -DARROW_DATASET=ON \
               -DARROW_FILESYSTEM=ON \
               -DARROW_HDFS=ON \
               -DARROW_JSON=ON \
               -DARROW_PARQUET=ON \
               -DARROW_WITH_BROTLI=ON \
               -DARROW_WITH_BZ2=ON \
               -DARROW_WITH_LZ4=ON \
               -DARROW_WITH_SNAPPY=ON \
               -DARROW_WITH_ZLIB=ON \
               -DARROW_WITH_ZSTD=ON \
               -DPARQUET_REQUIRE_ENCRYPTION=ON \
               .. \
    && make -j4 \
    && make install

Result:

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    29    395.4 MiB    395.4 MiB           1   @profile
    30                                         def to_parquet(df: pd.DataFrame, filename: str):
    31    372.2 MiB    -23.2 MiB           1       table = Table.from_pandas(df)
    32    372.2 MiB      0.0 MiB           1       pool = pa.default_memory_pool()
    33    396.4 MiB     24.2 MiB           1       parquet.write_table(table, filename, compression="snappy")
    34    396.4 MiB      0.0 MiB           1       del table
    35    396.4 MiB      0.0 MiB           1       pool.release_unused()

@guozhans
Copy link
Author

I closed this issue, and see above comment

@kyle-ip
Copy link

kyle-ip commented Mar 28, 2024

@guozhans Thank you very much. Your information helps me a lot!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants