In [None]:
# --- 0. Setup and Imports ---
print("--- 0. Setting up Colab environment and imports ---")
# Install the necessary text-splitter library (Colab specific)
!pip install --quiet langchain-text-splitters
import json
import pandas as pd
import gzip
import requests
import io
import time
from langchain_text_splitters import RecursiveCharacterTextSplitter

# --- Configuration ---
URL = "https://ntrs.staging.sti.appdat.jsc.nasa.gov/api/docs/ntrs-public-metadata.json.gz?attachment=true"
CHUNK_SIZE = 512
CHUNK_OVERLAP = 50

# --- Helper Function: Load Data (Download, Decompress, Load) ---
def load_data_from_url(url):
    """Downloads, decompresses, and loads the single JSON object into a DataFrame."""
    start_time = time.time()
    print(f"\n1. Starting Data Acquisition and Loading from URL...")
    try:
        # 1. Download the file content
        print("   📥 Downloading compressed file...")
        response = requests.get(url, stream=True)
        response.raise_for_status()

        # 2. Decompress
        print("   📦 Decompressing gzip file...")
        compressed_file = io.BytesIO(response.content)

        # Use gzip to open the compressed content as text
        with gzip.open(compressed_file, 'rt', encoding='utf-8') as f:
            # 3. Load the single JSON object
            print("   📖 Parsing JSON data...")
            data = json.load(f)

        print(f"   ✅ Data loaded. Type: {type(data)}. Size: {len(data):,} items.")

        # 4. Convert to DataFrame
        records = []
        if isinstance(data, dict):
            for doc_id, doc_data in data.items():
                if isinstance(doc_data, dict):
                    record = doc_data.copy()
                    record['document_id'] = doc_id
                    records.append(record)

            df = pd.DataFrame(records)
            print(f"   ✅ Converted to DataFrame with {len(df):,} rows.")
            return df
        else:
            print("   ❌ Error: Data root structure is not a dictionary.")
            return None

    except requests.RequestException as e:
        print(f"   ❌ ERROR: Network/Download error: {e}")
        return None
    except (gzip.BadGzipFile, json.JSONDecodeError) as e:
        print(f"   ❌ ERROR: Decompression or JSON parsing error: {e}")
        return None
    except Exception as e:
        print(f"   ❌ An unexpected error occurred: {e}")
        return None
    finally:
        print(f"   (Data Acquisition took {time.time() - start_time:.2f} seconds)")


# --- Step 1: Data Cleaning and Preprocessing ---
def preprocess_data(df):
    """Cleans, preprocesses, and validates the data."""
    start_time = time.time()
    print("\n2. Starting Data Cleaning and Preprocessing...")

    # --- Data Cleaning and Validation ---
    df['abstract'] = df['abstract'].fillna("").astype(str)
    df['keywords'] = df['keywords'].apply(lambda x: x if isinstance(x, list) else [])

    initial_count = len(df)
    df = df[df['title'].astype(str).str.strip().str.len() > 0]
    print(f"   - Validation: Filtered {initial_count - len(df)} records with missing titles.")

    # --- Data Feature Engineering (Flattening) ---
    def flatten_authors(affiliations):
        if not isinstance(affiliations, list): return ""
        author_info = []
        for item in affiliations:
            try:
                if 'meta' in item and 'author' in item['meta']:
                    name = item['meta']['author'].get('name', '')
                    if name: author_info.append(name)
            except:
                continue
        return ", ".join(sorted(list(set(author_info))))

    def list_to_string(item_list):
        if not isinstance(item_list, list): return ""
        return " | ".join(str(item) for item in item_list)

    df['authors_flat'] = df['authorAffiliations'].apply(flatten_authors)
    df['keywords_flat'] = df['keywords'].apply(list_to_string)

    # --- Core Text Generation (The RAG Source) ---
    df['text_source'] = (
        "TITLE: " + df['title'].astype(str) +
        "\nABSTRACT: " + df['abstract'].astype(str) +
        "\nAUTHORS: " + df['authors_flat'].astype(str) +
        "\nKEYWORDS: " + df['keywords_flat'].astype(str)
    )

    df_rag = df[['document_id', 'title', 'abstract', 'text_source']].copy()

    print(f"   ✅ Preprocessing complete. Final record count: {len(df_rag):,}.")
    print(f"   (Preprocessing took {time.time() - start_time:.2f} seconds)")
    return df_rag


