In [28]:
import requests
import zipfile
import xml.etree.ElementTree as ET
import csv
import boto3
import logging
import unittest

In [29]:
import boto3
import csv
import io
import logging
import requests
import zipfile
import xml.etree.ElementTree as ET


def download_xml_file():
    # Set up logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # Define the URL to download the file from
    url = "https://registers.esma.europa.eu/solr/esma_registers_firds_files/select?q=*&fq=publication_date:%5B2021-01-17T00:00:00Z+TO+2021-01-19T23:59:59Z%5D&wt=xml&indent=true&start=0&rows=100"

    # Send the request to the URL
    response = requests.get(url)

    # Parse the response XML
    xml_data = response.content.decode("utf-8")
    root = ET.fromstring(xml_data)

    # Find the download link for the DLTINS file type
    download_link = None
    for doc in root.iter("doc"):
        for str_tag in doc.findall("str"):
            if str_tag.get("name") == "file_type" and str_tag.text == "DLTINS":
                for arr_tag in doc.findall("arr"):
                    if arr_tag.get("name") == "download_link":
                        download_link = arr_tag[0].text
                        break
                break
        if download_link:
            break

    # Download the DLTINS zip file
    response = requests.get(download_link)
    zip_data = io.BytesIO(response.content)

    # Extract the XML file from the zip
    with zipfile.ZipFile(zip_data) as zip_file:
        file_name = [name for name in zip_file.namelist() if name.endswith(".xml")][0]
        xml_data = zip_file.read(file_name)

    # Convert the XML to CSV
    root = ET.fromstring(xml_data)
    ns = {"xmlns": "urn:iso:std:iso:20022:tech:xsd:head.001.001.01"}
    rows = []
    for instr in root.iter("FinInstrmGnlAttrbts"):
        row = [
            instr.find("Id").text,
            instr.find("FullNm").text,
            instr.find("ClssfctnTp").text,
            instr.find("CmmdtyDerivInd").text,
            instr.find("NtnlCcy").text,
            instr.find(f"xmlns:Issr", ns).text,
        ]
        rows.append(row)

    # Write the CSV data to a buffer
    csv_buffer = io.StringIO()
    csv_writer = csv.writer(csv_buffer)
    csv_writer.writerow([
        "FinInstrmGnlAttrbts.Id",
        "FinInstrmGnlAttrbts.FullNm",
        "FinInstrmGnlAttrbts.ClssfctnTp",
        "FinInstrmGnlAttrbts.CmmdtyDerivInd",
        "FinInstrmGnlAttrbts.NtnlCcy",
        "Issr"
    ])
    csv_writer.writerows(rows)

    # Upload the CSV file to S3
    s3 = boto3.resource('s3')
    bucket_name = 'my-bucket-name'
    object_key = 'data.csv'
    s3.Object(bucket_name, object_key).put(Body=csv_buffer.getvalue())
    
    logger.info(f"CSV file has been uploaded to S3 at s3://{bucket_name}/{object_key}")


def lambda_handler(event, context):
    url = "https://registers.esma.europa.eu/solr/esma_registers_firds_files/select?q=*&fq=publication_date:%5B2021-01-17T00:00:00Z+TO+2021-01-19T23:59:59"
   


In [30]:
import logging
import os
import zipfile
import csv
import boto3
import requests
from io import BytesIO
from typing import List, Dict

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(asctime)s: %(message)s')

