In [1]:
import xml.etree.ElementTree as ET
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def get_download_link(xml_file):
    """
    Parses the given XML file and returns the first download link found in it.

    :param xml_file: Path to the XML file
    :return: The first download link found in the XML file
    """
    try:
        # Parse the XML file
        tree = ET.parse(xml_file)

        # Get the root element
        root = tree.getroot()

        # Find the first download link
        download_link = root.find(".//doc/str[@name='download_link']").text

        return download_link

    except Exception as e:
        logging.error(f"Error while parsing the XML file: {e}")
        return None



download_link = get_download_link('response.xml')
if download_link:
    logging.info(f"Download link: {download_link}")
else:
    logging.error("Could not get download link.")


2023-04-22 23:34:46,077 - INFO - Download link: 
http://firds.esma.europa.eu/firds/DLTINS_20210117_01of01.zip
            


In [2]:
import urllib.request
import logging


def download_file(url: str, filename: str):
    """
    Download a file from the given URL and save it to the specified filename.

    Args:
        url (str): The URL of the file to download.
        filename (str): The name to save the downloaded file as.
    """
    try:
        # Download the file.
        urllib.request.urlretrieve(url, filename)
    except Exception as e:
        # Log any errors that occur.
        logging.error(f"Error downloading file from {url}: {e}")
    else:
        # Log a success message if the file downloaded successfully.
        logging.info(f"Downloaded {filename} from {url} successfully")



download_file(download_link, 'DLTINS_20210118_01of01.zip')


2023-04-22 23:35:41,255 - INFO - Downloaded DLTINS_20210118_01of01.zip from 
http://firds.esma.europa.eu/firds/DLTINS_20210117_01of01.zip
             successfully


In [3]:
import os
import logging

logging.getLogger().handlers = []

# Set up the logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def get_file_size(filename):
    """Get the size of a file in MB.

    Args:
        filename (str): The name of the file.

    Returns:
        float: The size of the file in MB.
    """
    file_size = os.path.getsize(filename)
    file_size_mb = file_size / (1024 * 1024)  # convert bytes to megabytes
    logging.info(f"The file '{filename}' size is {file_size_mb:.2f} MB")
    return file_size_mb

# Call the function with a filename
filename = 'DLTINS_20210118_01of01.zip'
file_size_mb = get_file_size(filename)
logging.info(f"File size in MB: {file_size_mb}")


2023-04-22 23:35:41,354 - INFO - The file 'DLTINS_20210118_01of01.zip' size is 4.23 MB
2023-04-22 23:35:41,356 - INFO - File size in MB: 4.229082107543945


In [4]:
import logging
import zipfile

logging.getLogger().handlers = []

# Create a logger with a specific name.
logger = logging.getLogger(__name__)

def extract_zip_file(zip_file_path: str):
    """
    Extract all files in a zip file to the current directory.

    Args:
        zip_file_path: A string representing the path of the zip file to be extracted.

    Returns:
        None
    """
    # Open the zip file.
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        # Extract all the files in the zip file to the current directory.
        zip_ref.extractall()

    # Log a message indicating that the zip file has been extracted successfully.
    logger.info(f"All files in '{zip_file_path}' have been extracted successfully.")

# Call the extract_zip_file function with the path of the zip file as argument.
extract_zip_file('DLTINS_20210118_01of01.zip')



In [5]:
# Call the function with a filename
filename = 'DLTINS_20210117_01of01.xml'
file_size_mb = get_file_size(filename)
logging.info(f"File size in MB: {file_size_mb}")

INFO:root:The file 'DLTINS_20210117_01of01.xml' size is 136.64 MB
INFO:root:File size in MB: 136.640606880188


In [6]:
import xml.etree.ElementTree as ET
import logging

logging.basicConfig(filename='xml_namespace.log', level=logging.DEBUG, 
                    format='%(asctime)s %(levelname)s %(message)s')

def remove_xml_namespace(file_path):
    """
    Removes all namespaces from an XML file.

    :param file_path: The path of the XML file to modify.
    :type file_path: str
    """
    try:
        # Parse the XML file.
        tree = ET.parse(file_path)
        root = tree.getroot()

        # Remove namespaces from all elements and their children recursively.
        for elem in root.iter():
            # Remove the namespace prefix from the tag.
            try:
                elem.tag = elem.tag.split('}')[-1] if '}' in elem.tag else elem.tag
            except AttributeError:
                # Log an error message if an element doesn't have a tag.
                logging.error(f"The element {elem} does not have a tag.")
                continue

            # Remove any attributes that are in a namespace.
            try:
                elem.attrib = {k.split('}')[-1] if '}' in k else k:v for k, v in elem.attrib.items()}
            except KeyError as e:
                # Log an error message if an element doesn't have an attribute with a namespace.
                logging.error(f"The element {elem} does not have the attribute {e}.")
                continue

        # Write the modified XML to the same file.
        tree.write(file_path, encoding='unicode')

    except FileNotFoundError as e:
        # Log an error message if the file is not found.
        logging.error(f"The file {file_path} was not found: {e}.")
    except IOError as e:
        # Log an error message if there is an error while writing the file.
        logging.error(f"The file {file_path} could not be modified: {e}.")

