In [None]:
import pandas as pd
import pathlib

# Define the root output directory as specified in your notebook
ROOT_OUT = pathlib.Path(
    "/Users/daniel/Library/Mobile Documents/com~apple~CloudDocs/"
    "Documents/Master Finance/MasterThesis/ThesisData"
)

# Specify the correct filename for the final dataset
file_name = "panel_data_charactermax200k.parquet"

# Construct the full path to the file
file_path = ROOT_OUT / file_name

# Load the parquet file into a pandas DataFrame
try:
    df = pd.read_parquet(file_path)
    print(f"Successfully loaded final dataset from: {file_path}")
    print(f"Shape: {df.shape}")
    print(f"Unique CIKs: {df['cik'].nunique()}")
except FileNotFoundError:
    print(f"Error: File not found at {file_path}")
except Exception as e:
    print(f"An error occurred: {e}")

In [None]:
import pandas as pd
from openai import OpenAI, RateLimitError, APIError
import json
import os
import re
import instructor
import time
from tqdm import tqdm
from pydantic import BaseModel, Field, field_validator, ValidationError
from typing import List, Literal, Optional, Union

# pydantic model definitions
AIStrategicDepthBucket = Literal[
    "A (Core Strategic Enabler)",
    "B (Significant Operational/Product Integration)",
    "C (Emerging/Exploratory Mentions)",
    "D (Superficial/Generic Mentions OR Risk of Non-Adoption Only)",
    "E (No Mention of Own AI Strategy/Adoption)"
]

AIDisclosureSentimentBucket = Literal[
    "A (Clearly Positive)",
    "B (Mostly Positive/Balanced)",
    "C (Neutral/Factual)",
    "D (Cautious/Mixed)",
    "E (Negative OR Not Applicable - No Own AI Discussion)"
]

AIRiskOwnAdoptionBucket = Literal[
    "A (Detailed & Specific Discussion)",
    "B (General Mention of Own AI Adoption Risk)",
    "C (No Mention of Own AI Adoption Risk)"
]

AIRiskExternalThreatsBucket = Literal[
    "A (Detailed & Specific Discussion)",
    "B (General Mention of External AI Threat)",
    "C (No Mention of External AI Threat)"
]

AIRiskNonAdoptionBucket = Literal[
    "A (Explicitly Discussed)",
    "B (Implicitly Suggested)",
    "C (No Mention of Non-Adoption/Competitive AI Risk)"
]

AIForwardLookingBucket = Literal[
    "A (Specific & Detailed Future Plans)",
    "B (General Future Intent)",
    "C (Implicit Future Focus Only)",
    "D (No Mention of Future AI Plans)"
]

AIWashingHypeBucket = Literal[
    "A (Substantive & Grounded)",
    "B (Mostly Substantive)",
    "C (Mixed - Some Substance, Some Hype)",
    "D (Mostly Hype)",
    "E (Pure Hype OR Not Applicable - No Positive AI Claims)"
]

AITalentInvestmentBucket = Literal[
    "A (Explicit & Significant Focus)",
    "B (General Mention of AI Talent/Investment)",
    "C (Implicit Focus Only)",
    "D (No Mention of AI Talent/Investment)"
]

class BaseAssessment(BaseModel):
    supporting_evidence: List[str] = Field(
        default_factory=list,
        description="Key verbatim quotes (or concise summaries) from the text that justify your assessment. Limit to 2-3 key pieces of evidence. If no direct evidence, provide an empty list []."
    )
    chain_of_thought_reasoning: str = Field(
        description="A brief explanation (1-2 sentences) of your thought process for arriving at the bucket assessment, referencing the criteria and evidence."
    )

    @field_validator("supporting_evidence", mode='before')
    @classmethod
    def normalize_supporting_evidence(cls, v):
        if v is None: return []
        if isinstance(v, str):
            stripped_v = v.strip()
            if stripped_v.lower() in ["no direct evidence found.", "no evidence found."]:
                return []
            return [stripped_v] if stripped_v else []
        if isinstance(v, list):
            return [
                str(item).strip() for item in v 
                if isinstance(item, str) and 
                   str(item).strip() and 
                   str(item).strip().lower() not in ["no direct evidence found.", "no evidence found."]
            ]
        return []

class AIStrategicDepthAssessment(BaseAssessment):
    bucket_assessment: AIStrategicDepthBucket

class AIDisclosureSentimentAssessment(BaseAssessment):
    bucket_assessment: AIDisclosureSentimentBucket

class AIRiskOwnAdoptionAssessment(BaseAssessment):
    bucket_assessment: AIRiskOwnAdoptionBucket

class AIRiskExternalThreatsAssessment(BaseAssessment):
    bucket_assessment: AIRiskExternalThreatsBucket

class AIRiskNonAdoptionAssessment(BaseAssessment):
    bucket_assessment: AIRiskNonAdoptionBucket

class AIRiskDisclosure(BaseModel):
    risk_from_own_ai_adoption: AIRiskOwnAdoptionAssessment
    risk_from_external_ai_threats: AIRiskExternalThreatsAssessment
    risk_of_not_adopting_ai_or_competitive_ai_threat: AIRiskNonAdoptionAssessment

class AIForwardLookingAssessment(BaseAssessment):
    bucket_assessment: AIForwardLookingBucket

class AIWashingHypeAssessment(BaseAssessment):
    bucket_assessment: AIWashingHypeBucket

class AITalentInvestmentAssessment(BaseAssessment):
    bucket_assessment: AITalentInvestmentBucket

class CompanyAIAnalysis(BaseModel):
    company_identifier: Optional[str] = Field(default=None, description="EXTRACT_COMPANY_NAME_IF_POSSIBLE_ELSE_NULL")
    overall_ai_preparedness_summary_cot: str = Field(description="LLM_GENERATED_1_2_SENTENCE_SUMMARY_OF_OVERALL_AI_POSTURE_BASED_ON_ALL_FACTORS")
    
    ai_strategic_depth: AIStrategicDepthAssessment
    ai_disclosure_sentiment: AIDisclosureSentimentAssessment
    ai_risk_disclosure: AIRiskDisclosure
    forward_looking_ai_statements: AIForwardLookingAssessment 
    ai_washing_hype_index: AIWashingHypeAssessment
    ai_talent_and_investment_focus: AITalentInvestmentAssessment
    
    key_ai_related_terminology_found: List[str] = Field(
        default_factory=list, 
        description="List any explicit AI-related terms found (e.g., 'artificial intelligence', 'machine learning'). If none, provide an empty list []."
    )

    @field_validator("key_ai_related_terminology_found", mode='before')
    @classmethod
    def normalize_key_terms(cls, v):
        if v is None: return []
        if isinstance(v, str):
            return [term.strip() for term in v.split(',') if term.strip()]
        if isinstance(v, list):
            return [str(item).strip() for item in v if isinstance(item, str) and str(item).strip()]
        return []

# prompt template
PROMPT_TEMPLATE = """
You are an expert financial analyst and AI strategist, specializing in evaluating corporate disclosures for insights into a company's engagement with Artificial Intelligence (AI). Your task is to meticulously analyze the provided text from a company's 10-K filing (specifically excerpts from "Item 1. Risk Factors" and "Item 7. Management's Discussion and Analysis") and provide a structured JSON output assessing the company's AI preparedness based on the defined factors and criteria.

**DOCUMENT CONTEXT:**
The provided text below, enclosed in triple backticks (```), contains excerpts from "Item 1. Risk Factors" and "Item 7. Management's Discussion and Analysis (MD&A)" of a single company's 10-K report.

```
{document_text}
```

**INSTRUCTIONS:**
1.  Carefully read the entire provided text.
2.  For each of the AI Assessment Factors listed below, evaluate the text based on the defined Buckets & Criteria.
3.  Your entire response MUST be a single, valid JSON object. Do not include any explanatory text before or after the JSON object. The JSON object MUST conform to the Pydantic schema `CompanyAIAnalysis`.
4.  For each factor, you MUST include in the JSON:
    * `bucket_assessment`: The exact string for the category you've assigned (e.g., "A (Core Strategic Enabler)", "E (No Mention of Own AI Strategy/Adoption)"). It must match one of the predefined bucket descriptions for that factor.
    * `chain_of_thought_reasoning`: A brief explanation (1-2 sentences) of your thought process for arriving at the bucket assessment, referencing the criteria and evidence.
    * `supporting_evidence`: Key verbatim quotes (or concise summaries of multiple related statements) from the text that justify your assessment. Limit to 2-3 key pieces of evidence. If no direct evidence is found for a factor, you MUST provide an empty list `[]` for this field.
5.  If information for a factor or sub-factor is not present in the text, use the appropriate "No Mention" bucket assessment for that factor. For `supporting_evidence` in such cases, provide an empty list `[]`.
    *IMPORTANT*: Ensure ALL main assessment factor objects (ai_strategic_depth, ai_disclosure_sentiment, ai_risk_disclosure, forward_looking_ai_statements, ai_washing_hype_index, ai_talent_and_investment_focus) are ALWAYS present as keys in your JSON output. If a factor has no relevant information, its `bucket_assessment` should reflect "No Mention" (or an equivalent E/D grade for that factor's rubric), and `supporting_evidence` should be an empty list. Do not omit these top-level keys from the JSON.
6.  For the `company_identifier` field, extract the company name from the "Company Name (if known):" line in the DOCUMENT CONTEXT. If it's "Unknown Company" or not clearly identifiable, set it to null.
7.  For the `overall_ai_preparedness_summary_cot` field, you MUST provide a 1-2 sentence summary of the company's overall AI posture based on your analysis of all factors.
8.  For the `key_ai_related_terminology_found` field, you MUST list any explicit AI-related terms found in the text (e.g., "artificial intelligence", "machine learning", "NLP", "generative AI"). If no such terms are found, provide an empty list `[]`.

**AI ASSESSMENT FACTORS, BUCKETS & CRITERIA:**
(Your detailed criteria here - ensure this is complete in your actual script)
**1. AI Strategic Depth:** ...
**2. AI Disclosure Sentiment:** ...
**3. AI Risk Disclosure:** ...
**4. Forward-Looking AI Statements:** ...
**5. "AI Washing" Hype Index:** ...
**6. AI Talent & Investment Focus:** ...
"""

# configuration & client initialization for openai
print("--- starting openai configuration and client initialization ---")
CONFIGURED_OPENAI_API_KEY = None
OPENAI_CLIENT_CONFIGURED_SUCCESSFULLY = False
INSTRUCTOR_OPENAI_CLIENT = None

OPENAI_KEY_FROM_ENV = os.getenv('OPENAI_KEY')
if OPENAI_KEY_FROM_ENV:
    print("found 'OPENAI_KEY' in environment variables.")
    CONFIGURED_OPENAI_API_KEY = OPENAI_KEY_FROM_ENV
elif 'OPENAI_KEY' in globals():
    print("found 'OPENAI_KEY' in global script variables.")
    CONFIGURED_OPENAI_API_KEY = globals()['OPENAI_KEY']
else:
    print("error: 'OPENAI_KEY' is not defined in environment or global script variables.")

if CONFIGURED_OPENAI_API_KEY:
    if isinstance(CONFIGURED_OPENAI_API_KEY, str) and CONFIGURED_OPENAI_API_KEY.strip().startswith("sk-"):
        print(f"success: openai api key loaded and appears valid.")
    else:
        print(f"error: openai api key is not a valid non-empty string or does not start with 'sk-'.")
        CONFIGURED_OPENAI_API_KEY = None

if CONFIGURED_OPENAI_API_KEY:
    print("attempting: to create instructor-patched openai client...")
    try:
        base_openai_client = OpenAI(api_key=CONFIGURED_OPENAI_API_KEY)
        INSTRUCTOR_OPENAI_CLIENT = instructor.from_openai(client=base_openai_client, mode=instructor.Mode.JSON) 
        OPENAI_CLIENT_CONFIGURED_SUCCESSFULLY = True
        print("  success: instructor-patched openai client created using json mode.")
    except Exception as e:
        print(f"  error: exception during openai client creation or patching: {e}")
else:
    print("skipped: openai client creation because api key was not valid or not found.")

if not OPENAI_CLIENT_CONFIGURED_SUCCESSFULLY:
    print("failure: instructor_openai_client could not be initialized. analysis will not work.")
print("--- openai configuration and client initialization finished ---")


# cost tracking configuration
GPT4O_MINI_INPUT_COST_PER_MILLION_TOKENS = 0.15
GPT4O_MINI_OUTPUT_COST_PER_MILLION_TOKENS = 0.60
cumulative_prompt_tokens = 0
cumulative_completion_tokens = 0
cumulative_cost = 0.0

def calculate_and_update_cost(prompt_tokens: int, completion_tokens: int, api_call_made_and_tokens_returned: bool):
    global cumulative_prompt_tokens, cumulative_completion_tokens, cumulative_cost
    call_cost = 0.0
    if prompt_tokens > 0 or completion_tokens > 0:
        input_cost = (prompt_tokens / 1_000_000) * GPT4O_MINI_INPUT_COST_PER_MILLION_TOKENS
        output_cost = (completion_tokens / 1_000_000) * GPT4O_MINI_OUTPUT_COST_PER_MILLION_TOKENS
        call_cost = input_cost + output_cost
        tqdm.write(f"api call tokens: prompt={prompt_tokens}, completion={completion_tokens}. cost for this call attempt: ${call_cost:.6f}.")
        if api_call_made_and_tokens_returned:
            cumulative_prompt_tokens += prompt_tokens
            cumulative_completion_tokens += completion_tokens
            cumulative_cost += call_cost
    tqdm.write(f"cumulative: prompt tokens={cumulative_prompt_tokens}, completion tokens={cumulative_completion_tokens}, cost=${cumulative_cost:.6f}")
    return call_cost

# analysis function using instructor with openai
def analyze_text_with_openai_instructor(
    risk_factors_text: str,
    mda_text: str,
    company_name: str = "Unknown Company",
    temperature: float = 0.0
) -> (Optional[CompanyAIAnalysis], int, int):
    if not INSTRUCTOR_OPENAI_CLIENT:
        raise Exception("critical: INSTRUCTOR_OPENAI_CLIENT is not available for analysis function.")

    document_text = f"""Company Name (if known): {company_name}
Item 1. Risk Factors:\n{risk_factors_text}\n\nItem 7. Management's Discussion and Analysis (MD&A):\n{mda_text}"""
    full_prompt = PROMPT_TEMPLATE.format(document_text=document_text)
    messages = [{"role": "user", "content": full_prompt}]

    prompt_tokens_used, completion_tokens_used = 0, 0
    analysis_pydantic_object = None
    
    try:
        # when using mode.json, instructor passes response_format={"type": "json_object"}
        parsed_model, completion_obj = INSTRUCTOR_OPENAI_CLIENT.chat.completions.create_with_completion(
            model="gpt-4o-mini", 
            messages=messages,
            response_model=CompanyAIAnalysis, 
            temperature=temperature,
        )
        analysis_pydantic_object = parsed_model

        if completion_obj and hasattr(completion_obj, 'usage') and completion_obj.usage:
            prompt_tokens_used = completion_obj.usage.prompt_tokens if completion_obj.usage.prompt_tokens is not None else 0
            completion_tokens_used = completion_obj.usage.completion_tokens if completion_obj.usage.completion_tokens is not None else 0
        
        return analysis_pydantic_object, prompt_tokens_used, completion_tokens_used

    except ValidationError as ve:
        tqdm.write(f"pydantic validationerror for {company_name} (instructor/json mode): {str(ve)[:200]}")
        raise ve 
    except (RateLimitError, APIError, TypeError) as api_err:
        raise api_err 
    except Exception as e_unexpected:
        tqdm.write(f"unexpected error in analyze_text_with_openai_instructor for {company_name}: {type(e_unexpected).__name__} - {str(e_unexpected)[:200]}")
        raise e_unexpected

# helper and checkpoint functions
def extract_grade_letter(bucket_assessment_string: str) -> str:
    if bucket_assessment_string and isinstance(bucket_assessment_string, str):
        grade_part = bucket_assessment_string.split(" ")[0]
        if len(grade_part) == 1 and grade_part.isalpha(): return grade_part.upper()
    return "N/A"

def find_latest_openai_checkpoint_info(checkpoint_dir: str, filename_prefix: str) -> (Optional[str], int):
    latest_checkpoint_filename = None
    max_original_df_rows_processed = 0 
    checkpoint_pattern = re.compile(rf"{filename_prefix}(?:ERROR_)?checkpoint_rows_upto_(\d+)\.csv")
    if not os.path.isdir(checkpoint_dir):
        tqdm.write(f"checkpoint directory not found: {checkpoint_dir}")
        return None, 0
    for filename in os.listdir(checkpoint_dir):
        match = checkpoint_pattern.match(filename)
        if match:
            try:
                rows_in_this_filename = int(match.group(1))
                if rows_in_this_filename > max_original_df_rows_processed:
                    max_original_df_rows_processed = rows_in_this_filename
                    latest_checkpoint_filename = filename
                elif rows_in_this_filename == max_original_df_rows_processed and latest_checkpoint_filename:
                    if "ERROR" in latest_checkpoint_filename and "ERROR" not in filename:
                        latest_checkpoint_filename = filename
            except ValueError:
                tqdm.write(f"warning: could not parse row count from checkpoint filename: {filename}")
                continue
    if latest_checkpoint_filename:
        tqdm.write(f"found latest checkpoint: {latest_checkpoint_filename} (represents processing up to original_df_index {max_original_df_rows_processed -1}).")
    else:
        tqdm.write(f"no valid '{filename_prefix}' checkpoint found in {checkpoint_dir}. starting fresh.")
    return latest_checkpoint_filename, max_original_df_rows_processed

def save_openai_checkpoint(results_list_to_save: list, total_original_df_rows_covered: int, base_path: str, filename_prefix: str, is_error_save: bool = False):
    if not results_list_to_save:
        tqdm.write("no new results to save in this checkpoint interval (results_list_to_save is empty).")
        return
    temp_df = pd.DataFrame(results_list_to_save)
    temp_df['cumulative_prompt_tokens_at_save'] = cumulative_prompt_tokens
    temp_df['cumulative_completion_tokens_at_save'] = cumulative_completion_tokens
    temp_df['cumulative_cost_at_save'] = cumulative_cost
    error_tag = "ERROR_" if is_error_save else ""
    filename = os.path.join(base_path, f"{filename_prefix}{error_tag}checkpoint_rows_upto_{total_original_df_rows_covered}.csv")
    try:
        temp_df.to_csv(filename, index=False)
        tqdm.write(f"--- {'ERROR' if is_error_save else 'Regular'} checkpoint saved: {filename} (covers {total_original_df_rows_covered} original df rows) ---")
    except Exception as e_save:
        tqdm.write(f"error saving checkpoint {filename}: {e_save}")

