# Example Data Reduction

In [None]:
import os
os.environ['OPENTSDB_PYTHON_METRICS_TEST_MODE'] = 'True'
os.environ['API_ROOT'] = "https://archive-api.lco.global/"

In [None]:
from banzai.calibrations import make_master_calibrations
import requests
from banzai import settings
from banzai import dbs
from banzai.utils.stage_utils import run_pipeline_stages
import logging
from banzai.logs import set_log_level
from glob import glob

In [None]:
set_log_level('INFO')
logger = logging.getLogger('banzai')

In [None]:
os.makedirs('example_reduction', exist_ok=True)
os.environ['DB_ADDRESS'] = 'sqlite:///example_reduction/banzai-example.db'
settings.processed_path = os.path.join(os.getcwd(), 'example_reduction')
settings.fpack = True
settings.db_address = os.environ['DB_ADDRESS']
settings.reduction_level = 91

In [None]:
# set up the context object.
import banzai.main
context = banzai.main.parse_args(settings, parse_system_args=False)

In [None]:
# Configuration
# Set to True to download all files locally first, False to access directly from S3
DOWNLOAD_FILES_LOCALLY = True

## Download Data (Optional)
Two approaches are available:
- **Local download**: Download all files first, then process from disk
- **S3 direct**: Access files directly from archive during processing (default)

In [None]:
# We will be using example data from the LCO archive, from site LSC and instrument sq34

# make directories for the test dataset.
raw_data_dir = 'example_reduction/lsc/sq34/raw'
os.makedirs(raw_data_dir, exist_ok=True)
bpm_dir = 'example_reduction/lsc/sq34/bpm/'
os.makedirs(bpm_dir, exist_ok=True)

In [None]:
# Store the basename and archive frame_id for each of the frames we'll use in this example
bpm_filename = 'lsc0m409-sq34-20240314-bpm-central30x30.fits.fz'
bpm_id = '69359142'

raw_frames = {
    'lsc0m476-sq34-20250622-0058-f00.fits.fz': '84043242',
    'lsc0m476-sq34-20250622-0059-f00.fits.fz': '84043255',
    'lsc0m476-sq34-20250622-0060-f00.fits.fz': '84043260',
    'lsc0m476-sq34-20250622-0061-f00.fits.fz': '84043265',
    'lsc0m476-sq34-20250622-0062-f00.fits.fz': '84043272',
    'lsc0m476-sq34-20250625-0060-d00.fits.fz': '84133538',
    'lsc0m476-sq34-20250625-0061-d00.fits.fz': '84133758',
    'lsc0m476-sq34-20250625-0062-d00.fits.fz': '84134040',
    'lsc0m476-sq34-20250625-0063-d00.fits.fz': '84134220',
    'lsc0m476-sq34-20250625-0064-d00.fits.fz': '84134361',
    'lsc0m476-sq34-20250626-0018-d00.fits.fz': '84149171',
    'lsc0m476-sq34-20250626-0019-d00.fits.fz': '84149216',
    'lsc0m476-sq34-20250626-0020-d00.fits.fz': '84149302',
    'lsc0m476-sq34-20250626-0021-d00.fits.fz': '84149349',
    'lsc0m476-sq34-20250626-0022-d00.fits.fz': '84149433',
    'lsc0m476-sq34-20250626-0094-e00.fits.fz': '84151307',
    'lsc0m476-sq34-20250626-0368-e00.fits.fz': '84167225',
    'lsc0m476-sq34-20250626-0452-b00.fits.fz': '84172674',
    'lsc0m476-sq34-20250626-0453-b00.fits.fz': '84172677',
    'lsc0m476-sq34-20250626-0454-b00.fits.fz': '84172685',
    'lsc0m476-sq34-20250626-0455-b00.fits.fz': '84172689',
    'lsc0m476-sq34-20250626-0456-b00.fits.fz': '84172695',
    'lsc0m476-sq34-20250626-0457-b00.fits.fz': '84172701',
    'lsc0m476-sq34-20250626-0458-b00.fits.fz': '84172706',
    'lsc0m476-sq34-20250626-0459-b00.fits.fz': '84172716',
    'lsc0m476-sq34-20250626-0460-b00.fits.fz': '84172721',
    'lsc0m476-sq34-20250626-0461-b00.fits.fz': '84172730'
}

In [None]:
if DOWNLOAD_FILES_LOCALLY:
    # Download the data into the correct directory
    # Note that this won't download any files that already exist
    for filename, frame_id in raw_frames.items():
        if os.path.exists(os.path.join(raw_data_dir, filename)):
            continue
        archive_url = f'https://archive-api.lco.global/frames/{frame_id}'
        frame_info = requests.get(archive_url).json()
        with open(os.path.join(raw_data_dir, filename), 'wb') as f:
            f.write(requests.get(frame_info['url']).content)
else:
    print("Using S3 direct access - files will be accessed during processing")

