# Product Confirmation Workflow

This notebook downloads DIST-ALERT products from S3, unzips them, and runs the confirmation workflow.

In [6]:
import pandas as pd
import shutil
import zipfile
import requests
from pathlib import Path
from tqdm.auto import tqdm
from concurrent.futures import ThreadPoolExecutor
from dist_s1 import run_sequential_confirmation_of_dist_products_workflow
from utils import unzip_dist_s1_prod, wrap_run_sequential_confirmation_of_dist_products_workflow
import multiprocessing


In [2]:
tmp_dir =  Path('tmp')
unconfirmed_products_dir =  Path('unconfirmed_products')
confirmed_products_dir =  Path('confirmed_products')

tmp_dir.mkdir(exist_ok=True)
unconfirmed_products_dir.mkdir(exist_ok=True)
confirmed_products_dir.mkdir(exist_ok=True)

In [3]:
# Load the test products CSV
csv_path = Path('val_products_minus_one.csv')
df = pd.read_csv(csv_path)
df.head()

Unnamed: 0,zip_url,browse_url,product_request_time,processing_duration,high_confidence_alert_threshold,mgrs_tile_id,post_date_buffer_days,stride_for_norm_param_estimation,n_workers_for_norm_param_estimation,delta_lookback_days_mw,...,model_source,memory_strategy,batch_size_for_norm_param_estimation,post_date,track_number,low_confidence_alert_threshold,n_workers_for_despeckling,device,max_pre_imgs_per_burst_mw,model_compilation
0,https://hyp3-tibet-jpl-test-contentbucket-hrat...,https://hyp3-tibet-jpl-test-contentbucket-hrat...,2025-09-30T18:19:50+00:00,1492.944,4.5,45TUK,1,7,4,none,...,transformer_optimized,high,32,2024-05-05,114,2.5,4,best,none,False
1,https://hyp3-tibet-jpl-test-contentbucket-hrat...,https://hyp3-tibet-jpl-test-contentbucket-hrat...,2025-09-30T18:19:50+00:00,778.462,4.5,45TUK,1,7,4,none,...,transformer_optimized,high,32,2024-10-25,12,2.5,4,best,none,False
2,https://hyp3-tibet-jpl-test-contentbucket-hrat...,https://hyp3-tibet-jpl-test-contentbucket-hrat...,2025-09-30T18:19:50+00:00,1609.243,4.5,45TUK,1,7,4,none,...,transformer_optimized,high,32,2024-11-13,114,2.5,4,best,none,False
3,https://hyp3-tibet-jpl-test-contentbucket-hrat...,https://hyp3-tibet-jpl-test-contentbucket-hrat...,2025-09-30T18:19:50+00:00,590.068,4.5,45TUK,1,7,4,none,...,transformer_optimized,high,32,2024-12-12,12,2.5,4,best,none,False
4,https://hyp3-tibet-jpl-test-contentbucket-hrat...,https://hyp3-tibet-jpl-test-contentbucket-hrat...,2025-09-30T18:19:50+00:00,791.816,4.5,45TUK,1,7,4,none,...,transformer_optimized,high,32,2024-08-14,12,2.5,4,best,none,False


In [4]:
def download_file(url, destination_path):
    response = requests.get(url, stream=True)
    response.raise_for_status()
    
    with open(destination_path, 'wb') as file:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                file.write(chunk)
    
    return destination_path


In [5]:
download_tasks = []
for _, row in df.iterrows():
    url = row['zip_url']
    filename = Path(url).name
    zip_path = tmp_dir / filename
    download_tasks.append((url, zip_path))

# Download all files concurrently
downloaded_files = []
with ThreadPoolExecutor(max_workers=10) as executor:
    with tqdm(total=len(download_tasks), desc="Downloading files", unit="file") as pbar:
        future_to_task = {executor.submit(download_file, url, dest_path): dest_path for url, dest_path in download_tasks}
        
        for future in future_to_task:
            file_path = future.result()
            downloaded_files.append(file_path)
            pbar.update(1)

print(f"Downloaded {len(downloaded_files)} files")

Downloading files: 100%|█| 5303/5303 [43:33<00:0

Downloaded 5303 files





# Unzip

In [2]:
downloaded_zips = list(Path('tmp/').glob('*.zip'))
len(downloaded_zips)

5303

In [10]:
num_processes = 5 
print('Total processes: ', multiprocessing.cpu_count())
print(f"Using {num_processes} processes for unzipping.")

with multiprocessing.Pool(processes=num_processes) as pool:
    results = pool.imap(unzip_dist_s1_prod, downloaded_zips[:])
    for _ in tqdm(results, total=len(downloaded_zips), desc="Unzipping Files"):
        pass

Total processes:  224
Using 25 processes for unzipping.


Unzipping Files: 100%|█| 5303/5303 [20:41<00:00,


In [3]:
subdirs = list(Path('unconfirmed_products').glob('*/'))
mgrs_tiles_unzipped = [subdir.name for subdir in subdirs]
mgrs_tiles_unzipped[:3]

['45TUK', '21JTF', '33QTU']

In [14]:
# cleanup_temp = True
# if cleanup_temp:
#     shutil.rmtree(tmp_dir)

# Confirmation

In [17]:
# %%time

# for mgrs_tile_id in tqdm(mgrs_tiles_unzipped[2:]):
#     # Run the confirmation workflow
#     run_sequential_confirmation_of_dist_products_workflow(
#         unconfirmed_products_dir / mgrs_tile_id, 
#         confirmed_products_dir / mgrs_tile_id
#     )

In [9]:
with multiprocessing.Pool(processes=10) as pool:
    results = pool.imap(wrap_run_sequential_confirmation_of_dist_products_workflow, mgrs_tiles_unzipped[18:])
    for _ in tqdm(results, total=len(mgrs_tiles_unzipped), desc="Confirming Products"):
        pass

Confirming 30 products: 100%|█| 30/30 [02:17<00/
Confirming 25 products: 100%|█| 25/25 [03:39<00
Confirming 30 products: 100%|█| 30/30 [03:58<00
Confirming 27 products: 100%|█| 27/27 [03:47<00
Confirming 61 products: 100%|█| 61/61 [08:09<00
Confirming 60 products: 100%|█| 60/60 [08:15<00
Confirming 57 products: 100%|█| 57/57 [08:34<005
Confirming 77 products: 100%|█| 77/77 [09:08<00
Confirming 10 products: 100%|█| 10/10 [01:20<00
Confirming 89 products: 100%|█| 89/89 [11:40<00
Confirming 39 products: 100%|█| 39/39 [05:23<00
Confirming 119 products: 100%|█| 119/119 [13:42
Confirming 57 products: 100%|█| 57/57 [05:11<00
Confirming 84 products: 100%|█| 84/84 [13:50<00
Confirming 92 products: 100%|█| 92/92 [11:40<002
Confirming 52 products: 100%|█| 52/52 [07:30<00,
Confirming 89 products: 100%|█| 89/89 [12:34<00
Confirming 58 products: 100%|█| 58/58 [07:17<001
Confirming 58 products: 100%|█| 58/58 [07:17<00
Confirming 28 products: 100%|█| 28/28 [04:38<00
Confirming 55 products: 100%|█| 55/