# main execution
if __name__ == "__main__":
    print("\n--- main execution for openai detailed analysis (v9 - json mode, corrected columns) ---")

    OUTPUT_BASE_DIR = "./OpenAI_Analysis_Results/" 
    CHECKPOINT_FILENAME_PREFIX = "openai_detailed_analysis_"
    FINAL_RESULTS_FILENAME = os.path.join(OUTPUT_BASE_DIR, f"{CHECKPOINT_FILENAME_PREFIX}FINAL_ALL_ROWS.csv")
    os.makedirs(OUTPUT_BASE_DIR, exist_ok=True)
    
    CHECKPOINT_INTERVAL = 50 
    API_REQUEST_DELAY_ON_SUCCESS = 0.1 
    BACKOFF_DELAYS = [3, 10, 30, 60, 120] 
    MAX_RETRIES_PER_ROW = len(BACKOFF_DELAYS)

    if not OPENAI_CLIENT_CONFIGURED_SUCCESSFULLY:
        print("critical error: openai client not initialized. exiting.")
        exit()
    if 'df' not in globals() or not isinstance(df, pd.DataFrame) or df.empty:
        print("critical error: input dataframe 'df' not found, not a pandas dataframe, or is empty. exiting.")
        exit()
    print(f"input dataframe 'df' loaded with {len(df)} rows.")
    
    input_col_cik = "cik" 
    input_col_ticker = "tickers_sec" 
    input_col_company_name = "companyName_sec" 
    input_col_sector = "sector_user" 
    input_col_year = "year" 
    input_col_filing_date = "filingDate" 
    input_col_risk_factors = "risk_factors_text"
    input_col_mda = "item7_mda_text"

    if input_col_cik in df.columns:
        df[input_col_cik] = df[input_col_cik].astype(str).str.zfill(10)
        print(f"standardized '{input_col_cik}' column in input dataframe 'df'.")
    else:
        print(f"critical error: input cik column '{input_col_cik}' not found in dataframe 'df'. exiting.")
        exit()

    essential_text_cols = [input_col_risk_factors, input_col_mda]
    missing_text_cols = [col for col in essential_text_cols if col not in df.columns]
    if missing_text_cols:
        print(f"critical error: input dataframe 'df' is missing essential text columns: {missing_text_cols}. exiting.")
        exit()
    
    identifier_cols_to_check_in_input = [input_col_ticker, input_col_company_name, input_col_sector, input_col_year, input_col_filing_date]
    for col in identifier_cols_to_check_in_input:
        if col not in df.columns:
            print(f"warning: identifier column '{col}' not found in input dataframe 'df'. it will be 'n/a' or none in results.")

    all_processed_results = []
    start_processing_from_original_df_index = 0
    cumulative_prompt_tokens = 0
    cumulative_completion_tokens = 0
    cumulative_cost = 0.0

    latest_checkpoint_file, max_original_rows_covered_by_checkpoint = find_latest_openai_checkpoint_info(OUTPUT_BASE_DIR, CHECKPOINT_FILENAME_PREFIX)
    
    if latest_checkpoint_file:
        full_checkpoint_path = os.path.join(OUTPUT_BASE_DIR, latest_checkpoint_file)
        try:
            tqdm.write(f"loading data from checkpoint: {full_checkpoint_path}")
            checkpoint_df = pd.read_csv(full_checkpoint_path, dtype={'CIK': str}) 
            if 'CIK' in checkpoint_df.columns:
                checkpoint_df['CIK'] = checkpoint_df['CIK'].astype(str).str.zfill(10)

            if not checkpoint_df.empty:
                last_chkpt_row = checkpoint_df.iloc[-1]
                cumulative_prompt_tokens = int(last_chkpt_row.get('cumulative_prompt_tokens_at_save', 0))
                cumulative_completion_tokens = int(last_chkpt_row.get('cumulative_completion_tokens_at_save', 0))
                cumulative_cost = float(last_chkpt_row.get('cumulative_cost_at_save', 0.0))
                tqdm.write(f"restored cumulative stats: cost=${cumulative_cost:.6f}, ptokens={cumulative_prompt_tokens}, ctokens={cumulative_completion_tokens}")

                last_row_error_info = last_chkpt_row.get('error')
                is_last_row_an_error = pd.notna(last_row_error_info) and str(last_row_error_info).strip() != ""

                if "ERROR" in latest_checkpoint_file and is_last_row_an_error:
                    all_processed_results = checkpoint_df.iloc[:-1].to_dict('records')
                    start_processing_from_original_df_index = max_original_rows_covered_by_checkpoint - 1 
                    tqdm.write(f"loaded {len(all_processed_results)} successful results. will re-attempt original df index {start_processing_from_original_df_index}.")
                else:
                    all_processed_results = checkpoint_df.to_dict('records')
                    start_processing_from_original_df_index = max_original_rows_covered_by_checkpoint
                    tqdm.write(f"loaded {len(all_processed_results)} results. resuming from original df index {start_processing_from_original_df_index}.")
            else: tqdm.write("checkpoint was empty.")
        except Exception as e_load_chkpt:
            tqdm.write(f"error loading checkpoint {full_checkpoint_path}: {e_load_chkpt}. starting fresh.")
            all_processed_results = []; start_processing_from_original_df_index = 0
            cumulative_cost = 0.0; cumulative_prompt_tokens = 0; cumulative_completion_tokens = 0
    else: tqdm.write("no checkpoint found. starting fresh.")
    
    df_to_process_slice = df.iloc[start_processing_from_original_df_index:]

    if df_to_process_slice.empty and start_processing_from_original_df_index >= len(df):
        tqdm.write(f"all {len(df)} rows processed based on checkpoints.")
    elif len(df) == 0:
        tqdm.write("input dataframe 'df' is empty. exiting."); exit()
    
    newly_processed_results_this_session = []
    stop_all_processing_flag = False

    for original_df_index, row_data in tqdm(df_to_process_slice.iterrows(), 
                                             initial=0, 
                                             total=len(df_to_process_slice), 
                                             desc=f"Processing Rows (Original df index {start_processing_from_original_df_index} to {len(df)-1})"):
        
        current_row_result_dict = {
            'CIK': str(row_data.get(input_col_cik, "N/A_CIK")).zfill(10),
            'Ticker': str(row_data.get(input_col_ticker, "N/A_Ticker")),
            'Company Name': str(row_data.get(input_col_company_name, "N/A_CompName")),
            'Sector': str(row_data.get(input_col_sector, "N/A_Sector")),
            'Year': row_data.get(input_col_year, None), 
            'filingDate': row_data.get(input_col_filing_date, None),
            'Overall Summary': "N/A", 'Strategic Depth': "N/A", 'Disclosure Sentiment': "N/A",
            'Risk - Own Adoption': "N/A", 'Risk - External Threats': "N/A", 'Risk - Non-Adoption': "N/A",
            'Forward-Looking': "N/A", 'AI Washing Index': "N/A", 'Talent & Investment': "N/A",
            'Key AI Terms': "", 'error': None,
            'api_call_cost_for_row': 0.0, 
            'prompt_tokens_for_row': 0,
            'completion_tokens_for_row': 0
        }
        cik_val_for_log = current_row_result_dict['CIK']
        company_name_for_llm = current_row_result_dict['Company Name'] if current_row_result_dict['Company Name'] not in ["N/A_CompName", "nan", "None", ""] else current_row_result_dict['Ticker']
        if not company_name_for_llm or company_name_for_llm.lower() == "n/a_ticker" or company_name_for_llm.lower() == "nan":
            company_name_for_llm = f"CIK_{cik_val_for_log}"

        risk_text_val = str(row_data.get(input_col_risk_factors, ""))
        mda_text_val = str(row_data.get(input_col_mda, ""))

        risk_text_is_missing = pd.isna(row_data.get(input_col_risk_factors)) or not risk_text_val.strip()
        mda_text_is_missing = pd.isna(row_data.get(input_col_mda)) or not mda_text_val.strip()

        if risk_text_is_missing or mda_text_is_missing:
            error_message_parts = []
            if risk_text_is_missing: error_message_parts.append(f"missing/empty {input_col_risk_factors}")
            if mda_text_is_missing: error_message_parts.append(f"missing/empty {input_col_mda}")
            current_row_result_dict['error'] = " and ".join(error_message_parts)
            tqdm.write(f"skipping cik {cik_val_for_log} (original index: {original_df_index}): {current_row_result_dict['error']}.")
            newly_processed_results_this_session.append(current_row_result_dict)
            current_total_original_df_rows_covered = start_processing_from_original_df_index + len(newly_processed_results_this_session)
            if len(newly_processed_results_this_session) > 0 and \
               (current_total_original_df_rows_covered % CHECKPOINT_INTERVAL == 0 or \
                original_df_index == df.index[-1]):
                save_openai_checkpoint(all_processed_results + newly_processed_results_this_session, 
                                      current_total_original_df_rows_covered, 
                                      OUTPUT_BASE_DIR, CHECKPOINT_FILENAME_PREFIX,
                                      is_error_save=True)
            continue

        consecutive_api_failures_for_row = 0
        api_call_successful_and_parsed = False
        
        while consecutive_api_failures_for_row < MAX_RETRIES_PER_ROW:
            analysis_object = None
            attempt_p_tokens, attempt_c_tokens = 0, 0
            
            try:
                tqdm.write(f"attempt {consecutive_api_failures_for_row + 1}/{MAX_RETRIES_PER_ROW} for cik {cik_val_for_log} (original index: {original_df_index})")
                analysis_object, attempt_p_tokens, attempt_c_tokens = analyze_text_with_openai_instructor(
                    risk_text_val, mda_text_val, company_name_for_llm, temperature=0.0
                )
                
                current_row_result_dict['prompt_tokens_for_row'] += attempt_p_tokens
                current_row_result_dict['completion_tokens_for_row'] += attempt_c_tokens
                current_call_cost = calculate_and_update_cost(attempt_p_tokens, attempt_c_tokens, True)
                current_row_result_dict['api_call_cost_for_row'] += current_call_cost

                if analysis_object:
                    tqdm.write(f"success: cik {cik_val_for_log} (original index: {original_df_index}) parsed.")
                    current_row_result_dict.update({
                        'Overall Summary': analysis_object.overall_ai_preparedness_summary_cot,
                        'Strategic Depth': extract_grade_letter(analysis_object.ai_strategic_depth.bucket_assessment),
                        'Disclosure Sentiment': extract_grade_letter(analysis_object.ai_disclosure_sentiment.bucket_assessment),
                        'Risk - Own Adoption': extract_grade_letter(analysis_object.ai_risk_disclosure.risk_from_own_ai_adoption.bucket_assessment),
                        'Risk - External Threats': extract_grade_letter(analysis_object.ai_risk_disclosure.risk_from_external_ai_threats.bucket_assessment),
                        'Risk - Non-Adoption': extract_grade_letter(analysis_object.ai_risk_disclosure.risk_of_not_adopting_ai_or_competitive_ai_threat.bucket_assessment),
                        'Forward-Looking': extract_grade_letter(analysis_object.forward_looking_ai_statements.bucket_assessment),
                        'AI Washing Index': extract_grade_letter(analysis_object.ai_washing_hype_index.bucket_assessment),
                        'Talent & Investment': extract_grade_letter(analysis_object.ai_talent_and_investment_focus.bucket_assessment),
                        'Key AI Terms': ', '.join(analysis_object.key_ai_related_terminology_found or []),
                        'error': None 
                    })
                    api_call_successful_and_parsed = True
                    break 
                else: 
                    current_row_result_dict['error'] = 'analysis returned no object without exception.'
                    break 

            except (RateLimitError, APIError) as api_err:
                tqdm.write(f"api error (attempt {consecutive_api_failures_for_row + 1}) for cik {cik_val_for_log}: {type(api_err).__name__} - {api_err}")
                current_row_result_dict['error'] = f'api error attempt {consecutive_api_failures_for_row + 1}: {type(api_err).__name__} - {str(api_err)[:100]}'
                consecutive_api_failures_for_row += 1
                if consecutive_api_failures_for_row >= MAX_RETRIES_PER_ROW:
                    tqdm.write(f"max retries for api errors on cik {cik_val_for_log}. recording error.")
                    temp_error_save_list = newly_processed_results_this_session + [current_row_result_dict] 
                    save_openai_checkpoint(all_processed_results + temp_error_save_list,
                                          start_processing_from_original_df_index + len(newly_processed_results_this_session) + 1,
                                          OUTPUT_BASE_DIR, CHECKPOINT_FILENAME_PREFIX, is_error_save=True)
                    break 
                else:
                    delay = BACKOFF_DELAYS[consecutive_api_failures_for_row -1]
                    tqdm.write(f"waiting {delay}s before next retry for cik {cik_val_for_log}.")
                    time.sleep(delay)
            
            except ValidationError as ve:
                tqdm.write(f"validation error (attempt {consecutive_api_failures_for_row + 1}) for cik {cik_val_for_log}: {str(ve)[:200]}")
                current_row_result_dict['error'] = f'pydantic validationerror: {str(ve)[:150]}'
                tqdm.write(f"terminal error for cik {cik_val_for_log} due to unparsable llm response.")
                break 

            except Exception as e_unexpected:
                tqdm.write(f"unexpected error (attempt {consecutive_api_failures_for_row + 1}) for cik {cik_val_for_log}: {type(e_unexpected).__name__} - {str(e_unexpected)[:200]}")
                current_row_result_dict['error'] = f'unexpected attempt {consecutive_api_failures_for_row + 1}: {str(e_unexpected)[:100]}'
                consecutive_api_failures_for_row += 1
                if consecutive_api_failures_for_row >= MAX_RETRIES_PER_ROW:
                    tqdm.write(f"max retries for unexpected errors on cik {cik_val_for_log}. recording error.")
                    break
                else:
                    delay = BACKOFF_DELAYS[consecutive_api_failures_for_row -1]
                    tqdm.write(f"waiting {delay}s before next retry for cik {cik_val_for_log}.")
                    time.sleep(delay)

        newly_processed_results_this_session.append(current_row_result_dict)

        if stop_all_processing_flag:
            tqdm.write("stop flag activated. ending processing loop.")
            break 
        
        current_total_original_df_rows_covered = start_processing_from_original_df_index + len(newly_processed_results_this_session)
        if len(newly_processed_results_this_session) > 0 and \
           (current_total_original_df_rows_covered % CHECKPOINT_INTERVAL == 0 or \
            original_df_index == df.index[-1]): 
            
            combined_results_for_checkpoint = all_processed_results + newly_processed_results_this_session
            save_openai_checkpoint(combined_results_for_checkpoint, 
                                  current_total_original_df_rows_covered, 
                                  OUTPUT_BASE_DIR, 
                                  CHECKPOINT_FILENAME_PREFIX, 
                                  is_error_save=bool(current_row_result_dict.get('error')))
        
        if api_call_successful_and_parsed and API_REQUEST_DELAY_ON_SUCCESS > 0:
            time.sleep(API_REQUEST_DELAY_ON_SUCCESS)

    final_complete_list_of_results = all_processed_results + newly_processed_results_this_session

    if final_complete_list_of_results:
        final_save_is_error_state = stop_all_processing_flag and bool(final_complete_list_of_results[-1].get('error'))
        save_openai_checkpoint(final_complete_list_of_results, 
                              len(final_complete_list_of_results), 
                              OUTPUT_BASE_DIR, 
                              CHECKPOINT_FILENAME_PREFIX, 
                              is_error_save=final_save_is_error_state) 
        
        df_results_final_output = pd.DataFrame(final_complete_list_of_results)
        try:
            if 'CIK' in df_results_final_output.columns:
                df_results_final_output['CIK'] = df_results_final_output['CIK'].astype(str).str.zfill(10)

            output_columns_ordered = [
                'CIK', 'Ticker', 'Company Name', 'Sector', 'Year', 'filingDate',
                'Overall Summary', 'Strategic Depth', 'Disclosure Sentiment',
                'Risk - Own Adoption', 'Risk - External Threats', 'Risk - Non-Adoption',
                'Forward-Looking', 'AI Washing Index', 'Talent & Investment', 'Key AI Terms',
                'error', 'api_call_cost_for_row', 'prompt_tokens_for_row', 'completion_tokens_for_row',
            ]
            final_output_cols_present = [col for col in output_columns_ordered if col in df_results_final_output.columns]
            for stat_col in ['cumulative_prompt_tokens_at_save', 'cumulative_completion_tokens_at_save', 'cumulative_cost_at_save']:
                if stat_col in df_results_final_output.columns and stat_col not in final_output_cols_present:
                    final_output_cols_present.append(stat_col)

            df_results_final_output[final_output_cols_present].to_csv(FINAL_RESULTS_FILENAME, index=False)
            print(f"\n--- processing finished. final results saved to: {FINAL_RESULTS_FILENAME} ({len(df_results_final_output)} rows) ---")
            if not df_results_final_output.empty:
                print(df_results_final_output[final_output_cols_present].head().to_string())
        except Exception as e_final_save:
            print(f"error saving final results to {FINAL_RESULTS_FILENAME}: {e_final_save}")
    else:
        print("\nno results were processed or loaded from checkpoints to save in the final file.")
    
    print(f"\n--- final cost summary ---\ntotal prompt tokens: {cumulative_prompt_tokens}\ntotal completion tokens: {cumulative_completion_tokens}\ntotal estimated cost: ${cumulative_cost:.6f}")
    print("\n--- main execution finished ---")


In [None]:
import pandas as pd
import google.generativeai as genai
import json
import os
import re
import instructor
import time
from tqdm import tqdm
from pydantic import BaseModel, Field, field_validator
from typing import List, Literal, Optional, Union

# pydantic model definitions
# these are the same as the openai version, as the desired output structure is identical.
AIStrategicDepthBucket = Literal[
    "A (Core Strategic Enabler)",
    "B (Significant Operational/Product Integration)",
    "C (Emerging/Exploratory Mentions)",
    "D (Superficial/Generic Mentions OR Risk of Non-Adoption Only)",
    "E (No Mention of Own AI Strategy/Adoption)"
]

AIDisclosureSentimentBucket = Literal[
    "A (Clearly Positive)",
    "B (Mostly Positive/Balanced)",
    "C (Neutral/Factual)",
    "D (Cautious/Mixed)",
    "E (Negative OR Not Applicable - No Own AI Discussion)"
]

AIRiskOwnAdoptionBucket = Literal[
    "A (Detailed & Specific Discussion)",
    "B (General Mention of Own AI Adoption Risk)",
    "C (No Mention of Own AI Adoption Risk)"
]

AIRiskExternalThreatsBucket = Literal[
    "A (Detailed & Specific Discussion)",
    "B (General Mention of External AI Threat)",
    "C (No Mention of External AI Threat)"
]

AIRiskNonAdoptionBucket = Literal[
    "A (Explicitly Discussed)",
    "B (Implicitly Suggested)",
    "C (No Mention of Non-Adoption/Competitive AI Risk)"
]

AIForwardLookingBucket = Literal[
    "A (Specific & Detailed Future Plans)",
    "B (General Future Intent)",
    "C (Implicit Future Focus Only)",
    "D (No Mention of Future AI Plans)"
]

AIWashingHypeBucket = Literal[
    "A (Substantive & Grounded)",
    "B (Mostly Substantive)",
    "C (Mixed - Some Substance, Some Hype)",
    "D (Mostly Hype)",
    "E (Pure Hype OR Not Applicable - No Positive AI Claims)"
]

AITalentInvestmentBucket = Literal[
    "A (Explicit & Significant Focus)",
    "B (General Mention of AI Talent/Investment)",
    "C (Implicit Focus Only)",
    "D (No Mention of AI Talent/Investment)"
]

class BaseAssessment(BaseModel):
    supporting_evidence: List[str] = Field(default_factory=list)
    chain_of_thought_reasoning: str

    @field_validator("supporting_evidence", mode='before')
    @classmethod
    def normalize_supporting_evidence(cls, v):
        if v is None: return []
        if isinstance(v, str): return [v] if v.strip() else []
        if isinstance(v, list): return [str(item).strip() for item in v if isinstance(item, str) and str(item).strip()]
        return []

class AIStrategicDepthAssessment(BaseAssessment):
    bucket_assessment: AIStrategicDepthBucket

class AIDisclosureSentimentAssessment(BaseAssessment):
    bucket_assessment: AIDisclosureSentimentBucket

class AIRiskOwnAdoptionAssessment(BaseAssessment):
    bucket_assessment: AIRiskOwnAdoptionBucket

class AIRiskExternalThreatsAssessment(BaseAssessment):
    bucket_assessment: AIRiskExternalThreatsBucket

class AIRiskNonAdoptionAssessment(BaseAssessment):
    bucket_assessment: AIRiskNonAdoptionBucket

class AIRiskDisclosure(BaseModel):
    risk_from_own_ai_adoption: AIRiskOwnAdoptionAssessment
    risk_from_external_ai_threats: AIRiskExternalThreatsAssessment
    risk_of_not_adopting_ai_or_competitive_ai_threat: AIRiskNonAdoptionAssessment

class AIForwardLookingAssessment(BaseAssessment):
    bucket_assessment: AIForwardLookingBucket

class AIWashingHypeAssessment(BaseAssessment):
    bucket_assessment: AIWashingHypeBucket

class AITalentInvestmentAssessment(BaseAssessment):
    bucket_assessment: AITalentInvestmentBucket

class CompanyAIAnalysis(BaseModel):
    company_identifier: Optional[str] = Field(default=None, description="EXTRACT_COMPANY_NAME_IF_POSSIBLE_ELSE_NULL")
    overall_ai_preparedness_summary_cot: str = Field(description="LLM_GENERATED_1_2_SENTENCE_SUMMARY_OF_OVERALL_AI_POSTURE_BASED_ON_ALL_FACTORS")
    ai_strategic_depth: AIStrategicDepthAssessment
    ai_disclosure_sentiment: AIDisclosureSentimentAssessment
    ai_risk_disclosure: AIRiskDisclosure
    forward_looking_ai_statements: AIForwardLookingAssessment
    ai_washing_hype_index: AIWashingHypeAssessment
    ai_talent_and_investment_focus: AITalentInvestmentAssessment
    key_ai_related_terminology_found: List[str] = Field(default_factory=list, description="if_any_explicit_ai_terms_like_machine_learning_nlp_etc")

    @field_validator("key_ai_related_terminology_found", mode='before')
    @classmethod
    def normalize_key_terms(cls, v):
        if v is None: return []
        if isinstance(v, str): return [term.strip() for term in v.split(',') if term.strip()]
        if isinstance(v, list): return [str(item).strip() for item in v if isinstance(item, str) and str(item).strip()]
        return []

