In [None]:
!pip install matplotlib -q
!pip install dask -q

In [3]:
import pandas as pd
import logging
import multiprocessing as mp
import numpy as np
import os

# Initialize logging
logging.basicConfig(filename='pubmed_processing.log', level=logging.INFO, 
                    format='%(asctime)s:%(levelname)s:%(message)s')

file_path = '/app/dataset_with_abstracts.csv'
output_file = '/app/workspace/data/output/abstracts.csv'

def process_chunk(df, chunk_idx):
    try:
        if 'MeshHeadings' not in df.columns or 'Language' not in df.columns:
            logging.error(f'Chunk {chunk_idx} is missing required columns.')
            return pd.DataFrame(columns=['MeshHeadings', 'AbstractText'])

        empty_mesh_count = df['MeshHeadings'].isna().sum()
        df['MeshHeadings'] = df['MeshHeadings'].str.lower()
        language_counts = df['Language'].value_counts()

        df_filtered = df[~df['MeshHeadings'].isna() & (df['Language'] == 'eng')].copy()
        
        df_filtered['MeshHeadings'] = df_filtered['MeshHeadings'].apply(
            lambda x: x.split(';') if isinstance(x, str) else []
        )

        df_filtered = df_filtered[df_filtered['MeshHeadings'].apply(lambda terms: any(
            term in terms for term in ['drug therapy', 'cardiovascular diseases']
        ))]

        df_filtered['MeshHeadings'] = df_filtered['MeshHeadings'].apply(lambda x: ';'.join(x))

        logging.info(f'Chunk {chunk_idx}: empty MeshHeadings count = {empty_mesh_count}')
        logging.info(f'Chunk {chunk_idx}: language distribution = {language_counts.to_dict()}')

        return df_filtered[['MeshHeadings', 'AbstractText']]
    except Exception as e:
        logging.error(f'Error processing chunk {chunk_idx}: {e}')
        return pd.DataFrame(columns=['MeshHeadings', 'AbstractText'])

def store_to_csv(df, chunk_idx):
    try:
        with open(output_file, 'a') as f:
            df.to_csv(f, header=f.tell() == 0, index=False)
            logging.info(f'Chunk {chunk_idx} stored to CSV')
    except Exception as e:
        logging.error(f'Error storing chunk {chunk_idx} to CSV: {e}')

def process_chunk_wrapper(args):
    return process_chunk(*args)

def main():
    num_cores = max(1, mp.cpu_count() - 5)  # Ensure at least one core is used
    print(f'Number of cores: {num_cores}')

    if not os.path.exists(file_path):
        logging.error(f'File not found: {file_path}')
        return

    with pd.read_csv(file_path, chunksize=10000) as reader:
        for chunk_idx, chunk in enumerate(reader):
            print(f'Processing chunk {chunk_idx}')
            chunks = np.array_split(chunk, num_cores)
            args = [(chunk, chunk_idx) for chunk in chunks]
            with mp.Pool(num_cores) as pool:
                results = pool.map(process_chunk_wrapper, args)

            combined_results = pd.concat(results)
            store_to_csv(combined_results, chunk_idx)

if __name__ == "__main__":
    main()


Number of cores: 27
Processing chunk 0


  return bound(*args, **kwds)


Processing chunk 1


  return bound(*args, **kwds)


Processing chunk 2


  return bound(*args, **kwds)


Processing chunk 3


  return bound(*args, **kwds)


Processing chunk 4


  return bound(*args, **kwds)


Processing chunk 5


  return bound(*args, **kwds)


Processing chunk 6


  return bound(*args, **kwds)


Processing chunk 7


  return bound(*args, **kwds)


Processing chunk 8


  return bound(*args, **kwds)


Processing chunk 9


  return bound(*args, **kwds)


Processing chunk 10


  return bound(*args, **kwds)


Processing chunk 11


  return bound(*args, **kwds)


Processing chunk 12


  return bound(*args, **kwds)


Processing chunk 13


  return bound(*args, **kwds)


