In [None]:
def _default_giveup_condition(e: Exception) -> bool:
    """Backoff giveup condition: stop on 4xx errors except 429."""
    if isinstance(e, aiohttp.ClientResponseError):
        return 400 <= e.status < 500 and e.status != 429
    return True

@backoff.on_exception(backoff.expo, aiohttp.ClientError, max_tries=5, giveup=_default_giveup_condition)
async def post_with_retry(session: aiohttp.ClientSession, url: str, limiter: AsyncLimiter, json_payload: Any, headers: Dict[str, str], verify_ssl: bool) -> aiohttp.ClientResponse:
    """Performs an HTTP POST request with rate limiting and backoff/retry logic."""
    async with limiter:
        response = await session.post(url, json=json_payload, headers=headers, ssl=None if verify_ssl else False)
        response.raise_for_status()
        return response

@backoff.on_exception(backoff.expo, aiohttp.ClientError, max_tries=5, giveup=_default_giveup_condition)
async def get_with_retry(session: aiohttp.ClientSession, url: str, limiter: AsyncLimiter, headers: Dict[str, str], verify_ssl: bool) -> aiohttp.ClientResponse:
    """Performs an HTTP GET request with rate limiting and backoff/retry logic."""
    async with limiter:
        response = await session.get(url, headers=headers, ssl=None if verify_ssl else False)
        response.raise_for_status()
        return response