# prompt template
PROMPT_TEMPLATE = """
You are an expert financial analyst and AI strategist, specializing in evaluating corporate disclosures for insights into a company's engagement with Artificial Intelligence (AI). Your task is to meticulously analyze the provided text from a company's 10-K filing (specifically excerpts from "Item 1. Risk Factors" and "Item 7. Management's Discussion and Analysis") and provide a structured JSON output assessing the company's AI preparedness based on the defined factors and criteria.

**DOCUMENT CONTEXT:**
The provided text below, enclosed in triple backticks (```), contains excerpts from "Item 1. Risk Factors" and "Item 7. Management's Discussion and Analysis (MD&A)" of a single company's 10-K report.

```
{document_text}
```

**INSTRUCTIONS:**
1.  Carefully read the entire provided text.
2.  For each of the AI Assessment Factors listed below, evaluate the text based on the defined Buckets & Criteria.
3.  Provide your entire response as a single, valid JSON object. Do not include any explanatory text before or after the JSON object. The JSON object should conform to the Pydantic schema provided implicitly by the system.
4.  For each factor, include:
    * `bucket_assessment`: The exact string for the category you've assigned (e.g., "A (Core Strategic Enabler)", "E (No Mention of Own AI Strategy/Adoption)"). It must match one of the predefined bucket descriptions for that factor.
    * `supporting_evidence`: Key verbatim quotes (or concise summaries of multiple related statements) from the text that justify your assessment. Limit to 2-3 key pieces of evidence. If no direct evidence, provide an empty list or a list containing "No direct evidence found.".
    * `chain_of_thought_reasoning`: A brief explanation (1-2 sentences) of your thought process for arriving at the bucket assessment, referencing the criteria and evidence.
5.  If information for a factor or sub-factor is not present in the text, use the appropriate "No Mention" bucket assessment for that factor and provide an empty list or ["No direct evidence found."] for `supporting_evidence`.

**AI ASSESSMENT FACTORS, BUCKETS & CRITERIA:**

**1. AI Strategic Depth:**
    * *Concept:* How deeply and explicitly AI is integrated into the company's core business strategy and future plans, as evidenced by concrete examples, initiatives, or stated intentions.
    * *Buckets & Criteria:*
        * `A (Core Strategic Enabler)`: AI explicitly central to core strategy, key driver of competitive advantage/future growth, multiple concrete examples linked to strategic objectives.
        * `B (Significant Operational/Product Integration)`: AI explicitly enhances key operations/products with specific examples, strategic for those areas.
        * `C (Emerging/Exploratory Mentions)`: AI mentioned for isolated projects, pilot programs, or limited exploration; strategic link less developed.
        * `D (Superficial/Generic Mentions OR Risk of Non-Adoption Only)`: Generic AI statements without concrete examples OR AI *only* mentioned as a risk of non-adoption by the company.
        * `E (No Mention of Own AI Strategy/Adoption)`: No discussion of the company's own AI strategy, adoption, or specific AI initiatives.

**2. AI Disclosure Sentiment:**
    * *Concept:* The overall tone of the company's *own statements* regarding its AI initiatives, capabilities, and future AI plans.
    * *Buckets & Criteria:*
        * `A (Clearly Positive)`: Optimistic language on AI benefits/achievements/opportunities, backed by specifics.
        * `B (Mostly Positive/Balanced)`: Generally positive, perhaps tempered with neutral statements or acknowledged challenges.
        * `C (Neutral/Factual)`: AI discussed objectively, descriptively, without strong positive/negative connotations.
        * `D (Cautious/Mixed)`: AI discussions highlight significant uncertainties/challenges/risks of *own* AI efforts, or tone is wary.
        * `E (Negative OR Not Applicable - No Own AI Discussion)`: Predominantly negative language regarding own AI experiences OR no discussion of own AI initiatives.

**3. AI Risk Disclosure:**
    * *Concept:* How thoroughly and specifically the company acknowledges and discusses potential risks associated with AI. Assessed across three sub-factors:
    * **3a. Risk from Company's Own AI Adoption:** (e.g., AI system failure, data bias, ethical concerns, implementation challenges).
        * *Buckets & Criteria for 3a:*
            * `A (Detailed & Specific Discussion)`: Specific risks of *own AI adoption* identified and discussed in detail, perhaps with impacts/mitigation.
            * `B (General Mention of Own AI Adoption Risk)`: Risks of *own AI adoption* mentioned generally, less detail.
            * `C (No Mention of Own AI Adoption Risk)`: This specific type of risk is not mentioned.
    * **3b. Risk from External AI (Threats):** (e.g., AI used in cyberattacks by others, AI-driven misinformation).
        * *Buckets & Criteria for 3b:*
            * `A (Detailed & Specific Discussion)`: Specific *external AI threats* identified and discussed in detail.
            * `B (General Mention of External AI Threat)`: External AI threats mentioned generally.
            * `C (No Mention of External AI Threat)`: This specific type of risk is not mentioned.
    * **3c. Risk of Not Adopting AI / Competitive AI Threat:** (e.g., falling behind AI-leveraging competitors, failure to innovate using AI).
        * *Buckets & Criteria for 3c:*
            * `A (Explicitly Discussed)`: Risk of not adopting AI or being outcompeted by AI-using rivals is explicitly stated and perhaps detailed.
            * `B (Implicitly Suggested)`: Competitive pressures from technologically advanced rivals are mentioned, technology adoption is stressed, but AI not explicitly named as the specific risk of non-adoption.
            * `C (No Mention of Non-Adoption/Competitive AI Risk)`: This specific type of risk is not mentioned.

**4. Forward-Looking AI Statements:**
    * *Concept:* The extent and specificity of discussion about future plans, investments, or expected impact related to the company's own AI initiatives.
    * *Buckets & Criteria:*
        * `A (Specific & Detailed Future Plans)`: Concrete future AI initiatives, R&D, investments, or launches described with some detail.
        * `B (General Future Intent)`: General intention to explore/invest in/utilize AI, without specific details.
        * `C (Implicit Future Focus Only)`: General future-oriented tech/digital strategies mentioned that *might* include AI, but AI not explicitly named in forward-looking plans.
        * `D (No Mention of Future AI Plans)`: No forward-looking statements regarding own AI initiatives.

**5. "AI Washing" Hype Index:** (Factor is only applicable if positive claims about own AI are made)
    * *Concept:* Degree to which a company's positive AI claims appear substantive vs. exaggerated or vague.
    * *Buckets & Criteria:*
        * `A (Substantive & Grounded)`: Positive AI claims consistently backed by specific examples, clear use cases, tangible outcomes, or detailed projects/investments. Risk acknowledgement of own AI can signal substance.
        * `B (Mostly Substantive)`: Most positive AI claims supported by specifics, some mild aspirational language.
        * `C (Mixed - Some Substance, Some Hype)`: Some specific AI applications mentioned, but interspersed with buzzwords or generic/aspirational statements.
        * `D (Mostly Hype)`: Predominantly vague claims, AI buzzwords, general AI potential without concrete company-specific examples.
        * `E (Pure Hype OR Not Applicable - No Positive AI Claims)`: AI mentioned in purely aspirational/marketing sense with no support OR no positive claims about own AI are made.

**6. AI Talent & Investment Focus:**
    * *Concept:* Explicit discussion of focus on acquiring/developing AI talent, or specific investments in AI R&D, technology, partnerships, or infrastructure.
    * *Buckets & Criteria:*
        * `A (Explicit & Significant Focus)`: Specific initiatives for AI talent (hiring, training, dedicated teams), AI R&D spending, or substantial investments/partnerships in AI detailed.
        * `B (General Mention of AI Talent/Investment)`: Mentions tech talent implicitly including AI, or general statements about investing in tech/R&D where AI is likely but not singled out.
        * `C (Implicit Focus Only)`: Significant R&D in tech-heavy areas or major "business technology transformation" mentioned, but AI not explicitly linked to talent/investment items.
        * `D (No Mention of AI Talent/Investment)`: No explicit or strong implicit discussion of AI-specific talent or investments.

**EXPECTED JSON OUTPUT STRUCTURE (Example for LLM guidance, actual structure enforced by Pydantic):**
```json
{{
  "company_identifier": "EXTRACT_COMPANY_NAME_IF_POSSIBLE_ELSE_NULL",
  "overall_ai_preparedness_summary_cot": "LLM_GENERATED_1_2_SENTENCE_SUMMARY_OF_OVERALL_AI_POSTURE_BASED_ON_ALL_FACTORS",
  "ai_strategic_depth": {{
    "bucket_assessment": "A (Core Strategic Enabler)",
    "supporting_evidence": ["quote1", "quote2"],
    "chain_of_thought_reasoning": "Your reasoning for this factor."
  }},
  "ai_disclosure_sentiment": {{
    "bucket_assessment": "B (Mostly Positive/Balanced)",
    "supporting_evidence": ["quote1"],
    "chain_of_thought_reasoning": "Your reasoning for this factor."
  }},
  "ai_risk_disclosure": {{
    "risk_from_own_ai_adoption": {{
      "bucket_assessment": "A (Detailed & Specific Discussion)",
      "supporting_evidence": ["quote1"],
      "chain_of_thought_reasoning": "Your reasoning for this sub-factor."
    }},
    "risk_from_external_ai_threats": {{
      "bucket_assessment": "B (General Mention of External AI Threat)",
      "supporting_evidence": ["quote1"],
      "chain_of_thought_reasoning": "Your reasoning for this sub-factor."
    }},
    "risk_of_not_adopting_ai_or_competitive_ai_threat": {{
      "bucket_assessment": "C (No Mention of Non-Adoption/Competitive AI Risk)",
      "supporting_evidence": ["No direct evidence found."],
      "chain_of_thought_reasoning": "Your reasoning for this sub-factor."
    }}
  }},
  "forward_looking_ai_statements": {{
    "bucket_assessment": "D (No Mention of Future AI Plans)",
    "supporting_evidence": [],
    "chain_of_thought_reasoning": "Your reasoning for this factor."
  }},
  "ai_washing_hype_index": {{
    "bucket_assessment": "E (Pure Hype OR Not Applicable - No Positive AI Claims)",
    "supporting_evidence": ["No positive AI claims made."],
    "chain_of_thought_reasoning": "Your reasoning for this factor."
  }},
  "ai_talent_and_investment_focus": {{
    "bucket_assessment": "A (Explicit & Significant Focus)",
    "supporting_evidence": ["quote relating to talent or investment"],
    "chain_of_thought_reasoning": "Your reasoning for this factor."
  }},
  "key_ai_related_terminology_found": ["artificial intelligence", "machine learning"]
}}
```
"""

# configuration and client initialization for gemini
print("--- starting gemini configuration and client initialization ---")
CONFIGURED_GEMINI_API_KEY = None
GENAI_CONFIGURED_SUCCESSFULLY = False
INSTRUCTOR_GEMINI_CLIENT = None

if 'gemini_api_key' in globals():
    print("using pre-defined global 'gemini_api_key'.")
else:
    print("info: 'gemini_api_key' is not defined globally. ensure it is set if required.")
    gemini_api_key = None

if gemini_api_key is None:
    print("error: global variable 'gemini_api_key' is not defined or not found via other methods.")
else:
    CONFIGURED_GEMINI_API_KEY = gemini_api_key
    if isinstance(CONFIGURED_GEMINI_API_KEY, str) and CONFIGURED_GEMINI_API_KEY.strip():
        print(f"success: 'gemini_api_key' variable found and is a non-empty string.")
    else:
        print(f"error: 'gemini_api_key' variable is not a valid non-empty string.")
        CONFIGURED_GEMINI_API_KEY = None

if CONFIGURED_GEMINI_API_KEY:
    try:
        genai.configure(api_key=CONFIGURED_GEMINI_API_KEY)
        print("success: genai.configure(api_key=...) called successfully.")
        GENAI_CONFIGURED_SUCCESSFULLY = True
    except Exception as e:
        print(f"error: exception during genai.configure(api_key=...): {e}")
else:
    print("skipped: genai.configure() because api key was not valid or not found.")

if GENAI_CONFIGURED_SUCCESSFULLY:
    print("attempting: to create instructor-patched gemini client...")
    try:
        print("  step 1: creating base genai.generativemodel('models/gemini-1.5-flash-latest')...")
        base_gemini_model_instance = genai.GenerativeModel(model_name='models/gemini-1.5-flash-latest')
        print("  success: base genai.generativemodel created.")
        print("  step 2: creating client with instructor.from_gemini using instructor.mode.gemini_json...")
        INSTRUCTOR_GEMINI_CLIENT = instructor.from_gemini(
            client=base_gemini_model_instance,
            mode=instructor.Mode.GEMINI_JSON
        )
        print("  success: client created with instructor.from_gemini using gemini_json mode.")
    except Exception as e:
        print(f"  error: exception during instructor.from_gemini client creation: {e}")
        INSTRUCTOR_GEMINI_CLIENT = None
else:
    print("skipped: instructor client creation for gemini because genai library was not configured successfully.")

if INSTRUCTOR_GEMINI_CLIENT:
    print("success: instructor_gemini_client appears to be initialized.")
else:
    print("failure: instructor_gemini_client could not be initialized. analysis will not work.")

print("--- gemini configuration and client initialization finished ---")

# analysis function using instructor with gemini
def analyze_text_with_gemini_instructor(
    risk_factors_text: str,
    mda_text: str,
    company_name: str = "Unknown Company",
    max_retries: int = 1, # instructor's internal retries, set to 1 as we handle retries externally
    temperature: float = 0.0
) -> Optional[CompanyAIAnalysis]:
    if not INSTRUCTOR_GEMINI_CLIENT:
        tqdm.write(f"error in analyze_text_with_gemini_instructor for {company_name}: instructor_gemini_client is not available.")
        return None

    document_text = f"""Company Name (if known): {company_name}

Item 1. Risk Factors:
{risk_factors_text}

Item 7. Management's Discussion and Analysis (MD&A):
{mda_text}
    """
    full_prompt = PROMPT_TEMPLATE.format(document_text=document_text)

    try:
        messages = [{"role": "user", "content": full_prompt}]
        # note: max_retries for instructor.from_gemini is for the specific call attempt.
        # our script handles higher-level retries for the row.
        if hasattr(INSTRUCTOR_GEMINI_CLIENT, 'messages') and hasattr(INSTRUCTOR_GEMINI_CLIENT.messages, 'create'):
            analysis_response = INSTRUCTOR_GEMINI_CLIENT.messages.create(
                messages=messages,
                response_model=CompanyAIAnalysis,
                max_retries=max_retries, 
                generation_config={"temperature": temperature}
            )
        elif hasattr(INSTRUCTOR_GEMINI_CLIENT, 'generate_content'):
            analysis_response = INSTRUCTOR_GEMINI_CLIENT.generate_content(
                contents=[full_prompt],
                response_model=CompanyAIAnalysis,
                max_retries=max_retries,
                generation_config=genai.types.GenerationConfig(temperature=temperature)
            )
        else:
            tqdm.write(f"error for {company_name}: patched instructor_gemini_client does not have a recognized method.")
            return None
        return analysis_response
    except Exception as e:
        tqdm.write(f"error in analyze_text_with_gemini_instructor: exception during api call for {company_name}: {e}")
        return None

# helper function to extract letter grade
def extract_grade_letter(bucket_assessment_string: str) -> str:
    if bucket_assessment_string and isinstance(bucket_assessment_string, str):
        grade_part = bucket_assessment_string.split(" ")[0]
        if len(grade_part) == 1 and grade_part.isalpha():
            return grade_part.upper()
    return "N/A"

# function to find the latest checkpoint
def find_latest_checkpoint_info(checkpoint_dir: str) -> (Optional[str], int):
    latest_checkpoint_filename = None
    max_rows_processed = 0 # this will be the number in the filename, e.g., from "upto_n"
    checkpoint_pattern = re.compile(r"gemini_analysis_(?:ERROR_)?checkpoint_rows_upto_(\d+)\.csv")

    if not os.path.isdir(checkpoint_dir):
        tqdm.write(f"checkpoint directory {checkpoint_dir} does not exist. starting fresh.")
        return None, 0

    for filename in os.listdir(checkpoint_dir):
        match = checkpoint_pattern.match(filename)
        if match:
            try:
                rows = int(match.group(1))
                if rows > max_rows_processed:
                    max_rows_processed = rows
                    latest_checkpoint_filename = filename
            except ValueError:
                tqdm.write(f"warning: could not parse row count from checkpoint filename: {filename}")
                continue

    if latest_checkpoint_filename:
        # max_rows_processed is the n from "upto_n". this n represents the count of rows in that file.
        tqdm.write(f"found latest checkpoint: {latest_checkpoint_filename} (contains {max_rows_processed} rows).")
    else:
        tqdm.write("no valid checkpoint files found. starting fresh.")
    return latest_checkpoint_filename, max_rows_processed

# function to save checkpoint
def save_checkpoint(list_to_save: list, num_rows_in_list: int, base_path: str, is_error_save: bool = False):
    if not list_to_save: # should not happen if called correctly, but good check
        tqdm.write("no results to save in checkpoint (list_to_save is empty).")
        return

    temp_df = pd.DataFrame(list_to_save)
    prefix = "gemini_analysis_ERROR_checkpoint_rows_upto_" if is_error_save else "gemini_analysis_checkpoint_rows_upto_"
    # num_rows_in_list is the actual count of items being saved
    filename = os.path.join(base_path, f"{prefix}{num_rows_in_list}.csv") 
    
    try:
        temp_df.to_csv(filename, index=False)
        status_message = "ERROR" if is_error_save else "Regular"
        tqdm.write(f"--- {status_message} checkpoint saved: {filename} ({num_rows_in_list} total rows in this file) ---")
    except Exception as e_save:
        tqdm.write(f"error saving {status_message.lower()} checkpoint {filename}: {e_save}")

# main execution
if __name__ == "__main__":
    print("\n--- main execution for gemini analysis ---")

    checkpoint_base_path = "/Users/daniel/Library/Mobile Documents/com~apple~CloudDocs/Documents/Master Finance/MasterThesis/ThesisData/GeminiResults/"
    os.makedirs(checkpoint_base_path, exist_ok=True)

    checkpoint_interval = 50 
    api_request_delay_on_success = 1.0 

    backoff_delays = [30, 60, 300]  # delays in seconds: 30s, 1min, 5min
    max_retries_per_row = len(backoff_delays) # max attempts for a single row after initial try

    if not CONFIGURED_GEMINI_API_KEY or not GENAI_CONFIGURED_SUCCESSFULLY or not INSTRUCTOR_GEMINI_CLIENT:
        print("critical error: gemini api key not configured or instructor client not initialized. cannot proceed.")
    elif 'df' not in globals() or not isinstance(df, pd.DataFrame):
        print("critical error: dataframe 'df' not found or is not a pandas dataframe. please load your dataframe first.")
    elif df.empty:
        print("critical error: dataframe 'df' is empty. no data to analyze.")
    else:
        print(f"input dataframe 'df' has {len(df)} rows.")

        all_finalized_results_list = [] # stores rows that are successfully processed or skipped due to input error
        start_row_index_for_df = 0      # the 0-indexed row in the original 'df' to start processing from

        latest_checkpoint_file, num_rows_in_latest_file = find_latest_checkpoint_info(checkpoint_base_path)

        if latest_checkpoint_file:
            full_checkpoint_path = os.path.join(checkpoint_base_path, latest_checkpoint_file)
            try:
                tqdm.write(f"loading data from checkpoint: {full_checkpoint_path}")
                checkpoint_df = pd.read_csv(full_checkpoint_path)
                if not checkpoint_df.empty:
                    if "ERROR" in latest_checkpoint_file and checkpoint_df.iloc[-1].get('error') is not None and str(checkpoint_df.iloc[-1].get('error')).strip() != "":
                        # last row in error checkpoint was a failure, so we need to retry it.
                        # load all rows *except* the last one into finalized results.
                        all_finalized_results_list = checkpoint_df.iloc[:-1].to_dict('records')
                        start_row_index_for_df = len(all_finalized_results_list) 
                        tqdm.write(f"loaded {len(all_finalized_results_list)} finalized rows. will retry row index {start_row_index_for_df} (0-indexed).")
                    else:
                        # regular checkpoint, or error checkpoint where last row wasn't a script-logged failure
                        all_finalized_results_list = checkpoint_df.to_dict('records')
                        start_row_index_for_df = len(all_finalized_results_list)
                        tqdm.write(f"loaded {len(all_finalized_results_list)} finalized rows. resuming from next row (index {start_row_index_for_df}).")
                else: # checkpoint file was empty
                    tqdm.write("checkpoint file was empty. starting fresh.")
                    start_row_index_for_df = 0
            except Exception as e:
                tqdm.write(f"error loading checkpoint file {full_checkpoint_path}: {e}. starting from scratch.")
                all_finalized_results_list = []
                start_row_index_for_df = 0
        else:
            tqdm.write("no checkpoint found. starting processing from the beginning.")

        identifier_columns = ["cik", "tickers_sec", "companyName_sec", "sector_user"]
        text_columns = ["risk_factors_text", "item7_mda_text"]
        all_required_cols_for_df = identifier_columns + text_columns

        missing_schema_cols = [col for col in all_required_cols_for_df if col not in df.columns]
        if missing_schema_cols:
            print(f"critical error: dataframe 'df' is missing required columns: {missing_schema_cols}. aborting.")
        else:
            df_to_process = df.iloc[start_row_index_for_df:]
            
            if df_to_process.empty and start_row_index_for_df > 0 :
                tqdm.write(f"all {start_row_index_for_df} rows from input dataframe appear to be processed based on checkpoints.")
            elif df_to_process.empty and start_row_index_for_df == 0:
                tqdm.write("input dataframe is empty. no data to process.")

            rows_processed_this_session = 0 # counts rows finalized in the current run
            stop_all_processing_flag = False

            # outer loop: iterates through rows of the input dataframe that need processing
            for original_df_index_val, row_data in tqdm(df_to_process.iterrows(), total=df_to_process.shape[0], desc="Processing DataFrame Rows"):
                # original_df_index_val is the true index from the original 'df'
                
                cik_val = str(row_data.get("cik", "N/A"))
                ticker_val = str(row_data.get("tickers_sec", "N/A"))
                company_name_val = str(row_data.get("companyName_sec", "N/A"))
                sector_val = str(row_data.get("sector_user", "N/A"))

                company_name_for_llm_analysis = company_name_val if company_name_val and company_name_val.lower() != "nan" else ticker_val
                if not company_name_for_llm_analysis or company_name_for_llm_analysis.lower() == "nan":
                    company_name_for_llm_analysis = f"CIK_{cik_val}"

                # this dictionary will hold the result for the current row
                current_row_result_dict = {
                    'CIK': cik_val, 'Ticker': ticker_val, 'Company Name': company_name_val, 'Sector': sector_val,
                    'Overall Summary': "N/A", 'Strategic Depth': "N/A", 'Disclosure Sentiment': "N/A",
                    'Risk - Own Adoption': "N/A", 'Risk - External Threats': "N/A", 'Risk - Non-Adoption': "N/A",
                    'Forward-Looking': "N/A", 'AI Washing Index': "N/A", 'Talent & Investment': "N/A",
                    'Key AI Terms': "", 'error': None
                }

                # check for valid input data for this row before attempting api calls
                valid_row_for_api_analysis = True
                for col in text_columns:
                    if pd.isna(row_data[col]) or not str(row_data[col]).strip():
                        tqdm.write(f"skipping cik {cik_val} (original index: {original_df_index_val}): missing or empty data in column '{col}'.")
                        current_row_result_dict['error'] = f'missing/empty data in column {col}'
                        valid_row_for_api_analysis = False
                        break
                
                if not valid_row_for_api_analysis:
                    all_finalized_results_list.append(current_row_result_dict)
                    rows_processed_this_session += 1
                    # regular checkpoint saving based on finalized rows
                    if rows_processed_this_session > 0 and (rows_processed_this_session % checkpoint_interval == 0):
                        save_checkpoint(all_finalized_results_list, len(all_finalized_results_list), checkpoint_base_path, is_error_save=False)
                    continue # move to the next row in the outer loop

                # inner loop to retry the current row if api call is needed
                consecutive_api_failures_for_current_row = 0
                current_row_successfully_processed = False

                while True: # retry loop for the current original_df_index_val
                    analysis_object = analyze_text_with_gemini_instructor(
                        str(row_data["risk_factors_text"]), 
                        str(row_data["item7_mda_text"]), 
                        company_name_for_llm_analysis, 
                        temperature=0.0
                    )

                    if analysis_object:
                        tqdm.write(f"success: cik {cik_val} (original index: {original_df_index_val}) processed successfully.")
                        current_row_result_dict.update({
                            'Overall Summary': analysis_object.overall_ai_preparedness_summary_cot,
                            'Strategic Depth': extract_grade_letter(analysis_object.ai_strategic_depth.bucket_assessment),
                            'Disclosure Sentiment': extract_grade_letter(analysis_object.ai_disclosure_sentiment.bucket_assessment),
                            'Risk - Own Adoption': extract_grade_letter(analysis_object.ai_risk_disclosure.risk_from_own_ai_adoption.bucket_assessment),
                            'Risk - External Threats': extract_grade_letter(analysis_object.ai_risk_disclosure.risk_from_external_ai_threats.bucket_assessment),
                            'Risk - Non-Adoption': extract_grade_letter(analysis_object.ai_risk_disclosure.risk_of_not_adopting_ai_or_competitive_ai_threat.bucket_assessment),
                            'Forward-Looking': extract_grade_letter(analysis_object.forward_looking_ai_statements.bucket_assessment),
                            'AI Washing Index': extract_grade_letter(analysis_object.ai_washing_hype_index.bucket_assessment),
                            'Talent & Investment': extract_grade_letter(analysis_object.ai_talent_and_investment_focus.bucket_assessment),
                            'Key AI Terms': ', '.join(analysis_object.key_ai_related_terminology_found if analysis_object.key_ai_related_terminology_found else [])
                        })
                        current_row_result_dict['error'] = None # clear any previous error for this row
                        consecutive_api_failures_for_current_row = 0 
                        current_row_successfully_processed = True
                        break # exit retry loop for this row
                    else: # api call failed for this row
                        current_row_result_dict['error'] = 'gemini analysis failed or returned no result' # update error for this attempt
                        consecutive_api_failures_for_current_row += 1
                        
                        # save an error checkpoint. it includes previously finalized rows + current failing row's state.
                        # the number of rows in the filename will be len(all_finalized_results_list) + 1
                        temp_list_for_error_save = list(all_finalized_results_list)
                        temp_list_for_error_save.append(current_row_result_dict)
                        save_checkpoint(temp_list_for_error_save, 
                                        len(temp_list_for_error_save), # number of rows including the current failing one
                                        checkpoint_base_path, 
                                        is_error_save=True)

                        if consecutive_api_failures_for_current_row <= max_retries_per_row:
                            delay_index = consecutive_api_failures_for_current_row - 1
                            current_delay = backoff_delays[delay_index]
                            tqdm.write(f"api error for cik {cik_val} (original index: {original_df_index_val}). attempt {consecutive_api_failures_for_current_row}/{max_retries_per_row}. waiting {current_delay}s before retrying same row.")
                            time.sleep(current_delay)
                            # continue to the next iteration of the inner while loop to retry
                        else: 
                            tqdm.write(f"persistent api errors for cik {cik_val} (original index: {original_df_index_val}) after {consecutive_api_failures_for_current_row} attempts. stopping script.")
                            stop_all_processing_flag = True
                            # current_row_result_dict already has the final error
                            break # exit retry loop for this row, it failed permanently
                # end of inner retry loop for the current row

                all_finalized_results_list.append(current_row_result_dict) # add the final state of this row (success or max retries failed)
                rows_processed_this_session += 1
                
                if stop_all_processing_flag:
                    tqdm.write("stopping all processing due to persistent error on a row.")
                    break # exit the main outer for loop

                # regular interval checkpoint saving (based on finalized rows)
                if rows_processed_this_session > 0 and (rows_processed_this_session % checkpoint_interval == 0):
                    save_checkpoint(all_finalized_results_list, len(all_finalized_results_list), checkpoint_base_path, is_error_save=False)
                
                # apply normal delay only if the row was successfully processed in its last attempt
                if current_row_successfully_processed and api_request_delay_on_success > 0 :
                    time.sleep(api_request_delay_on_success)
                
            # final save after loop completion (or break)
            if all_finalized_results_list:
                df_results_final = pd.DataFrame(all_finalized_results_list)
                final_results_filename = os.path.join(checkpoint_base_path, "gemini_analysis_FINAL_ALL_ROWS.csv")
                try:
                    df_results_final.to_csv(final_results_filename, index=False)
                    print(f"\n\n--- processing finished. final results saved to: {final_results_filename} ({len(df_results_final)} total rows) ---")
                    if not df_results_final.empty:
                        print(df_results_final.head().to_string())
                    else:
                        print("final dataframe is empty.")
                except Exception as e_final_save:
                    print(f"error saving final results {final_results_filename}: {e_final_save}")
            else:
                print("\nno results were processed or loaded to create a final dataframe.")
    
    print("\n--- main execution finished ---")