Processing chunk 14


  return bound(*args, **kwds)


Processing chunk 15


  return bound(*args, **kwds)


Processing chunk 16


  return bound(*args, **kwds)


Processing chunk 17


  return bound(*args, **kwds)


Processing chunk 18


  return bound(*args, **kwds)


Processing chunk 19


  return bound(*args, **kwds)


Processing chunk 20


  return bound(*args, **kwds)


Processing chunk 21


  return bound(*args, **kwds)


Processing chunk 22


  return bound(*args, **kwds)


Processing chunk 23


  return bound(*args, **kwds)


Processing chunk 24


  return bound(*args, **kwds)


In [None]:
import pandas as pd
pd.read_csv('/app/dataset_with_abstracts.csv', nrows=10).head()

In [None]:
import pandas as pd
import logging
import multiprocessing as mp
import numpy as np

# Initialize logging
logging.basicConfig(filename='pubmed_processing.log', level=logging.INFO, 
                    format='%(asctime)s:%(levelname)s:%(message)s')

file_path = '/app/dataset_with_abstracts.csv'
output_file = '/app/workspace/data/output/abstracts.csv'

def process_chunk(df, chunk_idx):
    # Count empty MeshHeadings
    empty_mesh_count = df['MeshHeadings'].isna().sum()
    
    # Convert MeshHeadings to lowercase
    df['MeshHeadings'] = df['MeshHeadings'].str.lower()
    
    # Count occurrences of each language
    language_counts = df['Language'].value_counts()
    
    # Filter out rows where MeshHeadings is empty and Language is not 'eng'
    df_filtered = df[~df['MeshHeadings'].isna() & (df['Language'] == 'eng')]
    
    # Split MeshHeadings and filter based on 'drug therapy' and 'cardiovascular diseases'
    df_filtered['MeshHeadings'] = df_filtered['MeshHeadings'].str.split(';')
    
    # Filter based on 'drug therapy' and 'cardiovascular diseases'
    df_filtered = df_filtered[df_filtered['MeshHeadings'].apply(lambda terms: any(
        term in terms for term in ['drug therapy', 'cardiovascular diseases']
    ))]
    
    # Join MeshHeadings list back into a semicolon-separated string
    df_filtered['MeshHeadings'] = df_filtered['MeshHeadings'].apply(lambda x: ';'.join(x))
    
    # Log the statistics for the current chunk
    logging.info(f'Chunk {chunk_idx}: empty MeshHeadings count = {empty_mesh_count}')
    logging.info(f'Chunk {chunk_idx}: language distribution = {language_counts.to_dict()}')
    
    # Return only MeshHeadings and AbstractText columns
    df_filtered = df_filtered[['MeshHeadings', 'AbstractText']]
    return df_filtered

def store_to_csv(df, chunk_idx):
    with open(output_file, 'a') as f:
        df.to_csv(f, header=f.tell()==0, index=False)
        logging.info(f'Chunk {chunk_idx} stored to CSV')

def process_chunk_wrapper(args):
    return process_chunk(*args)

def main():
    num_cores = mp.cpu_count()
    print(f'Number of cores: {num_cores}')

    with pd.read_csv(file_path, chunksize=10000) as reader:
        for chunk_idx, chunk in enumerate(reader):
            # Split the chunk into smaller pieces for parallel processing
            chunks = np.array_split(chunk, num_cores)
            args = [(chunk, chunk_idx) for chunk in chunks]
            with mp.Pool(num_cores) as pool:
                results = pool.map(process_chunk_wrapper, args)
            
            # Combine results from all cores
            combined_results = pd.concat(results)
            store_to_csv(combined_results, chunk_idx)

if __name__ == "__main__":
    main()


Number of cores: 32


  return bound(*args, **kwds)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_filtered['MeshHeadings'] = df_filtered['MeshHeadings'].str.split(';')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_filtered['MeshHeadings'] = df_filtered['MeshHeadings'].str.split(';')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_filtered['MeshHeadings'] =