class VespaManager:
    """
    Manages ingestion into Vespa. Uses JSON for simple trackers (pending, success, posted IDs)
    and Parquet for detailed failed payload tracking. Uses GET for status checks.
    """
    STATUS_SUCCESS = "SUCCESS"; STATUS_FAILURE = "FAILURE"; STATUS_PENDING = "PENDING"
    STATUS_DELETED = "DELETED"; STATUS_NOT_FOUND = "NOT_FOUND"; STATUS_ERROR = "ERROR"
    FAILURE_STATUS_FAILED = "failed"
    FAILURE_STATUS_PERMANENT = "permanent_failure"

    def __init__(
        self,
        schema_id: str,
        env: str,
        s3_bucket_id: str,
        creds: Dict,
        s3_tracking_prefix: str,
        max_reingest_attempts: int = 3,
        ingest_calls_per_minute: int = 600,
        status_calls_per_minute: int = 300,
        batch_size: int = 100,
        limiter_time_period: int = 60,
        verify_ssl: bool = True,
        s3_client: Optional[Any] = None,
        http_session: Optional[aiohttp.ClientSession] = None,
    ):
        if not schema_id or not env or not s3_bucket_id or not s3_tracking_prefix:
             raise ValueError("schema_id, env, s3_bucket_id, and s3_tracking_prefix are required.")

        self.schema_id = schema_id
        self.env = env
        self.s3 = s3_client if s3_client else S3(creds, s3_bucket_id)
        self.s3_tracking_prefix = s3_tracking_prefix.strip('/')

        self.max_reingest_attempts = max_reingest_attempts
        self.batch_size = batch_size
        self.verify_ssl = verify_ssl

        self.ingest_limiter = AsyncLimiter(ingest_calls_per_minute, period=limiter_time_period)
        self.status_limiter = AsyncLimiter(status_calls_per_minute, period=limiter_time_period)

        # S3 paths: JSON for simple trackers, Parquet for failures
        self.pending_tracker_path = f"{self.s3_tracking_prefix}/pending_documents.json"
        self.success_tracker_path = f"{self.s3_tracking_prefix}/success_documents.json"
        self.failed_payload_path = f"{self.s3_tracking_prefix}/failed_payloads.parquet"
        self.posted_native_ids_path = f"{self.s3_tracking_prefix}/posted_native_ids.json"

        self._external_session = http_session is not None
        self._session = http_session if self._external_session else aiohttp.ClientSession(headers=create_headers())

    async def close(self):
        """Closes the internally managed aiohttp ClientSession."""
        if self._session and not self._external_session and not self._session.closed:
            await self._session.close()

    def _read_s3_json(self, file_path: str, default: Any = None) -> Any:
        """Reads/parses JSON from S3, returns default on error/not found."""
        try:
            if self.s3.file_exists(file_path):
                content = self.s3.read_file(file_path)
                return json.loads(content) if content else (default if default is not None else {})
            else: return default if default is not None else {}
        except (json.JSONDecodeError, Exception) as e:
            print(f"Warning: Error reading/parsing S3 JSON {file_path}: {e}. Returning default.")
            return default if default is not None else {}

    def _write_s3_json(self, file_path: str, data: Any):
        """Writes Python object as formatted JSON to S3."""
        try:
            self.s3.write_file(file_path, json.dumps(data, indent=2))
        except Exception as e: print(f"Error writing JSON to S3 file {file_path}: {e}")

    def _read_failed_payloads_parquet(self) -> Dict[str, Dict]:
        """Reads failed payloads parquet into {nativeId: {payload:..., attempts:..., error:..., status:...}}."""
        df = self.s3.read_parquet(self.failed_payload_path)
        failed_dict = {}
        required_cols = ['nativeId', 'payload_str', 'ingest_attempts', 'last_error', 'status']
        if df.empty or not all(col in df.columns for col in required_cols):
            return failed_dict

        for _, row in df.iterrows():
            native_id = row['nativeId']
            try: payload_dict = json.loads(row.get('payload_str', '{}'))
            except (json.JSONDecodeError, TypeError): payload_dict = {}
            failed_dict[native_id] = {
                'payload': payload_dict,
                'ingest_attempts': int(row.get('ingest_attempts', 0)),
                'last_error': row.get('last_error', ''),
                'status': row.get('status', self.FAILURE_STATUS_FAILED)
            }
        return failed_dict

    def _write_failed_payloads_parquet(self, failed_payloads_dict: Dict[str, Dict]):
        """Converts failed payloads dict to DataFrame and writes as parquet."""
        expected_cols = ['nativeId', 'payload_str', 'ingest_attempts', 'last_error', 'status']
        if not failed_payloads_dict:
            df = pd.DataFrame(columns=expected_cols)
            self.s3.write_parquet(self.failed_payload_path, df) # Write empty DF with schema
            return

        data_for_df = []
        for native_id, data in failed_payloads_dict.items():
            try: payload_str = json.dumps(data.get('payload', {}))
            except TypeError: payload_str = "{}"
            data_for_df.append({
                'nativeId': native_id, 'payload_str': payload_str,
                'ingest_attempts': data.get('ingest_attempts', 0),
                'last_error': data.get('last_error', ''),
                'status': data.get('status', self.FAILURE_STATUS_FAILED)
            })
        df = pd.DataFrame(data_for_df, columns=expected_cols)
        self.s3.write_parquet(self.failed_payload_path, df)

    def _has_failed_payloads(self) -> bool:
         """Checks if any failures are tracked in the parquet file."""
         failed_dict = self._read_failed_payloads_parquet()
         return bool(failed_dict)

    def _get_pending_tracker(self) -> Dict[str, str]:
        return self._read_s3_json(self.pending_tracker_path, default={})
    def _get_success_tracker(self) -> Dict[str, str]:
        return self._read_s3_json(self.success_tracker_path, default={})
    def _get_failed_payloads(self) -> Dict[str, Dict]:
        return self._read_failed_payloads_parquet()
    def _get_posted_native_ids(self) -> Set[str]:
        id_list = self._read_s3_json(self.posted_native_ids_path, default=[])
        return set(id_list) if isinstance(id_list, list) else set()

    async def ingest_in_vespa(self, ingest_payload: List[Dict[str, Any]]):
        """
        Ingests a list of payload dictionaries into Vespa.
        Handles filtering, batching, POST requests, state tracking, and saving state.
        """
        if not ingest_payload: return
        print(f"Starting ingestion for {len(ingest_payload)} payloads.")

        # Load current state
        pending_docs = self._get_pending_tracker()
        success_docs = self._get_success_tracker()
        failed_payloads = self._get_failed_payloads() # Dict from parquet
        posted_native_ids = self._get_posted_native_ids()

        payloads_to_process = []
        skipped_count = 0
        for payload in ingest_payload:
            native_id = payload.get("nativeId")
            if native_id in success_docs or native_id in posted_native_ids:
                skipped_count += 1
            else:
                payloads_to_process.append(payload)

        if skipped_count > 0: print(f"Skipped {skipped_count} already processed/posted payloads.")
        if not payloads_to_process: print("No new payloads to process."); return
        print(f"Processing {len(payloads_to_process)} new or previously failed payloads.")

        ingest_url = create_url(self.env, f"{self.schema_id}/ingest")
        num_batches = (len(payloads_to_process) + self.batch_size - 1) // self.batch_size

        for i in range(0, len(payloads_to_process), self.batch_size):
            batch = payloads_to_process[i : i + self.batch_size]
            batch_num = i // self.batch_size + 1
            print(f"Processing batch {batch_num}/{num_batches}...")

            response = None
            try:
                response = await post_with_retry(
                    session=self._session, url=ingest_url, limiter=self.ingest_limiter,
                    json_payload=batch, headers=create_headers(), verify_ssl=self.verify_ssl,
                )
                response_json = await response.json(content_type=None)

                api_success_docs = response_json.get("successDocs", [])
                api_failed_docs = response_json.get("failedDocs", [])

                for success_info in api_success_docs:
                    native_id = success_info.get("nativeId"); doc_id = success_info.get("documentId")
                    if native_id and doc_id:
                        pending_docs[native_id] = doc_id
                        posted_native_ids.add(native_id)
                        if native_id in failed_payloads: del failed_payloads[native_id] # Remove from failures
                    else: print(f"Warning: Could not process success info: {success_info}")

                for failure_info in api_failed_docs:
                    native_id = failure_info.get("nativeId"); error_msg = failure_info.get("error")
                    original_payload = batch_origin_map.get(native_id)
                    if native_id and original_payload:
                         self._update_failed_payloads_dict(native_id, original_payload, error_msg, failed_payloads)
                         posted_native_ids.add(native_id) # Mark as posted even on failure
                    else: print(f"Warning: Could not link failed post back. Info: {failure_info}")

            except (aiohttp.ClientError, asyncio.TimeoutError) as http_err:
                print(f"HTTP Error processing batch {batch_num}: {http_err}")
                error_msg = f"Batch POST failed permanently: {http_err}"
                for payload in batch: # Iterate original batch items
                    native_id = payload.get('nativeId')
                    if native_id:
                        self._update_failed_payloads_dict(native_id, payload, error_msg, failed_payloads)
                        posted_native_ids.add(native_id)
            except json.JSONDecodeError as json_err:
                 print(f"JSON Error processing batch {batch_num} response: {json_err}")
                 error_msg = f"Failed to parse Vespa response: {json_err} (Status: {response.status if response else 'N/A'})"
                 for payload in batch:
                      native_id = payload.get('nativeId')
                      if native_id:
                           self._update_failed_payloads_dict(native_id, payload, error_msg, failed_payloads)
                           posted_native_ids.add(native_id)
            except Exception as e:
                print(f"Unexpected Error processing batch {batch_num}: {e}")
                error_msg = f"Unexpected batch error: {e}"
                for payload in batch:
                     native_id = payload.get('nativeId')
                     if native_id:
                          self._update_failed_payloads_dict(native_id, payload, error_msg, failed_payloads)
                          posted_native_ids.add(native_id)
            finally:
                 if response: response.release()

        print("Ingestion loop finished. Saving trackers...")
        self._write_s3_json(self.pending_tracker_path, pending_docs)
        self._write_failed_payloads_parquet(failed_payloads) # Writes parquet
        self._write_s3_json(self.posted_native_ids_path, sorted(list(posted_native_ids))) # Writes JSON list


    def _update_failed_payloads_dict(
        self, native_id: str, payload_dict: Dict[str, Any], error_message: str, failed_payloads_master_dict: Dict[str, Dict]
    ):
        """Updates the in-memory failed payloads dict, handles retries and status."""
        entry = failed_payloads_master_dict.get(native_id, {})
        entry['payload'] = payload_dict
        entry['ingest_attempts'] = entry.get('ingest_attempts', 0) + 1
        entry['last_error'] = str(error_message)
        if entry['ingest_attempts'] > self.max_reingest_attempts:
            if entry.get('status') != self.FAILURE_STATUS_PERMANENT:
                 print(f"Max attempts reached for {native_id}. Marking as permanent failure.")
                 entry['status'] = self.FAILURE_STATUS_PERMANENT
        else:
            entry['status'] = self.FAILURE_STATUS_FAILED
        failed_payloads_master_dict[native_id] = entry


    async def _get_single_doc_status(self, document_id: str) -> Tuple[str, Optional[str]]:
        """Fetches status for a single document ID using GET with retry."""
        status_url = create_url(self.env, f"v1/status/{self.schema_id}/{document_id}")
        response = None
        try:
            response = await get_with_retry(
                session=self._session, url=status_url, limiter=self.status_limiter,
                headers=create_headers(), verify_ssl=self.verify_ssl,
            )
            status_json = await response.json(content_type=None)
            vespa_status = status_json.get("status") # Check actual Vespa status strings
            # --- Map Vespa status strings ---
            if vespa_status == "SUCCEEDED": return document_id, self.STATUS_SUCCESS
            elif vespa_status == "FAILED": return document_id, self.STATUS_FAILURE
            else: return document_id, self.STATUS_PENDING
        except aiohttp.ClientResponseError as e:
            if e.status == 404: return document_id, self.STATUS_NOT_FOUND
            else: print(f"HTTP Error status {document_id}: {e}"); return document_id, self.STATUS_ERROR
        except (aiohttp.ClientError, asyncio.TimeoutError, json.JSONDecodeError) as err:
            print(f"Error status {document_id}: {err}"); return document_id, self.STATUS_ERROR
        except Exception as e:
             print(f"Unexpected error status {document_id}: {e}"); return document_id, self.STATUS_ERROR
        finally:
            if response: response.release()


    async def update_ingestion_status(self) -> bool:
        """Checks status via GET per doc, updates trackers, returns if all processed."""
        print("Starting ingestion status update via GET...")
        pending_docs = self._get_pending_tracker()
        success_docs = self._get_success_tracker()
        failed_payloads = self._get_failed_payloads()

        if not pending_docs: print("No documents pending status check."); return True

        print(f"Checking status for {len(pending_docs)} pending documents concurrently...")
        tasks = [self._get_single_doc_status(doc_id) for doc_id in pending_docs.values()]
        status_results = await asyncio.gather(*tasks)

        all_processed_terminal = True
        native_id_map = {v: k for k, v in pending_docs.items()}
        needs_failed_save = False

        for document_id, current_status in status_results:
            native_id = native_id_map.get(document_id)
            if not native_id: continue

            if current_status == self.STATUS_SUCCESS:
                success_docs[native_id] = document_id
                if native_id in pending_docs: del pending_docs[native_id]
                if native_id in failed_payloads: del failed_payloads[native_id]; needs_failed_save = True
            elif current_status == self.STATUS_FAILURE:
                error_msg = "Ingestion confirmed FAILURE via status API"
                if native_id not in failed_payloads:
                     failed_payloads[native_id] = {'payload': {}, 'ingest_attempts': 0, 'last_error': '', 'status': self.FAILURE_STATUS_FAILED}
                     print(f"Warning: Adding missing entry for {native_id} to failures due to status API result.")
                self._update_failed_payloads_dict(native_id, failed_payloads[native_id].get('payload',{}), error_msg, failed_payloads)
                needs_failed_save = True
                if native_id in pending_docs: del pending_docs[native_id]
            elif current_status in [self.STATUS_DELETED, self.STATUS_NOT_FOUND]:
                 if native_id in pending_docs: del pending_docs[native_id]
                 if native_id in failed_payloads: del failed_payloads[native_id]; needs_failed_save = True
            elif current_status == self.STATUS_ERROR:
                 all_processed_terminal = False
            else: # Pending
                 all_processed_terminal = False

        print("Status update finished. Saving trackers...")
        self._write_s3_json(self.pending_tracker_path, pending_docs)
        self._write_s3_json(self.success_tracker_path, success_docs)
        if needs_failed_save:
             self._write_failed_payloads_parquet(failed_payloads)

        return all_processed_terminal

    async def reingest_documents(self):
        """Loads non-permanent failed payloads from parquet and attempts re-ingestion."""
        print("Starting re-ingestion process...")
        failed_payloads_dict = self._get_failed_payloads()
        if not failed_payloads_dict: print("No failed payloads found."); return

        payloads_to_reingest = []
        for native_id, data in failed_payloads_dict.items():
            if data.get('status') != self.FAILURE_STATUS_PERMANENT:
                actual_payload = data.get('payload', {})
                if 'nativeId' not in actual_payload:
                     actual_payload['nativeId'] = native_id
                payloads_to_reingest.append(actual_payload)

        if not payloads_to_reingest: print("No non-permanent failed payloads found to re-ingest."); return

        print(f"Attempting re-ingestion for {len(payloads_to_reingest)} documents.")
        await self.ingest_in_vespa(payloads_to_reingest)