In [None]:
import pandas as pd
import google.generativeai as genai
from google.generativeai.types import GenerationConfig, HarmCategory, HarmBlockThreshold
from google.api_core.exceptions import ResourceExhausted, InternalServerError, ServiceUnavailable, DeadlineExceeded
import json
import os
import re
import instructor
import time
from tqdm import tqdm
from pydantic import BaseModel, Field, field_validator, ValidationError
from typing import List, Literal, Optional, Union

# pydantic model definitions
AIStrategicDepthBucket = Literal[
    "A (Core Strategic Enabler)",
    "B (Significant Operational/Product Integration)",
    "C (Emerging/Exploratory Mentions)",
    "D (Superficial/Generic Mentions OR Risk of Non-Adoption Only)",
    "E (No Mention of Own AI Strategy/Adoption)"
]

AIDisclosureSentimentBucket = Literal[
    "A (Clearly Positive)",
    "B (Mostly Positive/Balanced)",
    "C (Neutral/Factual)",
    "D (Cautious/Mixed)",
    "E (Negative OR Not Applicable - No Own AI Discussion)"
]

AIRiskOwnAdoptionBucket = Literal[
    "A (Detailed & Specific Discussion)",
    "B (General Mention of Own AI Adoption Risk)",
    "C (No Mention of Own AI Adoption Risk)"
]

AIRiskExternalThreatsBucket = Literal[
    "A (Detailed & Specific Discussion)",
    "B (General Mention of External AI Threat)",
    "C (No Mention of External AI Threat)"
]

AIRiskNonAdoptionBucket = Literal[
    "A (Explicitly Discussed)",
    "B (Implicitly Suggested)",
    "C (No Mention of Non-Adoption/Competitive AI Risk)"
]

AIForwardLookingBucket = Literal[
    "A (Specific & Detailed Future Plans)",
    "B (General Future Intent)",
    "C (Implicit Future Focus Only)",
    "D (No Mention of Future AI Plans)"
]

AIWashingHypeBucket = Literal[
    "A (Substantive & Grounded)",
    "B (Mostly Substantive)",
    "C (Mixed - Some Substance, Some Hype)",
    "D (Mostly Hype)",
    "E (Pure Hype OR Not Applicable - No Positive AI Claims)"
]

AITalentInvestmentBucket = Literal[
    "A (Explicit & Significant Focus)",
    "B (General Mention of AI Talent/Investment)",
    "C (Implicit Focus Only)",
    "D (No Mention of AI Talent/Investment)"
]

class BaseAssessment(BaseModel):
    supporting_evidence: List[str] = Field(
        default_factory=list,
        description="Key verbatim quotes (or concise summaries) from the text that justify your assessment. Limit to 2-3 key pieces of evidence. If no direct evidence, provide an empty list []."
    )
    chain_of_thought_reasoning: str = Field(
        description="A brief explanation (1-2 sentences) of your thought process for arriving at the bucket assessment, referencing the criteria and evidence."
    )

    @field_validator("supporting_evidence", mode='before')
    @classmethod
    def normalize_supporting_evidence(cls, v):
        if v is None: return []
        if isinstance(v, str):
            stripped_v = v.strip()
            if stripped_v.lower() in ["no direct evidence found.", "no evidence found."]:
                return []
            return [stripped_v] if stripped_v else []
        if isinstance(v, list):
            return [
                str(item).strip() for item in v
                if isinstance(item, str) and
                str(item).strip() and
                str(item).strip().lower() not in ["no direct evidence found.", "no evidence found."]
            ]
        return []

class AIStrategicDepthAssessment(BaseAssessment):
    bucket_assessment: AIStrategicDepthBucket

class AIDisclosureSentimentAssessment(BaseAssessment):
    bucket_assessment: AIDisclosureSentimentBucket

class AIRiskOwnAdoptionAssessment(BaseAssessment):
    bucket_assessment: AIRiskOwnAdoptionBucket

class AIRiskExternalThreatsAssessment(BaseAssessment):
    bucket_assessment: AIRiskExternalThreatsBucket

class AIRiskNonAdoptionAssessment(BaseAssessment):
    bucket_assessment: AIRiskNonAdoptionBucket

class AIRiskDisclosure(BaseModel):
    risk_from_own_ai_adoption: AIRiskOwnAdoptionAssessment
    risk_from_external_ai_threats: AIRiskExternalThreatsAssessment
    risk_of_not_adopting_ai_or_competitive_ai_threat: AIRiskNonAdoptionAssessment

class AIForwardLookingAssessment(BaseAssessment):
    bucket_assessment: AIForwardLookingBucket

class AIWashingHypeAssessment(BaseAssessment):
    bucket_assessment: AIWashingHypeBucket

class AITalentInvestmentAssessment(BaseAssessment):
    bucket_assessment: AITalentInvestmentBucket

class CompanyAIAnalysis(BaseModel):
    company_identifier: Optional[str] = Field(default=None, description="EXTRACT_COMPANY_NAME_IF_POSSIBLE_ELSE_NULL")
    overall_ai_preparedness_summary_cot: str = Field(description="LLM_GENERATED_1_2_SENTENCE_SUMMARY_OF_OVERALL_AI_POSTURE_BASED_ON_ALL_FACTORS")
    ai_strategic_depth: AIStrategicDepthAssessment
    ai_disclosure_sentiment: AIDisclosureSentimentAssessment
    ai_risk_disclosure: AIRiskDisclosure
    forward_looking_ai_statements: AIForwardLookingAssessment
    ai_washing_hype_index: AIWashingHypeAssessment
    ai_talent_and_investment_focus: AITalentInvestmentAssessment
    key_ai_related_terminology_found: List[str] = Field(
        default_factory=list,
        description="List any explicit AI-related terms found (e.g., 'artificial intelligence', 'machine learning'). If none, provide an empty list []."
    )

    @field_validator("key_ai_related_terminology_found", mode='before')
    @classmethod
    def normalize_key_terms(cls, v):
        if v is None: return []
        if isinstance(v, str):
            return [term.strip() for term in v.split(',') if term.strip()]
        if isinstance(v, list):
            return [str(item).strip() for item in v if isinstance(item, str) and str(item).strip()]
        return []

# prompt template
PROMPT_TEMPLATE = """
You are an expert financial analyst and AI strategist, specializing in evaluating corporate disclosures for insights into a company's engagement with Artificial Intelligence (AI). Your task is to meticulously analyze the provided text from a company's 10-K filing (specifically excerpts from "Item 1. Risk Factors" and "Item 7. Management's Discussion and Analysis") and provide a structured JSON output assessing the company's AI preparedness based on the defined factors and criteria.

**DOCUMENT CONTEXT:**
The provided text below, enclosed in triple backticks (```), contains excerpts from "Item 1. Risk Factors" and "Item 7. Management's Discussion and Analysis (MD&A)" of a single company's 10-K report.

```
{document_text}
```

**INSTRUCTIONS:**
1.  Carefully read the entire provided text.
2.  For each of the AI Assessment Factors listed below, evaluate the text based on the defined Buckets & Criteria.
3.  Your entire response MUST be a single, valid JSON object. Do not include any explanatory text before or after the JSON object. The JSON object MUST conform to the Pydantic schema `CompanyAIAnalysis`.
4.  For each factor, you MUST include in the JSON:
    * `bucket_assessment`: The exact string for the category you've assigned. **CRITICAL: This value MUST be one of the precise string values listed in the Buckets & Criteria for that factor (e.g., for AI Strategic Depth, it must be exactly "A (Core Strategic Enabler)" or "B (Significant Operational/Product Integration)", etc. Do NOT use abbreviations or variations).**
    * `chain_of_thought_reasoning`: A brief explanation (1-2 sentences) of your thought process for arriving at the bucket assessment, referencing the criteria and evidence.
    * `supporting_evidence`: Key verbatim quotes (or concise summaries of multiple related statements) from the text that justify your assessment. Limit to 2-3 key pieces of evidence. If no direct evidence is found for a factor, you MUST provide an empty list `[]` for this field.
5.  **CRITICAL FOR `ai_risk_disclosure` FIELD**: The `ai_risk_disclosure` field in the JSON output MUST be an object containing three specific nested assessment objects as its keys: `risk_from_own_ai_adoption`, `risk_from_external_ai_threats`, and `risk_of_not_adopting_ai_or_competitive_ai_threat`. Each of these three nested objects must follow the `BaseAssessment` structure (i.e., have `bucket_assessment`, `chain_of_thought_reasoning`, and `supporting_evidence`). Refer to the example JSON structure below.
6.  If information for any other factor or sub-factor (within `ai_risk_disclosure`) is not present in the text, use the appropriate "No Mention" bucket assessment for that factor (e.g., "E (No Mention of Own AI Strategy/Adoption)" or "C (No Mention of Own AI Adoption Risk)"). For `supporting_evidence` in such cases, provide an empty list `[]`.
    *IMPORTANT*: Ensure ALL main assessment factor objects (ai_strategic_depth, ai_disclosure_sentiment, ai_risk_disclosure, forward_looking_ai_statements, ai_washing_hype_index, ai_talent_and_investment_focus) are ALWAYS present as keys in your JSON output. If a factor has no relevant information, its `bucket_assessment` should reflect its defined "No Mention" or equivalent lowest grade for that factor's rubric, and `supporting_evidence` should be an empty list. Do not omit these top-level keys from the JSON.
7.  For the `company_identifier` field, extract the company name from the "Company Name (if known):" line in the DOCUMENT CONTEXT. If it's "Unknown Company" or not clearly identifiable, set it to null.
8.  For the `overall_ai_preparedness_summary_cot` field, you MUST provide a 1-2 sentence summary of the company's overall AI posture based on your analysis of all factors.
9.  For the `key_ai_related_terminology_found` field, you MUST list any explicit AI-related terms found in the text (e.g., "artificial intelligence", "machine learning", "NLP", "generative AI"). If no such terms are found, provide an empty list `[]`.

**AI ASSESSMENT FACTORS, BUCKETS & CRITERIA:**

**1. AI Strategic Depth:**
    * *Concept:* How deeply and explicitly AI is integrated into the company's core business strategy and future plans...
    * *Buckets & Criteria:*
        * "A (Core Strategic Enabler)"
        * "B (Significant Operational/Product Integration)"
        * "C (Emerging/Exploratory Mentions)"
        * "D (Superficial/Generic Mentions OR Risk of Non-Adoption Only)"
        * "E (No Mention of Own AI Strategy/Adoption)"

**2. AI Disclosure Sentiment:**
    * *Concept:* The overall tone of the company's *own statements* regarding its AI initiatives...
    * *Buckets & Criteria:*
        * "A (Clearly Positive)"
        * "B (Mostly Positive/Balanced)"
        * "C (Neutral/Factual)"
        * "D (Cautious/Mixed)"
        * "E (Negative OR Not Applicable - No Own AI Discussion)"

**3. AI Risk Disclosure:** (This section requires careful attention to the nested structure)
    * *Concept:* How thoroughly and specifically the company acknowledges and discusses potential risks associated with AI. This factor itself is an object in the JSON, containing the following three sub-assessments:
    * **3a. `risk_from_own_ai_adoption`:** (e.g., AI system failure, data bias, ethical concerns, implementation challenges).
        * *Buckets & Criteria for 3a:*
            * "A (Detailed & Specific Discussion)"
            * "B (General Mention of Own AI Adoption Risk)"
            * "C (No Mention of Own AI Adoption Risk)"
    * **3b. `risk_from_external_ai_threats`:** (e.g., AI used in cyberattacks by others, AI-driven misinformation).
        * *Buckets & Criteria for 3b:*
            * "A (Detailed & Specific Discussion)"
            * "B (General Mention of External AI Threat)"
            * "C (No Mention of External AI Threat)"
    * **3c. `risk_of_not_adopting_ai_or_competitive_ai_threat`:** (e.g., falling behind AI-leveraging competitors, failure to innovate using AI).
        * *Buckets & Criteria for 3c:*
            * "A (Explicitly Discussed)"
            * "B (Implicitly Suggested)"
            * "C (No Mention of Non-Adoption/Competitive AI Risk)"

**4. Forward-Looking AI Statements:**
    * *Concept:* The extent and specificity of discussion about future plans...
    * *Buckets & Criteria:*
        * "A (Specific & Detailed Future Plans)"
        * "B (General Future Intent)"
        * "C (Implicit Future Focus Only)"
        * "D (No Mention of Future AI Plans)"

**5. "AI Washing" Hype Index:** (Factor is only applicable if positive claims about own AI are made)
    * *Concept:* Degree to which a company's positive AI claims appear substantive vs. exaggerated or vague.
    * *Buckets & Criteria:*
        * "A (Substantive & Grounded)"
        * "B (Mostly Substantive)"
        * "C (Mixed - Some Substance, Some Hype)"
        * "D (Mostly Hype)"
        * "E (Pure Hype OR Not Applicable - No Positive AI Claims)"

**6. AI Talent & Investment Focus:**
    * *Concept:* Explicit discussion of focus on acquiring/developing AI talent...
    * *Buckets & Criteria:*
        * "A (Explicit & Significant Focus)"
        * "B (General Mention of AI Talent/Investment)"
        * "C (Implicit Focus Only)"
        * "D (No Mention of AI Talent/Investment)"

**EXPECTED JSON OUTPUT STRUCTURE (Example for LLM guidance, actual structure enforced by Pydantic):**
```json
{{
  "company_identifier": "EXTRACT_COMPANY_NAME_IF_POSSIBLE_ELSE_NULL",
  "overall_ai_preparedness_summary_cot": "LLM_GENERATED_1_2_SENTENCE_SUMMARY_OF_OVERALL_AI_POSTURE_BASED_ON_ALL_FACTORS",
  "ai_strategic_depth": {{
    "bucket_assessment": "A (Core Strategic Enabler)",
    "supporting_evidence": ["quote1", "quote2"],
    "chain_of_thought_reasoning": "Your reasoning for this factor."
  }},
  "ai_disclosure_sentiment": {{
    "bucket_assessment": "B (Mostly Positive/Balanced)",
    "supporting_evidence": ["quote1"],
    "chain_of_thought_reasoning": "Your reasoning for this factor."
  }},
  "ai_risk_disclosure": {{
    "risk_from_own_ai_adoption": {{
      "bucket_assessment": "C (No Mention of Own AI Adoption Risk)",
      "supporting_evidence": [],
      "chain_of_thought_reasoning": "No specific risks from the company's own AI adoption were mentioned."
    }},
    "risk_from_external_ai_threats": {{
      "bucket_assessment": "B (General Mention of External AI Threat)",
      "supporting_evidence": ["The company mentions general cybersecurity threats which could involve AI."],
      "chain_of_thought_reasoning": "General mention of external threats that could be AI-related."
    }},
    "risk_of_not_adopting_ai_or_competitive_ai_threat": {{
      "bucket_assessment": "A (Explicitly Discussed)",
      "supporting_evidence": ["Failure to adopt new technologies like AI could impact our competitive position."],
      "chain_of_thought_reasoning": "The company explicitly discusses the risk of not keeping up with technological advancements including AI."
    }}
  }},
  "forward_looking_ai_statements": {{
    "bucket_assessment": "D (No Mention of Future AI Plans)",
    "supporting_evidence": [],
    "chain_of_thought_reasoning": "Your reasoning for this factor."
  }},
  "ai_washing_hype_index": {{
    "bucket_assessment": "E (Pure Hype OR Not Applicable - No Positive AI Claims)",
    "supporting_evidence": [],
    "chain_of_thought_reasoning": "Your reasoning for this factor."
  }},
  "ai_talent_and_investment_focus": {{
    "bucket_assessment": "D (No Mention of AI Talent/Investment)",
    "supporting_evidence": [],
    "chain_of_thought_reasoning": "Your reasoning for this factor."
  }},
  "key_ai_related_terminology_found": ["artificial intelligence", "machine learning"]
}}
```
"""

# global variables for client and configuration
CONFIGURED_GEMINI_API_KEY = None
GENAI_CONFIGURED_SUCCESSFULLY = False
INSTRUCTOR_GEMINI_CLIENT = None 
BASE_GEMINI_MODEL_INSTANCE = None 
GEMINI_MODEL_NAME = 'models/gemini-2.5-flash-preview-05-20' 

