In [4]:
from datetime import datetime
from selenium import webdriver
from selenium.common.exceptions import *
from selenium.webdriver.common.by import By

import json
import re
import sys

# Tracking Scraper Exception

In [5]:
class TrackingScraperError(Exception):
    """
    Custom exception for the Tracking Scraper class.
    """
    
    def __init__(self, message, container = None, carrier = None, exception = None):
        # Call base class constructor
        super().__init__(message)
        
        # Declare custom attributes
        self.__message = message
        self.__container = container
        self.__carrier = carrier
        self.__datetime = datetime.today()
        self.__exception = exception
    
    def __str__(self):
        # Generate base string with general info
        message = "{0}, carrier={1}, container={2}: {3}."
        message = message.format(str(self.__datetime), self.__carrier, self.__container,
                                 self.__message)
        
        # Generate custom message
        if self.__exception is not None:
            message += " " + str(self.__exception) #.replace("\n", "")
        
        return message

# Tracking Scraper selector switcher class

In [7]:
class TrackingSelectorSwitcher:
    """
    Tracking Scraper switcher for selecting and saving DOM elements and subelements in a
    tracking-related document.
    """
    
    def __init__(self, container, carrier, driver, document, attribute_group, parent_command,
                 parent_element = None):
        self.__container = container
        self.__carrier = carrier
        self.__driver = driver
        self.__document = document
        self.__attribute_group = attribute_group
        self.__parent_command = parent_command
        self.__parent_element = driver if (parent_element is None) else parent_element
    
    @property
    def document(self):
        """
        Returns the stored tracking-related dictionary.
        """
        return self.__document
    
    ###############################################################################################
    
    def process(self):
        """
        Get DOM element(s) based on the configuration selector element declared in initialization,
        then process or return them accordingly. Returns True if all commands were executed
        successfully, False if one command failed, or DOM element(s) if no commands were found.
        """
        
        # Get selector type
        selector_type = self.__parent_command.get("type")
        if selector_type is None:
            self._close_driver("Selector type not found")
        
        # Execute process based on selector type
        try:
            method = getattr(self, "_process_" + selector_type)
            return method()
        except AttributeError:
            self._close_driver("Process: Selector type " + selector_type + " is not valid.")
        except TypeError:
            self._close_driver("Process: Selector type " + selector_type + " cannot be invoked.")
    
    ###############################################################################################
    
    def _process_id(self):
        """
        Get DOM element by ID, then process it or return it depending on the parent command.
        """
        
        # Get selector value
        selector_value = self.__selector_command.get("value")
        if selector_value is None:
            self._close_driver("Process by ID: Selector value not found.")
        
        # Get DOM element by ID
        try:
            dom_element = self.__parent_element.find_element_by_id(selector_value)
        except NoSuchElementException:
            # Return True if element was marked as not required, False otherwise
            return not self.__selector_command.get("required", True)
            
        # Get child command, if it's not declared, return DOM element
        child_command = self.__parent_command.get("command")
        if child_command is None:
            return dom_element
        
        # Create child selector switcher and return its process result
        child_selector = TrackingSelectorSwitcher(self.__container, self.__carrier, self.__driver,
                                                  self.__document, self.__attribute_group,
                                                  child_command, dom_element)
        return child_selector.process()
    
    ###############################################################################################
    
    def _process_class(self):
        """
        Get DOM elements by class name, then process them or return them depending on the parent
        command.
        """
        return self._process_dom_elements(By.CLASS_NAME)
    
    def _process_css(self):
        """
        Get DOM elements by CSS selector, then process them or return them depending on the parent
        command.
        """
        return self._process_dom_elements(By.CSS_SELECTOR)
    
    def _process_tag(self):
        """
        Get DOM elements by tag name, then process them or return them depending on the parent
        command.
        """
        return self._process_dom_elements(By.TAG_NAME)
    
    def _process_name(self):
        """
        Get DOM elements by "name" attribute, then process them or return them depending on the
        parent command.
        """
        return self._process_dom_elements(By.NAME)
    
    def _process_xpath(self):
        """
        Get DOM elements by XPath, then process them or return them depending on the parent command.
        """
        return self._process_dom_elements(By.XPATH)
    
    def _process_dom_elements(self, selector_type):
        """
        Get elements by a specified selector type, then process them or return them depending
        on the parent command.
        """
        
        # Get selector value
        selector = self.__parent_command.get("value")
        if selector is None:
            self._close_driver("Process " + selector_type + ": Selector value not found.")
        
        # Get DOM elements by specified attribute
        dom_elements = self.__parent_element.find_elements(selector_type, selector)
        if len(dom_elements) == 0:
            # Return True if element was marked as not required, False otherwise
            return not self.__parent_command("required", True)
        
        # Get child commands list if possible
        commands = self.__parent_command.get("commands")
        if commands is not None:
            
            # Iterate through child command elements
            for child_command in commands:
                child_result = self._get_child_element(selector_type, child_command, dom_elements)
                if child_result is not True:
                    # if no subelements were found, return that element or element list
                    # if a minor error occured (e.g. element not found), return False
                    return child_result
        
        # If no command list was found, try to find a child command for all elements
        else:
            child_command = self.__parent_command.get("command")
            if child_command is not None:
                
                # Iterate through all DOM elements
                for child_element in dom_elements:
                    child_result = self._generate_child_element(child_command, child_element)
                    if child_result is not True:
                        # if no subelements were found, return that element or element list
                        # if a minor error occured (e.g. element not found), return False
                        return child_result
            
            # If no single child command was found, return all DOM elements
            else:
                return dom_elements
        
        # If everything was fine, return True
        return True
    
    def _get_child_element(self, selector_type, child_command, elements):
        """
        Get DOM element at an specified index located in the child command's "index" attribute,
        then process that element.
        """
        
        # Get element index to use
        index = child_command.get("index")
        if index is None:
            self._close_driver("Process " + selector_type + ": Child command index not found")
        
        # Get DOM element at index
        try:
            return self._generate_child_element(child_command, elements[index])
        
        except IndexError:
            # Write log message
            self._log_message("Process {0}: Child element at index {1} not found".format(
                                selector_type, index)) 
            # Return True if element was marked as not required, False otherwise
            return not child_command.get("required", True)
    
    def _generate_child_element(self, child_command, elements):
        """
        Generate a process for a specific child command and a child element.
        """
        return TrackingSelectorSwitcher(self.__container, self.__carrier, self.__driver,
                                        self.__document, self.__attribute_group,
                                        child_command, child_element).process()
    
    ###############################################################################################
    
    def _process_split(self):
        """
        Split parent element text by a string, then process them or return them depending on the
        parent command.
        """
        
        # Get parent element text
        parent_text = self.__parent_element
        try:
            parent_text = parent_text.text
        except AttributeError:
            pass # value in "saved_text" is already a string
        
        # Get text separator
        separator = self.__parent_command.get("value")
        if separator is None:
            self._close_driver("Process split: No separator found")
        
        # Split text
        elements = parent_text.split(separator)
        
        # Get subcommands, if none are declared, return split list
        commands = self.__parent_command.get("commands")
        if commands is None:
            return items
        
        # Iterate through child command elements
        for child_command in commands:
            child_result = self._process_child_element("split", child_command, elements)
            if child_result is not True:
                # if no subelements were found, return that element or element list
                # if a minor error occured (e.g. element not found), return False
                return child_result
        
        # If everything was fine, return True
        return True
    
    ###############################################################################################
    
    def _process_regex(self):
        """
        Search regex pattern with the parent element text, then process the groups or return them
        depending on the parent command.
        """
        
        # Get parent element text
        parent_text = self.__parent_element
        try:
            parent_text = parent_text.text
        except AttributeError:
            pass # value in "parent_text" is already a string
        
        # Get regular expression
        regex = self.__parent_command.get("value")
        if regex is None:
            self._close_driver("Process regex: No regular expression found")
        
        # Match expression with text
        search = re.search(regex, parent_text)
        if search is None:
            self._log_message("Process regex: Expression does not match text")
            return not self.__parent_command.get("required", True)
        
        # Get list of matched elements
        elements = list(search.groups())
        
        # Get child commands, if none are found, return elements
        commands = self.__parent_command.get("commands")
        if commands is None:
            return elements
        
        # Iterate through
        for child_command in commands:
            child_result = self._process_child_element("regex", child_command, elements)
            if child_result is not True:
                # if no subelements were found, return that element or element list
                # if a minor error occured (e.g. element not found), return False
                return child_result
        
        # If everything was fine, return True
        return True
    
    ###############################################################################################
    
    def _process_save(self):
        """
        Saves element to the document, according to specified attribute and attribute group.
        """
        
        # Get attribute value to save to
        attribute = self.__parent_command.get("value")
        if attribute is None:
            self._close_driver("Process save: Attribute to save not found")
        
        # Get text to be saved
        saved_text = self.__parent_element
        try:
            # if value in "saved_text" is a DOM element, we need its text
            saved_text = saved_text.text
        except AttributeError:
            pass # value in "saved_text" is already a string
        
        # Verify if text is not empty
        if len(saved_text) == 0:
            # Write log message
            self._log_message("Process save: Text is empty")
            # Return True if element was marked as not required, False otherwise
            return not self.__parent_command.get("required", True)
        
        # Save text into the document
        try:
            self.__document[self.__attribute_group][attribute] = saved_text
            return True
        except KeyError:
            self._close_driver("Process save: Attribute group {0} not found".format(
                                self.__attribute_group))
    
    ###############################################################################################
    
    def _process_click(self):
        """
        Clicks an element only if the element is not hidden.
        """
        
        # Check if it is required to click on the element
        is_required = self.__parent_command.get("required", True)
        
        # Check if element is visible
        if not self.__parent_element.is_displayed():
            return not is_required
        
        # Click the element
        try:
            self.__parent_element.click()
            return True
        except ElementNotInteractableException:
            return not is_required
    
    def _process_assert(self):
        """
        Asserts if an element is present or absent, depending on the desired value.
        """
        
        #
        return True
    
    ###############################################################################################
    
    def _log_message(self, message):
        """
        Logs a message to the standard error file.
        """
        try:
            self._raise_error(message)
        except TrackingScraperError as ex:
            print(ex, file = sys.stderr)
    
    def _close_driver(self, message):
        """
        Closes the Selenium Driver and raises an exception.
        """
        self.__driver.close()
        self._raise_error(message)
    
    def _raise_error(self, message):
        """
        Raises a TrackingScraperError with a custom message detailing the main attributes of the
        selector switcher object.
        """
        
        # Get needed attributes
        command_index = self.__parent_command.get("index")
        command_type  = self.__parent_command.get("type")
        command_value = self.__parent_command.get("value")
        
        # Format message
        log_message = ("Selector(attribute_group={0}, command_index={1}, command_type={2}, "
                       "command_value={3}): {4}").format(self.__attribute_group, command_index,
                                                         command_type, command_value, message)
        
        # Raise exception
        raise TrackingScraperError(log_message, self.__container, self.__carrier)

