In [10]:
import glob
import pandas as pd
import os
import re
import dask.dataframe as dd

from astropy.io.votable import parse_single_table
import shutil

Download coordinate data

In [11]:
sources_file = '../data/Gaia DR3/queries/impartial sources coord-result.csv'

In [12]:
sources_df = pd.read_csv(sources_file)
sources_df.head()

Unnamed: 0,source_id,ra,dec
0,5937173300407375616,250.247043,-51.593826
1,5937081353758582016,251.645316,-51.613334
2,5937201200518016768,250.622534,-51.278547
3,5937201200518017024,250.61624,-51.279443
4,5937081388118319616,251.626566,-51.6255


In [13]:
sources_df_dd = dd.from_pandas(sources_df, npartitions=10000)

Combine coordinates and spectra

In [14]:
impartial_sources = '../data/Gaia DR3/spectra/XP_temp'
files = glob.glob(f'{impartial_sources}/*.vot')

In [15]:
# this loop works
pattern = re.compile(r'(\d+)(?=\.vot$)')
spectra_df = pd.DataFrame(columns=['source_id', 'flux'])

for f in files[:500]:
    match = pattern.search(os.path.basename(f))
    if match:
        source_id = match.group()
        
        impartial_data = parse_single_table(f).to_table().to_pandas()
        flux_array = impartial_data['flux'].values
        
        temp_df = pd.DataFrame({'source_id': [int(source_id)], 'flux': [flux_array]})
        
        spectra_df = pd.concat([spectra_df, temp_df], ignore_index=True)

In [16]:
# this loop also works

pattern = re.compile(r'(\d+)(?=\.vot$)')
data = []

for f in files[:500]:
    match = pattern.search(os.path.basename(f))
    if match:
        source_id = int(match.group())
        impartial_data = parse_single_table(f).to_table().to_pandas()
        flux_array = impartial_data['flux'].values
        data.append({'source_id': source_id, 'flux': flux_array})

spectra_df = pd.DataFrame(data)

In [22]:
# Ensure the './temp' directory exists
os.makedirs('./temp', exist_ok=True)

pattern = re.compile(r'(\d+)(?=\.vot$)')
batch_size = 500

# Process files in batches, skipping already processed files
for i in range(0, len(files), batch_size):
    batch_files = files[i:i + batch_size]
    batch_filename = f'./temp/{i}_{i + len(batch_files)}.parquet'

    # Skip processing if the batch file already exists
    if os.path.exists(batch_filename):
        print(f"Skipping batch {i}-{i + len(batch_files)}, already processed.")
        continue

    data = []

    for f in batch_files:
        match = pattern.search(os.path.basename(f))
        if match:
            try:
                source_id = int(match.group())
                impartial_data = parse_single_table(f).to_table().to_pandas()
                flux_array = impartial_data['flux'].values
                data.append({'source_id': source_id, 'flux': flux_array})
            except:
                print(f"The following file cannot be read: {os.path.basename(f)}")    
    # Create a DataFrame for the current batch
    spectra_df = pd.DataFrame(data)
    
    # Save the batch to a Parquet file
    spectra_df.to_parquet(batch_filename, index=False)
    
    # Clear the list to free memory
    del data
    del spectra_df

# Combine all batch Parquet files into one final Parquet file
all_files = [os.path.join('./temp', f) for f in os.listdir('./temp') if f.endswith('.parquet')]
combined_df = pd.concat((pd.read_parquet(f) for f in all_files), ignore_index=True)

# Save the final combined DataFrame as Parquet
combined_df.to_parquet('./final_combined.parquet', index=False)

# Delete the './temp' directory and its contents
#shutil.rmtree('./temp')

print("Batch processing completed. All files combined into 'final_combined.parquet'. Temp directory deleted.")

The following file cannot be read: XP_SAMPLED_Gaia_DR3_5888623436756684288.vot
Batch processing completed. All files combined into 'final_combined.parquet'. Temp directory deleted.


In [20]:
from dask import delayed, compute
import pandas as pd
import re
import os

pattern = re.compile(r'(\d+)(?=\.vot$)')

@delayed
def process_file(f):
    match = pattern.search(os.path.basename(f))
    if match:
        source_id = int(match.group())
        impartial_data = parse_single_table(f).to_table().to_pandas()
        flux_array = impartial_data['flux'].values
        return {'source_id': source_id, 'flux': flux_array}
    return None

file_paths = files[:500]  # Assuming files is your list of file paths

# Create a list of delayed tasks
tasks = [process_file(f) for f in file_paths]

# Compute the results in parallel
results = compute(*tasks)

# Filter out None results
results = [r for r in results if r is not None]

# Create the DataFrame
spectra_df = pd.DataFrame(results)

In [None]:
spectra_df_dd = dd.from_pandas(spectra_df, npartitions=10000)

In [None]:
result_df = dd.merge(sources_df, spectra_df, on='source_id', how='inner')

In [None]:
result_df = pd.merge(sources_df, spectra_df, on='source_id', how='inner')

In [None]:
result_df

Unnamed: 0,source_id,ra,dec,flux
0,1106487002561770368,99.334608,68.892552,"[9.1753686e-17, 1.0186828e-16, 9.4628383e-17, ..."
1,1103461593236932480,98.830491,64.092667,"[8.479949e-17, 1.4802934e-16, 2.1828411e-16, 1..."
2,1101665025596084864,107.524509,65.940173,"[-2.0176978e-17, -3.659598e-18, 1.4074245e-17,..."
3,1106844034603972352,94.147552,68.677752,"[1.2871195e-17, 8.22262e-18, 9.0978916e-18, 9...."
4,1105449304102685312,90.625299,67.979417,"[1.4120433e-17, 1.2651873e-17, 1.2261003e-17, ..."
...,...,...,...,...
95,125153392797464832,50.341736,32.590031,"[3.9062817e-17, 4.8551608e-17, 4.5109008e-17, ..."
96,123712001772784640,47.857266,31.584668,"[6.2166747e-18, 3.3648442e-18, 3.6364879e-19, ..."
97,1091965271455414272,122.749021,65.222823,"[7.8339155e-16, 7.036686e-16, 9.255116e-16, 7...."
98,1095175330012281216,121.694347,66.488818,"[4.3393175e-16, 3.0278368e-16, 2.9516192e-16, ..."
