\* Necessary to import from src

In [1]:
import sys
from pathlib import Path

# Add parent directory to Python path
parent_dir = Path().resolve().parent
sys.path.insert(0, str(parent_dir))

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from minio import Minio
from minio.error import S3Error
from time import sleep
import requests
import json
from collections import deque
from io import BytesIO
from typing import Self, Optional
import logging
import pickle
from datetime import datetime
from src.transformations import *
from src.config import *
from src.queue_manager import queue
from src.minio_client import *
from src.custom_exceptions import *


logging.basicConfig(filename='NeoPipeline.log', level=logging.DEBUG)
logger = logging.getLogger(__name__)

In [3]:
# Default parameters or parameters set by Airflow
api_key_param = NASA_NEO_API_KEY
api_uri_param = NASA_NEO_URI
start_date_param = '2025-05-02'
end_date_param = '2025-05-09'
bucket_name_param = 'neo'
mode = 'silver'

In [4]:
# Parameters
api_key_param = "Sfn0wfG6FG6E3D5Hu8MrxSja38yMXftWqboKv6ZH"
api_uri_param = "https://api.nasa.gov/neo/rest/v1/feed?"
start_date_param = "2025-01-15"
end_date_param = "2025-01-21"
bucket_name_param = "neo"
mode = "bronze"


In [5]:
# Connect to Minio blob storage    
minio_client = create_minio_client()