# --- Step 2: Chunking Strategy Implementation ---
def chunk_data(df_rag):
    """Implements intelligent chunking."""
    start_time = time.time()
    print("\n3. Starting Intelligent Chunking...")

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP,
        separators=["\n\n", "\n", ".", " ", ""]
    )

    chunks = []
    total_docs = len(df_rag)

    for index, row in df_rag.iterrows():
        doc_text = row['text_source']
        doc_id = row['document_id']
        doc_title = row['title']

        text_chunks = text_splitter.split_text(doc_text)

        for i, chunk_text in enumerate(text_chunks):
            chunks.append({
                'document_id': doc_id,
                'title': doc_title,
                'chunk_id': f"{doc_id}-{i+1}",
                'chunk_text': chunk_text,
                'chunk_size': len(chunk_text)
            })

    df_chunks = pd.DataFrame(chunks)

    print(f"   ✅ Chunking complete. Total documents processed: {total_docs:,}")
    print(f"   Total chunks created: {len(df_chunks):,}")
    print(f"   (Chunking took {time.time() - start_time:.2f} seconds)")
    return df_chunks


# --- Step 3: Pipeline Documentation and Execution ---
def run_pipeline():
    """Executes the full pipeline and provides documentation."""

    pipeline_doc = f"""
============================================================
PIPELINE DOCUMENTATION: NASA NTRS METADATA RAG PREPARATION
============================================================

1. DATA ACQUISITION & LOADING:
- Method: Direct download via Python 'requests' and in-memory decompression with 'gzip'.
- Source: {URL}

2. DATA CLEANING, PREPROCESSING & VALIDATION:
- **Core Text Generation**: Master field (`text_source`) created from TITLE, ABSTRACT, AUTHORS, and KEYWORDS.
- **Validation**: Records with missing titles are dropped.

3. CHUNKING STRATEGY:
- **Tool**: RecursiveCharacterTextSplitter.
- **Goal**: Maintain semantic boundaries.
- **Parameters**: Chunk Size: {CHUNK_SIZE}, Chunk Overlap: {CHUNK_OVERLAP}.

4. FINAL OUTPUT:
- DataFrame `df_chunks` ready for vector embedding.
============================================================
"""
    print(pipeline_doc)

    # Execution
    df_raw = load_data_from_url(URL)
    if df_raw is None:
        return None, None

    df_rag = preprocess_data(df_raw)

    df_chunks = chunk_data(df_rag)

    print(f"\n--- Final Output: df_chunks Head (First Chunk) ---")
    print(df_chunks.head(1).T)
    print(f"\nPipeline Execution Complete! Total final chunks: {len(df_chunks):,}")

    # *** KEY CHANGE: Return the DataFrames ***
    return df_rag, df_chunks

# This execution block needs to be run in the *same cell* as the functions.
# It assigns the returned values to global variables.
# You can delete the original 'if __name__ == "__main__":' block if it was in a separate cell.

# *** Assign the returned DataFrames to global variables ***
global df_rag, df_chunks
df_rag, df_chunks = run_pipeline()

In [1]:
# View the full combined text source for the first document
print("\n--- Full Combined Text Source for the First Document (df_rag) ---")
doc_index = 0
print(f"Document ID: {df_rag['document_id'].iloc[doc_index]}")
print("---")
print(df_rag['text_source'].iloc[doc_index])

# View the first 5 chunks to see how the text was split
print("\n--- Preview of the Final Chunks (df_chunks) ---")
print(f"Total Chunks: {len(df_chunks):,}\n")
print(df_chunks.head().to_markdown(index=False, numalign="left", stralign="left"))


--- Full Combined Text Source for the First Document (df_rag) ---


NameError: name 'df_rag' is not defined