# configuration & client initialization for gemini
def initialize_gemini_client():
    global CONFIGURED_GEMINI_API_KEY, GENAI_CONFIGURED_SUCCESSFULLY, \
           INSTRUCTOR_GEMINI_CLIENT, BASE_GEMINI_MODEL_INSTANCE, GEMINI_MODEL_NAME

    print("--- starting gemini configuration and client initialization ---")
    GEMINI_API_KEY_FROM_ENV = os.getenv('GEMINI_API_KEY')
    if GEMINI_API_KEY_FROM_ENV:
        print("found 'GEMINI_API_KEY' in environment variables.")
        CONFIGURED_GEMINI_API_KEY = GEMINI_API_KEY_FROM_ENV
    elif 'gemini_api_key' in globals() and globals()['gemini_api_key'] is not None: 
        print("found 'gemini_api_key' in global script variables.")
        CONFIGURED_GEMINI_API_KEY = globals()['gemini_api_key']
    else:
        print("error: 'GEMINI_API_KEY' is not defined in environment or global script variables.")

    if CONFIGURED_GEMINI_API_KEY:
        if isinstance(CONFIGURED_GEMINI_API_KEY, str) and CONFIGURED_GEMINI_API_KEY.strip():
            print(f"success: gemini api key loaded.")
        else:
            print(f"error: gemini api key is not a valid non-empty string.")
            CONFIGURED_GEMINI_API_KEY = None 

    if CONFIGURED_GEMINI_API_KEY:
        try:
            genai.configure(api_key=CONFIGURED_GEMINI_API_KEY)
            print("success: genai.configure(api_key=...) called successfully.")
            GENAI_CONFIGURED_SUCCESSFULLY = True
        except Exception as e:
            print(f"error: exception during genai.configure(api_key=...): {e}")
    else:
        print("skipped: genai.configure() because api key was not valid or not found.")

    if GENAI_CONFIGURED_SUCCESSFULLY:
        print("attempting: to create instructor-patched gemini client...")
        try:
            safety_settings_config = {
                HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
                HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
                HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
                HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
            }
            BASE_GEMINI_MODEL_INSTANCE = genai.GenerativeModel(
                model_name=GEMINI_MODEL_NAME,
                safety_settings=safety_settings_config
            )
            print(f"  success: base genai.generativemodel('{GEMINI_MODEL_NAME}') created with custom safety settings.")

            INSTRUCTOR_GEMINI_CLIENT = instructor.from_gemini(
                client=BASE_GEMINI_MODEL_INSTANCE, 
                mode=instructor.Mode.GEMINI_JSON
            )
            print("  success: client created with instructor.from_gemini using GEMINI_JSON mode.")
        except Exception as e:
            print(f"  error: exception during instructor.from_gemini client creation: {e}")
            INSTRUCTOR_GEMINI_CLIENT = None
            if not BASE_GEMINI_MODEL_INSTANCE:
                try:
                    BASE_GEMINI_MODEL_INSTANCE = genai.GenerativeModel(
                        model_name=GEMINI_MODEL_NAME,
                        safety_settings=safety_settings_config
                    )
                    print(f"  note: base genai.generativemodel created successfully despite instructor client failure.")
                except Exception as e_base:
                    print(f"  error: failed to create BASE_GEMINI_MODEL_INSTANCE as well: {e_base}")
                    BASE_GEMINI_MODEL_INSTANCE = None
    else:
        print("skipped: instructor client creation for gemini because genai library was not configured successfully.")

    if not INSTRUCTOR_GEMINI_CLIENT:
        print("failure: instructor_gemini_client could not be initialized.")
    if not BASE_GEMINI_MODEL_INSTANCE:
        print("failure: base_gemini_model_instance could not be initialized. fallback path will not work.")
    print("--- gemini configuration and client initialization finished ---")

initialize_gemini_client()

GEMINI_2_5_FLASH_INPUT_COST_PER_MILLION_TOKENS = 0.35
GEMINI_2_5_FLASH_OUTPUT_COST_PER_MILLION_TOKENS = 0.70
print(f"info: using placeholder pricing for gemini 2.5 flash: input=${GEMINI_2_5_FLASH_INPUT_COST_PER_MILLION_TOKENS}/m, output=${GEMINI_2_5_FLASH_OUTPUT_COST_PER_MILLION_TOKENS}/m. please verify and update if necessary.")

cumulative_prompt_tokens_gemini = 0
cumulative_completion_tokens_gemini = 0
cumulative_cost_gemini = 0.0

def calculate_and_update_cost_gemini(prompt_tokens: int, completion_tokens: int, api_call_made_and_tokens_returned: bool):
    global cumulative_prompt_tokens_gemini, cumulative_completion_tokens_gemini, cumulative_cost_gemini
    call_cost = 0.0
    if prompt_tokens > 0 or completion_tokens > 0:
        input_cost = (prompt_tokens / 1_000_000) * GEMINI_2_5_FLASH_INPUT_COST_PER_MILLION_TOKENS
        output_cost = (completion_tokens / 1_000_000) * GEMINI_2_5_FLASH_OUTPUT_COST_PER_MILLION_TOKENS
        call_cost = input_cost + output_cost
        tqdm.write(f"gemini api call tokens: prompt={prompt_tokens}, completion={completion_tokens}. cost for this call attempt: ${call_cost:.6f}.")
        if api_call_made_and_tokens_returned:
            cumulative_prompt_tokens_gemini += prompt_tokens
            cumulative_completion_tokens_gemini += completion_tokens
            cumulative_cost_gemini += call_cost
    tqdm.write(f"gemini cumulative: prompt tokens={cumulative_prompt_tokens_gemini}, completion tokens={cumulative_completion_tokens_gemini}, cost=${cumulative_cost_gemini:.6f}")
    return call_cost

def analyze_text_with_gemini_instructor(
    risk_factors_text: str,
    mda_text: str,
    company_name: str = "Unknown Company",
    temperature: float = 0.0
) -> (Optional[CompanyAIAnalysis], int, int):

    global INSTRUCTOR_GEMINI_CLIENT, BASE_GEMINI_MODEL_INSTANCE

    if not INSTRUCTOR_GEMINI_CLIENT and not BASE_GEMINI_MODEL_INSTANCE:
        raise Exception("critical: neither INSTRUCTOR_GEMINI_CLIENT nor BASE_GEMINI_MODEL_INSTANCE is available.")

    document_text = f"""Company Name (if known): {company_name}
Item 1. Risk Factors:\n{risk_factors_text}\n\nItem 7. Management's Discussion and Analysis (MD&A):\n{mda_text}"""
    full_prompt = PROMPT_TEMPLATE.format(document_text=document_text)

    analysis_pydantic_object = None
    prompt_tokens_used, completion_tokens_used = 0, 0
    json_text_response_for_debug = None 

    try:
        if not INSTRUCTOR_GEMINI_CLIENT:
            tqdm.write(f"instructor client not available for {company_name}, attempting fallback directly.")
            raise TypeError("instructor client not available, force fallback.") 

        tqdm.write(f"attempting analysis for {company_name} using instructor client.")
        api_response_object = INSTRUCTOR_GEMINI_CLIENT.generate_content(
            contents=[full_prompt],
            response_model=CompanyAIAnalysis,
            generation_config=GenerationConfig(temperature=temperature)
        )
        analysis_pydantic_object = api_response_object

        if hasattr(analysis_pydantic_object, '_raw_response') and analysis_pydantic_object._raw_response:
            original_gemini_response = analysis_pydantic_object._raw_response
            if hasattr(original_gemini_response, 'usage_metadata') and original_gemini_response.usage_metadata:
                prompt_tokens_used = original_gemini_response.usage_metadata.prompt_token_count
                completion_tokens_used = original_gemini_response.usage_metadata.candidates_token_count
            elif hasattr(original_gemini_response, 'prompt_feedback') and original_gemini_response.prompt_feedback.block_reason:
                tqdm.write(f"warning: gemini prompt for {company_name} (via instructor) may have been blocked. reason: {original_gemini_response.prompt_feedback.block_reason_message or 'Unknown'}")
            elif hasattr(api_response_object, 'usage_metadata') and api_response_object.usage_metadata:
                prompt_tokens_used = api_response_object.usage_metadata.prompt_token_count
                completion_tokens_used = api_response_object.usage_metadata.candidates_token_count
            else:
                tqdm.write(f"warning: could not retrieve token usage from instructor response for {company_name}. estimating input tokens.")
                if BASE_GEMINI_MODEL_INSTANCE:
                    prompt_tokens_used = BASE_GEMINI_MODEL_INSTANCE.count_tokens(contents=[full_prompt]).total_tokens
                completion_tokens_used = 0

    except TypeError as te:
        if ("unexpected keyword argument 'response_model'" in str(te) or "instructor client not available" in str(te)) and BASE_GEMINI_MODEL_INSTANCE:
            tqdm.write(f"instructor TypeError or unavailability for {company_name}: {te}. falling back to manual json parsing with base gemini client.")

            json_generation_config = GenerationConfig(
                temperature=temperature,
                response_mime_type="application/json"
            )
            
            sdk_response = BASE_GEMINI_MODEL_INSTANCE.generate_content(
                contents=[full_prompt],
                generation_config=json_generation_config
            )

            if not sdk_response.candidates or not sdk_response.candidates[0].content or not sdk_response.candidates[0].content.parts:
                block_message = "Unknown reason."
                if hasattr(sdk_response, 'prompt_feedback') and sdk_response.prompt_feedback.block_reason:
                    block_message = sdk_response.prompt_feedback.block_reason_message or sdk_response.prompt_feedback.block_reason.name 
                elif sdk_response.candidates and hasattr(sdk_response.candidates[0], 'finish_reason') and sdk_response.candidates[0].finish_reason != genai.types.FinishReason.STOP:
                    block_message = f"Finish reason: {sdk_response.candidates[0].finish_reason.name}." 
                elif sdk_response.candidates and hasattr(sdk_response.candidates[0], 'safety_ratings'):
                    problematic_ratings = [str(sr) for sr in sdk_response.candidates[0].safety_ratings if sr.probability not in [genai.types.HarmProbability.NEGLIGIBLE, genai.types.HarmProbability.LOW]]
                    if problematic_ratings:
                        block_message = f"safety ratings indicate potential blocking: {', '.join(problematic_ratings)}"
                raise Exception(f"gemini sdk call (fallback) blocked or returned no content parts for {company_name}. {block_message}")

            json_text_response_for_debug = sdk_response.candidates[0].content.parts[0].text
            try:
                analysis_pydantic_object = CompanyAIAnalysis.model_validate_json(json_text_response_for_debug)
            except ValidationError as ve_fallback:
                tqdm.write(f"pydantic validationerror during fallback for {company_name}: {str(ve_fallback)[:500]}")
                tqdm.write(f"problematic json from fallback for {company_name}: {json_text_response_for_debug[:1000]}") 
                raise ve_fallback 

            if hasattr(sdk_response, 'usage_metadata') and sdk_response.usage_metadata:
                prompt_tokens_used = sdk_response.usage_metadata.prompt_token_count
                completion_tokens_used = sdk_response.usage_metadata.candidates_token_count
            else: # fallback token counting for manual path
                if BASE_GEMINI_MODEL_INSTANCE:
                    prompt_tokens_used = BASE_GEMINI_MODEL_INSTANCE.count_tokens(contents=[full_prompt]).total_tokens
                completion_tokens_used = 0
        else:
            raise te

    except ValidationError as ve: 
        tqdm.write(f"pydantic validationerror (primary path) for {company_name}: {str(ve)[:500]}")
        raise ve

    except (ResourceExhausted, InternalServerError, ServiceUnavailable, DeadlineExceeded) as api_err:
        raise api_err

    except Exception as e_unexpected:
        tqdm.write(f"unexpected error in analyze_text_with_gemini_instructor for {company_name}: {type(e_unexpected).__name__} - {str(e_unexpected)[:200]}")
        if json_text_response_for_debug: 
            tqdm.write(f"json text at time of unexpected error for {company_name}: {json_text_response_for_debug[:500]}")
        raise e_unexpected

    return analysis_pydantic_object, prompt_tokens_used, completion_tokens_used

def extract_grade_letter(bucket_assessment_string: str) -> str:
    if bucket_assessment_string and isinstance(bucket_assessment_string, str):
        grade_part = bucket_assessment_string.split(" ")[0]
        if len(grade_part) == 1 and grade_part.isalpha(): return grade_part.upper()
    return "N/A"

def find_latest_gemini_checkpoint_info(checkpoint_dir: str, filename_prefix: str) -> (Optional[str], int):
    latest_checkpoint_filename = None
    max_original_df_rows_processed = 0
    checkpoint_pattern = re.compile(rf"{filename_prefix}(?:ERROR_)?checkpoint_rows_upto_(\d+)\.csv")
    if not os.path.isdir(checkpoint_dir):
        tqdm.write(f"checkpoint directory not found: {checkpoint_dir}")
        return None, 0
    for filename in os.listdir(checkpoint_dir):
        match = checkpoint_pattern.match(filename)
        if match:
            try:
                rows_in_this_filename = int(match.group(1))
                if rows_in_this_filename > max_original_df_rows_processed:
                    max_original_df_rows_processed = rows_in_this_filename
                    latest_checkpoint_filename = filename
                elif rows_in_this_filename == max_original_df_rows_processed and latest_checkpoint_filename:
                    if "ERROR" in latest_checkpoint_filename and "ERROR" not in filename:
                        latest_checkpoint_filename = filename
            except ValueError:
                tqdm.write(f"warning: could not parse row count from checkpoint filename: {filename}")
                continue
    if latest_checkpoint_filename:
        tqdm.write(f"found latest gemini checkpoint: {latest_checkpoint_filename} (represents processing up to original_df_index {max_original_df_rows_processed -1}).")
    else:
        tqdm.write(f"no valid '{filename_prefix}' checkpoint found in {checkpoint_dir}. starting fresh.")
    return latest_checkpoint_filename, max_original_df_rows_processed

def save_gemini_checkpoint(results_list_to_save: list, total_original_df_rows_covered: int, base_path: str, filename_prefix: str, is_error_save: bool = False):
    if not results_list_to_save:
        tqdm.write("no new results to save in this checkpoint interval (results_list_to_save is empty).")
        return
    temp_df = pd.DataFrame(results_list_to_save)
    if not temp_df.empty:
        temp_df['cumulative_prompt_tokens_gemini_at_save'] = cumulative_prompt_tokens_gemini
        temp_df['cumulative_completion_tokens_gemini_at_save'] = cumulative_completion_tokens_gemini
        temp_df['cumulative_cost_gemini_at_save'] = cumulative_cost_gemini
    
    error_tag = "ERROR_" if is_error_save else ""
    filename = os.path.join(base_path, f"{filename_prefix}{error_tag}checkpoint_rows_upto_{total_original_df_rows_covered}.csv")
    try:
        temp_df.to_csv(filename, index=False)
        tqdm.write(f"--- {'ERROR' if is_error_save else 'Regular'} gemini checkpoint saved: {filename} (covers {total_original_df_rows_covered} original df rows) ---")
    except Exception as e_save:
        tqdm.write(f"error saving gemini checkpoint {filename}: {e_save}")