In [6]:
class NeoApiClient:
    # Instantiate current task API client
    def __init__(self, 
                 api_key,
                 api_uri,
                 start_date, 
                 end_date, 
                 storage, 
                 bucket_name,
                 mode):
        
        """
        Initialize NASA NEO API client.
        
        Args:
            api_key: NASA API authentication key
            api_uri: Base URI for NASA NEO API  
            start_date: Query start date (YYYY-MM-DD format)
            end_date: Query end date (YYYY-MM-DD format)
            storage: MinIO client instance for data storage
            bucket_name: Target storage bucket name
            mode: Data processing mode ('bronze', 'silver', 'gold')
            
        Raises:
            ValueError: If date range or mode parameters are invalid
        """
        
        logger.debug(f"Initializing NeoApiClient for date range {start_date} to {end_date}")
        
        # Validate inputs
        self._validate_inputs(start_date, end_date, mode)
        
        # set attributes
        self.key = api_key
        self.api_uri = api_uri
        self.start_date = start_date
        self.end_date = end_date
        self.storage = storage
        self.bucket_name = bucket_name
        self.mode = mode
        
        # Initialize state
        self.data: Optional[dict | pyspark.sql.DataFrame] = None
        self.spark: Optional[SparkSession] = None
        
        logger.debug(f"NeoApiClient initialized for bucket '{bucket_name}' in {mode} mode")
        
    # ================================================================================================
    # ATTRIBUTE VALIDATION METHODS
    # ================================================================================================
        
    def _validate_inputs(self, start_date: str, end_date: str, mode: str) -> None:
        """Validate constructor parameters."""

        # Validate mode
        valid_modes = {'bronze', 'silver', 'gold'}
        if mode not in valid_modes:
            raise ValueError(f"Mode must be one of {valid_modes}, got: {mode}")

        # Valid date values
        # can be included in the future

    # ================================================================================================
    # QUEUE MANAGEMENT METHODS
    # ================================================================================================
    
    def _update_queue(self, mode: str, object_name: Optional[str] = None) -> Optional[str]:

        """
        Update a persistent queue stored in object storage..

        Args:
            mode: Operation mode - 'in' to add item, 'out' to remove item
            object_name: Item to add to queue (required for 'in' mode)

        Returns:
            str: Item removed from queue (for 'out' mode)
            None: For 'in' mode or when queue is empty

        Raises:
            ValueError: If mode is invalid or object_name missing for 'in' mode
            QueueOperationError: If storage operations fail
        """
            
        # Validate inputs
        if mode not in {'in', 'out'}:
            raise ValueError(f"Invalid mode '{mode}'. Must be 'in' or 'out'")

        if mode == 'in' and object_name is None:
            raise ValueError("object_name is required for 'in' mode")

        logger.debug(f"Queue operation: mode={mode}, object_name={object_name}")
        
        try:
            # Select update mode
            match mode:
                case 'in':
                    queue.add_to_queue(item=object_name)

                case 'out':
                    item = queue.get_from_queue()

                    return item
            
        except Exception as e:
            logger.error(f"Queue operation failed: mode={mode}, error={e}")
            raise QueueOperationError(f"Failed to {mode} queue: {e}")
    
    # ================================================================================================
    # SPARK SESSION MANAGEMENT METHODS
    # ================================================================================================
     
    def create_spark_session(self, mode: Optional[str] = None) -> SparkSession:
        """
        Create a Spark session configured for Iceberg and MinIO S3A integration.

        Configures Spark with the necessary JARs and settings for:
        - Apache Iceberg table format support
        - MinIO S3-compatible object storage access
        - Hadoop S3A filesystem integration

        Args:
            mode: Processing mode ('bronze', 'silver', 'gold'). 
                  If None, uses self.mode

        Returns:
            SparkSession: Configured Spark session ready for data processing

        Raises:
            SparkSessionError: If session creation fails
            FileNotFoundError: If required JAR files are missing
            ValueError: If mode is invalid

        Example:
            spark = client.create_spark_session('bronze')
            df = spark.read.table('bronze_catalog.neo_data')
        """

        # Validate and set mode
        mode = self._validate_and_set_mode(mode)

        # Build configuration
        config = self._build_spark_config(mode)

        # Create session with error handling
        return self._create_session_with_retry(config, mode)

    
    def _validate_and_set_mode(self, mode: Optional[str]) -> str:
        """Validate and return the processing mode."""
        if not mode:
            mode = self.mode
            logger.debug(f"Using default mode: {mode}")

        valid_modes = {'bronze', 'silver', 'gold'}
        if mode not in valid_modes:
            raise ValueError(f"Invalid mode '{mode}'. Must be one of {valid_modes}")

        logger.debug(f"Using processing mode: {mode}")
        return mode

    
    def _build_spark_config(self, mode: str) -> dict:
        """Build Spark configuration dictionary."""
        logger.debug(f"Building Spark configuration for mode: {mode}")

        import pyspark
        print(pyspark.__version__)
        config = {
            # Application
            "spark.app.name": f"NASA_NEO_{mode.capitalize()}_Data",

            # Iceberg configuration
            "spark.jars.packages": ",".join(["org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.3",
                                             # "org.apache.iceberg:iceberg-core:1.9.1",
                                             "org.apache.hadoop:hadoop-aws:3.3.4",
                                             "org.apache.hadoop:hadoop-common:3.3.4",
                                             "com.amazonaws:aws-java-sdk-bundle:1.12.367",
                                             # "org.apache.iceberg:iceberg-aws-bundle:1.4.3",
                                             # "software.amazon.awssdk:bundle:2.20.18"
                                            ]),
            
            "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
            "spark.sql.catalog.iceberg_catalog": "org.apache.iceberg.spark.SparkCatalog",
            "spark.sql.catalog.iceberg_catalog.type": "hadoop",
            "spark.sql.catalog.iceberg_catalog.warehouse": f"s3a://{self.bucket_name}",
            "spark.sql.catalog.iceberg_catalog.io-impl": "org.apache.iceberg.hadoop.HadoopFileIO",
            # "spark.sql.catalog.iceberg_catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
            "spark.sql.catalog.iceberg_catalog.s3.endpoint": f"http://{MINIO_ENDPOINT}",
            # "spark.sql.catalog.iceberg_catalog.s3.region": "us-east-1",
            # "spark.sql.defaultCatalog": "iceberg_catalog",
            
            # Hadoop S3a configuration
            "spark.hadoop.fs.s3a.endpoint": f"http://{MINIO_ENDPOINT}",
            "spark.hadoop.fs.s3a.access.key": MINIO_ROOT_USER,
            "spark.hadoop.fs.s3a.secret.key": MINIO_ROOT_PASSWORD,
            "spark.hadoop.fs.s3a.path.style.access": "true",
            "spark.hadoop.fs.s3a.connection.ssl.enabled": "false",
            "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
            "spark.hadoop.fs.s3a.endpoint.region": "us-east-1",
            "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",

            # Performance Configuration
            "spark.sql.adaptive.enabled": "false",
            "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
            "spark.sql.adaptive.coalescePartitions.enabled": "false",
            "spark.sql.adaptive.skewJoin.enabled": "false"
        }

            
        logger.debug(f"Built configuration with {len(config)} settings")
        return config

    
    def _create_session_with_retry(self, config: dict, mode: str, max_retries: int = 3) -> SparkSession:
        """Create Spark session with retry logic and proper error handling."""

        for attempt in range(max_retries):
            try:

                # Build new session
                builder = SparkSession.builder

                # Apply all configuration
                for key, value in config.items():
                    builder = builder.config(key, value)

                # Create session
                spark = builder.getOrCreate()

                # Validate session
                self._validate_session(spark, mode)

                # Save session
                self.spark = spark

                logger.info(f"Successfully created Spark session: {spark.sparkContext.appName}")
                
                return spark

            except Exception as e:
                attempt_num = attempt + 1
                logger.warning(f"Spark session creation attempt {attempt_num}/{max_retries} failed: {e}")

                if attempt_num == max_retries:
                    logger.error(f"Failed to create Spark session after {max_retries} attempts")
                    raise SparkSessionError(f"Could not create Spark session: {e}")

                # Brief pause before retry
                import time
                time.sleep(2 ** attempt)  # Exponential backoff

        
    def _validate_session(self, spark: SparkSession, mode: str) -> None:
        """Validate that the Spark session is working correctly."""
        try:
            # Test basic functionality
            spark.sql("SELECT 1").collect()
            logger.debug("Spark session basic functionality validated")

        except Exception as e:
            logger.error(f"Spark session validation failed: {e}")
            raise SparkSessionError(f"Session validation failed: {e}")

            
    def close_spark_session(self) -> None:
        """Properly close the Spark session and clean up resources."""
        if hasattr(self, 'spark') and self.spark:
            try:
                logger.info("Closing Spark session...")
                self.spark.stop()
                self.spark = None
                logger.info("Spark session closed successfully")
            except Exception as e:
                logger.error(f"Error closing Spark session: {e}")
    
    # ================================================================================================
    # DATA PROCESSING METHODS - EXTRACT
    # ================================================================================================
            
    def extract(self) -> Self:
        """
        Extract data based on processing mode.
        
        Bronze: Fetches from NASA NEO API
        Silver: Reads from bronze storage bucket  
        Gold: Queries silver Iceberg tables
        
        Returns:
            Self for method chaining
            
        Raises:
            DataExtractionError: When extraction fails for any mode
            ValueError: When mode is invalid
        """
        logger.info(f"Starting data extraction for mode: {self.mode}")
       
        # Select API Client mode
        match self.mode:
            case 'bronze':    # Extract from source: NASA NEO API request 
                self._extract_from_api()
            
            case 'silver':    # Extract from source: neo/bronze/ 
                self._extract_from_bronze_storage()
                
            case 'gold':    # Extract from source: neo/silver/
                self._extract_from_silver_tables()
                
            case _:
                raise ValueError(f"Invalid extraction mode: {self.mode}")

        logger.info(f"Data extraction completed successfully for mode: {self.mode}")
        return self
                
                
    def _extract_from_api(self) -> None:
        """Extract raw data from NASA NEO API (Bronze mode)."""
        # Generate API request uri
        full_uri = f'{self.api_uri}start_date={self.start_date}&end_date={self.end_date}&api_key={self.key}'
        
        try:
            logger.debug(f"Making API request to: {full_uri}")
            
            response = requests.get(full_uri, timeout=5)
            response.raise_for_status()  # Raises HTTPError for bad responses

            # Convert JSON to bytes
            json_bytes = json.dumps(response.json()).encode('utf-8')

            # Store data as BytesIO object
            self.data = BytesIO(json_bytes)
            logger.info(f"Successfully extracted {len(json_bytes)} bytes from NASA API")

        except requests.exceptions.Timeout:
            raise DataExtractionError(f"API request timed out")
        except requests.exceptions.ConnectionError:
            raise DataExtractionError("Failed to connect to NASA API - check network connection")
        except requests.exceptions.HTTPError as e:
            raise DataExtractionError(f"NASA API returned error: {e.response.status_code}")
        except requests.exceptions.RequestException as e:
            raise DataExtractionError(f"API request failed: {e}")
        except json.JSONDecodeError:
            raise DataExtractionError("NASA API returned invalid JSON")

                    
    def _extract_from_bronze_storage(self) -> None:
        """Extract processed data from bronze storage (Silver mode)."""
        logger.debug("Extracting data from bronze storage")
        
        # Get next item from queue
        obj_name = self._update_queue('out')
        
        if not obj_name:
            logger.info("No objects available in queue")
            self.data = None
            return
        
        response = None
        try:
            logger.debug(f"Retrieving object: {obj_name}")
            response = self.storage.get_object(self.bucket_name, obj_name)
            
            # Read and parse JSON data
            raw_data = response.data.decode('utf-8')
            self.data = json.loads(raw_data)
            
            logger.info(f"Successfully extracted object '{obj_name}' from bronze storage")
            
        except json.JSONDecodeError:
            raise DataExtractionError(f"Invalid JSON in bronze object: {obj_name}")
        except Exception as e:
            # Put item back in queue if extraction failed
            if obj_name:
                self._update_queue('in', obj_name)
            raise DataExtractionError(f"Failed to extract from bronze storage: {e}")
        finally:
            # Always clean up HTTP connection
            if response:
                response.close()
                response.release_conn()
         
        
    def _extract_from_silver_tables(self) -> None:
        """Extract structured data from silver Iceberg tables (Gold mode)."""
        logger.debug("Extracting data from silver Iceberg tables")
        
        spark = None
        try:
            # Create Spark session for silver catalog
            spark = self.create_spark_session('silver')
            
            # Execute query
            query = "SELECT * FROM iceberg_catalog.neo_db.silver_asteroids"
            logger.debug(f"Executing query: {query}")
            self.data = spark.sql(query)
            
            # Validate that we got data
            if self.data.count() == 0:
                logger.warning("No data found in silver tables")
            else:
                logger.info(f"Successfully extracted {self.data.count()} records from silver tables")
               
        except Exception as e:
            raise DataExtractionError(f"Failed to extract from silver tables: {e}")

        
    # ================================================================================================
    # DATA PROCESSING METHODS - TRANSFORM
    # ================================================================================================
                     
    def transform(self) -> Self:
        """
        Transform data based on processing mode.
        
        Bronze: Store raw data (no transformation)
        Silver: Convert JSON to structured DataFrame
        Gold: Prepare analytics-ready dataset

        Returns:
            Self for method chaining

        Raises:
            DataTransformationError: When transformation fails
            ValueError: When mode is invalid
        """
        logger.info(f"Starting data transformation for mode: {self.mode}")
       
        try:
            match self.mode:
                case 'bronze':
                    # Bronze mode: no transformation, data already extracted
                    logger.info("Bronze mode: No transformation required")

                case 'silver':
                    # Silver mode: JSON to structured DataFrame
                    if not self.data:
                        raise DataTransformationError("No data available for silver transformation")

                    if not self.spark:
                        self.spark = self.create_spark_session('silver')

                    self.data = NeoTransformations.json_to_structured_dataframe(self.data, self.spark)

                case 'gold':
                    # Gold mode: Analytics-ready dataset
                    if not self.data:
                        raise DataTransformationError("No data available for gold transformation")

                    # if not self.spark:
                    #     self.spark = self.create_spark_session('gold')   REMOVE

                    self.data = NeoTransformations.prepare_analytics_dataset(self.data)

                case _:
                    raise ValueError(f"Invalid transformation mode: {self.mode}")

            logger.info(f"Data transformation completed successfully for mode: {self.mode}")
            return self
        
        except Exception as e:
            logger.error(f"Data transformation failed for mode {self.mode}: {e}")
            raise DataTransformationError(f"Transformation failed: {e}")
    
    # ================================================================================================
    # DATA PROCESSING METHODS - LOAD
    # ================================================================================================
                              
    def load(self) -> Self:
        """Load data based on processing mode."""
        logger.info(f"Starting data load for mode: {self.mode}")

        try:
            match self.mode:
                case 'bronze':
                    if not self.data:
                        raise DataLoadError("No data available for bronze load")

                    obj_name = f'{self.mode}/{self.bucket_name}-{self.start_date}_{self.end_date}.json'

                    self.storage.put_object(
                        self.bucket_name, 
                        obj_name, 
                        self.data,
                        length=len(self.data.getvalue()),
                        content_type='application/json'
                    )

                    self._update_queue('in', obj_name)
                    logger.info(f"Successfully stored {obj_name} and added to queue")

                case 'silver':
                    if not self.data:
                        raise DataLoadError("No data available for silver load")

                    self.spark.sql("CREATE DATABASE IF NOT EXISTS iceberg_catalog.neo_db")

                    self.data.writeTo(f'iceberg_catalog.neo_db.{self.mode}_asteroids') \
                             .partitionedBy('observation_date') \
                             .createOrReplace()

                    logger.info(f"Successfully loaded data to silver table")

                    self.close_spark_session()

                case 'gold':
                    if not self.data:
                        raise DataLoadError("No data available for gold load")

                    if not self.spark:
                        self.spark = self.create_spark_session('gold')

                    # self.spark.sql("CREATE DATABASE IF NOT EXISTS iceberg_catalog.neo_db") REMOVE

                    self.data.writeTo(f'iceberg_catalog.neo_db.{self.mode}_asteroids') \
                             .partitionedBy('observation_date') \
                             .createOrReplace()

                    logger.info(f"Successfully loaded data to gold table")

                    self.close_spark_session()

                case _:
                    raise ValueError(f"Invalid load mode: {self.mode}")

            logger.info(f"Data load completed successfully for mode: {self.mode}")
            return self

        except Exception as e:
            logger.error(f"Data load failed for mode {self.mode}: {e}")
            raise DataLoadError(f"Load failed: {e}")

In [7]:
# Initialize NeoApiClient
neo_client = NeoApiClient(api_key=api_key_param,
                          api_uri=api_uri_param,
                          start_date=start_date_param, 
                          end_date=end_date_param, 
                          storage=minio_client,  
                          bucket_name=bucket_name_param, 
                          mode=mode)

# Execute ETL pipeline task based on mode
neo_client.extract().transform().load()

<__main__.NeoApiClient at 0x7520ddc60f20>