# NB 01: Extract ENIGMA CORAL Data

Extract geochemistry, community abundance, ASV taxonomy, and sample
metadata from the ENIGMA CORAL database via Spark Connect. Filter to
the 108 samples with both geochemistry and community data.

**Requires Spark** — `get_spark_session()` from `berdl_notebook_utils`.

**Outputs**: 4 TSV files in `data/`.

In [1]:
import pandas as pd
import numpy as np
from pathlib import Path

from berdl_notebook_utils.setup_spark_session import get_spark_session
spark = get_spark_session()
print(f"Spark version: {spark.version}")

DATA_DIR = Path('../data')
DATA_DIR.mkdir(exist_ok=True)

Spark version: 4.0.1


## 1. Extract Geochemistry

In [2]:
# Extract geochemistry: sample x molecule concentrations
geochem_raw = spark.sql("""
    SELECT sdt_sample_name, molecule_from_list_sys_oterm_name, concentration_micromolar
    FROM enigma_coral.ddt_brick0000010
""").toPandas()
print(f"Raw geochemistry: {len(geochem_raw):,} rows")
print(f"  Samples: {geochem_raw['sdt_sample_name'].nunique()}")
print(f"  Molecules: {geochem_raw['molecule_from_list_sys_oterm_name'].nunique()}")

# Pivot to sample x molecule matrix
geochem = geochem_raw.pivot_table(
    index='sdt_sample_name',
    columns='molecule_from_list_sys_oterm_name',
    values='concentration_micromolar',
    aggfunc='mean'
)

# Shorten column names
geochem.columns = [c.replace(' atom', '').replace(' ', '_') for c in geochem.columns]
print(f"\nGeochemistry matrix: {geochem.shape[0]} samples x {geochem.shape[1]} molecules")

# Focus on key contaminants
key_metals = ['uranium', 'chromium', 'nickel', 'zinc', 'iron', 'copper', 'cadmium', 'lead', 'arsenic']
available_metals = [m for m in key_metals if m in geochem.columns]
print(f"Key metals available: {available_metals}")
print()
geochem[available_metals].describe().round(3)

Raw geochemistry: 52,884 rows
  Samples: 117
  Molecules: 48

Geochemistry matrix: 117 samples x 48 molecules
Key metals available: ['uranium', 'chromium', 'nickel', 'zinc', 'iron', 'copper', 'cadmium', 'lead', 'arsenic']



Unnamed: 0,uranium,chromium,nickel,zinc,iron,copper,cadmium,lead,arsenic
count,117.0,117.0,117.0,117.0,117.0,117.0,117.0,117.0,117.0
mean,10.442,0.225,4.756,0.73,13.682,0.368,0.107,0.015,0.003
std,43.556,0.886,16.328,1.658,39.705,1.423,0.391,0.042,0.006
min,0.0,0.0,0.001,0.076,0.017,0.001,0.0,0.0,0.0
25%,0.002,0.003,0.011,0.241,0.282,0.007,0.001,0.001,0.0
50%,0.005,0.009,0.038,0.426,1.392,0.027,0.002,0.004,0.001
75%,0.075,0.035,0.333,0.658,8.07,0.081,0.003,0.011,0.003
max,377.126,6.176,104.567,16.938,245.499,9.238,3.039,0.311,0.061


## 2. Extract Community Abundance

In [3]:
# Extract ASV counts from brick476 (80M rows) -- filter in Spark first
# brick476 is the table with 587 environmental communities and 108 geochemistry overlap
# brick459 had no environmental community matches

# Step 1: Get the community names for the 108 overlap samples
# First get all geochemistry sample names
geochem_sample_list = "','".join(geochem.index.tolist())

