Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running pycisTopic on very large datasets [PERFORMANCE] #106

Open
simozhou opened this issue Jan 23, 2024 · 10 comments
Open

Running pycisTopic on very large datasets [PERFORMANCE] #106

simozhou opened this issue Jan 23, 2024 · 10 comments

Comments

@simozhou
Copy link

What type of problem are you experiencing and which function is you problem related too
I am running cisTopic on a very large dataset (200k cells) and it takes apparently very long. It has approx 80k regions.

I am running the mallet version of pycisTopic, and the function has these params:

models=run_cgs_models_mallet(path_to_mallet_binary,
                    cistopic_obj,
                    n_topics=[2,5,10,15,20,25,30,35,40,45,50,60,70,80,90,100,150],
                    n_cpu=64,
                    n_iter=500,
                    random_state=420,
                    alpha=50,
                    alpha_by_topic=True,
                    eta=0.1,
                    eta_by_topic=False,
                    tmp_path=tmp_path, #Use SCRATCH if many models or big data set
                    save_path=None)

Is there a way I can speed up computations? At the moment it runs for more than 4 days, and I have plans to run it on an even bigger dataset (1M cells), and I have the feeling I might be doing something wrong, and that maybe I could do something differently (maybe not use Mallet? not sure). Do you have suggestions on this?

The machine it runs on has 64 CPUs and 500GB of RAM available.

Version information
pycisTopic: 1.0.3.dev20+g8955c76

@SeppeDeWinter
Copy link
Collaborator

Hi @simozhou

This step can take a long time, however 4 days is still a lot.

Did any intermediate models finish in this time, or is it stuck at running the model with 2 topics?

I would also suggest to specify a save_path: Path to save models as independent files as they are completed. This is recommended for large data sets. Default: None.. This will save any intermediate models.

All the best,

Seppe

@simozhou
Copy link
Author

Hi @SeppeDeWinter,

Thank you so much for your feedback!

All models do run eventually, although very slowly (2 topics runs faster, then for obvious reasons larger models with more topics are slower).

I will definitely add a save path to avoid recalculating all models every time.

I am providing 450GB of RAM for this job. Do you believe that a larger amount of RAM may help with the speed of computations?

Thanks again and best regards,
Simone

@SeppeDeWinter
Copy link
Collaborator

Hi @simozhou

450 GB of RAM should be enough. I'm not sure why it's running so slowly for you...

All the best,

Seppe

@tiffanywc
Copy link

I am also running mallet with a very large dataset. I have saved intermediate models, in case it terminates before completion. I am wondering how I can combine multiple runs to combine the different topic modelings under mallet.pkl in this case?

@SeppeDeWinter
Copy link
Collaborator

Hi @tiffanywc

We store each model as an entry in a list.
Some pseudocode below

import os
import pickle

models = []
for file in os.lisdir(<PATH_TO_DIRECTORY_WITH_MODELS>:
   # check wether file is a result from topic modelling, e.g. based on the name
   if file.endswith(".pkl"):
      model = pickle.load(open(os.path.join(<PATH_TO_DIRECTORY_WITH_MODELS>, file), "rb"))
      models.append(model)

I hope this helps?

All the best,

Seppe

@TingTingShao TingTingShao mentioned this issue May 11, 2024
@TemiLeke
Copy link

TemiLeke commented Jul 2, 2024

Hello @simozhou,

I'm wondering if you managed to find a resolution, because I'm currently facing a similar challenge:

  • Model details: 2 topics
  • cistopic object: 300K cells and 161k regions
  • Runtime: Exceeding 2 days
  • Resources: 40 CPUs allocated

Despite the seemingly small number of topics and substantial computational resources, the process is taking an unexpectedly long time. Have you encountered any solutions or optimizations that might help in this scenario? Any insights or workarounds you've discovered would be greatly appreciated.

Thank you!

@simozhou
Copy link
Author

simozhou commented Jul 4, 2024

Hi @TemiLeke,

In short, no, I have not yet solved my time problem. There are a few improvements that helped make it at least tolerable.

  1. Saving topics at every iteration helped a lot to avoid re-running the whole experiment if something failed (usually a TIMEOUT error from the HPC 😅)
  2. Setting reuse_corpus=True also helped a lot, as I have realised that the mallet compressed object was re-written every time, and this saved some time.
  3. If you are working on an HPC, make sure that the number of nodes you are using is not more than one. The algorithm is not optimised to work in a distributed fashion and this would make things much slower than they should. I was running cisTopic with 128 CPUs, only to realise that all nodes on my HPC had 64 CPUs, paradoxically slowing computations down!

This is the code I'm currently using:

# this would be the first time we run cisTopic on this data set
models=run_cgs_models_mallet(path_to_mallet_binary,
                    cistopic_obj,
                    n_topics=[10,15,20,50,60,70,80,90,100,150,200],
                    n_cpu=64,
                    n_iter=500,
                    random_state=420,
                    alpha=50,
                    alpha_by_topic=True,
                    eta=0.1,
                    eta_by_topic=False,
                    tmp_path=tmp_path, #Use SCRATCH if many models or big data set
                    save_path=os.path.join(args.outdir, 'models'),
                    reuse_corpus=True)

I would like to point out that the computational time is still very slow, and it would be good to address this problem. I have been running my 1 million cells dataset and it took 8 days of computations to run with the aforementioned parameters. (which was kinda foreseen, but it would be ideal to shorten this time for the next iteration if possible :) )