if __name__ == "__main__":
    print("\n--- main execution for gemini 2.5 flash detailed analysis ---")

    if 'df' not in globals():
        print("info: 'df' not found in globals. creating a dummy dataframe for testing.")
        data = {
            'cik': ['0000001750', '0000001751', '0000001752'],
            'tickers_sec': ['TICKA', 'TICKB', 'TICKC'],
            'companyName_sec': ['Test Company A', 'Test Company B', 'Test Company C (No MDA)'],
            'sector_user': ['Tech', 'Finance', 'Retail'],
            'year': [2023, 2023, 2023],
            'filingDate': ['2024-03-01', '2024-03-02', '2024-03-03'],
            'risk_factors_text': [
                "The company faces risks from AI development and cyber threats. We are investing in AI. Our AI strategy is to enhance products.",
                "Market competition is a risk. AI could be an opportunity if we adopt it. We are exploring AI for efficiency.",
                "Standard retail risks. Supply chain is a concern." 
            ],
            'item7_mda_text': [
                "Our AI strategy is to enhance products. We see AI as a key future driver. We plan to launch new AI features next year.",
                "We are exploring AI for efficiency. No major AI initiatives yet. The future is uncertain regarding AI adoption.",
                None 
            ]
        }
        df = pd.DataFrame(data)
        if not os.getenv('GEMINI_API_KEY') and ('gemini_api_key' not in globals() or globals()['gemini_api_key'] is None):
            print("info: GEMINI_API_KEY not found. please set it as an environment variable or a global 'gemini_api_key' string for the script to run.")

    OUTPUT_BASE_DIR_GEMINI = "./Gemini_2_5_Flash_Analysis_Results/"
    CHECKPOINT_FILENAME_PREFIX_GEMINI = "gemini_2_5_flash_detailed_analysis_"
    FINAL_RESULTS_FILENAME_GEMINI = os.path.join(OUTPUT_BASE_DIR_GEMINI, f"{CHECKPOINT_FILENAME_PREFIX_GEMINI}FINAL_ALL_ROWS.csv")

    try:
        os.makedirs(OUTPUT_BASE_DIR_GEMINI, exist_ok=True)
        print(f"gemini output directory set to: {OUTPUT_BASE_DIR_GEMINI}")
        test_file_path = os.path.join(OUTPUT_BASE_DIR_GEMINI, "test_write_gemini.txt")
        with open(test_file_path, "w") as f_test: f_test.write("test")
        os.remove(test_file_path)
        print(f"successfully tested write access to {OUTPUT_BASE_DIR_GEMINI}")
    except Exception as e_dir:
        print(f"critical error: could not create or write to gemini output directory {OUTPUT_BASE_DIR_GEMINI}: {e_dir}. exiting.")
        exit()

    CHECKPOINT_INTERVAL = 50
    API_REQUEST_DELAY_ON_SUCCESS = 1.0 
    BACKOFF_DELAYS = [10, 30, 60, 180, 300] 
    MAX_RETRIES_PER_ROW = len(BACKOFF_DELAYS)

    if not GENAI_CONFIGURED_SUCCESSFULLY:
        print("critical error: genai not configured (api key issue likely). exiting.")
        exit()
    if not INSTRUCTOR_GEMINI_CLIENT and not BASE_GEMINI_MODEL_INSTANCE:
        print("critical error: gemini client (INSTRUCTOR_GEMINI_CLIENT or BASE_GEMINI_MODEL_INSTANCE) not initialized. exiting.")
        exit()
    if 'df' not in globals() or not isinstance(df, pd.DataFrame) or df.empty:
        print("critical error: input dataframe 'df' not found, not a pandas dataframe, or is empty. exiting.")
        exit()
    print(f"input dataframe 'df' loaded with {len(df)} rows.")

    input_col_cik = "cik"
    input_col_ticker = "tickers_sec"
    input_col_company_name = "companyName_sec"
    input_col_sector = "sector_user"
    input_col_year = "year"
    input_col_filing_date = "filingDate"
    input_col_risk_factors = "risk_factors_text"
    input_col_mda = "item7_mda_text"

    if input_col_cik in df.columns:
        df[input_col_cik] = df[input_col_cik].astype(str).str.zfill(10)
        print(f"standardized '{input_col_cik}' column in input dataframe 'df'.")
    else:
        print(f"critical error: input cik column '{input_col_cik}' not found in dataframe 'df'. exiting.")
        exit()

    essential_text_cols = [input_col_risk_factors, input_col_mda]
    missing_text_cols = [col for col in essential_text_cols if col not in df.columns]
    if missing_text_cols:
        print(f"critical error: input dataframe 'df' is missing essential text columns: {missing_text_cols}. exiting.")
        exit()

    all_processed_results_gemini = []
    start_processing_from_original_df_index = 0
    
    latest_checkpoint_file, max_original_rows_covered_by_checkpoint = find_latest_gemini_checkpoint_info(OUTPUT_BASE_DIR_GEMINI, CHECKPOINT_FILENAME_PREFIX_GEMINI)

    if latest_checkpoint_file:
        full_checkpoint_path = os.path.join(OUTPUT_BASE_DIR_GEMINI, latest_checkpoint_file)
        try:
            tqdm.write(f"loading data from gemini checkpoint: {full_checkpoint_path}")
            checkpoint_df = pd.read_csv(full_checkpoint_path, dtype={'CIK': str}) 
            if 'CIK' in checkpoint_df.columns: 
                checkpoint_df['CIK'] = checkpoint_df['CIK'].astype(str).str.zfill(10)

            if not checkpoint_df.empty:
                last_chkpt_row = checkpoint_df.iloc[-1]
                cumulative_prompt_tokens_gemini = int(last_chkpt_row.get('cumulative_prompt_tokens_gemini_at_save', 0))
                cumulative_completion_tokens_gemini = int(last_chkpt_row.get('cumulative_completion_tokens_gemini_at_save', 0))
                cumulative_cost_gemini = float(last_chkpt_row.get('cumulative_cost_gemini_at_save', 0.0))
                tqdm.write(f"restored gemini cumulative stats: cost=${cumulative_cost_gemini:.6f}, ptokens={cumulative_prompt_tokens_gemini}, ctokens={cumulative_completion_tokens_gemini}")

                last_row_error_info = last_chkpt_row.get('error')
                is_last_row_an_error = pd.notna(last_row_error_info) and str(last_row_error_info).strip() != ""

                if "ERROR" in latest_checkpoint_file and is_last_row_an_error:
                    all_processed_results_gemini = checkpoint_df.iloc[:-1].to_dict('records')
                    start_processing_from_original_df_index = max_original_rows_covered_by_checkpoint - 1
                    tqdm.write(f"loaded {len(all_processed_results_gemini)} successful gemini results. will re-attempt original df index {start_processing_from_original_df_index}.")
                else:
                    all_processed_results_gemini = checkpoint_df.to_dict('records')
                    start_processing_from_original_df_index = max_original_rows_covered_by_checkpoint
                    tqdm.write(f"loaded {len(all_processed_results_gemini)} gemini results. resuming from original df index {start_processing_from_original_df_index}.")
            else: tqdm.write("gemini checkpoint was empty.")
        except Exception as e_load_chkpt:
            tqdm.write(f"error loading gemini checkpoint {full_checkpoint_path}: {e_load_chkpt}. starting fresh.")
            all_processed_results_gemini = []; start_processing_from_original_df_index = 0
            cumulative_cost_gemini = 0.0; cumulative_prompt_tokens_gemini = 0; cumulative_completion_tokens_gemini = 0
    else: tqdm.write("no gemini checkpoint found. starting fresh.")

    df_to_process_slice = df.iloc[start_processing_from_original_df_index:]

    if df_to_process_slice.empty and start_processing_from_original_df_index >= len(df):
        tqdm.write(f"all {len(df)} rows appear to have been processed for gemini based on checkpoints.")
    elif len(df) == 0: 
        tqdm.write("input dataframe 'df' is empty. exiting."); exit()

    newly_processed_results_gemini_this_session = []
    stop_all_processing_flag = False

    for original_df_index, row_data in tqdm(df_to_process_slice.iterrows(),
                                             initial=0, 
                                             total=len(df_to_process_slice),
                                             desc=f"Processing Rows for Gemini (Original df index {start_processing_from_original_df_index} to {len(df)-1})"):

        current_row_result_dict = {
            'CIK': str(row_data.get(input_col_cik, "N/A_CIK")).zfill(10),
            'Ticker': str(row_data.get(input_col_ticker, "N/A_Ticker")),
            'Company Name': str(row_data.get(input_col_company_name, "N/A_CompName")),
            'Sector': str(row_data.get(input_col_sector, "N/A_Sector")),
            'Year': row_data.get(input_col_year, None),
            'filingDate': row_data.get(input_col_filing_date, None),
            'Overall Summary': "N/A", 'Strategic Depth': "N/A", 'Disclosure Sentiment': "N/A",
            'Risk - Own Adoption': "N/A", 'Risk - External Threats': "N/A", 'Risk - Non-Adoption': "N/A",
            'Forward-Looking': "N/A", 'AI Washing Index': "N/A", 'Talent & Investment': "N/A",
            'Key AI Terms': "", 'error': None,
            'api_call_cost_for_row': 0.0,
            'prompt_tokens_for_row': 0,
            'completion_tokens_for_row': 0
        }
        cik_val_for_log = current_row_result_dict['CIK']
        company_name_for_llm = current_row_result_dict['Company Name'] if current_row_result_dict['Company Name'] not in ["N/A_CompName", "nan", "None", ""] else current_row_result_dict['Ticker']
        if not company_name_for_llm or company_name_for_llm.lower() == "n/a_ticker" or company_name_for_llm.lower() == "nan":
            company_name_for_llm = f"CIK_{cik_val_for_log}"

        risk_text_val = str(row_data.get(input_col_risk_factors, ""))
        mda_text_val = str(row_data.get(input_col_mda, ""))
        
        risk_text_is_missing_or_empty = pd.isna(row_data.get(input_col_risk_factors)) or not risk_text_val.strip()
        mda_text_is_missing_or_empty = pd.isna(row_data.get(input_col_mda)) or not mda_text_val.strip()

        if risk_text_is_missing_or_empty and mda_text_is_missing_or_empty: 
            error_message = f"Missing/empty {input_col_risk_factors} AND {input_col_mda}"
            current_row_result_dict['error'] = error_message
            tqdm.write(f"skipping cik {cik_val_for_log} (original index: {original_df_index}) for gemini: {current_row_result_dict['error']}.")
            newly_processed_results_gemini_this_session.append(current_row_result_dict)
            current_total_original_df_rows_covered = start_processing_from_original_df_index + len(newly_processed_results_gemini_this_session)
            if len(newly_processed_results_gemini_this_session) > 0 and \
               (current_total_original_df_rows_covered % CHECKPOINT_INTERVAL == 0 or \
                original_df_index == df.index[-1]): 
                save_gemini_checkpoint(all_processed_results_gemini + newly_processed_results_gemini_this_session,
                                      current_total_original_df_rows_covered,
                                      OUTPUT_BASE_DIR_GEMINI, CHECKPOINT_FILENAME_PREFIX_GEMINI,
                                      is_error_save=True) 
            continue

        consecutive_api_failures_for_row = 0
        api_call_successful_and_parsed = False

        while consecutive_api_failures_for_row < MAX_RETRIES_PER_ROW:
            analysis_object = None
            attempt_p_tokens, attempt_c_tokens = 0, 0

            try:
                tqdm.write(f"attempt {consecutive_api_failures_for_row + 1}/{MAX_RETRIES_PER_ROW} for cik {cik_val_for_log} (gemini) (original index: {original_df_index})")
                analysis_object, attempt_p_tokens, attempt_c_tokens = analyze_text_with_gemini_instructor(
                    risk_text_val, mda_text_val, company_name_for_llm, temperature=0.0
                )

                current_row_result_dict['prompt_tokens_for_row'] += attempt_p_tokens
                current_row_result_dict['completion_tokens_for_row'] += attempt_c_tokens
                current_call_cost = calculate_and_update_cost_gemini(attempt_p_tokens, attempt_c_tokens, True)
                current_row_result_dict['api_call_cost_for_row'] += current_call_cost

                if analysis_object:
                    tqdm.write(f"success: cik {cik_val_for_log} (gemini) (original index: {original_df_index}) parsed.")
                    current_row_result_dict.update({
                        'Overall Summary': analysis_object.overall_ai_preparedness_summary_cot,
                        'Strategic Depth': extract_grade_letter(analysis_object.ai_strategic_depth.bucket_assessment),
                        'Disclosure Sentiment': extract_grade_letter(analysis_object.ai_disclosure_sentiment.bucket_assessment),
                        'Risk - Own Adoption': extract_grade_letter(analysis_object.ai_risk_disclosure.risk_from_own_ai_adoption.bucket_assessment),
                        'Risk - External Threats': extract_grade_letter(analysis_object.ai_risk_disclosure.risk_from_external_ai_threats.bucket_assessment),
                        'Risk - Non-Adoption': extract_grade_letter(analysis_object.ai_risk_disclosure.risk_of_not_adopting_ai_or_competitive_ai_threat.bucket_assessment),
                        'Forward-Looking': extract_grade_letter(analysis_object.forward_looking_ai_statements.bucket_assessment),
                        'AI Washing Index': extract_grade_letter(analysis_object.ai_washing_hype_index.bucket_assessment),
                        'Talent & Investment': extract_grade_letter(analysis_object.ai_talent_and_investment_focus.bucket_assessment),
                        'Key AI Terms': ', '.join(analysis_object.key_ai_related_terminology_found or []),
                        'error': None
                    })
                    api_call_successful_and_parsed = True
                    break 
                else: # should not happen if analyze_text_with_gemini_instructor raises exceptions on failure
                    current_row_result_dict['error'] = 'gemini analysis returned no object without specific exception (logic error).'
                    tqdm.write(f"error for cik {cik_val_for_log}: {current_row_result_dict['error']}")
                    break 

            except (ResourceExhausted, InternalServerError, ServiceUnavailable, DeadlineExceeded, TypeError, ValidationError) as specific_err:
                error_type_name = type(specific_err).__name__
                tqdm.write(f"gemini api/validation error (attempt {consecutive_api_failures_for_row + 1}) for cik {cik_val_for_log}: {error_type_name} - {str(specific_err)[:200]}")
                current_row_result_dict['error'] = f'gemini error attempt {consecutive_api_failures_for_row + 1}: {error_type_name} - {str(specific_err)[:100]}'
                consecutive_api_failures_for_row += 1

                if consecutive_api_failures_for_row >= MAX_RETRIES_PER_ROW:
                    tqdm.write(f"max retries for gemini errors on cik {cik_val_for_log}. recording error.")
                    break
                else:
                    delay = BACKOFF_DELAYS[consecutive_api_failures_for_row -1] 
                    tqdm.write(f"waiting {delay}s before next retry for cik {cik_val_for_log} (gemini).")
                    time.sleep(delay)
            
            except Exception as e_main_loop_unexpected: 
                tqdm.write(f"main loop unexpected error (attempt {consecutive_api_failures_for_row + 1}) for cik {cik_val_for_log}: {type(e_main_loop_unexpected).__name__} - {str(e_main_loop_unexpected)[:200]}")
                current_row_result_dict['error'] = f'unexpected main loop gemini attempt {consecutive_api_failures_for_row + 1}: {str(e_main_loop_unexpected)[:100]}'
                consecutive_api_failures_for_row += 1
                if consecutive_api_failures_for_row >= MAX_RETRIES_PER_ROW:
                    tqdm.write(f"max retries for unexpected errors on cik {cik_val_for_log} (gemini). recording error.")
                    break
                else:
                    delay = BACKOFF_DELAYS[consecutive_api_failures_for_row -1]
                    tqdm.write(f"waiting {delay}s before next retry for cik {cik_val_for_log} (gemini).")
                    time.sleep(delay)

        newly_processed_results_gemini_this_session.append(current_row_result_dict)

        if stop_all_processing_flag: 
            tqdm.write("stop flag activated. ending processing loop for gemini.")
            break

        current_total_original_df_rows_covered = start_processing_from_original_df_index + len(newly_processed_results_gemini_this_session)
        is_current_row_error = bool(current_row_result_dict.get('error'))
        
        if len(newly_processed_results_gemini_this_session) > 0 and \
           (current_total_original_df_rows_covered % CHECKPOINT_INTERVAL == 0 or \
            original_df_index == df.index[-1] or \
            (is_current_row_error and consecutive_api_failures_for_row >= MAX_RETRIES_PER_ROW ) ): 

            combined_results_for_checkpoint = all_processed_results_gemini + newly_processed_results_gemini_this_session
            save_gemini_checkpoint(combined_results_for_checkpoint,
                                  current_total_original_df_rows_covered,
                                  OUTPUT_BASE_DIR_GEMINI,
                                  CHECKPOINT_FILENAME_PREFIX_GEMINI,
                                  is_error_save=is_current_row_error) 

        if api_call_successful_and_parsed and API_REQUEST_DELAY_ON_SUCCESS > 0:
            time.sleep(API_REQUEST_DELAY_ON_SUCCESS)

    final_complete_list_of_results_gemini = all_processed_results_gemini + newly_processed_results_gemini_this_session

    if final_complete_list_of_results_gemini:
        final_save_is_error_state = False
        if final_complete_list_of_results_gemini and final_complete_list_of_results_gemini[-1].get('error'):
            final_save_is_error_state = True
        
        final_rows_covered_count = start_processing_from_original_df_index + len(newly_processed_results_gemini_this_session)
        if final_rows_covered_count > len(df): 
            final_rows_covered_count = len(df)

        save_gemini_checkpoint(final_complete_list_of_results_gemini,
                              final_rows_covered_count, 
                              OUTPUT_BASE_DIR_GEMINI,
                              CHECKPOINT_FILENAME_PREFIX_GEMINI,
                              is_error_save=final_save_is_error_state) 

        df_results_gemini_2_5_flash = pd.DataFrame(final_complete_list_of_results_gemini)
        try:
            if 'CIK' in df_results_gemini_2_5_flash.columns: 
                df_results_gemini_2_5_flash['CIK'] = df_results_gemini_2_5_flash['CIK'].astype(str).str.zfill(10)

            output_columns_ordered = [
                'CIK', 'Ticker', 'Company Name', 'Sector', 'Year', 'filingDate',
                'Overall Summary', 'Strategic Depth', 'Disclosure Sentiment',
                'Risk - Own Adoption', 'Risk - External Threats', 'Risk - Non-Adoption',
                'Forward-Looking', 'AI Washing Index', 'Talent & Investment', 'Key AI Terms',
                'error', 'api_call_cost_for_row', 'prompt_tokens_for_row', 'completion_tokens_for_row',
                'cumulative_prompt_tokens_gemini_at_save',
                'cumulative_completion_tokens_gemini_at_save',
                'cumulative_cost_gemini_at_save'
            ]
            final_output_cols_present = [col for col in output_columns_ordered if col in df_results_gemini_2_5_flash.columns]
            
            df_results_gemini_2_5_flash[final_output_cols_present].to_csv(FINAL_RESULTS_FILENAME_GEMINI, index=False)
            print(f"\n--- processing finished. final gemini 2.5 flash results saved to: {FINAL_RESULTS_FILENAME_GEMINI} ({len(df_results_gemini_2_5_flash)} rows) ---")
            if not df_results_gemini_2_5_flash.empty:
                print(df_results_gemini_2_5_flash[final_output_cols_present].head().to_string())
        except Exception as e_final_save:
            print(f"error saving final gemini 2.5 flash results to {FINAL_RESULTS_FILENAME_GEMINI}: {e_final_save}")
    else:
        print("\nno gemini 2.5 flash results were processed or loaded from checkpoints to save in the final file.")

    print(f"\n--- final gemini 2.5 flash cost summary ---\ntotal prompt tokens: {cumulative_prompt_tokens_gemini}\ntotal completion tokens: {cumulative_completion_tokens_gemini}\ntotal estimated cost: ${cumulative_cost_gemini:.6f}")
    print("\n--- main execution for gemini 2.5 flash finished ---")


In [None]:
import pandas as pd
import numpy as np
import os

# part 1: load all of our data
# panel data and the results from the three different ai models.
print("--- loading all source dataframes ---")

# this is the main folder where all my thesis data is located.
BASE_DIR = "/Users/daniel/Library/Mobile Documents/com~apple~CloudDocs/Documents/Master Finance/MasterThesis/ThesisData/"

path_panel_data = os.path.join(BASE_DIR, 'panel_data_charactermax200k.parquet')
path_openai = os.path.join(BASE_DIR, 'OpenAI_Analysis_Results/openai_detailed_analysis_FINAL_ALL_ROWS.csv')
path_gemini_1_5 = os.path.join(BASE_DIR, 'Gemini_1_5_Flash_Analysis_Results/gemini_1_5_flash_detailed_analysis_FINAL_ALL_ROWS.csv')
path_gemini_2_5 = os.path.join(BASE_DIR, 'Gemini_2_5_Flash_Analysis_Results/gemini_2_5_flash_detailed_analysis_FINAL_ALL_ROWS.csv')

def load_data(filepath, is_parquet=False):
    """a little helper function to load files safely."""
    if not os.path.exists(filepath):
        print(f"error: couldn't find the file at {filepath}")
        return pd.DataFrame()
    try:
        if is_parquet:
            df = pd.read_parquet(filepath)
        else:
            # making sure the CIK is always read as text to keep the leading zeros.
            df = pd.read_csv(filepath, dtype={'CIK': str})
        print(f"successfully loaded {os.path.basename(filepath)} with {len(df)} rows.")
        return df
    except Exception as e:
        print(f"ran into an error loading {filepath}: {e}")
        return pd.DataFrame()

# now, let's load everything into memory.
panel_df = load_data(path_panel_data, is_parquet=True)
df_gpt4o = load_data(path_openai)
df_gemini_1_5 = load_data(path_gemini_1_5)
df_gemini_2_5 = load_data(path_gemini_2_5)


# part 2: getting the ai model results ready to merge
print("\n--- preparing and merging ai model results ---")

def prepare_df_for_merge(df, suffix):
    """this function gets each model's results ready for the big merge."""
    if df.empty:
        return pd.DataFrame()
    
    base_cols = ['CIK', 'Year']
    score_cols = [
        'Overall Summary', 'Strategic Depth', 'Disclosure Sentiment',
        'Risk - Own Adoption', 'Risk - External Threats', 'Risk - Non-Adoption',
        'Forward-Looking', 'AI Washing Index', 'Talent & Investment', 'Key AI Terms'
    ]
    
    # making sure (cik and year) are in the right format.
    df['CIK'] = df['CIK'].astype(str).str.zfill(10)
    df['Year'] = df['Year'].astype(int)
    
    cols_to_use = base_cols + [col for col in score_cols if col in df.columns]
    df_subset = df[cols_to_use].copy()
    
    # adding a suffix to each score column 
    rename_dict = {col: f"{col}_{suffix}" for col in score_cols if col in df.columns}
    df_renamed = df_subset.rename(columns=rename_dict)
    
    return df_renamed

# running the prep function on each model's dataframe.
df_gpt4o_prep = prepare_df_for_merge(df_gpt4o, 'gpt4o_mini')
df_gemini_1_5_prep = prepare_df_for_merge(df_gemini_1_5, 'gemini_1_5')
df_gemini_2_5_prep = prepare_df_for_merge(df_gemini_2_5, 'gemini_2_5')

# time to combine the scores from all three models using CIK and Year as the link.
merge_keys = ['CIK', 'Year']
df_merged_scores = pd.merge(df_gpt4o_prep, df_gemini_1_5_prep, on=merge_keys, how='outer')
df_merged_scores = pd.merge(df_merged_scores, df_gemini_2_5_prep, on=merge_keys, how='outer')

print(f"successfully merged ai scores. shape: {df_merged_scores.shape}")


# part 3: bringing the ai scores into the main dataset
print("\n--- merging ai scores with main panel dataset ---")

# making sure the keys in our main panel dataframe match the format of the scores dataframe.
panel_df = panel_df.rename(columns={'cik': 'CIK', 'year': 'Year'})
panel_df['CIK'] = panel_df['CIK'].astype(str).str.zfill(10)
panel_df['Year'] = panel_df['Year'].astype(int)

# here's the main merge: adding the AI scores to the big panel dataset.
final_df = pd.merge(panel_df, df_merged_scores, on=['CIK', 'Year'], how='left')

print(f"final merged dataset shape: {final_df.shape}")


# part 4: the final cleanup to create a perfect, balanced dataset
print("\n--- final data cleaning and balancing ---")

#  panel should already be balanced from notebook 01, but double-check.
required_years = set(range(2020, 2025))
year_counts = final_df.groupby('CIK')['Year'].nunique()
complete_ciks = year_counts[year_counts == 5].index

df_balanced = final_df[final_df['CIK'].isin(complete_ciks)].copy()
print(f"filtered for complete companies. kept {len(complete_ciks)} companies.")

# let's do one last verification to be sure.
final_companies = df_balanced['CIK'].nunique()
expected_rows = final_companies * 5
print(f"final shape: {df_balanced.shape}")
print(f"unique companies: {final_companies}")
print(f"expected rows for balanced panel (companies * 5 years): {expected_rows}")

if len(df_balanced) == expected_rows:
    print("✅ success: the final dataset is a perfectly balanced panel.")
else:
    print("❌ error: the dataset is not balanced. something went wrong.")


# part 5: putting everything in a nice order and saving the final file
print("\n--- saving final analysis-ready dataset ---")

final_output_path = os.path.join(BASE_DIR, 'final_panel_with_scores.parquet')

if not df_balanced.empty:
    try:
        # i like my columns in a specific order, so let's arrange them.
        id_cols = ['CIK', 'Ticker', 'Company Name', 'Sector', 'Year', 'filingDate', 'form']
        
        # grouping the scores by metric makes the file easier to read.
        score_metrics = [
            'Strategic Depth', 'Disclosure Sentiment', 'Risk - Own Adoption',
            'Risk - External Threats', 'Risk - Non-Adoption', 'Forward-Looking',
            'Talent & Investment', 'AI Washing Index', 'Overall Summary', 'Key AI Terms'
        ]
        model_suffixes = ['_gpt4o_mini', '_gemini_1_5', '_gemini_2_5']
        
        ordered_score_cols = []
        for metric in score_metrics:
            for suffix in model_suffixes:
                col_name = f"{metric}{suffix}"
                if col_name in df_balanced.columns:
                    ordered_score_cols.append(col_name)

        text_cols = ['risk_factors_text', 'item7_mda_text']
        
        # bringing all the column lists together.
        final_order = [col for col in id_cols if col in df_balanced.columns] + \
                      ordered_score_cols + \
                      [col for col in text_cols if col in df_balanced.columns]
        
        # just in case any columns were missed.
        other_cols = [col for col in df_balanced.columns if col not in final_order]
        final_order.extend(other_cols)

        # finally, reordering the dataframe and saving it.
        df_to_save = df_balanced[final_order]
        df_to_save.to_parquet(final_output_path, index=False)
        
        print(f"✅ final analysis-ready dataset saved to: {final_output_path}")
        print(f"shape: {df_to_save.shape}")
        print(f"\nsample of final data:\n")
        print(df_to_save[id_cols].head())

    except Exception as e:
        print(f"error saving final dataset: {e}")
else:
    print("final dataframe is empty, so i didn't save anything.")



In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