overlap_communities = spark.sql(f"""
    SELECT c.sdt_community_name, c.sdt_sample_name
    FROM enigma_coral.sdt_community c
    WHERE c.community_type_sys_oterm_name = 'Environmental Community'
      AND c.sdt_sample_name IN ('{geochem_sample_list}')
""").toPandas()
print(f"Overlap communities: {len(overlap_communities)}")
print(f"  Samples: {overlap_communities['sdt_sample_name'].nunique()}")

# Step 2: Register as temp view for Spark join
spark.createDataFrame(overlap_communities).createOrReplaceTempView("overlap_comms")

# Step 3: Extract ASV counts only for overlap communities (pushes filter to Spark)
asv_counts_raw = spark.sql("""
    SELECT b.sdt_asv_name, b.sdt_community_name,
           CAST(b.count_count_unit AS LONG) as read_count,
           oc.sdt_sample_name
    FROM enigma_coral.ddt_brick0000476 b
    JOIN overlap_comms oc ON b.sdt_community_name = oc.sdt_community_name
    WHERE CAST(b.count_count_unit AS LONG) > 0
""").toPandas()
print(f"\nASV counts (filtered to overlap): {len(asv_counts_raw):,} rows")
print(f"  Communities: {asv_counts_raw['sdt_community_name'].nunique()}")
print(f"  Samples: {asv_counts_raw['sdt_sample_name'].nunique()}")
print(f"  ASVs: {asv_counts_raw['sdt_asv_name'].nunique()}")

# Aggregate replicates: sum counts per ASV per sample
# brick476 has a replicate column -- aggregate across replicates
asv_counts = asv_counts_raw.groupby(
    ['sdt_asv_name', 'sdt_community_name', 'sdt_sample_name']
)['read_count'].sum().reset_index()
print(f"\nAfter aggregating replicates: {len(asv_counts):,} rows")

Overlap communities: 220
  Samples: 108



ASV counts (filtered to overlap): 135,504 rows
  Communities: 212
  Samples: 108
  ASVs: 47528

After aggregating replicates: 132,356 rows


## 3. Extract ASV Taxonomy

In [4]:
# Extract ASV -> genus mapping from brick454
asv_taxonomy = spark.sql("""
    SELECT sdt_asv_name, taxonomic_level_sys_oterm_name as tax_level, sdt_taxon_name as taxon
    FROM enigma_coral.ddt_brick0000454
    WHERE taxonomic_level_sys_oterm_name = 'Genus'
""").toPandas()
print(f"ASV genus taxonomy: {len(asv_taxonomy):,} rows")
print(f"  Unique ASVs: {asv_taxonomy['sdt_asv_name'].nunique()}")
print(f"  Unique genera: {asv_taxonomy['taxon'].nunique()}")

# Also get phylum-level for context
asv_phylum = spark.sql("""
    SELECT sdt_asv_name, sdt_taxon_name as phylum
    FROM enigma_coral.ddt_brick0000454
    WHERE taxonomic_level_sys_oterm_name = 'Phylum'
""").toPandas()
print(f"ASV phylum taxonomy: {len(asv_phylum):,} rows")

# Merge genus + phylum
asv_tax_full = asv_taxonomy[['sdt_asv_name', 'taxon']].rename(columns={'taxon': 'genus'})
asv_tax_full = asv_tax_full.merge(asv_phylum, on='sdt_asv_name', how='left')
print(f"\nCombined taxonomy: {len(asv_tax_full):,} ASVs with genus + phylum")

ASV genus taxonomy: 96,822 rows
  Unique ASVs: 96822
  Unique genera: 1830


ASV phylum taxonomy: 107,888 rows

Combined taxonomy: 96,822 ASVs with genus + phylum


## 4. Build Sample Metadata

In [5]:
# Sample metadata: sample -> location -> date
sample_meta = spark.sql("""
    SELECT s.sdt_sample_name, s.sdt_location_name, s.date, s.depth_meter,
           l.latitude_degree, l.longitude_degree, l.region
    FROM enigma_coral.sdt_sample s
    LEFT JOIN enigma_coral.sdt_location l
      ON s.sdt_location_name = l.sdt_location_name
""").toPandas()
print(f"Sample metadata: {len(sample_meta):,} samples")
print(f"  Locations: {sample_meta['sdt_location_name'].nunique()}")