if __name__ == '__main__':
    file_path = 'DLTINS_20210117_01of01.xml'
    remove_xml_namespace(file_path)


In [7]:
import xml.etree.ElementTree as ET
import csv

# This code extracts data from an XML file and writes it to a CSV file.

# Parse the XML file.
try:
    tree = ET.parse('DLTINS_20210117_01of01.xml')
    root = tree.getroot()
except FileNotFoundError:
    print('The file DLTINS_20210117_01of01.xml_without_namespaces.xml was not found.')
    exit()

# Specify the headers for the CSV file.
headers = ['FinInstrmGnlAttrbts.Id', 'FinInstrmGnlAttrbts.FullNm',
           'FinInstrmGnlAttrbts.ClssfctnTp', 'FinInstrmGnlAttrbts.CmmdtyDerivInd',
           'FinInstrmGnlAttrbts.NtnlCcy', 'Issr']

# Open the CSV file for writing.
with open('final-output.csv', 'w', newline='') as csv_file:
    writer = csv.DictWriter(csv_file, fieldnames=headers)

    # Write the headers to the CSV file.
    writer.writeheader()

    # Iterate over the XML elements and extract the data to write to the CSV file.
    for FinInstrmRptgRefDataDltaRpt in root.findall(".//FinInstrm"):
        # Find the TermntdRcrd element.
        termntdrcrd = FinInstrmRptgRefDataDltaRpt.find("TermntdRcrd")

        # If the TermntdRcrd element is not None, iterate over the FinInstrmGnlAttrbts elements and extract the data to write to the CSV file.
        if termntdrcrd is not None:
            for FinInstrmGnlAttrbts in FinInstrmRptgRefDataDltaRpt.iter('FinInstrmGnlAttrbts'):
                # Create a dictionary of the data to write to the CSV file.
                data = {
                    'FinInstrmGnlAttrbts.Id': FinInstrmGnlAttrbts.find('Id').text if FinInstrmGnlAttrbts.find('Id') is not None else '',
                    'FinInstrmGnlAttrbts.FullNm': FinInstrmGnlAttrbts.find('FullNm').text if FinInstrmGnlAttrbts.find('FullNm') is not None else '',
                    'FinInstrmGnlAttrbts.ClssfctnTp': FinInstrmGnlAttrbts.find('ClssfctnTp').text if FinInstrmGnlAttrbts.find('ClssfctnTp') is not None else '',
                    'FinInstrmGnlAttrbts.CmmdtyDerivInd': FinInstrmGnlAttrbts.find('CmmdtyDerivInd').text if FinInstrmGnlAttrbts.find('CmmdtyDerivInd') is not None else '',
                    'FinInstrmGnlAttrbts.NtnlCcy': FinInstrmGnlAttrbts.find('NtnlCcy').text if FinInstrmGnlAttrbts.find('NtnlCcy') is not None else '',
                    'Issr': termntdrcrd.find('Issr').text if termntdrcrd.find('Issr') is not None else ''
                }

                # Write the data to the CSV file.
                try:
                    writer.writerow(data)
                except csv.Error:
                    print('The CSV file could not be written to.')
                    exit()



In [8]:
import boto3
import logging

def s3_operations(access_key: str, secret_key: str, region: str, bucket_name: str, filename: str):
    """
    Perform S3 operations including creating a bucket, uploading a file, and downloading a file.

    :param access_key: AWS access key ID
    :param secret_key: AWS secret access key
    :param region: AWS region
    :param bucket_name: name of the S3 bucket to create
    :param filename: name of the file to upload and download
    """
    # Set up logging
    logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)
    logger = logging.getLogger(__name__)

    # Create an S3 client
    s3 = boto3.client('s3', region_name=region, aws_access_key_id=access_key, aws_secret_access_key=secret_key)


    # Create a new S3 bucket
    s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint': region})
    logger.info("Bucket %s created in region %s", bucket_name, region)

    # Upload a file to the bucket
    s3.upload_file(filename, bucket_name, filename)
    logger.info("File %s uploaded to bucket %s", filename, bucket_name)



In [9]:
# Set the AWS access key, secret key, region, bucket name, and file name.
access_key = "ACCESS-KEY"
secret_key = "SECRET-ACCESS-KEY"
region = "ap-south-1"
file_name = "final-output.csv"
bucket_name = "csv-centre-steeleye-aryaroop"

# Call the function to perform S3 operations.
s3_operations(access_key, secret_key, region, bucket_name, file_name)


INFO:__main__:Bucket csv-centre-steeleye-aryaroop created in region ap-south-1
INFO:__main__:File final-output.csv uploaded to bucket csv-centre-steeleye-aryaroop