# Tracking Scraper main class

In [4]:
class TrackingScraper:
    """
    Web scraper for container information extraction.
    """
    
    def __init__(self, container, carrier):
        """
        Initializes Selenium WebDriver, the container tracking object, and configuration for
        the shipping carrier.
        """
        
        # Save main attributes
        self.__container = container
        self.__carrier = carrier
        
        # Open the Firefox WebDriver
        try:
            self.__driver = webdriver.Firefox()
        except WebDriverException as ex:
            raise TrackingScraperError("Error creating Selenium driver", container, carrier, ex)
        
        # Define tracking information
        self.__tracking_document = {
            "carrier": carrier,
            "container": container,
            "output": {},
            "movements": []
        }
        
        # Get tracking configuration information
        try:
            with open("../config/" + carrier + ".json") as file:
                self.__config = json.load(file)
        except FileNotFoundError:
            self._close_driver("Configuration file not found")
        except JSONDecodeError as ex:
            self._close_driver("JSON parsing error", ex)
        except Exception as ex:
            self._close_driver("Unknown error", ex)
    
    ###############################################################################################
    
    def go_to_url(self):
        """
        Go to the URL specified in the configuration file.
        """
        
        # Get URL from configuration file
        try:
            url = self.__config["general"]["url"]
        except KeyError:
            self._close_driver("URL not found in configuration file")
        
        # Parse container if necessary, and go to page
        try:
            self.__driver.get(url.format(container = self.__container))
        except WebDriverException as ex:
            self._close_driver("Error ocurred while going to URL", ex)
    
    ###############################################################################################

    def do_input_commands(self):
        """
        Executes input commands in the page, before executing the required output commands.
        Returns True if everything was executed successfully, False otherwise.
        """
        
        # Get input assertion commands
        input_asserts = self.__config.get("input_assert")
        if input_asserts is None:
            return True
        
        # Iterate through input assertion commands
        for input_assert in input_asserts:
            print(input_assert)
    
    ###############################################################################################
    
    def do_output_general_commands(self):
        """
        Executes output general commands before sending a form or reading container/tracking
        information. Returns True if everything was executed successfully, False otherwise.
        """
        
        # Get output general commands
        output_commands = self.__config.get("output")
        if output_commands is None:
            print("Output commands not found")
            return False
        
        # Iterate though output general commands
        for command in output_commands:
            result = TrackingSelectorSwitcher(self.__container, self.__carrier, self.__driver,
                                              self.__tracking_document, "output", command).process()
            if result is not True:
                # if no subelements were found, return that element or element list
                # if a minor error occured (e.g. element not found), return False
                return result
        
        return True
    
    ###############################################################################################
    
    @property
    def container(self):
        return self.__container
    
    @property
    def tracking_document(self):
        """
        Returns the tracking document for the container
        """
        return self.__tracking_document
    
    ###############################################################################################
    
    def _close_driver(self, message = None, exception = None):
        """
        Closes the Selenium Driver. If arguments are specified, also raises an exception.
        """
        
        try:
            self.__driver.close()
        except InvalidSessionIdException:
            pass # Driver already closed
        
        if message is not None:
            raise TrackingScraperError(message, self.__container, self.__carrier, exception)
    
    def __del__(self):
        """
        Close the driver before garbage collection.
        """
        self._close_driver()

# Tracking Scraper tests

In [5]:
try:
    scraper = TrackingScraper("FSCU5670046", "Hapag-Lloyd")
    scraper.go_to_url()
except TrackingScraperError as ex:
    print(ex, file = sys.stderr)

In [6]:
scraper.do_output_general_commands()

True

In [7]:
scraper.tracking_document

{'carrier': 'Hapag-Lloyd',
 'container': 'FSCU5670046',
 'movements': [],
 'output': {'container_description': 'REEFER CONTAINER',
  'container_height': '9\'6"',
  'container_length': "40'",
  'container_max_payload': '29360',
  'container_tare': '4640',
  'container_type': '45RT',
  'container_width': "8'",
  'last_date': '2019-03-12',
  'last_location': 'ANTWERP',
  'last_status': 'arrived'}}

In [8]:
scraper._close_driver()

In [10]:
with open('../tests/tests-2/FSCU5670046.json', 'w') as file:
    json.dump(scraper.tracking_document, file)