NameError: name 'Dict' is not defined

In [None]:
# --- Polling Function (Remains the same structure) ---

async def poll_ingest_update(
    start_date: dt.date,
    end_date: dt.date,
    schema_id: str,
    env: str,
    s3_bucket_id: str,
    s3_tracking_prefix: str,
    s3_client: Optional[Any] = None,
):
    """
    Polls data, ingests into Vespa, monitors status via GET, uses mixed tracking storage,
    and retries failures until completion or max attempts.
    """
    creds = get_s3_credentials()
    manager = None

    try:
        manager = VespaManager(
            schema_id=schema_id, env=env, s3_bucket_id=s3_bucket_id, creds=creds,
            s3_tracking_prefix=s3_tracking_prefix, s3_client=s3_client,
        )

        print(f"Loading data for date range: {start_date} to {end_date}")
        payload_list = load_earnings_call_data_vespa(start_date, end_date)
        if not payload_list: print("No payloads generated."); return

        # --- Preprocessing ---
        temp_s3 = S3(creds, bucket_id="fluenta1")
        try:
             sedol_map_str = temp_s3.read_file("sedol_to_id_db_company_fe.json")
             sedol_map = json.loads(sedol_map_str) if sedol_map_str else {}
        except Exception as e: print(f"Warning: Error loading sedol map: {e}"); sedol_map = {}

        print("Preprocessing payloads...")
        processed_payloads = []
        for payload in payload_list:
            try:
                if "fields" not in payload: payload["fields"] = {}
                if "sedols_s" not in payload["fields"]: payload["fields"]["sedols_s"] = []
                sedol = payload["fields"]["sedols_s"][0] if payload["fields"]["sedols_s"] else None
                payload["fields"]["companyId_s"] = sedol_map.get(sedol, "") if sedol else ""
                payload["fields"]["event_time_s"] = parse_ect_event_time(payload["fields"].get("event_time_s"))
                payload["fields"]["document_date_s"] = parse_ect_source_time(payload["fields"].get("document_date_s"))
                # Ensure nativeId exists before adding to processed list
                if "nativeId" in payload:
                    processed_payloads.append(payload)
                else:
                    print(f"Warning: Payload missing nativeId during preprocessing. Skipping.")
            except Exception as e: print(f"Error preprocessing payload {payload.get('nativeId', 'N/A')}: {e}. Skipping.")
        if not processed_payloads: print("No payloads after preprocessing."); return

        # --- Initial Ingestion ---
        print(f"\n--- Starting Initial Ingestion of {len(processed_payloads)} payloads ---")
        await manager.ingest_in_vespa(processed_payloads) # Pass list of payload dicts

        # --- Polling Loop ---
        print("\n--- Entering Status Polling and Re-ingestion Loop ---")
        poll_attempt = 0
        max_poll_attempts = 10 # Safety break

        while poll_attempt < max_poll_attempts:
            poll_attempt += 1
            print(f"\n--- Poll Cycle {poll_attempt}/{max_poll_attempts} ---")
            all_processed = False
            try:
                all_processed = await manager.update_ingestion_status()
                await manager.reingest_documents()
                has_failures = manager._has_failed_payloads()

                if all_processed and not has_failures: print("\nSUCCESS: All documents processed successfully."); break
                elif all_processed and has_failures: print("STATUS: All pending processed, but failures remain (check permanent).")
                else: print("STATUS: Some documents are still pending.")

                if poll_attempt >= max_poll_attempts:
                     print(f"WARNING: Max poll attempts reached. Exiting loop.")
                     if has_failures: print("WARNING: Exiting with outstanding failures.")
                     if not all_processed: print("WARNING: Exiting with pending documents.")
                     break
                print(f"Waiting 20 seconds before next poll cycle...")
                await asyncio.sleep(20)
            except Exception as e:
                print(f"Error during poll cycle {poll_attempt}: {e}")
                traceback.print_exc()
                print("Waiting 60 seconds after error...")
                await asyncio.sleep(60)

    except Exception as e:
         print(f"FATAL Error in ingestion process: {e}")
         traceback.print_exc()
    finally:
        if manager: await manager.close()
        print("Ingestion process finished.")