# XFR to PySpark Code Conversion Agent

This notebook implements an Azure AI Foundry code interpreter agent that converts Ab Initio XFR logic to PySpark code. The agent will:
1. Parse XFR artefacts and schemas
2. Generate PySpark transformation code
3. Validate the generated code using Pandas
4. Output a fully validated PySpark script

## 1. Setup Environment Variables

First, let's set up the required environment variables using values from our .env file.

## 1. Setup Environment Variables

Load the required environment variables from the `.env` file in the code folder. The following variables are required:
- PROJECT_ENDPOINT
- AZURE_AI_CONNECTION_STRING
- AZURE_AI_RESOURCE_GROUP
- AZURE_AI_SUBSCRIPTION_ID
- AZURE_AI_PROJECT
- AZURE_AI_ENDPOINT
- AZURE_AI_API_KEY
- AZURE_AI_DEPLOYMENT
- MODEL_DEPLOYMENT_NAME
- AZURE_AI_API_VERSION

In [None]:
# import os

# Example configuration - Replace with values from your .env file
# os.environ['PROJECT_ENDPOINT'] = 'your_project_endpoint'
# os.environ['AZURE_AI_CONNECTION_STRING'] = 'your_connection_string'
# os.environ['AZURE_AI_RESOURCE_GROUP'] = 'your_resource_group'
# os.environ['AZURE_AI_SUBSCRIPTION_ID'] = 'your_subscription_id'
# os.environ['AZURE_AI_PROJECT'] = 'your_project_name'
# os.environ['AZURE_AI_ENDPOINT'] = 'your_ai_endpoint'
# os.environ['AZURE_AI_API_KEY'] = 'your_api_key'
# os.environ['AZURE_AI_DEPLOYMENT'] = 'your_deployment_name'
# os.environ['MODEL_DEPLOYMENT_NAME'] = 'your_model_deployment'
# os.environ['AZURE_AI_API_VERSION'] = 'your_api_version'

In [None]:
import os
from pathlib import Path
from dotenv import load_dotenv

# Load environment variables from root directory .env file
env_path = Path(__file__).parent.parent / '.env'
load_dotenv(env_path)

print(f"Loading .env file from: {env_path}")

# Verify required environment variables are loaded
required_vars = [
    'PROJECT_ENDPOINT',
    'AZURE_AI_CONNECTION_STRING',
    'AZURE_AI_RESOURCE_GROUP',
    'AZURE_AI_SUBSCRIPTION_ID',
    'AZURE_AI_PROJECT',
    'AZURE_AI_ENDPOINT',
    'AZURE_AI_API_KEY',
    'AZURE_AI_DEPLOYMENT',
    'MODEL_DEPLOYMENT_NAME',
    'AZURE_AI_API_VERSION'
]

for var in required_vars:
    if not os.getenv(var):
        raise ValueError(f'Required environment variable {var} is not set in .env file')

print('Successfully loaded all required environment variables from .env file')

Successfully loaded all required environment variables from .env file


## 2. Import Required Libraries

Import the necessary Python libraries for working with Azure AI Foundry and file handling.

In [34]:
from azure.ai.projects import AIProjectClient
from azure.ai.agents.models import CodeInterpreterTool, AgentThreadCreationOptions, ThreadMessageOptions
from azure.ai.agents.models import FilePurpose, MessageRole, ListSortOrder
from azure.identity import DefaultAzureCredential
from pathlib import Path
import pandas as pd
import json
from azure.ai.agents import AgentsClient

## 3. Initialize AI Project Client

Create an instance of AIProjectClient using the configured endpoint and credentials.

In [None]:
endpoint = os.environ['PROJECT_ENDPOINT']

# Initialize credentials with specific tenant ID
credential = DefaultAzureCredential(
    exclude_interactive_browser_credential=False
)

agents_client = AgentsClient(
    endpoint=endpoint,
    credential=credential
)
print('Successfully initialized Agents Client')

Successfully initialized Agents Client


## 4. Read Input Files

Read the contents of XFR, input layout, and output layout files.

In [36]:
def read_file_content(file_path):
    with open(file_path, 'r') as f:
        return f.read()

# Read contents of input files
xfr_content = read_file_content('../Simple/ASC_VIP_Premium.xfr')
input_layout_content = read_file_content('../Simple/simple_input_layout.txt')
output_layout_content = read_file_content('../Simple/simple_output_layout.txt')

print('Successfully read all input files')

Successfully read all input files


## 5. Create Code Interpreter Agent

Create and configure the code interpreter agent with the task instructions.

In [37]:
# Import the required tool definition
from azure.ai.agents.models import CodeInterpreterToolDefinition

# Set up code interpreter tool
code_interpreter_tool = CodeInterpreterToolDefinition()

# Create agent
agent = agents_client.create_agent(
    model=os.environ['MODEL_DEPLOYMENT_NAME'],
    name='xfr-to-pyspark-converter',
    instructions='''
You are a principal Spark architect and Python engineer whose task is to convert Ab Initio XFR logic to PySpark code.
Your goals are to:
1. Parse XFR artefacts and schemas
2. Generate PySpark transformation code
3. Validate the code using Pandas
4. Return a fully validated PySpark script
''',
    tools=[code_interpreter_tool],
    tool_resources=None
)

