In [7]:
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException

import pandas as pd

import json

from dotenv import load_dotenv
import dotenv

import os

import functools
import logging

logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p")

from datetime import datetime

from snowflake_handler import SnowflakeHandler

import re



load_dotenv()


credentials = {
    "username": dotenv.get_key(".ENV", "USERNAME"),
    "password": dotenv.get_key(".ENV", "PASSWORD"),
}

login_url = dotenv.get_key(".ENV", "LOGIN_URL")



with open("elements.json", "r") as json_file:

    elements_dict = json.load(json_file)



required_vars = ["USERNAME", "PASSWORD", "LOGIN_URL"]

missing_vars = [var for var in required_vars if os.getenv(var) is None]


if missing_vars:

    raise EnvironmentError(
        f"Missing required environment variables: {', '.join(missing_vars)}"
    )



if not elements_dict:

    raise ValueError("Elements dictionary not loaded from elements.json.")



def error_handler(func):
    """Decorator to catch and log exceptions in the decorated function."""

    @functools.wraps(func)
    def wrapper(*args, **kwargs):

        try:

            logging.info(f"Processing: {func.__name__}")

            return func(*args, **kwargs)

        except Exception as e:

            logging.error(f"Error in {func.__name__}: {str(e)}", exc_info=True)

    return wrapper


class AutoDecorate:

    def __init_subclass__(cls, **kwargs):

        super().__init_subclass__(**kwargs)

        # Loop through all attributes of the class

        for attr_name, attr_value in cls.__dict__.items():

            if attr_name.startswith("__"):

                continue  # Skip magic methods

            if isinstance(attr_value, staticmethod):

                # Handle static methods

                original_func = attr_value.__func__

                decorated_func = error_handler(original_func)

                setattr(cls, attr_name, staticmethod(decorated_func))

            elif isinstance(attr_value, classmethod):

                # Handle class methods

                original_func = attr_value.__func__

                decorated_func = error_handler(original_func)

                setattr(cls, attr_name, classmethod(decorated_func))

            elif callable(attr_value):

                # Handle regular methods

                setattr(cls, attr_name, error_handler(attr_value))


class Router(AutoDecorate):

    def __init__(self, credentials, login_url, elements_dict) -> None:

        self.credentials = credentials

        self.login_url = login_url

        self.elements_dict = elements_dict

        logging.info("Router Initialized!")

    def create_driver(self):

        options = webdriver.ChromeOptions()

        options.add_argument("ignore-certificate-errors")

        # options.add_argument("--disable-gpu")  # Disables GPU, often unnecessary in headless mode

        # options.add_argument("--headless")  # This line hides the browser window

        self.driver = webdriver.Chrome(options=options)

        self.driver.minimize_window()

        self.driver.implicitly_wait(2)

        logging.info("Chrome Driver Created Successfully!")

    def _go_to_url(self, url):

        self.driver.get(url)

    def login(self):

        self._go_to_url(self.login_url)

        username_input = WebDriverWait(self.driver, 60).until(
            EC.visibility_of_element_located(
                (By.ID, self.elements_dict["username"]["id"])
            )
        )

        username_input.clear()

        username_input.send_keys(self.credentials["username"])

        password_input = WebDriverWait(self.driver, 60).until(
            EC.visibility_of_element_located(
                (By.ID, self.elements_dict["password"]["id"])
            )
        )

        password_input.clear()

        password_input.send_keys(self.credentials["password"])

        password_input.send_keys(Keys.RETURN)

        logging.info("Logged in Successfully!")

    def go_to_dsl_information(self):

        system_info_element = WebDriverWait(self.driver, 60).until(
            EC.element_to_be_clickable(
                (By.ID, self.elements_dict["system_info_btn"]["id"])
            )
        )

        system_info_element.click()

        dsl_info_element = WebDriverWait(self.driver, 60).until(
            EC.element_to_be_clickable(
                (By.ID, self.elements_dict["dsl_info_btn"]["id"])
            )
        )

        dsl_info_element.click()

    def check_iframes(self):

        WebDriverWait(self.driver, 60).until(
            EC.presence_of_element_located((By.TAG_NAME, "body"))
        )

        iframes = self.driver.find_elements(By.TAG_NAME, "iframe")

        logging.info(f"Found {len(iframes)} iframe(s) on the page")

        return iframes

    def get_dsl_data(self):


        iframes = self.check_iframes()

        if len(iframes) > 0:

            self.driver.switch_to.frame(iframes[0])

            logging.info("Switched to iframe.")

        else:

            logging.warning("No iframes found.")


        time.sleep(2)

        try:

            table_element = WebDriverWait(self.driver, 60).until(
                EC.visibility_of_element_located(
                    (By.ID, self.elements_dict["table_info"]["id"])
                )
            )

            table_html = table_element.get_attribute("outerHTML")

            table = pd.read_html(table_html)

            self.close_session()

            df = table[0]

            assert not df.empty

            return df

        except TimeoutException:

            logging.info("Table not found after switching to iframe.")

            self.driver.switch_to.default_content()

            self.close_session()

            raise

    def close_session(self):

        self.driver.quit()


