In [1]:
# rewriting
import os
import json
from datetime import datetime
from pandas import DataFrame # Still used briefly for initial transformation idea, though saving raw list is better
from loguru import logger
import boto3 # Added for S3 interaction
from botocore.exceptions import ClientError # Added for error handling

# Assuming these imports exist and work as before
from fire_incidents.clients.socrata_client import SocrataClient
from fire_incidents.utils.argparse import get_default_args
from fire_incidents.utils.secrets import Secrets # Assuming this can fetch Minio secrets

# Constants for Socrata
DOMAIN = "data.sfgov.org"
DATASET = "wr8u-xric"
DATE_COL = "incident_date"

# Constants for Minio (adapt structure as needed)
# It's often good practice to define the path structure
MINIO_BUCKET_SECRET = "MINIO_BUCKET" # Name of the secret holding the bucket name
RAW_DATA_BASE_PATH = "raw/fire-incidents" # Base path within the bucket

class LoadFireIncidentRaw:
    def __init__(self, **args) -> None:
        self.args = args
        self.run_date = args.get("run_date")
        # Validate run_date format if necessary
        try:
            datetime.strptime(self.run_date, '%Y-%m-%d')
        except ValueError:
            logger.error(f"Invalid run_date format: {self.run_date}. Expected YYYY-MM-DD.")
            raise # Re-raise the error to stop execution

        self.minio_endpoint = Secrets.get("MINIO_ENDPOINT")
        self.minio_access_key = Secrets.get("MINIO_ACCESS_KEY")
        self.minio_secret_key = Secrets.get("MINIO_SECRET_KEY")
        self.minio_bucket = Secrets.get(MINIO_BUCKET_SECRET)

        if not all([self.minio_endpoint, self.minio_access_key, self.minio_secret_key, self.minio_bucket]):
             raise ValueError("Missing required Minio configuration in Secrets.")

        # Initialize Minio client
        # Use os.environ.get if secrets are stored as environment variables
        # self.minio_endpoint = os.environ.get('MINIO_ENDPOINT')
        # self.minio_access_key = os.environ.get('MINIO_ACCESS_KEY')
        # self.minio_secret_key = os.environ.get('MINIO_SECRET_KEY')
        # self.minio_bucket = os.environ.get('MINIO_BUCKET')

        try:
            self.s3_client = boto3.client(
                's3',
                endpoint_url=self.minio_endpoint,
                aws_access_key_id=self.minio_access_key,
                aws_secret_access_key=self.minio_secret_key,
                # region_name is often optional for Minio, but can be set if needed
                # region_name='us-east-1'
            )
            # Optional: Check connection/bucket existence
            self.s3_client.head_bucket(Bucket=self.minio_bucket)
            logger.info(f"Successfully connected to Minio bucket '{self.minio_bucket}'")
        except ClientError as e:
            logger.error(f"Failed to connect to Minio or access bucket '{self.minio_bucket}': {e}")
            raise
        except Exception as e:
             logger.error(f"An unexpected error occurred during Minio client initialization: {e}")
             raise

    def _get_s3_key(self) -> str:
        """Generates the S3 key (path within the bucket) for the output file."""
        # Use the run_date to create a structured path, e.g., raw/fire-incidents/YYYY/MM/DD/data.json
        try:
            date_obj = datetime.strptime(self.run_date, '%Y-%m-%d')
            year = date_obj.strftime('%Y')
            month = date_obj.strftime('%m')
            day = date_obj.strftime('%d')
            # Define filename, e.g., data.json or use the date itself
            filename = "fire_incidents_data.json"
            return f"{RAW_DATA_BASE_PATH}/{year}/{month}/{day}/{filename}"
        except ValueError:
             # Should have been caught in __init__, but belt and suspenders
             logger.error(f"Cannot generate S3 key due to invalid run_date: {self.run_date}")
             raise ValueError(f"Invalid run_date format for S3 key generation: {self.run_date}")


    def run(self) -> None:
        # 1. Get data from Socrata
        logger.info(f"Extracting data for date: {self.run_date}")
        try:
            client = SocrataClient(DOMAIN)
            # Construct the Socrata query string carefully
            socrata_query = f"{DATE_COL}='{self.run_date}T00:00:00'"
            logger.debug(f"Using Socrata query: {socrata_query}")
            # The client.get_data returns a list of dictionaries
            raw_data = client.get_data(DATASET, where=socrata_query)
        except Exception as e:
            logger.error(f"Failed to extract data from Socrata: {e}")
            return # Exit if extraction fails

        if not raw_data:
             logger.warning(f"No data found for date {self.run_date}. Nothing to load.")
             return

        logger.info(f"Successfully extracted {len(raw_data)} records.")

        # 2. Prepare data for saving (Serialization)
        # We will save the raw list of dictionaries directly as a JSON file.
        # No intermediate DataFrame is strictly necessary for this.
        logger.info("Serializing data to JSON format")
        try:
            # Use indent for readability in the stored file, omit for smaller size
            json_data = json.dumps(raw_data, indent=4)
        except TypeError as e:
             logger.error(f"Failed to serialize data to JSON: {e}")
             return # Exit if serialization fails

        # 3. Load data to Minio
        s3_key = self._get_s3_key()
        logger.info(f"Attempting to load data to Minio bucket '{self.minio_bucket}' at key '{s3_key}'")

        try:
            self.s3_client.put_object(
                Bucket=self.minio_bucket,
                Key=s3_key,
                Body=json_data,
                ContentType='application/json' # Good practice to set content type
            )
            logger.info(f"Successfully saved data to s3://{self.minio_bucket}/{s3_key}")
        except ClientError as e:
            logger.error(f"Failed to upload data to Minio: {e}")
            # Consider specific error handling, e.g., for AccessDenied, NoSuchBucket
        except Exception as e:
            logger.error(f"An unexpected error occurred during Minio upload: {e}")

        logger.info("Finished job")


if __name__ == "__main__":
    # Assumes get_default_args() includes a 'run_date' argument in 'YYYY-MM-DD' format
    args = vars(get_default_args())

    # Basic check for run_date argument
    if 'run_date' not in args or not args['run_date']:
        logger.error("Missing '--run_date' argument. Please provide the date in YYYY-MM-DD format.")
    else:
        try:
            # Instantiate and run the job
            loader = LoadFireIncidentRaw(**args)
            loader.run()
        except ValueError as e:
             # Catch configuration or date format errors from __init__ or _get_s3_key
             logger.error(f"Initialization failed: {e}")
        except Exception as e:
             logger.error(f"An unexpected error occurred during the process: {e}")

ModuleNotFoundError: No module named 'fire_incidents'