# Download the bpm file (always needed locally for database setup)
bpm_archive_url = f'https://archive-api.lco.global/frames/{bpm_id}'
bpm_path = os.path.join(bpm_dir, bpm_filename)
if not os.path.exists(bpm_path):
    bpm_frame_info = requests.get(bpm_archive_url).json()
    with open(bpm_path, 'wb') as f:
        f.write(requests.get(bpm_frame_info['url']).content)


## Set up the database

In [None]:
os.system(f'banzai_create_db --db-address={os.environ["DB_ADDRESS"]}')

# This is the site and instrument of the test data we're using
os.system(f'banzai_add_site --site lsc --latitude -30.1673833333 --longitude -70.8047888889 --elevation 2198 --timezone -4 --db-address={os.environ["DB_ADDRESS"]}')
os.system(f'banzai_add_instrument --site lsc --camera sq34 --name sq34 --instrument-type 0m4-SciCam-QHY600 --nx 9600 --ny 6422 --db-address={os.environ["DB_ADDRESS"]}')

In [None]:
# Add the bpm to the database
logger.info(f'Adding bpm {bpm_filename} to the banzai database')
bpm_filepath = bpm_dir + bpm_filename
os.system(f'banzai_add_super_calibration {bpm_filepath} --db-address={os.environ["DB_ADDRESS"]}')


## Get the instrument record

In [None]:
instrument = dbs.get_instruments_at_site('lsc', settings.db_address)[0]

## Process and Stack Bias Files

In [None]:
if DOWNLOAD_FILES_LOCALLY:
    # Access files through local filesystem
    bias_files = glob(os.path.join('example_reduction/*/*/raw/*b00*'))
    for bias_file in bias_files:
        run_pipeline_stages([{'path': bias_file}], context)
else:
    # Directly access from S3
    bias_frames = {filename: frame_id for filename, frame_id in raw_frames.items() if 'b00' in filename}
    for filename, frame_id in bias_frames.items():
        run_pipeline_stages([{'filename': filename, 'frameid': frame_id}], context)

In [None]:
# Bias frames need to be manually verified for the first run because we don't have a super bias for automated comparisons
def mark_frames_as_good(filenames):
    counter = 0
    for filename in glob(f'example_reduction/*/*/*/processed/{filenames}'):
        counter += 1
        dbs.mark_frame(os.path.basename(filename), "good", db_address=os.environ['DB_ADDRESS'])
    print(f"Marked {counter} frames as good")

In [None]:
mark_frames_as_good('*b91*')

In [None]:
# The date range should contain the data we're using
min_date = '2024-01-01'
max_date = '2026-01-01'
make_master_calibrations(instrument, 'BIAS', min_date, max_date, context)

## Process and Stack Dark Files

In [None]:
if DOWNLOAD_FILES_LOCALLY:
    # Access files through local filesystem
    dark_files = glob(os.path.join('example_reduction/*/*/raw/*d00*'))
    for dark_file in dark_files:
        run_pipeline_stages([{'path': dark_file}], context)
else:
    # Directly access from S3
    dark_frames = {filename: frame_id for filename, frame_id in raw_frames.items() if 'd00' in filename}
    for filename, frame_id in dark_frames.items():
        run_pipeline_stages([{'filename': filename, 'frameid': frame_id}], context)

In [None]:

mark_frames_as_good('*d91*')

In [None]:

make_master_calibrations(instrument, 'DARK', '2024-01-01', '2026-01-01', context)

## Process and Stack Skyflats

In [None]:
if DOWNLOAD_FILES_LOCALLY:
    # Access files through local filesystem
    flat_files = glob(os.path.join('example_reduction/*/*/raw/*f00*'))
    for flat_file in flat_files:
        run_pipeline_stages([{'path': flat_file}], context)
else:
    # Directly access from S3
    flat_frames = {filename: frame_id for filename, frame_id in raw_frames.items() if 'f00' in filename}
    for filename, frame_id in flat_frames.items():
        run_pipeline_stages([{'filename': filename, 'frameid': frame_id}], context)

mark_frames_as_good('*f91*')
make_master_calibrations(instrument, 'SKYFLAT', '2024-01-01', '2026-01-01', context)

In [None]:
mark_frames_as_good('*f91*')

In [None]:
make_master_calibrations(instrument, 'SKYFLAT', '2024-01-01', '2026-01-01', context)

## Process the Science Exposure

In [None]:
if DOWNLOAD_FILES_LOCALLY:
    # Access files through local filesystem
    science_files = glob(os.path.join('example_reduction/*/*/raw/*e00*'))
    for science_file in science_files:
        run_pipeline_stages([{'path': science_file}], context)
else:
    # Directly access from S3
    science_frames = {filename: frame_id for filename, frame_id in raw_frames.items() if 'e00' in filename}
    for filename, frame_id in science_frames.items():
        run_pipeline_stages([{'filename': filename, 'frameid': frame_id}], context)
