In [16]:
print("SteelEye Data Engineer Assignment")

SteelEye Data Engineer Assignment


In [17]:
My_Details = {
  "Name"                : "Jaya Vardhan Swarna",
  "Registration Number" : 12002055,
  "University"          : "Lovely Professional University"
}
print(My_Details)

{'Name': 'Jaya Vardhan Swarna', 'Registration Number': 12002055, 'University': 'Lovely Professional University'}


In [18]:
#import all the required libraries
import csv
import requests
import xml.etree.ElementTree as ET
import zipfile
import pandas as pd
import boto3
from io import StringIO
import logging

In [19]:
class Lambda:
    '''
    The Lambda Object has the following parameters:
    :param url: The url to parse through the xml and extract the first download link whose file_type is DLTINS
    Has three methods:
    :method download_link: Requests the url to get the xml file -> Parses the Xml file to return the download_link.
    :method zip_extraction: Downloads the zip file -> Extracts the xml file in zip file
    :method xml_to_csv: Parse the xml file and Converts it into csv
    '''
    def __init__(self, url = None) -> None:
        self.url = url
        self.logger = logging.getLogger('lambda_function') # create logger object
        self.logger.setLevel(logging.INFO)
        
    def download_link(self):
        '''
        Uses the url of the class to get the required path.
        Creates a file 'registers.xml' in binary mode and write the path data to the xml file
        Parse the xml file and find the required node and return the download link
        '''
        try:
            self.resp = requests.get(self.url)
            with open('registers.xml', 'wb') as f:
                f.write(self.resp.content)
            self.tree = ET.parse('registers.xml')
            self.root = self.tree.getroot()
            self.link = ''
            for item in self.root[1].iter("doc"):
                if item.find("str[@name = 'file_type']").text == 'DLTINS':
                    self.link = item.find("str[@name='download_link']").text
                    break
            if not self.link:
                raise Exception('Could not find download link for file_type DLTINS')
            return self.link
        except Exception as e:
            self.logger.error(f"Error in download_link: {e}")
            raise e
    
    
    def zip_extraction(self, link = None):
        '''
        :param link: url link to download the zip file
        Uses the link to request the link
        Create a file 'zip_file.zip' and write the content into the file
        Extract the zip file and save the name of the file from the namelist and return it
        '''
        try:
            self.zip_file = requests.get(self.link)
            with open('zip_file.zip', 'wb') as f:
                f.write(self.zip_file.content)
            self.xml_file = ''
            with zipfile.ZipFile('zip_file.zip', 'r') as f:
                self.xml_file = f.namelist()[0]
                f.extractall('')
            if not self.xml_file:
                raise Exception('Could not extract xml file from zip')
            return self.xml_file
        except Exception as e:
            self.logger.error(f"Error in zip_extraction: {e}")
            raise e
    
            
    def xml_to_csv(self, xml = None):
        self.new = ET.parse(xml)     #parse xml
        self.test = self.new.getroot()

        self.pattern = 'FinInstrmGnlAttrbts'                                            #required node
        self.children = ['Id', 'FullNm', 'ClssfctnTp', 'CmmdtyDerivInd', 'NtnlCcy']     #required children nodes
        
        self.tag = 'Issr' #required node

        self.rows  = []
        self.cols = [self.pattern + '.' + k for k in self.children]
        self.cols.append(self.tag)
        
        self.parent = 'TermntdRcrd'                                    #parent node
        
        for i in self.test.iter():         
            if self.parent in i.tag:                                   # If parent is found
                self.entry = [None for x in range(len(self.cols))]     # Initialise array of required elements
                for child in i:
                    if self.pattern in child.tag:                      # If required child has been found
                            for c in child:                            # Get the required grand-children
                                for k in range(len(self.children)):
                                        if self.children[k] in c.tag:  # If grandchildren found, update entry
                                            self.entry[k] = c.text
                    if self.tag in child.tag:                          # If Issr found
                        self.entry[5] = child.text
                self.rows.append(self.entry)                           # Add to list of rows
                     
        self.df = pd.DataFrame(self.rows, columns=self.cols)      
        return self.df


In [20]:
if __name__ == '__main__':
    
    url = "https://registers.esma.europa.eu/solr/esma_registers_firds_files/select?q=*&fq=publication_date:%5B2021-01-17T00:00:00Z+TO+2021-01-19T23:59:59Z%5D&wt=xml&indent=true&start=0&rows=100" #Requirement-1: save the download link to url and download the xml file
    p = Lambda(url) #create an object for class lambda 
    
    #Requirement 2: From the xml, please parse through to the first download link whose file_type is DLTINS and download the zip
    zip_link = p.download_link()
    
    #Requirement 3: Extract the xml from the zip.
    xml_file = p.zip_extraction(zip_link)
    
    #Requirement 4: Convert the contents of the xml into a CSV
    df = p.xml_to_csv(xml_file)
    df.to_csv('output.csv')

In [21]:
df.head()

