In [None]:
# Purpose: install required libraries into this environment (safe to re-run)


import sys, subprocess, pkgutil
def _pip_install(pkgs):
    subprocess.check_call([sys.executable, "-m", "pip", "install", *pkgs])
need = [p for p in ("requests","pandas") if pkgutil.find_loader(p) is None]
if need:
    _pip_install(need)
print("OK")


OK


  need = [p for p in ("requests","pandas") if pkgutil.find_loader(p) is None]


In [None]:
# Purpose: import libraries and set API endpoint, headers, and sorting criteria
import json, time
from typing import Dict, Any
import requests
import pandas as pd


BASE_URL = "https://www.unoosa.org/oosa/osoindex/waxs-search.json"


SORTINGS = [
    {"fieldName": "object.launch.dateOfLaunch_s1", "dir": "desc"},
    {"fieldName": "object.nationalDesignator_s1", "dir": "asc"},
]


HEADERS = {
    "Accept": "application/json, text/plain, */*",
    "Referer": "https://www.unoosa.org/oosa/osoindex/search-ng.jspx",
    "User-Agent": "Mozilla/5.0",
}


# If your session works anonymously, leave COOKIES empty; otherwise paste fresh cookies from DevTools.
COOKIES = {
    # "JSESSIONID": "REPLACE_IF_NEEDED",
    # "_ga": "REPLACE_IF_NEEDED",
}



In [None]:
# Purpose: define small helpers to flatten nested dicts and fetch one page from the API
def flatten(d: Dict[str, Any], parent: str = "", sep: str = ".") -> Dict[str, Any]:
    out = {}
    for k, v in (d or {}).items():
        nk = f"{parent}{sep}{k}" if parent else k
        if isinstance(v, dict):
            out.update(flatten(v, nk, sep))
        else:
            out[nk] = v
    return out

def fetch_page(start_at: int) -> Dict[str, Any]:
    criteria = {"filters": [], "startAt": start_at, "sortings": SORTINGS}
    resp = requests.get(
        BASE_URL,
        headers=HEADERS,
        cookies=COOKIES,
        params={"criteria": json.dumps(criteria)},
        timeout=30,
    )
    resp.raise_for_status()
    return resp.json()


In [5]:
# Purpose: probe the API to detect total records and page size so we paginate correctly
first = fetch_page(0)
found = int(first.get("found", 0))
results0 = first.get("results", []) or []
page_size = len(results0)
print({"found": found, "page_size": page_size})
if page_size == 0:
    raise RuntimeError("No results returned; add cookies above or recheck the endpoint.")


{'found': 21289, 'page_size': 15}


In [6]:
# Purpose: collect all records by stepping startAt in exact page_size increments with basic retries
all_rows = []
seen_ids = set()

def add_results(results):
    added = 0
    for item in results:
        rid = item.get("id") or item.get("uri") or json.dumps(item.get("values", {}), sort_keys=True)
        if rid in seen_ids:
            continue
        seen_ids.add(rid)
        all_rows.append(flatten(item.get("values", {})))
        added += 1
    return added

# process the first page
add_results(results0)

# iterate remaining pages
start_at = page_size
while start_at < found:
    print(f"Fetching records starting at {start_at}...")
    try:
        page = fetch_page(start_at)
        results = page.get("results", []) or []
        if not results:
            time.sleep(1.0)
            page = fetch_page(start_at)
            results = page.get("results", []) or []
            if not results:
                print("Empty page encountered twice; stopping early.")
                break
        add_results(results)
        start_at += page_size
        time.sleep(0.05)
    except Exception as e:
        print(f"Error at startAt={start_at}: {e} (retrying once)")
        time.sleep(1.0)
        try:
            page = fetch_page(start_at)
            results = page.get("results", []) or []
            add_results(results)
            start_at += page_size
        except Exception as e2:
            print(f"Failed again at startAt={start_at}: {e2} (skipping forward)")
            start_at += page_size

print(f"Collected rows: {len(all_rows)} of reported {found}")


Fetching records starting at 15...
Fetching records starting at 30...
Fetching records starting at 45...
Fetching records starting at 60...
Fetching records starting at 75...


KeyboardInterrupt: 

In [None]:
# Purpose: auto-detect real column names that correspond to your desired headers, then save CSV

import re, os, pandas as pd

df = pd.DataFrame(all_rows)

# Quick peek so you can see what the API actually returned
print("Total cols:", len(df.columns))
print("Sample cols:", list(df.columns)[:25])

# Desired headers in final CSV, in order
target_headers = [
    "Name_of_Space_Object",
    "International_Designator",
    "National_Designator_s1",
    "Launch_Date",
    "Launching_State",
    "Launch_Site",
    "Launch_Vehicle",
    "Basic_Perigee",
    "Basic_Apogee",
    "Basic_Inclination",
    "Current_Function",
    "Status",
]