@SeppeDeWinter is there something we can do to help? I would be happy to contribute and possibly figure out why this is so slow!

@TemiLeke
Copy link

TemiLeke commented Jul 5, 2024

Thanks a lot for the detailed reply @simozhou. I'm currently trying this out. I unfortunately only have access to a 40-core system, so it would even take longer.

I agree it would be good to address the problem, and I'd be very happy to contribute in any capacity. @SeppeDeWinter

@JulieDeMan
Copy link

Hi @simozhou and @TemiLeke, I am running (or trying to run) topic modelling on a dataset with almost 1.5 million cells and 600,000 regions. Indeed these operations require a lot of memory and take a long time, but with the latest command line version, the speed of pre- and postprocessing of the corpus is already twice as fast as what it used to be, so you can maybe try to use that one. I have not yet managed to let the full topic modelling finish, since I am still trying to figure out how much memory I should exactly use (was requesting too little, giving me out of memory errors), but most of the time the mallet training step completes, while the memory bottleneck is in the loading of the assigned topics. The cli code I am using now looks like this, and I am only running it for one topic number at the time: pycistopic topic_modeling mallet -i $cto_path -o $out_path -t $topic_number -p 40 -n 400 -a 50 -e 0.1 -s 555 -k True -T $temp_dir -m 1000 -b $mallet_path -r True

@TemiLeke
Copy link

HI @JulieDeMan

I haven't tried out the CL version yet, but it would be interesting to see how significantly it speeds up the training. The memory consuming part of the pipeline indeed has to do with the loading of assigned topics. I dug a little further and found that the problem somehow has to do with the groupby and agg operation in the load_word_topics module line 635, even though lazy loading defers the execution. I repeatedly encountered the issue even when 1TB of RAM was provisioned, which is the maximum I'll get on a cluster.

To resolve this, I developed a custom function that significantly reduces memory usage, albeit at the cost of increased processing time (see below). It involves loading and processing the topics in smaller chunks, such that the groupby and agg operations are performed on each chunk, and the result is subsequently merged into a larger Polars dataframe, hence the increased processing time.

Here's how I implemented this in pycistopic,

  1. I created a custom function called custom_polars_read_csv()
  2. In the LDAMallet class, I replaced the pl.read_csv function call on line 625 with custom_polars_read_csv(self.fstate(), chunk_size=2_000_000)
  3. The custom_polars_read_csv() function is defined as follows:
import polars as pl
import gzip
import io

def custom_polars_read_csv(file_path, chunk_size=2_000_000):
    """
    This function is designed as an alternative to polars.read_csv for the large gzipped CSV files (created by MALLET)
    where memory consumption is a concern. It uses a combination of lazy evaluation and
    streaming to reduce memory usage during processing, at the cost of increased processing time.

    Key features:
    1. Streams the gzipped file, avoiding loading the entire file into memory at once.
    2. Uses Polars' lazy execution to optimize query planning and execution.
    3. Performs grouping and aggregation on the lazy DataFrame.

    Compared to standard polars.read_csv:
    - Pros: Significantly lower memory usage, especially for large files.
    - Cons: Slower processing speed due to streaming and lazy evaluation.

    Args:
    file_path (str): Path to the gzipped CSV file.
    chunk_size (int): Number of lines to process in each chunk.

    Returns:
    polars.DataFrame: A DataFrame containing the grouped and aggregated results.
    """
    pl.enable_string_cache()

    # initialize an empty DataFrame to store the aggregated results with correct schema
    result = pl.DataFrame(schema={"topic": pl.Int64, "region": pl.Int64, "occurrence": pl.UInt64})
    
    with gzip.open(file_path, 'rt') as f:
        # Skip the first 3 rows
        for _ in range(3):
            next(f)
        
        while True:
            chunk = io.StringIO()
            for _ in range(chunk_size):
                line = f.readline()
                if not line:
                    break
                chunk.write(line)
            
            if chunk.getvalue() == '':
                break
            
            chunk.seek(0)
            df = pl.read_csv(
                chunk,
                separator=" ",
                has_header=False,
                columns=[4, 5],
                new_columns=["region", "topic"],
                schema_overrides={"region": pl.Int64, "topic": pl.Int64}
            )
            
            # perform the group-by and aggregation on the chunk
            chunk_result = df.group_by(["topic", "region"]).agg(pl.len().cast(pl.UInt64).alias("occurrence"))
            
            # merge the chunk result with the overall result
            result = pl.concat([result, chunk_result]).group_by(["topic", "region"]).sum()
    
    return result

Please note that this is a crude application and there could possibly be a more efficient approach.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants