In [11]:
import sys
import os
sys.path.append('.')

import pandas as pd
from tqdm.auto import tqdm
tqdm.pandas()

import gc
import json
import numpy as np
from multiprocessing import cpu_count
import asyncio
from concurrent.futures import ProcessPoolExecutor

from fit import XRFAnalyzer

In [12]:
# load all the FITs files from parquets
fits_path = 'FITs/'

fits_list = []
for filename in tqdm(os.listdir(fits_path)):
    gc.collect()
    if filename.endswith('.parquet'):
        # print(f'Processing {filename}')
        fits_list.append(pd.read_parquet(f'{fits_path}{filename}'))
    else:
        continue

fits = pd.concat(fits_list, ignore_index=True)
gc.collect()

  0%|          | 0/1 [00:00<?, ?it/s]

21

In [13]:
# filtering the relevant columns

fits = fits[['file', 'V0_LAT', 'V1_LAT', 'V2_LAT', 'V3_LAT', 'V0_LON', 'V1_LON',
       'V2_LON', 'V3_LON', 'y']]

fits['file'] = fits['file'].str[31:]

# load the FITS to BKG mapping
mapping = pd.read_csv('new_cat.csv')
mapping.rename(columns={'class_file_name': 'file'}, inplace=True)
mapping['file'] = 'ch2_cla_l1_' + mapping['file'] + '.fits'
mapping = mapping[['file', 'background_file_name']]

# merge the FITS with the mapping
fits = fits.merge(mapping, on='file', how='left')
fits['background_file_name'] = fits['background_file_name'].fillna('')
fits.head(2)

Unnamed: 0,file,V0_LAT,V1_LAT,V2_LAT,V3_LAT,V0_LON,V1_LON,V2_LON,V3_LON,y,background_file_name
0,ch2_cla_l1_20230808T153456459_20230808T1535044...,-22.3776,-23.7047,-23.316,-21.9959,-130.99,-130.578,-129.572,-129.99,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",
1,ch2_cla_l1_20240118T040950149_20240118T0409581...,-5.7368,-6.7625,-6.7607,-5.735,69.3025,69.3062,69.917,69.9121,"[1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",


In [None]:
# function to get intensities and concentrations for each FIT

def get_intensities(y, bkg):
     # Create analyzer instance
    analyzer = XRFAnalyzer()
    
    try:
        if bkg == '':
            intensities, concentrations, uncertanities = analyzer.analyze_sample(
                plot_results=False,
                use_background = False,
                verbose=0,
                y_file=y,
                use_y=True
            )
        else:
            intensities, concentrations, uncertanities = analyzer.analyze_sample(
                background_fits='BKGs\ch2_cla_l1_' + bkg + '.pha',
                plot_results=False,
                use_background = True,
                use_y=True,
                y_file=y,
                verbose=0
            )
        
        return [intensities, concentrations, uncertanities]
        
    except Exception as e:
        print(f"\nError during analysis: {str(e)}")
        return [None, None]


In [15]:
def process_with_checkpoints(fits_df, output_dir='processed_results', start_index=None, end_index=None):
    """
    Process DataFrame rows with checkpointing and progress tracking
    
    Args:
        fits_df: Input DataFrame
        output_dir: Directory to save intermediate results
        start_index: Optional index to resume processing from
        end_index: Optional index to stop processing at
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Get list of already processed files if any
    processed_files = set()
    if os.path.exists(os.path.join(output_dir, 'processed_indices.txt')):
        with open(os.path.join(output_dir, 'processed_indices.txt'), 'r') as f:
            processed_files = set(int(x.strip()) for x in f.readlines())
    
    # Determine start index
    if start_index is None:
        start_index = 0 if not processed_files else max(processed_files) + 1
    
    # Determine end index
    if end_index is None:
        end_index = len(fits_df)
    
    # Process rows with progress bar
    with tqdm(total=end_index, initial=len(processed_files)) as pbar:
        # Skip to start_index
        pbar.update(start_index)
        
        # Process each row
        for idx in range(start_index, end_index):
            if idx in processed_files:
                continue
                
            row = fits_df.iloc[idx]
            try:
                # Process the row
                result = get_intensities(row['y'], row['background_file_name'])
                
                # Save result to file
                output_file = os.path.join(output_dir, f'result_{idx}.json')
                with open(output_file, 'w') as f:
                    json.dump({
                        'index': idx,
                        'result': result if not isinstance(result, np.ndarray) else result.tolist(),
                        'row_data': {k: str(v) for k, v in row.items()}  # Convert all values to string for JSON
                    }, f)
                
                # Record processed index
                with open(os.path.join(output_dir, 'processed_indices.txt'), 'a') as f:
                    f.write(f'{idx}\n')
                
                pbar.update(1)
                
            except Exception as e:
                print(f"\nError processing row {idx}: {str(e)}")
                # Optionally save error information
                with open(os.path.join(output_dir, 'errors.txt'), 'a') as f:
                    f.write(f'Error at index {idx}: {str(e)}\n')
                continue

def combine_results(output_dir='processed_results', fits_df=None):
    """
    Combine all processed results into a single DataFrame
    
    Args:
        output_dir: Directory containing result files
        fits_df: Original DataFrame (optional, for verification)
    """
    all_results = []
    
    # Get all result files
    result_files = [f for f in os.listdir(output_dir) if f.startswith('result_') and f.endswith('.json')]
    
    print(f"Found {len(result_files)} result files")
    
    # Combine results with progress bar
    for file in tqdm(result_files, desc="Combining results"):
        with open(os.path.join(output_dir, file), 'r') as f:
            data = json.load(f)
            all_results.append({
                'index': data['index'],
                'result': data['result']
            })
    
    # Sort by index
    all_results.sort(key=lambda x: x['index'])
    
    # Create DataFrame
    results_df = pd.DataFrame(all_results)
    
    # If original DataFrame provided, verify all indices were processed
    if fits_df is not None:
        missing_indices = set(range(len(fits_df))) - set(results_df['index'])
        if missing_indices:
            print(f"Warning: Missing results for indices: {missing_indices}")
    
    return results_df

# Example usage:
"""
# Process the data with checkpointing
process_with_checkpoints(fits_df, output_dir='my_results', end_index=1000)

# Later, combine all results
results_df = combine_results(output_dir='my_results', fits_df=fits_df)

# Merge results back with original DataFrame if needed
fits_df['results'] = results_df['result']
"""

"\n# Process the data with checkpointing\nprocess_with_checkpoints(fits_df, output_dir='my_results', end_index=1000)\n\n# Later, combine all results\nresults_df = combine_results(output_dir='my_results', fits_df=fits_df)\n\n# Merge results back with original DataFrame if needed\nfits_df['results'] = results_df['result']\n"

In [16]:
def calculate_optimal_batch_size(df):
    # Calculate memory per row
    mem_usage = df.memory_usage(deep=True).sum()
    mem_per_row = mem_usage / len(df)
    mem_per_row_mb = mem_per_row / (1024 * 1024)  # Convert to MB
    
    # Get available system memory
    import psutil
    available_ram = psutil.virtual_memory().available / (1024 * 1024)  # MB
    
    # Calculate safe memory limit (30% of available RAM)
    safe_mem_limit = available_ram * 0.3
    
    # Calculate batch size
    optimal_batch = int(safe_mem_limit / (mem_per_row_mb * cpu_count()))
    
    print(f"Memory per row: {mem_per_row_mb:.2f} MB")
    print(f"Available RAM: {available_ram:.0f} MB")
    print(f"Safe memory limit: {safe_mem_limit:.0f} MB")
    print(f"Number of CPU cores: {cpu_count()}")
    print(f"\nCalculated optimal batch size: {optimal_batch}")
    
    # Cap the batch size within reasonable limits
    final_batch = min(max(optimal_batch, 50), 500)
    print(f"Recommended batch size: {final_batch}")
    
    return final_batch

# Calculate optimal batch size
batch_size = calculate_optimal_batch_size(fits)

Memory per row: 0.00 MB
Available RAM: 4464 MB
Safe memory limit: 1339 MB
Number of CPU cores: 16

Calculated optimal batch size: 247166
Recommended batch size: 500


In [19]:
start_index = 400_000
end_index = 500_000
process_with_checkpoints(fits, output_dir='results-emd-narrow')
# process_with_checkpoints(fits, output_dir='results', start_index=start_index, end_index=end_index)

  0%|          | 0/85000 [00:00<?, ?it/s]

  popt, _ = curve_fit(self.gaussian, x_fit, y_fit, p0=p0)
  concentrations = concentrations / np.sum(concentrations)
  cov_x = invR @ invR.T
  cov_x = invR @ invR.T
  pcov = pcov * s_sq


In [20]:
results_df = combine_results(output_dir='results-emd-narrow', fits_df=fits)
fits['results'] = results_df['result']
fits.to_parquet(f'emd_narrow_fits_with_results.parquet', index=False)
# fits.to_csv(f'emd_fits_with_results.csv', index=False)
# fits.to_csv(f'fits_with_results_{start_index}_{end_index}.csv', index=False)

Found 85000 result files


Combining results:   0%|          | 0/85000 [00:00<?, ?it/s]