# Extract Structured Insider Trades (Directors' Dealings) with AI

This notebook demonstrates how to convert unstructured insider trade filings (Directors' Dealings, or `DIRS`) into clean, structured JSON using the FinancialReports API and Google's Gemini Flash AI model.

### The Value Proposition

Clients often need structured data on insider transactions (who bought/sold, what, how much, and at what price). Manually parsing this from raw text filings is difficult and time-consuming, as formats vary.

This workflow provides an automated solution. It uses our API to find the filings and a powerful AI model to read the filing's markdown and return a clean, predictable JSON object, saving clients significant development time.

In [None]:
import os
import json
import pandas as pd
import google.generativeai as genai
from google.generativeai import types
from financialreports_api_client import Client
from financialreports_api_client.api.filings import filings_list, filings_markdown_retrieve
from financialreports_api_client.models.filings_list_type import FilingsListType
from financialreports_api_client.types import Response
from dotenv import load_dotenv
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Load environment variables from a .env file (if it exists)
load_dotenv()

# Load API keys from environment
FR_API_KEY = os.environ.get("FR_API_KEY")
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")

if not FR_API_KEY:
    raise ValueError("FR_API_KEY not found. Please set it as an environment variable.")

if not GEMINI_API_KEY:
    raise ValueError("GEMINI_API_KEY not found. Please set it as an environment variable.")

logger.info("API keys loaded successfully.")

In [None]:
# 1. Configure FinancialReports API Client
fr_client = Client(base_url="https://api.financialreports.eu", headers={"X-API-Key": FR_API_KEY}, timeout=30.0)

# 2. Configure Google Gemini Client
genai.configure(api_key=GEMINI_API_KEY)

logger.info("API clients configured.")

In [None]:
# --- User-Configurable Parameters ---

# Define the company you want to search for
COMPANY_ISIN = "DE000A1EWWW0"  # Example: adidas AG

# Define the date range for the search
RELEASE_DATE_FROM = "2024-01-01T00:00:00Z"

# Define the Gemini model to use
MODEL_NAME = "models/gemini-flash-latest"

# --------------------------------------

logger.info(f"Parameters set: ISIN={COMPANY_ISIN}, DateFrom={RELEASE_DATE_FROM}")

## Step 1: Find DIRS Filings

First, we use the `/filings/` endpoint to find all filings with the type `DIRS` for our target company, filtered by our start date.

In [None]:
logger.info(f"Searching for 'DIRS' filings for ISIN {COMPANY_ISIN}...")

try:
    filings_response = filings_list.sync(
        client=fr_client,
        company_isin=COMPANY_ISIN,
        type=FilingsListType.DIRS,
        release_datetime_from=RELEASE_DATE_FROM,
        page_size=10  # Limiting to 10 for this example
    )

    if filings_response and filings_response.results:
        filings_to_process = filings_response.results
        logger.info(f"Found {filings_response.count} total filings. Processing first {len(filings_to_process)}.")
        
        # Display the first filing found as a confirmation
        print(f"--- Example Filing Found ---")
        print(f"ID: {filings_to_process[0].id}")
        print(f"Title: {filings_to_process[0].title}")
        print(f"Release Date: {filings_to_process[0].release_datetime}")
        print(f"Company: {filings_to_process[0].company.name}")
        print("----------------------------")
    else:
        filings_to_process = []
        logger.warning("No 'DIRS' filings found matching the criteria.")

except Exception as e:
    logger.error(f"Error fetching filings: {e}")
    filings_to_process = []

## Step 2: Define the Structured Output Schema

This is the most critical step. We define a `Schema` object that tells the Gemini model *exactly* what data to extract and what the final JSON structure must look like.

This schema enforces the data types (string, number, list) and structure (nested objects), ensuring the AI's output is reliable and predictable.

In [None]:
transaction_schema = types.Schema(
    type=types.Type.OBJECT,
    properties={
        "transaction_date": types.Schema(type=types.Type.STRING, description="The date of the transaction (YYYY-MM-DD)"),
        "financial_instrument": types.Schema(type=types.Type.STRING, description="The financial instrument, e.g., 'Shares' or 'Stock Options'"),
        "nature_of_transaction": types.Schema(type=types.Type.STRING, description="The nature of the transaction, e.g., 'Acquisition', 'Disposal' or 'Purchase'"),
        "price": types.Schema(type=types.Type.NUMBER, description="The price per unit of the instrument"),
        "currency": types.Schema(type=types.Type.STRING, description="The currency of the transaction (e.g., 'EUR', 'GBP')"),
        "volume": types.Schema(type=types.Type.NUMBER, description="The number of units transacted"),
        "total_value": types.Schema(type=types.Type.NUMBER, description="The total value of the transaction (Price * Volume)"),
        "venue": types.Schema(type=types.Type.STRING, description="The trading venue, e.g., 'XETRA', 'Outside a trading venue'")
    }
)

reporting_person_schema = types.Schema(
    type=types.Type.OBJECT,
    properties={
        "name": types.Schema(type=types.Type.STRING, description="The name of the reporting person (e.g., 'John Doe')"),
        "position": types.Schema(type=types.Type.STRING, description="The position of the person, e.g., 'CEO', 'Member of the Supervisory Board'")
    }
)

dirs_schema = types.Schema(
    type=types.Type.OBJECT,
    properties={
        "issuer_name": types.Schema(type=types.Type.STRING, description="The name of the company issuing the securities"),
        "issuer_isin": types.Schema(type=types.Type.STRING, description="The ISIN of the issuing company"),
        "reporting_person_details": reporting_person_schema,
        "transactions": types.Schema(
            type=types.Type.ARRAY,
            description="A list of all transactions reported in this filing",
            items=transaction_schema
        )
    }
)

logger.info("Structured output schema defined successfully.")

## Step 3: Create AI Extraction Function

Now we create a helper function that does all the work:
1.  Takes a `filing_id` as input.
2.  Fetches the filing's markdown content from our API.
3.  Creates a prompt that includes the markdown.
4.  Configures the Gemini model to use our schema and output JSON.
5.  Calls the model and returns the parsed JSON data.

In [None]:
def extract_structured_data(filing_id: int) -> dict | None:
    """
    Fetches markdown for a filing and uses Gemini to extract structured data.
    """
    logger.info(f"Processing filing_id: {filing_id}...")
    
    # 1. Fetch markdown content from FinancialReports API
    try:
        markdown_response: Response[str] = filings_markdown_retrieve.sync(
            client=fr_client,
            filing_id=filing_id
        )
        markdown_content = markdown_response.parsed
        if not markdown_content:
            logger.warning(f"No markdown content found for filing_id: {filing_id}")
            return None
    except Exception as e:
        logger.error(f"Error fetching markdown for filing_id {filing_id}: {e}")
        return None

    # 2. Build the prompt for the Gemini model
    prompt = f"""
    Extract the directors' dealing (insider trade) information from the following financial filing.
    Provide all monetary values as numbers, not strings.
    Ensure the 'transaction_date' is in 'YYYY-MM-DD' format.
    If multiple transactions are listed, include all of them in the 'transactions' list.

    Filing Content:
    ---
    {markdown_content}
    ---
    """
    
    contents = [types.Part.from_text(prompt)]

    # 3. Configure and call the Gemini model
    model = genai.GenerativeModel(
        model_name=MODEL_NAME,
        generation_config=types.GenerationConfig(
            response_mime_type="application/json",
            response_schema=dirs_schema,
        )
    )

    try:
        response = model.generate_content(contents)
        
        # 4. Parse and return the JSON data
        # The model.generate_content call now directly returns a response
        # The 'text' attribute contains the JSON string when in JSON mode.
        json_data = json.loads(response.text)
        logger.info(f"Successfully extracted data for filing_id: {filing_id}")
        return json_data
        
    except Exception as e:
        logger.error(f"Error generating content for filing_id {filing_id}: {e}")
        return None

## Step 4: Execute Workflow & Aggregate Results

Now we loop through the list of `filings_to_process` we found in Step 1.

We'll call our `extract_structured_data` function for each one and store the results in a list. We also add our own `filing_id` to the result for easy cross-referencing.

In [None]:
all_structured_data = []
processed_count = 0

if not filings_to_process:
    logger.warning("No filings to process. Skipping extraction loop.")
else:
    for filing in filings_to_process:
        if filing.id is None:
            continue
            
        data = extract_structured_data(filing.id)
        
        if data:
            # Add our own metadata for context
            data['filing_id'] = filing.id
            data['filing_title'] = filing.title
            data['filing_release_datetime'] = str(filing.release_datetime)
            all_structured_data.append(data)
            processed_count += 1

logger.info(f"--- Workflow Complete ---")
logger.info(f"Successfully processed {processed_count} out of {len(filings_to_process)} filings.")

# Display the raw nested JSON output for the first result
if all_structured_data:
    print("\n--- Raw JSON Output (First Result) ---")
    print(json.dumps(all_structured_data[0], indent=2))
else:
    print("\nNo structured data was extracted.")

## Step 5: Analyze and Flatten Data with Pandas

The raw JSON output is useful, but it's nested. For many analyses (e.g., in Excel or a BI tool), a flat table is better.

We can use `pandas.json_normalize` to instantly 'flatten' our data. We tell it to use the `transactions` list as the main records (`record_path`) and to pull in the top-level details (like `filing_id` and `issuer_name`) as columns for each record (`meta`).

In [None]:
if not all_structured_data:
    logger.warning("No data to flatten. Skipping pandas DataFrame creation.")
else:
    try:
        # Define which top-level keys to merge into each transaction row
        meta_keys = [
            'filing_id',
            'filing_title',
            'filing_release_datetime',
            'issuer_name',
            'issuer_isin',
            ['reporting_person_details', 'name'],
            ['reporting_person_details', 'position']
        ]

        # Use json_normalize to flatten the data
        df_transactions = pd.json_normalize(
            all_structured_data, 
            record_path=['transactions'], 
            meta=meta_keys,
            errors='ignore'  # Skip if a record has no 'transactions'
        )

        # Rename flattened columns for clarity
        df_transactions.rename(columns={
            'reporting_person_details.name': 'person_name',
            'reporting_person_details.position': 'person_position'
        }, inplace=True)

        print("\n--- Flattened DataFrame of All Transactions ---")
        display(df_transactions)

    except Exception as e:
        logger.error(f"Error flattening data with pandas: {e}")
        print("Could not create DataFrame. Displaying raw data instead:")
        print(all_structured_data)

## Conclusion

In just a few steps, we have built a powerful pipeline that:
1.  Identified relevant filings using the FinancialReports API.
2.  Fetched the raw text content for each one.
3.  Used Google's Gemini Flash model to parse the text into a guaranteed-valid JSON structure.
4.  Aggregated and flattened this data into a single, analysis-ready DataFrame.

This `DataFrame` can now be easily exported to a CSV, loaded into a database, or used for further financial analysis.