In [7]:
import json
import xml.etree.ElementTree as ET
import boto3
import logging

### Logging and Connect to S3


In [8]:
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize the S3 client
s3_client = boto3.client('s3')

# S3 paths
input_bucket = 'eda-search-documents'
input_prefix = 'contracts/PDS/2025/'
output_bucket = 'eda-search-documents'
output_prefix = 'processed-json/'

### Convert XML to a Dictionary

In [9]:
# Function to convert XML to dictionary while maintaining the hierarchical structure
def xml_to_dict(element):
    result = {}

    for child in element:
        child_dict = xml_to_dict(child) if len(child) else child.text.strip() if child.text else ""

        if child.tag in result:
            # Convert to a list if multiple children with the same tag exist
            if isinstance(result[child.tag], list):
                result[child.tag].append(child_dict)
            else:
                result[child.tag] = [result[child.tag], child_dict]
        else:
            result[child.tag] = child_dict

    return result


#### Code explain:
* This recursive function converts an XML element into a Python dictionary.
* It iterates over the children of the given XML element.
* If child has nested elements (len(child) > 0), it recursively calls xml_to_dict(child), converting the child into a dictionary.
* If child has no nested elements:
    * If child.text exists (i.e., it contains text), .strip() is used to remove leading/trailing whitespace.
    * If child.text is None, it defaults to an empty string ("").

* This ensures that:
    * Elements with children are represented as dictionaries.
    * Leaf nodes (no children) store their text content.

### Read and Convert XML files

In [10]:
# Function to process each XML file
def process_xml_file(file_key):
    try:
        logger.info(f"Processing file: {file_key}")

        # Get the XML content from S3
        response = s3_client.get_object(Bucket=input_bucket, Key=file_key)
        xml_content = response['Body'].read().decode('utf-8')

        # Parse the XML content
        root = ET.fromstring(xml_content)
        json_data = xml_to_dict(root)

        return json.dumps(json_data, indent=4)

    except ET.ParseError as e:
        logger.error(f"XML ParseError in {file_key}: {e}")
    except Exception as e:
        logger.error(f"Unexpected error processing {file_key}: {e}")

    return None  # Return None if processing fails


#### Code explain

* Reads an XML file from S3.
* Parses the XML content using ElementTree (ET).
* Calls xml_to_dict(root) to convert the parsed XML into a dictionary.
    * More specifically, uses **ElementTree.fromstring()** to parse the XML string and convert it into an ElementTree object (root).
    * This allows us to traverse the XML structure.
* Converts the dictionary to a JSON string.
* Logs errors if XML parsing fails.
* Returns the JSON string if processing succeeds; otherwise, returns None

### List XML files from S3 bucket using the prefix

In [11]:
def list_xml_files():
    files = []
    response = s3_client.list_objects_v2(Bucket=input_bucket, Prefix=input_prefix)

    while 'Contents' in response:
        for obj in response['Contents']:
            file_key = obj['Key']
            if file_key.endswith('.xml'):
                files.append(file_key)

        if response.get('IsTruncated'):
            response = s3_client.list_objects_v2(
                Bucket=input_bucket, Prefix=input_prefix,
                ContinuationToken=response['NextContinuationToken']
            )
        else:
            break

    return files

### Main function to process all XML files and store as JSON in S3

In [None]:
def main():
    xml_files = list_xml_files()
    
    for file_key in xml_files:
        json_data = process_xml_file(file_key)
        
        if json_data:  
            output_file_key = f"{output_prefix}{file_key.split('/')[-1].replace('.xml', '.json')}"
            
            # Upload the JSON data to S3
            s3_client.put_object(Bucket=output_bucket, Key=output_file_key, Body=json_data)
            
            logger.info(f"Processed and uploaded: {output_file_key}")
        else:
            logger.warning(f"Skipping upload for {file_key} due to processing failure.")

if __name__ == '__main__':
    main()

#### Code explain

* Calls list_xml_files() to get a list of XML file paths.
* Iterates through each XML file:
* Calls process_xml_file(file_key) to convert it into JSON.
* If successful, generates an output filename with a .json extension.
* Uploads the JSON file to the output S3 bucket using s3_client.put_object().
* Logs success or failure messages.
* **5. if __name__ == '__main__': main()
Ensures the script runs only when executed directly.
Calls main() to start processing all XML files.**