class CompleteAIFactorDatasetBuilder:
    """
    this is my all-in-one dataset builder.
    i'm putting everything in one place to avoid confusion.
    it loads all my data, adds financial factors, calculates returns with
    quality checks built-in, and gets everything ready for my regressions.
    """
    
    def __init__(self, base_ai_dataset, daily_path, fundamentals_path, ff_factors_path):
        """
        here i set up all the initial parameters. i'm defining my file paths,
        return horizons, and most importantly, my quality control settings like
        minimum stock price. this way, quality is handled from the very beginning.
        """
        self.base_df = base_ai_dataset.copy()
        self.daily_path = daily_path
        self.fundamentals_path = fundamentals_path
        self.ff_factors_path = ff_factors_path
        
        # these are the time windows i'm interested in for calculating stock returns.
        self.price_offsets = [0, 3, 6, 9, 12]  
        self.return_horizons = [3, 6, 9, 12]   
        
        # setting a cutoff date for my data to keep things consistent.
        self.data_cutoff = pd.Timestamp('2025-05-31')
        
        # my quality rules. i want to avoid penny stocks and crazy outlier returns.
        self.min_price = 1.0       # minimum price to avoid penny stocks
        self.min_end_price = 0.1   # minimum ending price (avoid total crashes)
        self.max_return = 5.0      # maximum allowed return (500%)
        self.min_return = -0.95    # minimum allowed return (-95%)
        
        # i'm only selecting the columns i actually need to keep my dataframes tidy.
        self.daily_cols_wanted = ['gvkey', 'datadate', 'prccd', 'cshoc', 'cshtrd', 
                                  'ggroup', 'gind', 'gsector', 'gsubind']
        self.fund_cols_wanted = ['gvkey', 'datadate', 'atq', 'cheq', 'cshoq', 
                                 'dlttq', 'epsfxq', 'niq', 'oiadpq', 'teqq', 'revty']
        
        # just getting my main ai dataset ready by making sure the dates and gvkey are in the right format.
        self.base_df['filingDate'] = pd.to_datetime(self.base_df['filingDate'])
        self.base_df['gvkey'] = self.base_df['gvkey'].astype(str).str.strip()
        
        print("🎯 COMPLETE AI FACTOR DATASET BUILDER")
        print("=" * 60)
        print(f"📊 Base AI dataset: {self.base_df.shape}")
        print(f"💰 Quality settings:")
        print(f"    Min starting price: ${self.min_price}")
        print(f"    Return bounds: {self.min_return:.0%} to {self.max_return:.0%}")
        print(f"🧹 Selected columns only (no bloat)")
        print(f"🎯 One clean process - no dataset confusion!")
        
    def load_source_data(self):
        """this function loads all my external data sources: fama-french, daily prices, and fundamentals."""
        print("\n📊 LOADING SOURCE DATA")
        print("=" * 40)
        
        # loading the fama-french factors.
        print("Loading FF factors...")
        self.ff_factors = pd.read_csv(self.ff_factors_path)
        if 'date' in self.ff_factors.columns:
            try:
                self.ff_factors['date'] = pd.to_datetime(self.ff_factors['date'], format='%Y%m%d')
            except:
                self.ff_factors['date'] = pd.to_datetime(self.ff_factors['date'])
        
        # sometimes the ff factors are in percentages, so i check and convert them to decimals if needed.
        factor_cols = ['mktrf', 'smb', 'hml', 'rmw', 'cma', 'rf', 'umd']
        for col in factor_cols:
            if col in self.ff_factors.columns and self.ff_factors[col].abs().max() > 1:
                self.ff_factors[col] = self.ff_factors[col] / 100
        
        self.ff_factors = self.ff_factors.sort_values('date').reset_index(drop=True)
        print(f"✅ FF Factors: {self.ff_factors.shape}")
        
        # loading the daily stock data. i'm using `usecols` to only grab what i need, which saves memory.
        print("Loading daily market data...")
        try:
            sample_daily = pd.read_csv(self.daily_path, nrows=1)
            available_cols = list(sample_daily.columns)
            cols_to_use = [col for col in self.daily_cols_wanted if col in available_cols]
            missing_cols = [col for col in self.daily_cols_wanted if col not in available_cols]
            
            if missing_cols:
                print(f"⚠️ Missing daily columns: {missing_cols}")
            
            self.df_daily = pd.read_csv(self.daily_path, usecols=cols_to_use, dtype={'gvkey': str})
            self.df_daily['gvkey'] = self.df_daily['gvkey'].str.strip()
            self.df_daily['datadate'] = pd.to_datetime(self.df_daily['datadate'])
            
            print(f"✅ Daily Data: {self.df_daily.shape}, columns: {len(cols_to_use)}")
            
        except Exception as e:
            print(f"❌ Error loading daily data: {e}")
            return None
        
        # same thing for the fundamentals data.
        print("Loading fundamentals...")
        try:
            sample_fund = pd.read_csv(self.fundamentals_path, nrows=1)
            available_fund_cols = list(sample_fund.columns)
            fund_cols_to_use = [col for col in self.fund_cols_wanted if col in available_fund_cols]
            missing_fund_cols = [col for col in self.fund_cols_wanted if col not in available_fund_cols]
            
            if missing_fund_cols:
                print(f"⚠️ Missing fundamental columns: {missing_fund_cols}")
            
            self.df_fundamentals = pd.read_csv(self.fundamentals_path, usecols=fund_cols_to_use, dtype={'gvkey': str})
            self.df_fundamentals['gvkey'] = self.df_fundamentals['gvkey'].str.strip()
            self.df_fundamentals['datadate'] = pd.to_datetime(self.df_fundamentals['datadate'])
            
            print(f"✅ Fundamentals: {self.df_fundamentals.shape}, columns: {len(fund_cols_to_use)}")
            
        except Exception as e:
            print(f"❌ Error loading fundamentals: {e}")
            return None
        
        return self
    
    def calculate_cumulative_rf(self, start_date, months):
        """a helper function to calculate the cumulative risk-free rate over a specific period. i'll need this to calculate excess returns."""
        end_date = start_date + pd.DateOffset(months=months)
        
        period_rf = self.ff_factors[
            (self.ff_factors['date'] >= start_date) & 
            (self.ff_factors['date'] <= end_date)
        ]['rf'].copy()
        
        if len(period_rf) == 0:
            return np.nan
        
        # the formula for cumulative return.
        cumulative_rf = 1.0
        valid_days = 0
        
        for daily_rf in period_rf:
            if pd.notna(daily_rf):
                cumulative_rf *= (1 + daily_rf)
                valid_days += 1
        
        # i'll only calculate the rate if i have enough data points for the period.
        min_days_required = max(30, months * 15)
        if valid_days < min_days_required:
            return np.nan
        
        return cumulative_rf - 1
    
    def add_fama_french_and_rf(self):
        """this is where i add the fama-french factors and the cumulative risk-free rate i just calculated to my main dataset."""
        print("\n📊 ADDING FAMA-FRENCH FACTORS + CUMULATIVE RF")
        print("=" * 40)
        
        # starting with my base dataset.
        self.result_df = self.base_df.copy()
        
        # getting the names of the factor columns.
        factor_cols = [col for col in self.ff_factors.columns if col != 'date']
        
        # it's more efficient to create the empty columns first.
        for col in factor_cols:
            self.result_df[f'ff_{col}'] = np.nan
        
        for horizon in self.return_horizons:
            self.result_df[f'rf_{horizon}m_cumulative'] = np.nan
        
        successful_ff = 0
        successful_rf = 0
        
        print("Matching FF factors and calculating cumulative RF...")
        for idx, row in tqdm(self.result_df.iterrows(), total=len(self.result_df), desc="FF + RF"):
            filing_date = row['filingDate']
            
            # matching the closest fama-french data to my filing date.
            date_diffs = abs(self.ff_factors['date'] - filing_date).dt.days
            valid_matches = date_diffs <= 31
            
            if valid_matches.any():
                closest_idx = date_diffs[valid_matches].idxmin()
                closest_ff_row = self.ff_factors.loc[closest_idx]
                
                for col in factor_cols:
                    if pd.notna(closest_ff_row[col]):
                        self.result_df.at[idx, f'ff_{col}'] = closest_ff_row[col]
                
                successful_ff += 1
            
            # calculating the cumulative risk-free rate for each horizon.
            for horizon in self.return_horizons:
                try:
                    cumulative_rf = self.calculate_cumulative_rf(filing_date, horizon)
                    if pd.notna(cumulative_rf):
                        self.result_df.at[idx, f'rf_{horizon}m_cumulative'] = cumulative_rf
                        successful_rf += 1
                except:
                    continue
        
        print(f"✅ FF factors: {successful_ff:,}/{len(self.result_df):,}")
        print(f"✅ Cumulative RF: {successful_rf:,} calculations")
        return self
    
    def add_prices_and_calculate_quality_returns(self):
        """
        this is a key step. i'm getting the stock prices and calculating returns,
        but i'm applying my quality filters right away.
        """
        print("\n💰 ADDING PRICES + CALCULATING QUALITY RETURNS")
        print("=" * 40)
        print(f"🛡️ Quality filters applied during calculation:")
        print(f"    Min starting price: ${self.min_price}")
        print(f"    Return bounds: {self.min_return:.0%} to {self.max_return:.0%}")
        
        # creating the columns i'll fill in this step.
        for offset in self.price_offsets:
            self.result_df[f'price_t{offset}'] = np.nan
        
        for horizon in self.return_horizons:
            self.result_df[f'return_{horizon}mo'] = np.nan
            self.result_df[f'return_{horizon}mo_annualized'] = np.nan
            self.result_df[f'excess_return_{horizon}mo'] = np.nan
            self.result_df[f'excess_return_{horizon}mo_annualized'] = np.nan
        
        # getting the sector info at the same time.
        sector_cols = ['ggroup', 'gind', 'gsector', 'gsubind']
        for col in sector_cols:
            self.result_df[f'sector_{col}'] = np.nan
        
        price_matches = 0
        return_calculations = 0
        quality_filtered = 0
        
        print("Processing prices and calculating quality returns...")
        for idx, row in tqdm(self.result_df.iterrows(), total=len(self.result_df), desc="Prices+Returns"):
            gvkey = row['gvkey']
            filing_date = row['filingDate']
            
            # looking up the daily data for the specific company in the current row.
            company_daily = self.df_daily[self.df_daily['gvkey'] == gvkey].copy()
            if company_daily.empty:
                continue
            
            # getting sector info.
            filing_match = abs(company_daily['datadate'] - filing_date).dt.days
            if filing_match.min() <= 15:
                closest_sector_idx = filing_match.idxmin()
                closest_sector_row = company_daily.loc[closest_sector_idx]
                
                for col in sector_cols:
                    if col in closest_sector_row.index and pd.notna(closest_sector_row[col]):
                        self.result_df.at[idx, f'sector_{col}'] = closest_sector_row[col]
            
            # getting prices.
            prices = {}
            for offset_months in self.price_offsets:
                target_date = filing_date + pd.DateOffset(months=offset_months)
                
                if target_date > self.data_cutoff:
                    continue
                
                # finding the trading day price that is closest to my target date.
                date_diffs = abs(company_daily['datadate'] - target_date).dt.days
                valid_matches = date_diffs <= 15
                
                if valid_matches.any():
                    closest_idx = date_diffs[valid_matches].idxmin()
                    closest_row = company_daily.loc[closest_idx]
                    
                    if pd.notna(closest_row.get('prccd')) and closest_row['prccd'] > 0:
                        prices[offset_months] = closest_row['prccd']
                        self.result_df.at[idx, f'price_t{offset_months}'] = closest_row['prccd']
                        price_matches += 1
            
            # calculating returns, but only if the starting price is valid.
            if 0 in prices:
                base_price = prices[0]
                
                # my first quality check: the starting price.
                if base_price < self.min_price:
                    quality_filtered += len(self.return_horizons)
                    continue
                
                for horizon in self.return_horizons:
                    if horizon in prices:
                        end_price = prices[horizon]
                        
                        # another quality check on the ending price.
                        if end_price < self.min_end_price:
                            quality_filtered += 1
                            continue
                        
                        # calculating the raw return.
                        raw_return = (end_price / base_price) - 1
                        
                        # my main quality check on the return value itself.
                        if not (self.min_return <= raw_return <= self.max_return):
                            quality_filtered += 1
                            continue
                        
                        # a sanity check on the price ratio.
                        price_ratio = end_price / base_price
                        if not (0.001 <= price_ratio <= 1000):
                            quality_filtered += 1
                            continue
                        
                        # if a return passes all my checks, i'll add it to my dataframe.
                        self.result_df.at[idx, f'return_{horizon}mo'] = raw_return
                        
                        # i also want to annualize the returns.
                        if horizon != 12:
                            annualized = (1 + raw_return) ** (12/horizon) - 1
                            self.result_df.at[idx, f'return_{horizon}mo_annualized'] = annualized
                        else:
                            self.result_df.at[idx, f'return_{horizon}mo_annualized'] = raw_return
                        
                        # now calculating the excess return.
                        rf_col = f'rf_{horizon}m_cumulative'
                        if rf_col in self.result_df.columns and pd.notna(self.result_df.at[idx, rf_col]):
                            excess_return = raw_return - self.result_df.at[idx, rf_col]
                            self.result_df.at[idx, f'excess_return_{horizon}mo'] = excess_return
                            
                            # annualizing the excess return too.
                            if horizon != 12:
                                ann_excess = (1 + excess_return) ** (12/horizon) - 1
                                self.result_df.at[idx, f'excess_return_{horizon}mo_annualized'] = ann_excess
                            else:
                                self.result_df.at[idx, f'excess_return_{horizon}mo_annualized'] = excess_return
                            
                        return_calculations += 1
        
        print(f"✅ Price matches: {price_matches:,}")
        print(f"✅ Quality returns calculated: {return_calculations:,}")
        print(f"🛡️ Quality filtered: {quality_filtered:,}")
        print(f"💡 Quality filtering rate: {quality_filtered/(return_calculations+quality_filtered):.1%}")
        
        return self
    
    def add_fundamentals_and_ratios(self):
        """now, i'm adding the company fundamentals and calculating some key financial ratios."""
        print("\n💼 ADDING FUNDAMENTALS + CALCULATING RATIOS")
        print("=" * 40)
        
        # creating empty columns for the fundamental data.
        fund_cols = [col for col in self.fund_cols_wanted if col not in ['gvkey', 'datadate']]
        for col in fund_cols:
            self.result_df[f'fund_{col}'] = np.nan
        
        successful_fund = 0
        
        # add fundamental data by finding the closest quarter.
        print("Adding fundamental data...")
        for idx, row in tqdm(self.result_df.iterrows(), total=len(self.result_df), desc="Fundamentals"):
            gvkey = row['gvkey']
            filing_date = row['filingDate']
            
            company_fund = self.df_fundamentals[self.df_fundamentals['gvkey'] == gvkey].copy()
            if company_fund.empty:
                continue
            
            date_diffs = abs(company_fund['datadate'] - filing_date).dt.days
            valid_quarters = date_diffs <= 90
            
            if valid_quarters.any():
                closest_idx = date_diffs[valid_quarters].idxmin()
                closest_quarter = company_fund.loc[closest_idx]
                
                for col in fund_cols:
                    if col in closest_quarter.index and pd.notna(closest_quarter[col]):
                        self.result_df.at[idx, f'fund_{col}'] = closest_quarter[col]
                
                successful_fund += 1
        
        print(f"✅ Fundamental matches: {successful_fund:,}")
        
        # calculating the ratios.
        print("Calculating financial ratios...")
        
        # just mapping the database codes to readable names.
        fund_mapping = {
            'niq': 'net_income', 'atq': 'total_assets', 'teqq': 'shareholders_equity',
            'dlttq': 'long_term_debt', 'cheq': 'cash_equivalents', 'revty': 'revenue',
            'oiadpq': 'operating_income', 'epsfxq': 'eps_diluted', 'cshoq': 'shares_outstanding'
        }
        
        for fund_code, clean_name in fund_mapping.items():
            fund_col = f'fund_{fund_code}'
            if fund_col in self.result_df.columns:
                self.result_df[clean_name] = self.result_df[fund_col]
        
        ratios_calculated = 0
        
        # size metrics.
        if 'total_assets' in self.result_df.columns:
            assets = self.result_df['total_assets'].replace([0, np.inf, -np.inf], np.nan)
            self.result_df['calc_log_total_assets'] = np.log(assets)
            ratios_calculated += 1
        
        if 'price_t0' in self.result_df.columns:
            if 'shares_outstanding' in self.result_df.columns:
                market_cap = self.result_df['price_t0'] * self.result_df['shares_outstanding']
                market_cap = market_cap.replace([0, np.inf, -np.inf], np.nan)
                self.result_df['calc_market_cap'] = market_cap
                self.result_df['calc_log_market_cap'] = np.log(market_cap)
                ratios_calculated += 2
        
        # profitability ratios.
        if 'net_income' in self.result_df.columns and 'total_assets' in self.result_df.columns:
            self.result_df['calc_roa'] = self.result_df['net_income'] / self.result_df['total_assets'].replace(0, np.nan)
            ratios_calculated += 1
        
        if 'net_income' in self.result_df.columns and 'shareholders_equity' in self.result_df.columns:
            self.result_df['calc_roe'] = self.result_df['net_income'] / self.result_df['shareholders_equity'].replace(0, np.nan)
            ratios_calculated += 1
        
        if 'net_income' in self.result_df.columns and 'revenue' in self.result_df.columns:
            self.result_df['calc_profit_margin'] = self.result_df['net_income'] / self.result_df['revenue'].replace(0, np.nan)
            ratios_calculated += 1
        
        # financial health ratios.
        if 'long_term_debt' in self.result_df.columns and 'total_assets' in self.result_df.columns:
            self.result_df['calc_debt_to_assets'] = self.result_df['long_term_debt'] / self.result_df['total_assets'].replace(0, np.nan)
            ratios_calculated += 1
        
        if 'long_term_debt' in self.result_df.columns and 'shareholders_equity' in self.result_df.columns:
            self.result_df['calc_debt_to_equity'] = self.result_df['long_term_debt'] / self.result_df['shareholders_equity'].replace(0, np.nan)
            ratios_calculated += 1
        
        # valuation metrics.
        if 'price_t0' in self.result_df.columns and 'shareholders_equity' in self.result_df.columns and 'shares_outstanding' in self.result_df.columns:
            book_value_per_share = self.result_df['shareholders_equity'] / self.result_df['shares_outstanding'].replace(0, np.nan)
            self.result_df['calc_price_to_book'] = self.result_df['price_t0'] / book_value_per_share
            ratios_calculated += 1
        
        if 'price_t0' in self.result_df.columns and 'eps_diluted' in self.result_df.columns:
            self.result_df['calc_price_to_earnings'] = self.result_df['price_t0'] / self.result_df['eps_diluted'].replace(0, np.nan)
            ratios_calculated += 1
        
        print(f"✅ Financial ratios calculated: {ratios_calculated}")
        
        # getting rid of the temporary columns i created for the ratio calculations.
        helper_cols = list(fund_mapping.values())
        existing_helpers = [col for col in helper_cols if col in self.result_df.columns]
        if existing_helpers:
            self.result_df = self.result_df.drop(columns=existing_helpers)
        
        return self
    
    def organize_final_structure(self):
        """the final step is to organize all the columns in a logical order to make the dataset easy to work with."""
        print("\n📋 ORGANIZING FINAL STRUCTURE")
        print("=" * 40)
        
        # i'm grouping my columns into categories to make them easier to find.
        column_groups = {
            'identifiers': ['gvkey', 'CIK', 'Ticker', 'Company Name', 'Sector', 'Year', 'filingDate'],
            'ai_factors': [col for col in self.result_df.columns if any(x in col for x in 
                             ['Strategic Depth', 'Disclosure Sentiment', 'AI Washing', 'Forward-Looking', 
                              'Talent', 'Risk', 'Overall Summary', 'Key AI Terms', 'Cum_Score'])],
            'prices': [col for col in self.result_df.columns if col.startswith('price_t')],
            'returns_raw': [col for col in self.result_df.columns if col.startswith('return_') and 'excess' not in col],
            'returns_excess': [col for col in self.result_df.columns if col.startswith('excess_return_')],
            'ff_factors': [col for col in self.result_df.columns if col.startswith('ff_')],
            'rf_cumulative': [col for col in self.result_df.columns if 'rf_' in col and 'cumulative' in col],
            'sectors': [col for col in self.result_df.columns if col.startswith('sector_')],
            'fundamentals': [col for col in self.result_df.columns if col.startswith('fund_')],
            'calculated_ratios': [col for col in self.result_df.columns if col.startswith('calc_')]
        }
        
        ordered_columns = []
        for group_name, group_cols in column_groups.items():
            if group_cols:
                print(f"    📊 {group_name}: {len(group_cols)} columns")
                ordered_columns.extend(sorted(group_cols))
        
        # adding any columns i might have missed.
        remaining_cols = [col for col in self.result_df.columns if col not in ordered_columns]
        if remaining_cols:
            print(f"    📋 Other: {len(remaining_cols)} columns")
            ordered_columns.extend(sorted(remaining_cols))
        
        # reordering the dataframe.
        self.result_df = self.result_df[ordered_columns]
        
        # sorting the whole dataset.
        if 'Ticker' in self.result_df.columns and 'Year' in self.result_df.columns:
            self.result_df = self.result_df.sort_values(['Ticker', 'Year']).reset_index(drop=True)
            print(f"    ✅ Sorted by Ticker and Year")
        
        print(f"✅ Final structure organized: {self.result_df.shape}")
        return self
    
    def validate_final_dataset(self):
        """one last check to make sure everything looks right."""
        print("\n🔍 FINAL DATASET VALIDATION")
        print("=" * 40)
        
        # checking that all the data groups are present.
        checks = {
            'AI Factors': len([col for col in self.result_df.columns if 'Strategic Depth' in col or 'Sentiment' in col]) > 0,
            'FF Factors': len([col for col in self.result_df.columns if col.startswith('ff_')]) >= 7,
            'Cumulative RF': len([col for col in self.result_df.columns if 'cumulative' in col]) >= 4,
            'Quality Returns': len([col for col in self.result_df.columns if col.startswith('return_')]) >= 4,
            'Excess Returns': len([col for col in self.result_df.columns if col.startswith('excess_return_')]) >= 4,
            'Fundamentals': len([col for col in self.result_df.columns if col.startswith('fund_')]) >= 8,
            'Calculated Ratios': len([col for col in self.result_df.columns if col.startswith('calc_')]) >= 5
        }
        
        all_passed = True
        for check_name, result in checks.items():
            status = "✅" if result else "❌"
            print(f"    {status} {check_name}")
            if not result:
                all_passed = False
        
        # checking the quality of the returns i calculated.
        print(f"\n    💰 RETURN QUALITY CHECK:")
        for horizon in [3, 6, 12]:
            return_col = f'return_{horizon}mo'
            if return_col in self.result_df.columns:
                returns = self.result_df[return_col].dropna()
                if len(returns) > 0:
                    mean_ret, std_ret, min_ret, max_ret = returns.mean(), returns.std(), returns.min(), returns.max()
                    
                    # making sure the returns are within my predefined bounds.
                    bounds_ok = min_ret >= self.min_return and max_ret <= self.max_return
                    status = "✅" if bounds_ok else "⚠️"
                    print(f"        {status} {horizon}mo: μ={mean_ret:.1%}, σ={std_ret:.1%}, range=[{min_ret:.1%}, {max_ret:.1%}]")
        
        if all_passed:
            print(f"\n🎉 ALL VALIDATION CHECKS PASSED!")
        else:
            print(f"\n⚠️ SOME VALIDATION CHECKS FAILED")
        
        return all_passed
    
    def build_complete_dataset(self):
        """this is the main function that runs all the steps in the correct order."""
        print("\n🚀 BUILDING COMPLETE AI FACTOR DATASET")
        print("=" * 60)
        
        try:
            # running the whole pipeline.
            self.load_source_data()
            self.add_fama_french_and_rf()
            self.add_prices_and_calculate_quality_returns()
            self.add_fundamentals_and_ratios()
            self.organize_final_structure()
            validation_passed = self.validate_final_dataset()
            
            if validation_passed:
                print(f"\n🎉 COMPLETE DATASET READY!")
                print(f"📊 Final shape: {self.result_df.shape}")
                print(f"🛡️ Quality filters applied during calculation")
                print(f"🧹 Clean structure - no dataset confusion")
                print(f"🎯 Ready for AI factor analysis!")
                
                return self.result_df
            else:
                print(f"\n⚠️ VALIDATION FAILED")
                return None
                
        except Exception as e:
            print(f"\n❌ ERROR DURING BUILDING:")
            print(f"    {str(e)}")
            import traceback
            traceback.print_exc()
            return None

# --- usage: running the builder ---
# here i'm creating an instance of my builder class and running the process.
# i need to make sure my `merged_df` from the previous step is loaded correctly here.
print("\n🎯 INSTANTIATING AND RUNNING THE DATASET BUILDER")
print("=" * 70)

# assuming 'final_df' from the previous merging step is available, if not, load it.
if 'final_df' not in locals() or final_df.empty:
    print("loading `final_panel_with_scores.parquet` as base dataset...")
    path_to_base = os.path.join(BASE_DIR, 'final_panel_with_scores.parquet')
    final_df = load_data(path_to_base, is_parquet=True)


