In [None]:
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import os

# --- Dask Setup ---
cluster = LocalCluster()
client = Client(cluster)
client

In [None]:
# --- Load and Process Taxonomy Lookup Table ---

with open('taxon_ids.txt', 'r') as f:
    lines = f.readlines()

taxon_ids = []
taxon_names = []

for line in lines:
    split_pos = line.find(' ')
    if split_pos != -1:
        taxon_ids.append(int(line[:split_pos]))
        taxon_names.append(line[split_pos+1:].strip())

# Create a Pandas DataFrame for the taxonomy data
df_taxa = pd.DataFrame({'taxon_id': taxon_ids, 'name': taxon_names})
df_taxa['taxon_id'] = df_taxa['taxon_id'].astype('float64') # Cast to float to match the observations data type

df_taxa.head()

In [None]:
# --- Lazily Load Large Datasets ---

ddf_obs = dd.read_csv('observations.csv',
                      sep='\t',
                      usecols=['observation_uuid','taxon_id'],
                      dtype={'taxon_id': 'float64'},
                      blocksize='50MB')

ddf_photos = dd.read_csv('photos.csv',
                         sep='\t',
                         usecols=['observation_uuid','photo_id', 'extension'],
                         blocksize='50MB')

In [None]:
# --- Combine and Clean DataFrames ---

ddf_merged = dd.merge(ddf_obs, ddf_photos, on='observation_uuid', how='inner')

ddf_merged = dd.merge(ddf_merged, df_taxa, on='taxon_id', how='left')

ddf_merged = ddf_merged.dropna(subset=['name'])

ddf_merged['taxon_id'] = ddf_merged['taxon_id'].astype('int64')

ddf_merged = ddf_merged.drop(columns=['observation_uuid', 'taxon_id'])


with open('top100species.txt', 'r') as f:
    top_100_species_list = eval(f.read())

# Filter the original DataFrame to keep only rows with names in the top species list
filtered_ddf = ddf_merged[ddf_merged['name'].isin(top_100_species_list)]

print('Dataframe creation complete.\n')

In [None]:
# --- Execute Computation and Save to Parquet ---

print("Starting computation and saving to Parquet file...")

# This command triggers the full Dask computation graph
filtered_ddf.to_parquet('photo_id_data.parquet', engine='pyarrow', write_index=False)

print("\nProcessing complete! File 'photo_id_data.parquet' has been saved.")

client.close()
cluster.close()