Unnamed: 0,FinInstrmGnlAttrbts.Id,FinInstrmGnlAttrbts.FullNm,FinInstrmGnlAttrbts.ClssfctnTp,FinInstrmGnlAttrbts.CmmdtyDerivInd,FinInstrmGnlAttrbts.NtnlCcy,Issr
0,DE000A1R07V3,Kreditanst.f.Wiederaufbau Anl.v.2014 (2021),DBFTFB,False,EUR,549300GDPG70E3MBBU98
1,DE000A1R07V3,KFW 1 5/8 01/15/21,DBFTFB,False,EUR,549300GDPG70E3MBBU98
2,DE000A1R07V3,Kreditanst.f.Wiederaufbau Anl.v.2014 (2021),DBFTFB,False,EUR,549300GDPG70E3MBBU98
3,DE000A1R07V3,Kreditanst.f.Wiederaufbau Anl.v.2014 (2021),DBFTFB,False,EUR,549300GDPG70E3MBBU98
4,DE000A1X3J56,IKB Deutsche Industriebank AG Stufenz.MTN-IHS ...,DTVUFB,False,EUR,PWEFG14QWWESISQ84C69


In [22]:
#Store the csv from step 4) in an AWS S3 bucket
s3 = boto3.client("s3", aws_access_key_id = "AKIAX4EIEWN2MKO3L2K6", aws_secret_access_key="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")
csv_buf = StringIO()
df.to_csv(csv_buf, header = True, index = False)
csv_buf.seek(0)
s3.put_object(Bucket="steeleyedata", Body=csv_buf.getvalue(), Key='output.csv')

{'ResponseMetadata': {'RequestId': 'Y1DZDBGM8MNDF8CD',
  'HostId': 'AYuozKgqLMlwQDRQ1Fda1Ndz4i1XGKKXKaYVh/HIuHol7HgTgtDCBNbnvKialjr08Wj4vZc/mkjFI/vJQStvBA==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'AYuozKgqLMlwQDRQ1Fda1Ndz4i1XGKKXKaYVh/HIuHol7HgTgtDCBNbnvKialjr08Wj4vZc/mkjFI/vJQStvBA==',
   'x-amz-request-id': 'Y1DZDBGM8MNDF8CD',
   'date': 'Sun, 23 Apr 2023 09:35:22 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"0b4c343ed3a2c4e6cafe39d57459c5f9"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"0b4c343ed3a2c4e6cafe39d57459c5f9"',
 'ServerSideEncryption': 'AES256'}

In [None]:
import csv
import requests
import xml.etree.ElementTree as ET
import zipfile
import pandas as pd
import boto3
from io import StringIO
import logging

class Lambda:
    def __init__(self, url = None) -> None:
        self.url = url
        self.logger = logging.getLogger('lambda_function')
        self.logger.setLevel(logging.INFO)
        
    def download_link(self):
        try:
            self.resp = requests.get(self.url)
            with open('registers.xml', 'wb') as f:
                f.write(self.resp.content)
            self.tree = ET.parse('registers.xml')
            self.root = self.tree.getroot()
            self.link = ''
            for item in self.root[1].iter("doc"):
                if item.find("str[@name = 'file_type']").text == 'DLTINS':
                    self.link = item.find("str[@name='download_link']").text
                    break
            if not self.link:
                raise Exception('Could not find download link for file_type DLTINS')
            return self.link
        except Exception as e:
            self.logger.error(f"Error in download_link: {e}")
            raise e
    
    
    def zip_extraction(self, link = None):
        try:
            self.zip_file = requests.get(self.link)
            with open('zip_file.zip', 'wb') as f:
                f.write(self.zip_file.content)
            self.xml_file = ''
            with zipfile.ZipFile('zip_file.zip', 'r') as f:
                self.xml_file = f.namelist()[0]
                f.extractall('')
            if not self.xml_file:
                raise Exception('Could not extract xml file from zip')
            return self.xml_file
        except Exception as e:
            self.logger.error(f"Error in zip_extraction: {e}")
            raise e
    
            
    def xml_to_csv(self, xml = None):
        self.new = ET.parse(xml)   
        self.test = self.new.getroot()
        self.pattern = 'FinInstrmGnlAttrbts'   
        self.children = ['Id', 'FullNm', 'ClssfctnTp', 'CmmdtyDerivInd', 'NtnlCcy'] 
        self.tag = 'Issr'
        self.rows  = []
        self.cols = [self.pattern + '.' + k for k in self.children]
        self.cols.append(self.tag)
        self.parent = 'TermntdRcrd'   
        
        for i in self.test.iter():         
            if self.parent in i.tag:     
                self.entry = [None for x in range(len(self.cols))]    
                for child in i:
                    if self.pattern in child.tag:  
                            for c in child:     
                                for k in range(len(self.children)):
                                        if self.children[k] in c.tag:   
                                            self.entry[k] = c.text
                    if self.tag in child.tag:    
                        self.entry[5] = child.text
                self.rows.append(self.entry)   
                
        self.df = pd.DataFrame(self.rows, columns=self.cols)      
        return self.df

if __name__ == '__main__':
    url = "https://registers.esma.europa.eu/solr/esma_registers_firds_files/select?q=*&fq=publication_date:%5B2021-01-17T00:00:00Z+TO+2021-01-19T23:59:59Z%5D&wt=xml&indent=true&start=0&rows=100" #Requirement-1: save the download link to url and download the xml file
    p = Lambda(url)
    zip_link = p.download_link()
    xml_file = p.zip_extraction(zip_link)
    df = p.xml_to_csv(xml_file)
    df.to_csv('output.csv')
    
    #Store the csv from step 4) in an AWS S3 bucket
    s3 = boto3.client("s3", aws_access_key_id = "AKIA57OPFLJQGYQWY5FX", aws_secret_access_key="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")
    csv_buf = StringIO()
    df.to_csv(csv_buf, header = True, index = False)
    csv_buf.seek(0)
    s3.put_object(Bucket="steeleye-aaa", Body=csv_buf.getvalue(), Key='output.csv')