class DownloadXML:
    """
    Class to download an XML file from a given URL and extract a zip file from it.
    """

    def __init__(self, url: str) -> None:
        """
        Initialize the DownloadXML class with the given URL.

        Args:
            url (str): The URL of the XML file to download.
        """
        self.url = url

    def download_xml(self) -> str:
        """
        Download the XML file from the given URL.

        Returns:
            str: The file name of the downloaded XML file.
        """
        logging.info(f"Downloading XML file from {self.url}...")
        response = requests.get(self.url)
        zip_file = zipfile.ZipFile(BytesIO(response.content))
        xml_filename = None
        for filename in zip_file.namelist():
            if filename.endswith('.xml'):
                xml_filename = filename
                break
        if xml_filename is None:
            logging.error("No XML file found in the downloaded zip file.")
            raise ValueError("No XML file found in the downloaded zip file.")
        zip_file.extract(xml_filename)
        logging.info(f"Downloaded XML file saved as {xml_filename}")
        return xml_filename

    def xml_to_csv(self, xml_filename: str) -> List[Dict[str, str]]:
        """
        Parse the XML file and convert it into a list of dictionaries with the required fields.

        Args:
            xml_filename (str): The file name of the XML file to parse.

        Returns:
            List[Dict[str, str]]: A list of dictionaries containing the required fields.
        """
        logging.info("Converting XML file to CSV...")
        with open(xml_filename, 'r', encoding='utf-8') as xml_file:
            csv_list = []
            for line in xml_file:
                if '<FinInstrmGnlAttrbts' in line:
                    csv_dict = {}
                    csv_dict['FinInstrmGnlAttrbts.Id'] = line.split('Id="')[1].split('"')[0]
                    csv_dict['FinInstrmGnlAttrbts.FullNm'] = line.split('FullNm="')[1].split('"')[0]
                    csv_dict['FinInstrmGnlAttrbts.ClssfctnTp'] = line.split('ClssfctnTp="')[1].split('"')[0]
                    csv_dict['FinInstrmGnlAttrbts.CmmdtyDerivInd'] = line.split('CmmdtyDerivInd="')[1].split('"')[0]
                    csv_dict['FinInstrmGnlAttrbts.NtnlCcy'] = line.split('NtnlCcy="')[1].split('"')[0]
                    csv_dict['Issr'] = line.split('Issr="')[1].split('"')[0]
                    csv_list.append(csv_dict)
        logging.info("XML file converted to CSV.")
        return csv_list

def upload_to_s3(csv_list: List[Dict[str, str]], bucket_name: str, file_name: str) -> None:
    """
    Upload the CSV data to an AWS S3 bucket.

    Args:
        csv_list (List[Dict[str, str]]): The list of dictionaries containing the CSV data.
        bucket_name (str): The name of the AWS
        """


In [31]:
import unittest
from io import StringIO

class TestParseXmlToCsv(unittest.TestCase):
    
    def setUp(self):
        # Set up test data
        xml_data = """<Root>
                            <Item>
                                <FinInstrmGnlAttrbts>
                                    <Id>ABC123</Id>
                                    <FullNm>Test Security</FullNm>
                                    <ClssfctnTp>Bond</ClssfctnTp>
                                    <CmmdtyDerivInd>false</CmmdtyDerivInd>
                                    <NtnlCcy>USD</NtnlCcy>
                                </FinInstrmGnlAttrbts>
                                <Issr>Issuer Inc.</Issr>
                            </Item>
                            <Item>
                                <FinInstrmGnlAttrbts>
                                    <Id>DEF456</Id>
                                    <FullNm>Another Security</FullNm>
                                    <ClssfctnTp>Equity</ClssfctnTp>
                                    <CmmdtyDerivInd>true</CmmdtyDerivInd>
                                    <NtnlCcy>EUR</NtnlCcy>
                                </FinInstrmGnlAttrbts>
                                <Issr>Issuer LLC</Issr>
                            </Item>
                        </Root>"""
        self.xml_file = StringIO(xml_data)
        
    def test_parse_xml_to_csv(self):
        # Call the function with test data
        result = parse_xml_to_csv(self.xml_file)
        
        # Check the result
        expected_output = 'FinInstrmGnlAttrbts.Id,FinInstrmGnlAttrbts.FullNm,FinInstrmGnlAttrbts.ClssfctnTp,FinInstrmGnlAttrbts.CmmdtyDerivInd,FinInstrmGnlAttrbts.NtnlCcy,Issr\nABC123,Test Security,Bond,false,USD,Issuer Inc.\nDEF456,Another Security,Equity,true,EUR,Issuer LLC\n'
        self.assertEqual(result, expected_output)