# Helper: find the best matching column in df.columns for any of the patterns (prefer longest name)
def find_col(pattern_list, cols):
    matches = []
    for pat in pattern_list:
        rx = re.compile(pat, flags=re.I)
        for c in cols:
            if rx.search(c):
                matches.append(c)
    # prefer the longest (often most specific) column name
    matches = sorted(set(matches), key=len, reverse=True)
    return matches[0] if matches else None

# Build a dynamic mapping from REAL column names -> your desired headers
dynamic_map = {}
unresolved = []
for header in target_headers:
    real_col = find_col(patterns[header], df.columns)
    if real_col:
        dynamic_map[real_col] = header
    else:
        unresolved.append(header)

print("\nColumn mapping (real -> desired):")
for k, v in dynamic_map.items():
    print(f"  {k}  ->  {v}")
if unresolved:
    print("\n⚠ Could not find these headers in the data:", unresolved)

# Create the output DataFrame in your exact column order (missing columns filled with empty string)
df_named = pd.DataFrame()
for header in target_headers:
    # if we found a real column for it, use it; else create an empty column
    real = next((k for k, v in dynamic_map.items() if v == header), None)
    if real and real in df.columns:
        df_named[header] = df[real]
    else:
        df_named[header] = ""

# Save to CSV
out_path = "unoosa_objects.csv"
df_named.to_csv(out_path, index=False, encoding="utf-8")
print(f"\nSaved {len(df_named)} rows and {len(df_named.columns)} columns to {os.path.abspath(out_path)}")


Total cols: 23
Sample cols: ['object.internationalDesignator_s1', 'object.internationalDesignator@official_s1', 'object.nationalDesignator_s1', 'object.nameOfSpaceObjectIno_s1', 'object.nameOfSpaceObjectO_s1', 'object.launch.stateOfRegistry_s1', 'object.launch.stateOfRegistry@official_s1', 'object.launch.dateOfLaunch_s1', 'object.status.gsoLocation_s1', 'object.unRegistration.unRegistered_s1', 'en#object.status.objectStatus_s1', 'object.status@official_s1', 'object.status.dateOfDecay_s1', 'object.launch.dateOfLaunch@official_s1', 'object.status.dateOfDecay@official_s1', 'object.functionOfSpaceObject_s1', 'object.remark_s1', 'object.status.webSite_s1', 'object.unRegistration.registrationDocuments.document@uri_s', 'object.unRegistration.registrationDocuments.document..document.symbol_s', 'object.status.gsoLocation@official_s1', 'object.unRegistration.decayDocuments.document@uri_s', 'object.unRegistration.decayDocuments.document..document.symbol_s']

Column mapping (real -> desired):
  ob

In [None]:
# Purpose: quick sanity checks (peek at columns and top launching states if present)
print("Columns:", list(df.columns)[:20], " ...")
col = "object.launch.launchingState_s1"
if col in df.columns:
    print(df[col].value_counts().head(10))
df_named.head()


Columns: ['object.internationalDesignator_s1', 'object.internationalDesignator@official_s1', 'object.nationalDesignator_s1', 'object.nameOfSpaceObjectIno_s1', 'object.nameOfSpaceObjectO_s1', 'object.launch.stateOfRegistry_s1', 'object.launch.stateOfRegistry@official_s1', 'object.launch.dateOfLaunch_s1', 'object.status.gsoLocation_s1', 'object.unRegistration.unRegistered_s1', 'en#object.status.objectStatus_s1', 'object.status@official_s1', 'object.status.dateOfDecay_s1', 'object.launch.dateOfLaunch@official_s1', 'object.status.dateOfDecay@official_s1', 'object.functionOfSpaceObject_s1', 'object.remark_s1', 'object.status.webSite_s1', 'object.unRegistration.registrationDocuments.document@uri_s', 'object.unRegistration.registrationDocuments.document..document.symbol_s']  ...


Unnamed: 0,Name_of_Space_Object,International_Designator,National_Designator_s1,Launch_Date,Launching_State,Launch_Site,Launch_Vehicle,Basic_Perigee,Basic_Apogee,Basic_Inclination,Current_Function,Status
0,,2025-085Q,,2025-04-28,,,,,,,,in orbit
1,,2025-085S,,2025-04-28,,,,,,,,in orbit
2,,2025-085T,,2025-04-28,,,,,,,,in orbit
3,,2025-085U,,2025-04-28,,,,,,,,in orbit
4,,2025-085V,,2025-04-28,,,,,,,,in orbit


In [None]:
# Purpose: optional resume utility to fetch remaining if interrupted (run only if you need to resume)
already = len(df)
if already < found:
    print(f"Resuming from row {already} of {found}...")
    start_at = already - (already % page_size)
    while start_at < found:
        print(f"Fetching records starting at {start_at}...")
        results = fetch_page(start_at).get("results", []) or []
        if not results:
            break
        add_results(results)
        start_at += page_size
    df2 = pd.DataFrame(all_rows)
    df2 = df2.loc[:, ordered]
    df2.to_csv(out_path, index=False, encoding="utf-8")
    print("Resumed and updated:", df2.shape)
else:
    print("Resume not needed.")


Resume not needed.