if not final_df.empty:
    complete_builder = CompleteAIFactorDatasetBuilder(
        base_ai_dataset=final_df,
        daily_path="/Users/daniel/Library/Mobile Documents/com~apple~CloudDocs/Documents/Master Finance/MasterThesis/ThesisData/Controls/daily.csv",
        fundamentals_path="/Users/daniel/Library/Mobile Documents/com~apple~CloudDocs/Documents/Master Finance/MasterThesis/ThesisData/Controls/fundamentals.csv",
        ff_factors_path="/Users/daniel/Library/Mobile Documents/com~apple~CloudDocs/Documents/Master Finance/MasterThesis/ThesisData/Controls/5factors.csv"
    )

    # building the complete dataset.
    final_enhanced_dataset = complete_builder.build_complete_dataset()

    if final_enhanced_dataset is not None:
        print(f"\n✅ READY TO SAVE!")
        
        # saving the final, enhanced dataset.
        save_path = "/Users/daniel/Library/Mobile Documents/com~apple~CloudDocs/Documents/Master Finance/MasterThesis/ThesisData/Regression/final_enhanced_dataset.csv"
        final_enhanced_dataset.to_csv(save_path, index=False)
        
        print(f"💾 Saved: {save_path}")
        print(f"📊 Shape: {final_enhanced_dataset.shape}")
        print(f"🎯 ONE DATASET, NO CONFUSION!")
else:
    print("could not load the base AI dataset, so i can't build the final version.")



In [None]:
import pandas as pd
import numpy as np
import statsmodels.api as sm
from statsmodels.regression.linear_model import OLS
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

class FactorLoadingsCalculator:
    """
    this is my factor loadings calculator. i'm using it to figure out the betas
    for each company against different fama-french factors. this version has
    all the bugs i found earlier fixed, especially the look-ahead bias.
    """
    
    def __init__(self, daily_prices_path, ff_factors_path, final_dataset_path):
        """
        i'm setting up the calculator with the paths to all the data i need.
        """
        self.daily_prices_path = daily_prices_path
        self.ff_factors_path = ff_factors_path
        self.final_dataset_path = final_dataset_path
        
        print("🔧 Factor Loadings Calculator Initialized (version 1)")
        print(f"    - using daily prices from: {daily_prices_path}")
        print(f"    - using ff factors from: {ff_factors_path}")
        print(f"    - starting with dataset: {final_dataset_path}")

    def load_data(self):
        """
        here i'm loading all the necessary files into memory.
        """
        print("\n# loading my data files...")
        
        # loading the fama-french factors and making sure they're in decimals, not percentages.
        self.ff_factors = pd.read_csv(self.ff_factors_path)
        try:
            self.ff_factors['date'] = pd.to_datetime(self.ff_factors['date'], format='%Y%m%d')
        except:
            self.ff_factors['date'] = pd.to_datetime(self.ff_factors['date'])
        
        factor_cols = ['mktrf', 'smb', 'hml', 'rmw', 'cma', 'rf', 'umd']
        for col in factor_cols:
            if col in self.ff_factors.columns and self.ff_factors[col].abs().max() > 1:
                self.ff_factors[col] = self.ff_factors[col] / 100
        
        self.ff_factors = self.ff_factors.sort_values('date').reset_index(drop=True)
        print(f"  ...loaded Fama-French Factors: {self.ff_factors.shape}")
        
        # loading the daily stock prices and making sure the gvkey is a standard format.
        self.daily_prices = pd.read_csv(self.daily_prices_path, dtype={'gvkey': str})
        self.daily_prices['gvkey'] = self.daily_prices['gvkey'].str.strip().str.zfill(6)
        self.daily_prices['datadate'] = pd.to_datetime(self.daily_prices['datadate'])
        self.daily_prices = self.daily_prices[self.daily_prices['prccd'].notna() & (self.daily_prices['prccd'] > 0)].copy()
        self.daily_prices = self.daily_prices.sort_values(['gvkey', 'datadate']).reset_index(drop=True)
        print(f"  ...loaded Daily Prices: {self.daily_prices.shape}")

        # loading my main dataset that i created with the builder.
        self.final_dataset = pd.read_csv(self.final_dataset_path, dtype={'gvkey': str, 'CIK': str})
        self.final_dataset['gvkey'] = self.final_dataset['gvkey'].str.strip().str.zfill(6)
        if 'filingDate' in self.final_dataset.columns:
            self.final_dataset['filingDate'] = pd.to_datetime(self.final_dataset['filingDate'])
        print(f"  ...loaded Final Enhanced Dataset: {self.final_dataset.shape}")
        
        return self

    def calculate_daily_returns(self):
        """
        calculating daily returns from the price data. this is a crucial step.
        """
        print("\n# calculating daily stock returns...")
        
        returns_list = []
        
        # i'm only processing firms that are in both my daily price file and my final dataset.
        processable_gvkeys = set(self.daily_prices['gvkey'].unique()).intersection(set(self.final_dataset['gvkey'].unique()))
        print(f"  ...processing returns for {len(processable_gvkeys)} firms.")
        
        for gvkey in tqdm(processable_gvkeys, desc="calculating daily returns"):
            firm_data = self.daily_prices[self.daily_prices['gvkey'] == gvkey].copy()
            
            # i need at least 100 days of data to get a reliable estimate.
            if len(firm_data) < 100:
                continue
            
            firm_data = firm_data.sort_values('datadate')
            firm_data['price_lag'] = firm_data['prccd'].shift(1)
            firm_data['daily_return'] = (firm_data['prccd'] / firm_data['price_lag']) - 1
            
            # i'll winsorize the returns to handle extreme outliers.
            firm_data['daily_return'] = firm_data['daily_return'].clip(-1, 1)
            
            returns_list.append(firm_data[['gvkey', 'datadate', 'daily_return']].dropna())
        
        if returns_list:
            self.daily_returns = pd.concat(returns_list, ignore_index=True)
            print(f"  ...finished calculating daily returns. total observations: {len(self.daily_returns):,}")
        else:
            raise ValueError("could not calculate any daily returns.")
            
        return self

    def merge_with_ff_factors(self):
        """
        now i'll merge the daily returns i just calculated with the fama-french factors.
        """
        print("\n# merging daily returns with fama-french factors...")
        
        self.returns_with_factors = pd.merge(
            self.daily_returns, self.ff_factors, left_on='datadate', right_on='date', how='inner'
        )
        
        # this gives me the daily excess return.
        if 'rf' in self.returns_with_factors.columns:
            self.returns_with_factors['excess_return'] = (
                self.returns_with_factors['daily_return'] - self.returns_with_factors['rf']
            )
        
        print(f"  ...merge complete. dataset shape: {self.returns_with_factors.shape}")
        return self

    def calculate_loadings(self, estimation_months=6, min_observations=100):
        """
        this is the main part where i run the regressions to get the factor loadings.
        i'm using a standard academic approach: a 6-month window before the filing date.
        """
        print("\n# calculating factor loadings...")
        
        factor_loadings_list = []
        
        processable_gvkeys = set(self.final_dataset['gvkey'].unique()).intersection(set(self.returns_with_factors['gvkey'].unique()))
        print(f"  ...calculating for {len(processable_gvkeys)} firms.")
        
        for gvkey in tqdm(processable_gvkeys, desc="calculating factor loadings"):
            firm_data = self.returns_with_factors[self.returns_with_factors['gvkey'] == gvkey]
            firm_filings = self.final_dataset[self.final_dataset['gvkey'] == gvkey]
            
            for _, filing_row in firm_filings.iterrows():
                filing_date = filing_row['filingDate']
                year = filing_row.get('Year', filing_date.year)
                
                # this is the estimation window: 6 months ending 1 day before the filing. no look-ahead bias.
                estimation_end = filing_date - pd.Timedelta(days=1)
                estimation_start = estimation_end - pd.DateOffset(months=estimation_months)
                
                window_data = firm_data[(firm_data['datadate'] >= estimation_start) & (firm_data['datadate'] <= estimation_end)]
                
                # again, i need enough data points to run a meaningful regression.
                if len(window_data) < min_observations:
                    continue
                
                try:
                    # setting up the regression: y is the excess return, x are the factors.
                    y = window_data['excess_return']
                    factor_names = ['mktrf', 'smb', 'hml', 'rmw', 'cma', 'umd']
                    X = window_data[factor_names]
                    X = sm.add_constant(X) # for the alpha (intercept).
                    
                    model = OLS(y, X).fit()
                    
                    # storing all the results: betas, t-stats, p-values, and r-squared.
                    result = {'gvkey': gvkey, 'Year': year}
                    result[f'alpha'] = model.params.get('const')
                    result[f'r_squared'] = model.rsquared
                    result[f'n_obs'] = model.nobs
                    for factor in factor_names:
                        result[f'beta_{factor}'] = model.params.get(factor)
                        result[f'tstat_{factor}'] = model.tvalues.get(factor)
                        result[f'pvalue_{factor}'] = model.pvalues.get(factor)
                    
                    factor_loadings_list.append(result)
                    
                except Exception as e:
                    # just in case a regression fails for some reason.
                    print(f"could not run regression for gvkey {gvkey}, year {year}: {e}")
                    continue
                    
        if factor_loadings_list:
            self.factor_loadings = pd.DataFrame(factor_loadings_list)
            print(f"  ...finished calculating loadings. created {len(self.factor_loadings)} results.")
        else:
            raise ValueError("could not calculate any factor loadings.")
            
        return self

    def merge_loadings_with_final_dataset(self):
        """
        the final step is to merge these new factor loadings back into my main dataset.
        """
        print("\n# merging factor loadings into the final dataset...")
        
        self.final_dataset_with_loadings = pd.merge(
            self.final_dataset, self.factor_loadings, on=['gvkey', 'Year'], how='left'
        )
        print(f"  ...merge complete. final shape: {self.final_dataset_with_loadings.shape}")
        
        return self

# --- running the entire process ---
print("\n--- initiating the factor loading construction process (version 1) ---")

# here i create an instance of my builder and run all the steps in order.
try:
    calculator = FactorLoadingsCalculator(
        daily_prices_path=path_daily_prices,
        ff_factors_path=path_ff_factors,
        final_dataset_path=path_merged_ai_scores
    )

    # running the full pipeline.
    calculator.load_data()
    calculator.calculate_daily_returns()
    calculator.merge_with_ff_factors()
    calculator.calculate_loadings()
    calculator.merge_loadings_with_final_dataset()

    # getting the final dataframe with everything included.
    final_dataset_with_factors = calculator.final_dataset_with_loadings
    
    # saving the final result. this is the dataset i'll use for my regressions.
    final_save_path = os.path.join(BASE_DIR, "final_dataset_with_loadings.parquet")
    final_dataset_with_factors.to_parquet(final_save_path, index=False)
    
    print(f"\n✅ SUCCESS! final dataset with factor loadings is complete.")
    print(f"   saved to: {final_save_path}")
    print(f"   final shape: {final_dataset_with_factors.shape}")

except Exception as e:
    print(f"\n❌ an error occurred during the process: {e}")
    import traceback
    traceback.print_exc()


In [None]:
import pandas as pd
import numpy as np
import statsmodels.api as sm
from statsmodels.regression.linear_model import OLS
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

class FinancialFactorsCalculator:
    """
    this is my all-in-one calculator for financial factors. it now handles both:
    1. factor loadings (betas) for the fama-french model.
    2. momentum returns from the months prior to my filing date.
    this will be the last major step before i start my regressions.
    """
    
    def __init__(self, daily_prices_path, ff_factors_path, base_dataset_path):
        """
        setting up the calculator with the paths to all my data.
        the base_dataset_path should point to the file created by the CompleteAIFactorDatasetBuilder.
        """
        self.daily_prices_path = daily_prices_path
        self.ff_factors_path = ff_factors_path
        self.base_dataset_path = base_dataset_path
        
        print("🔧 Financial Factors Calculator Initialized (version 1)")
        print(f"    - using daily prices from: {daily_prices_path}")
        print(f"    - using ff factors from: {ff_factors_path}")
        print(f"    - starting with dataset: {base_dataset_path}")

    def load_data(self):
        """
        loading all the necessary files into memory.
        """
        print("\n# loading my data files...")
        
        # loading fama-french factors and making sure they're decimals.
        self.ff_factors = pd.read_csv(self.ff_factors_path)
        try:
            self.ff_factors['date'] = pd.to_datetime(self.ff_factors['date'], format='%Y%m%d')
        except:
            self.ff_factors['date'] = pd.to_datetime(self.ff_factors['date'])
        
        factor_cols = ['mktrf', 'smb', 'hml', 'rmw', 'cma', 'rf', 'umd']
        for col in factor_cols:
            if col in self.ff_factors.columns and self.ff_factors[col].abs().max() > 1:
                self.ff_factors[col] = self.ff_factors[col] / 100
        
        self.ff_factors = self.ff_factors.sort_values('date').reset_index(drop=True)
        print(f"  ...loaded Fama-French Factors: {self.ff_factors.shape}")
        
        # loading daily stock prices and standardizing the gvkey.
        self.daily_prices = pd.read_csv(self.daily_prices_path, dtype={'gvkey': str})
        self.daily_prices['gvkey'] = self.daily_prices['gvkey'].str.strip().str.zfill(6)
        self.daily_prices['datadate'] = pd.to_datetime(self.daily_prices['datadate'])
        self.daily_prices = self.daily_prices[self.daily_prices['prccd'].notna() & (self.daily_prices['prccd'] > 0)].copy()
        self.daily_prices = self.daily_prices.sort_values(['gvkey', 'datadate']).reset_index(drop=True)
        print(f"  ...loaded Daily Prices: {self.daily_prices.shape}")

        # loading my main dataset created by the previous builder step.
        self.final_dataset = pd.read_csv(self.base_dataset_path, dtype={'gvkey': str, 'CIK': str})
        self.final_dataset['gvkey'] = self.final_dataset['gvkey'].str.strip().str.zfill(6)
        if 'filingDate' in self.final_dataset.columns:
            self.final_dataset['filingDate'] = pd.to_datetime(self.final_dataset['filingDate'])
        print(f"  ...loaded Final Enhanced Dataset: {self.final_dataset.shape}")
        
        return self

    def calculate_daily_returns(self):
        """
        calculating daily returns, which i'll need for both loadings and momentum.
        """
        print("\n# calculating daily stock returns...")
        
        returns_list = []
        processable_gvkeys = set(self.daily_prices['gvkey'].unique()).intersection(set(self.final_dataset['gvkey'].unique()))
        print(f"  ...processing returns for {len(processable_gvkeys)} firms.")
        
        for gvkey in tqdm(processable_gvkeys, desc="calculating daily returns"):
            firm_data = self.daily_prices[self.daily_prices['gvkey'] == gvkey].copy()
            if len(firm_data) < 100:
                continue
            
            firm_data = firm_data.sort_values('datadate')
            firm_data['price_lag'] = firm_data['prccd'].shift(1)
            firm_data['daily_return'] = (firm_data['prccd'] / firm_data['price_lag']) - 1
            firm_data['daily_return'] = firm_data['daily_return'].clip(-0.5, 0.5) # winsorizing daily returns
            
            returns_list.append(firm_data[['gvkey', 'datadate', 'daily_return']].dropna())
        
        if not returns_list:
            raise ValueError("could not calculate any daily returns.")
            
        self.daily_returns = pd.concat(returns_list, ignore_index=True)
        print(f"  ...finished calculating daily returns. total observations: {len(self.daily_returns):,}")
        return self

    def merge_with_ff_factors(self):
        """
        merging the daily returns with the fama-french factors to get daily excess returns.
        """
        print("\n# merging daily returns with fama-french factors...")
        
        self.returns_with_factors = pd.merge(
            self.daily_returns, self.ff_factors, left_on='datadate', right_on='date', how='inner'
        )
        
        if 'rf' in self.returns_with_factors.columns:
            self.returns_with_factors['excess_return'] = self.returns_with_factors['daily_return'] - self.returns_with_factors['rf']
        
        print(f"  ...merge complete. dataset shape: {self.returns_with_factors.shape}")
        return self

    def calculate_factor_loadings(self, estimation_months=6, min_observations=100):
        """
        running the regressions to get the factor loadings (betas).
        """
        print("\n# calculating factor loadings...")
        
        factor_loadings_list = []
        processable_gvkeys = set(self.final_dataset['gvkey'].unique()).intersection(set(self.returns_with_factors['gvkey'].unique()))
        print(f"  ...calculating for {len(processable_gvkeys)} firms.")
        
        for gvkey in tqdm(processable_gvkeys, desc="calculating factor loadings"):
            firm_data = self.returns_with_factors[self.returns_with_factors['gvkey'] == gvkey]
            firm_filings = self.final_dataset[self.final_dataset['gvkey'] == gvkey]
            
            for _, filing_row in firm_filings.iterrows():
                filing_date = filing_row['filingDate']
                year = filing_row.get('Year', filing_date.year)
                
                estimation_end = filing_date - pd.Timedelta(days=1)
                estimation_start = estimation_end - pd.DateOffset(months=estimation_months)
                window_data = firm_data[(firm_data['datadate'] >= estimation_start) & (firm_data['datadate'] <= estimation_end)]
                
                if len(window_data) < min_observations:
                    continue
                
                try:
                    y = window_data['excess_return']
                    factor_names = ['mktrf', 'smb', 'hml', 'rmw', 'cma', 'umd']
                    X = window_data[factor_names]
                    X = sm.add_constant(X)
                    model = OLS(y, X).fit()
                    
                    result = {'gvkey': gvkey, 'Year': year}
                    result['alpha'] = model.params.get('const')
                    result['r_squared'] = model.rsquared
                    result['n_obs'] = model.nobs
                    for factor in factor_names:
                        result[f'beta_{factor}'] = model.params.get(factor)
                    
                    factor_loadings_list.append(result)
                except Exception:
                    continue
                    
        self.factor_loadings = pd.DataFrame(factor_loadings_list)
        print(f"  ...finished calculating loadings. created {len(self.factor_loadings)} results.")
        return self
        
    def calculate_momentum(self, months_back=[1, 2, 3]):
        """
        calculating momentum as the cumulative return over prior months.
        """
        print("\n# calculating momentum factors...")
        
        momentum_results = []
        processable_gvkeys = set(self.final_dataset['gvkey'].unique()).intersection(set(self.daily_returns['gvkey'].unique()))
        print(f"  ...calculating for {len(processable_gvkeys)} firms.")
        
        for gvkey in tqdm(processable_gvkeys, desc="calculating momentum"):
            firm_data = self.daily_returns[self.daily_returns['gvkey'] == gvkey]
            firm_filings = self.final_dataset[self.final_dataset['gvkey'] == gvkey]
            
            for _, filing_row in firm_filings.iterrows():
                filing_date = filing_row['filingDate']
                year = filing_row.get('Year', filing_date.year)
                result = {'gvkey': gvkey, 'Year': year}
                
                for months in months_back:
                    lookback_end = filing_date - pd.Timedelta(days=5)
                    lookback_start = lookback_end - pd.DateOffset(months=months)
                    window_data = firm_data[(firm_data['datadate'] >= lookback_start) & (firm_data['datadate'] <= lookback_end)]
                    
                    if len(window_data) > 10:
                        cumulative_return = (1 + window_data['daily_return']).prod() - 1
                        result[f'momentum_{months}m'] = cumulative_return
                
                momentum_results.append(result)

        self.momentum_data = pd.DataFrame(momentum_results)
        print(f"  ...finished calculating momentum. created {len(self.momentum_data)} results.")
        return self

    def merge_factors_into_final_dataset(self):
        """
        merging both the factor loadings and the momentum factors into my main dataset.
        """
        print("\n# merging all new factors into the final dataset...")
        
        # merging factor loadings.
        self.final_dataset_with_factors = pd.merge(
            self.final_dataset, self.factor_loadings, on=['gvkey', 'Year'], how='left'
        )
        print(f"  ...merged factor loadings. shape: {self.final_dataset_with_factors.shape}")

        # merging momentum.
        if hasattr(self, 'momentum_data'):
            self.final_dataset_with_factors = pd.merge(
                self.final_dataset_with_factors, self.momentum_data, on=['gvkey', 'Year'], how='left'
            )
            print(f"  ...merged momentum factors. shape: {self.final_dataset_with_factors.shape}")
        
        return self

# --- running the entire process ---
print("\n--- initiating the financial factor construction process (version 1) ---")

try:
    # i'm creating an instance of my calculator class.
    # IMPORTANT: i'm making sure it uses the correct dataset from the previous builder step.
    calculator = FinancialFactorsCalculator(
        daily_prices_path="/Users/daniel/Library/Mobile Documents/com~apple~CloudDocs/Documents/Master Finance/MasterThesis/ThesisData/Controls/daily.csv",
        ff_factors_path="/Users/daniel/Library/Mobile Documents/com~apple~CloudDocs/Documents/Master Finance/MasterThesis/ThesisData/Controls/5factors.csv",
        base_dataset_path="/Users/daniel/Library/Mobile Documents/com~apple~CloudDocs/Documents/Master Finance/MasterThesis/ThesisData/Regression/final_enhanced_dataset.csv"
    )

    # running the full pipeline.
    calculator.load_data()
    calculator.calculate_daily_returns()
    calculator.merge_with_ff_factors()
    calculator.calculate_factor_loadings()
    calculator.calculate_momentum()
    calculator.merge_factors_into_final_dataset()

    # getting the final dataframe with everything included.
    final_dataset_with_all_factors = calculator.final_dataset_with_factors
    
    # saving the final result. this is the one i'll use for my regressions.
    final_save_path = os.path.join(BASE_DIR, "Regression/final_dataset_with_all_factors.parquet")
    final_dataset_with_all_factors.to_parquet(final_save_path, index=False)
    
    print(f"\n✅ SUCCESS! final dataset with loadings and momentum is complete.")
    print(f"   saved to: {final_save_path}")
    print(f"   final shape: {final_dataset_with_all_factors.shape}")

except Exception as e:
    print(f"\n❌ an error occurred during the process: {e}")
    import traceback
    traceback.print_exc()