class ETL:

    def __init__(self,credentials,login_url,elements_dict) -> None:

        self.router = Router(credentials, login_url, elements_dict)

        self.snowflake_handler = SnowflakeHandler()

        self.csv_backup_path = "backup_file.csv"



    def _extract(self):

        self.router.create_driver()

        self.router.login()

        self.router.go_to_dsl_information()

        self.extracted_df = self.router.get_dsl_data()



    @staticmethod

    def clean_dsl_up_time(duration_str):


        pattern = r'(?:(\d+) day[s]?)?\s*(?:(\d+) hour[s]?)?\s*(?:(\d+) minute[s]?)?\s*(?:(\d+) second[s]?)?'


        match = re.match(pattern, duration_str)

        if not match:

            return None  



        days, hours, minutes, seconds = match.groups(default='0')




        total_hours = (int(days) * 24) + int(hours) + (int(minutes) / 60) + (int(seconds) / 3600)

        return round(total_hours,3)



    def _transform(self):

        self.extracted_df.columns = ['VALUE_NAME','VALUE']


        self.extracted_df = self.extracted_df.dropna(how='all')


        self.extracted_df = self.extracted_df.set_index('VALUE_NAME').transpose()

        self.extracted_df.columns = [col.strip(":") for col in self.extracted_df.columns] 


        self.extracted_df =  self.extracted_df.rename(columns={

            'DSL synchronization status': 'DSL_SYNCHRONIZATION_STATUS',

            'Connection status': 'CONNECTION_STATUS',

            'Upstream line rate (kbit/s)': 'UPSTREAM_LINE_RATE_KBIT_S',

            'Downstream line rate (kbit/s)': 'DOWNSTREAM_LINE_RATE_KBIT_S',

            'Maximum upstream rate (kbit/s)': 'MAXIMUM_UPSTREAM_RATE_KBIT_S',

            'Maximum downstream rate (kbit/s)': 'MAXIMUM_DOWNSTREAM_RATE_KBIT_S',

            'Upstream noise safety coefficient (dB)': 'UPSTREAM_NOISE_SAFETY_COEFFICIENT_DB',

            'Downstream noise safety coefficient (dB)': 'DOWNSTREAM_NOISE_SAFETY_COEFFICIENT_DB',

            'Line standard': 'LINE_STANDARD',

            'Upstream line attenuation (dB)': 'UPSTREAM_LINE_ATTENUATION_DB',

            'Downstream line attenuation (dB)': 'DOWNSTREAM_LINE_ATTENUATION_DB',

            'Upstream output power (dBm)': 'UPSTREAM_OUTPUT_POWER_DBM',

            'Downstream output power (dBm)': 'DOWNSTREAM_OUTPUT_POWER_DBM',

            'DSL up time': 'DSL_UP_TIME_HOURS'

        })

        self.extracted_df['DSL_UP_TIME_HOURS'] = self.extracted_df['DSL_UP_TIME_HOURS'].apply(self.clean_dsl_up_time)

        self.extracted_df["CREATED_DATE"] = datetime.now().strftime(

            "%Y-%m-%d %H:%M:%S.%f"

        )[:-3]



    def _load(self):

        try:


            self.snowflake_handler.connect()

            self.snowflake_handler.df_to_table(self.extracted_df, "RAW_DATA")

            self.snowflake_handler.close_connection()




            if os.path.exists(self.csv_backup_path):

                print("Loading any data from backup CSV...")

                backup_df = pd.read_csv(self.csv_backup_path)

                self.snowflake_handler.connect()

                self.snowflake_handler.df_to_table(backup_df, "RAW_DATA")

                os.remove(

                    self.csv_backup_path

                ) 



        except Exception as e:

            print(f"Connection to Snowflake failed: {e}")


            self._backup_to_csv()



    def _backup_to_csv(self):

        """Appends the current DataFrame to the CSV file as a backup."""

        if not os.path.exists(self.csv_backup_path):

            self.extracted_df.to_csv(

                self.csv_backup_path, mode="w", header=True, index=False

            )

        else:

            self.extracted_df.to_csv(

                self.csv_backup_path, mode="a", header=False, index=False

            )

        print(f"Data has been backed up to {self.csv_backup_path}.")



    def run(self):

        self._extract()

        self._transform()

        self._load() 
etl_pipeline = ETL(credentials,login_url,elements_dict)
etl_pipeline.run() 

  table = pd.read_html(table_html)
11/23/2024 01:40:16 PM Failed to connect to Snowflake: 250001 (08001): Failed to connect to DB: jefzwrq-fb47027.snowflakecomputing.com:443. Your free trial has ended and all of your virtual warehouses have been suspended. Add billing information in the Snowflake web UI to continue using the full set of Snowflake features.


Connection to Snowflake failed: 250001 (08001): Failed to connect to DB: jefzwrq-fb47027.snowflakecomputing.com:443. Your free trial has ended and all of your virtual warehouses have been suspended. Add billing information in the Snowflake web UI to continue using the full set of Snowflake features.
Data has been backed up to backup_file.csv.