print(f'Created agent with ID: {agent.id}')

Created agent with ID: asst_A3ZbJAQI20l3gONIkZOAnhZ3


## 6-9. Generate and Validate PySpark Code

Create a thread and send a message to the agent to generate and validate the PySpark code.

In [41]:
# Create thread and process run in one step
run = agents_client.create_thread_and_process_run(
    agent_id=agent.id,
    thread=AgentThreadCreationOptions(
        messages=[ThreadMessageOptions(
            role="user",
            content=f'''
Here are the input files and instructions:

1. XFR File Content:
```
{xfr_content}
```

2. Input Layout:
```
{input_layout_content}
```

3. Output Layout:
```
{output_layout_content}
```

Please perform the following tasks in order:

1. Parse these artefacts:
   - XFR logic from the provided XFR content above
   - Source schema from the input layout content above
   - Target schema from the output layout content above

2. Generate PySpark code:
   - Create a function for each XFR rule: def add_<rule>(df)
   - Create a pipeline(df_in) that applies all functions in sequence
   - Ensure output columns match the target schema exactly

3. Validate without Spark:
   - Create Pandas equivalents of all functions
   - Generate test data (20+ rows) covering all columns
   - Run Pandas validation
   - Verify column names and order
   - Fix any issues found

4. Return the final, validated PySpark script with comments
'''
        )]
    )
)

print(f'Run completed with status: {run.status!r}')

if run.status == 'failed':
    print('Run failed:', run.last_error)
else:
    print(f'Thread ID: {run.thread_id}')
    # Get messages with debug information
    messages = agents_client.messages.list(thread_id=run.thread_id, order=ListSortOrder.ASCENDING)
    print(f'Retrieved {sum(1 for _ in messages)} messages')
    
    found_agent_message = False
    for msg in messages:
        print(f'Message from {msg.role}:')
        if msg.text_messages:
            for text_msg in msg.text_messages:
                print(f'Content: {text_msg.text.value}')
                if msg.role == 'agent':
                    found_agent_message = True
        else:
            print('No text messages in this message')
    
    if not found_agent_message:
        print('\nNo agent messages found in the response')

Run completed with status: <RunStatus.COMPLETED: 'completed'>
Thread ID: thread_8ZBREEKgq6Es3rqSBGrTjhiG
Retrieved 3 messages

No agent messages found in the response
Retrieved 3 messages

No agent messages found in the response


In [None]:
# Create thread and process run in one step
run = agents_client.create_thread_and_process_run(
    agent_id=agent.id,
    thread=AgentThreadCreationOptions(
        messages=[ThreadMessageOptions(
            role="user",
            content=f'''
Here are the input files and instructions:

1. XFR File Content:
```
{xfr_content}
```

2. Input Layout:
```
{input_layout_content}
```

3. Output Layout:
```
{output_layout_content}
```

Please perform the following tasks in order:

1. Parse these artefacts:
   - XFR logic from the provided XFR content above
   - Source schema from the input layout content above
   - Target schema from the output layout content above

2. Generate PySpark code:
   - Create a function for each XFR rule: def add_<rule>(df)
   - Create a pipeline(df_in) that applies all functions in sequence
   - Ensure output columns match the target schema exactly

3. Validate without Spark:
   - Create Pandas equivalents of all functions
   - Generate test data (20+ rows) covering all columns
   - Run Pandas validation
   - Verify column names and order
   - Fix any issues found

4. Return the final, validated PySpark script with inline comments
'''
        )]
    )
)

print(f'Run completed with status: {run.status!r}')

if run.status == 'failed':
    print('Run failed:', run.last_error)
else:
    # List out all messages in the thread in ascending order
    messages = agents_client.messages.list(thread_id=run.thread_id, order=ListSortOrder.ASCENDING)
    for msg in messages:
        if msg.text_messages:
            last_text = msg.text_messages[-1]
            print(f'{msg.role}: {last_text.text.value}')

Run completed with status: <RunStatus.COMPLETED: 'completed'>
MessageRole.USER: 
Here are the input files and instructions:

1. XFR File Content:
```
/*   Do not edit. Generated file - BRE 4.1.5.7;/Projects/ent/ess/direct_state_reporting_property_premium/rset/ASC_VIP_Premium/ruleset.rset;Default deployment;   */


/*@ BizHidden:true @*/
/*   lookup ISO State Code   */
out0::bzt_7_ac38046_67629ab9_2fd1c1_0(in0)=
begin
let record
  string("\t") country_code, policy_state_code = NULL(''), policy_state_alpha_code, policy_iso_state_code, claim_nextgen_state_code;
  string("\n") claim_tico_state_code;
end __bzt_local_res__;
__bzt_local_res__ = fail_if_error(lookup("ISO State Code",in0));
  out0::__bzt_local_res__;
end;

out0::ASC_VIP_Premium_documentation(in)=
begin

out0.policyeffectiveyear::depends_on(
        in.transactioneffdttime,
        in.transactiontypeentcd,
        in.contracttermeffdttime
        );/*@ Rules: [ "Compute policyeffectiveyear" ] @*/

out0.companynumber::depends_on(

## Cleanup

Delete the agent and close the client.

In [None]:
# Delete agent
agents_client.delete_agent(agent.id)
print('Deleted agent')