Sample metadata: 4,346 samples
  Locations: 202


## 5. Find Overlapping Samples

In [6]:
# Identify the overlap samples (already filtered in Spark extraction above)
overlap_samples = set(asv_counts['sdt_sample_name'].unique())

print(f"Geochemistry samples: {len(geochem)}") 
print(f"Community samples (from brick476 overlap): {len(overlap_samples)}")

# Filter geochemistry to overlap
geochem_filtered = geochem.loc[geochem.index.isin(overlap_samples)].copy()
asv_counts_filtered = asv_counts.copy()  # already filtered
sample_meta_filtered = sample_meta[sample_meta['sdt_sample_name'].isin(overlap_samples)].copy()

print(f"\nFiltered geochemistry: {len(geochem_filtered)} samples")
print(f"Filtered ASV counts: {len(asv_counts_filtered):,} rows")
print(f"Filtered sample metadata: {len(sample_meta_filtered)} samples")

Geochemistry samples: 117
Community samples (from brick476 overlap): 108

Filtered geochemistry: 108 samples
Filtered ASV counts: 132,356 rows
Filtered sample metadata: 108 samples


## 6. Save Extracted Data

In [7]:
# Save geochemistry
geochem_filtered.to_csv(DATA_DIR / 'site_geochemistry.tsv', sep='\t')
print(f"Saved: site_geochemistry.tsv ({len(geochem_filtered)} samples x {geochem_filtered.shape[1]} molecules)")

# Save ASV counts (only non-zero, filtered to overlap samples)
asv_counts_filtered[['sdt_asv_name', 'sdt_community_name', 'sdt_sample_name', 'read_count']].to_csv(
    DATA_DIR / 'asv_counts.tsv', sep='\t', index=False
)
print(f"Saved: asv_counts.tsv ({len(asv_counts_filtered):,} non-zero counts)")

# Save ASV taxonomy
asv_tax_full.to_csv(DATA_DIR / 'asv_taxonomy.tsv', sep='\t', index=False)
print(f"Saved: asv_taxonomy.tsv ({len(asv_tax_full):,} ASVs)")

# Save sample metadata
sample_meta_filtered.to_csv(DATA_DIR / 'sample_metadata.tsv', sep='\t', index=False)
print(f"Saved: sample_metadata.tsv ({len(sample_meta_filtered)} samples)")

Saved: site_geochemistry.tsv (108 samples x 48 molecules)


Saved: asv_counts.tsv (132,356 non-zero counts)


Saved: asv_taxonomy.tsv (96,822 ASVs)
Saved: sample_metadata.tsv (108 samples)


In [8]:
print("=" * 60)
print("NB01 SUMMARY: ENIGMA Data Extraction")
print("=" * 60)
print(f"Overlap samples: {len(overlap_samples)}")
print(f"Geochemistry: {geochem_filtered.shape[1]} molecules measured")
print(f"  Uranium range: {geochem_filtered.get('uranium', pd.Series([0])).min():.3f} - {geochem_filtered.get('uranium', pd.Series([0])).max():.1f} µM")
print(f"Community: {asv_counts_filtered['sdt_asv_name'].nunique():,} ASVs in {asv_counts_filtered['sdt_community_name'].nunique()} communities")
print(f"Taxonomy: {asv_tax_full['genus'].nunique():,} genera")
print(f"Locations: {sample_meta_filtered['sdt_location_name'].nunique()}")
print("=" * 60)

NB01 SUMMARY: ENIGMA Data Extraction
Overlap samples: 108
Geochemistry: 48 molecules measured
  Uranium range: 0.000 - 188.2 µM
Community: 47,528 ASVs in 212 communities
Taxonomy: 1,830 genera
Locations: 97
