# Texas Comptroller of Public Accounts - Scraper

## Importing Libraries

In [1]:
# Importing Libraries
from selenium import webdriver
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException

from bs4 import BeautifulSoup

from io import StringIO

import pandas as pd
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)


import numpy as np

from typing import Optional, List

# Import date class from datetime module
from datetime import date

import time
import os
from pathlib import Path  # Import Path to handle file paths easily

# Importing Logger
from custom_logger import CustomLogger

## Initializing Scraper

In [2]:
# Define log directory and ensure it exists
log_dir = r"C:\Users\Apoorva.Saxena\OneDrive - Sitio Royalties\Desktop\Project - Apoorva\Python\Scraping\Texas-Comptroller-of-Public-Accounts-Scraper\logs"
os.makedirs(log_dir, exist_ok=True)

# Create a CustomLogger instance
logger = CustomLogger(log_file_name="scraper_log",
                      log_dir_path=log_dir, logger_name='scraper_ipynb'
                      ).get_logger()

[scraper_ipynb] INFO (10-13 04:28 PM): ################## Logging Started ################## (Line: 11) [772446353.py]


## Creating Scraper Class

In [3]:
class _LeaseDropNaturalGas_WebScraper:

    def __init__(self, csv_dir: str = "./", scraped_csv: str = "scraped_leases.csv") -> None:
        """
        Initialize the web scraper.

        Args:
            scraped_csv (str): The CSV file name or path to store scraped data. Defaults to "scraped_leases.csv".
        
        Example:
            scraper = _LeaseDropNaturalGas_WebScraper(csv_dir="/path/to/directory")
        """
        self.site_key: str = '6Lf6Z5sUAAAAACg7ECAeRMcnAo2_WfoKUeNYXkj_'
        self.login_url: str = 'https://mycpa.cpa.state.tx.us/cong/loginForward.do?phase=check'
        self.ngl_drop_url: str = 'https://mycpa.cpa.state.tx.us/cong/leaseDropNGAction.do'
        self.xpath_leaseNo: str = '//*[@id="leaseNum"]'
        self.xpath_begDt: str = '//*[@id="begFilPrd"]'
        self.xpath_endDt: str = '//*[@id="endFilPrd"]'
        self.xpath_submitForm: str = '//*[@id="leaseDropNGForm"]/span[7]/p/input'
        self.xpath_lease_table: str = '//*[@id="menucontenttable"]/table/tbody/tr/td[2]/div/table'
        self.xpath_error: str = '//*[@id="leaseDropNGForm"]/span[2]/ul/li/strong'
        self.driver: WebDriver  = None
        self._initialize_driver()

        # Handle the file path: set the absolute path for the CSV file
        self.scraped_csv: str = os.path.join(os.path.abspath(csv_dir), scraped_csv)


    def _initialize_driver(self) -> None:
        """
        Initializes the Chrome WebDriver.
        """
        options = webdriver.ChromeOptions()
        # options.add_argument('--headless')  # Optional: run in headless mode
        options.add_argument('--disable-gpu')  # Optional: disable GPU
        options.add_argument('--no-sandbox')  # Optional: required for some environments

        self.driver = webdriver.Chrome(options=options)


    def _load_page(self) -> None:
        """
        Load the login and Natural Gas Inquiry drop page.
        """
        if self.driver is None:
            raise RuntimeError("WebDriver is not initialized.")
        
        self.driver.maximize_window()
        self.driver.get(self.login_url)
        time.sleep(0.5)
        self.driver.get(self.ngl_drop_url)
        wait = WebDriverWait(self.driver, 3)
        wait.until(lambda d: d.execute_script("return typeof grecaptcha !== 'undefined'"))


    def _get_recaptcha_token(self) -> str:
        """
        Retrieve the reCAPTCHA token from the webpage.

        Returns:
            Optional[str]: The reCAPTCHA token as a string, or None if the token retrieval fails.
        """
        
        if self.driver is None:
            raise RuntimeError("WebDriver is not initialized.")

        self._load_page()
        
        token = self.driver.execute_script(f'''
            return grecaptcha.execute('{self.site_key}', {{action: 'homepage'}}).then(function(token) {{
                return token;
            }});
        ''')
        
        return token


    def _get_NGL_Inquiry_html(self, lease_no: str, beg_dt: str, end_dt: str, max_retries: int = 3) -> Optional[str]:
        """
        Scrape the Natural Gas Inquiry form based on lease_no, beg_dt, and end_dt.
        
        Args:
            lease_no (str): The lease number to search (6 or all digits).
            beg_dt (str): Beginning period (yymm or yy).
            end_dt (str): Ending period (yymm or yy).
            max_retries (int): Maximum number of retries if the form submission fails. Defaults to 3.
        
        Returns:
            Optional[str]: The HTML content of the page or None if an error occurred.
        """
        if self.driver is None:
            raise RuntimeError("WebDriver is not initialized.")
        
        # Format Lease Number
        try:
            if len(lease_no) == 11:
                formatted_lease_no = lease_no.split('-')[1]
            elif len(lease_no) == 6:
                formatted_lease_no = lease_no
        except ValueError as e:
            logger.error(f'Lease number {lease_no} is not of 6 or 11 digits:', e)
            return None
        
        # Retry loop for submitting the form and handling errors
        for attempt in range(max_retries):
            try:
                # Fill in the lease number
                self.driver.find_element(By.XPATH, self.xpath_leaseNo).send_keys(formatted_lease_no)
                # Fill in the beginning and ending periods
                self.driver.find_element(By.XPATH, self.xpath_begDt).send_keys(beg_dt)
                self.driver.find_element(By.XPATH, self.xpath_endDt).send_keys(end_dt)

                # Submit the form
                time.sleep(2)  # Delay before submitting
                self.driver.find_element(By.XPATH, self.xpath_submitForm).click()

                # Check for the _error object to validate the success of the submission
                try:
                    _error = WebDriverWait(self.driver, 3).until(
                        EC.presence_of_element_located((By.XPATH, self.xpath_error))
                    )
                    if _error:
                        logger.warning(f"Error detected for lease {lease_no} on attempt {attempt + 1}. Retrying...")
                        time.sleep(2)  # Delay before retrying
                        continue  # Retry submission
                except TimeoutException:
                    pass  # No error found, proceed to check the table

                # Wait for the lease table to load or raise a timeout exception
                lease_table = WebDriverWait(self.driver, 5).until(
                    EC.presence_of_all_elements_located((By.XPATH, self.xpath_lease_table))
                )
                
                # If the lease table is found, return the page source
                if lease_table:
                    logger.info(f"Successfully scraped lease {lease_no} (Attempt {attempt + 1})")
                    return self.driver.page_source

            except TimeoutException:
                logger.warning(f"Lease table not found for lease {lease_no} on attempt {attempt + 1}. Retrying...")
                time.sleep(1)

            except NoSuchElementException as e:
                logger.error(f"NoSuchElementException occurred for lease {lease_no} (Attempt {attempt + 1}): {e}")
                return False
        
        # If max_retries are exhausted without success
        logger.error(f"Failed to scrape lease {lease_no} after {max_retries} attempts.")
        return False
        

    def _clear_entry_labels(self) -> None:
        """
        Clear the input fields for Lease Number, Beginning Period, and Ending Period.

        Returns:
            None: This function does not return anything.
        """
        # Clearing Lease Number
        self.driver.find_element(By.XPATH, self.xpath_leaseNo).clear()

        # Clearing Begining Period
        self.driver.find_element(By.XPATH, self.xpath_begDt).clear()

        # Clearing Ending Period
        self.driver.find_element(By.XPATH, self.xpath_endDt).clear()


    def _parse_html(self, html: str, raw: bool = False) -> pd.DataFrame:
        """
        Parsing HTML content using Beautiful Soup into a DataFrame
        
        Args:
            html (str): The HTML content as string.
        
        Returns:
            pd.DataFrame: The parsed and cleaned DataFrame.
        """
        soup = BeautifulSoup(html,'html.parser')

        df_raw = pd.read_html(StringIO(str(soup.find_all('table'))))

        df_LeaseNGL_raw = df_raw[1]

        # Step 1: Identify rows where 'Primary Taxpayer #' contains 'Period' and extract the date part
        df_LeaseNGL_raw['prod_dt'] = np.where(
            df_LeaseNGL_raw['Primary Taxpayer #'].str.contains('Period', na=False),
            df_LeaseNGL_raw['Primary Taxpayer #'].str.extract(r'Period: (\d{4})', expand=False),
            np.nan
        )

        # Step 2: Forward fill the 'prod_dt' column to propagate the last valid date value
        df_LeaseNGL_raw['prod_dt'] = df_LeaseNGL_raw['prod_dt'].ffill()


        # Step 3: Convert 'prod_dt' from 'YYMM' to datetime format 'YYYY-MM-DD'
        df_LeaseNGL_raw['prod_dt'] = pd.to_datetime(df_LeaseNGL_raw['prod_dt'], format='%y%m')

        df_LeaseNGL_raw.insert(0, 'prod_dt', df_LeaseNGL_raw.pop('prod_dt')) # Insert 'prod_dt' as the first column

        # Step 4: Filter out rows where column 'Primary Taxpayer #' contains 'Period'
        df_LeaseNGL_cleaned = df_LeaseNGL_raw[~df_LeaseNGL_raw['Primary Taxpayer #'].str.contains('Period', na=False)].reset_index(drop=True)

        # Step 5: Clean column names
        df_LeaseNGL_cleaned.columns = df_LeaseNGL_cleaned.columns.str.lower()  # Convert to lowercase
        df_LeaseNGL_cleaned.columns = df_LeaseNGL_cleaned.columns.str.replace('#', '')  # Remove '#' character
        df_LeaseNGL_cleaned.columns = df_LeaseNGL_cleaned.columns.str.replace(' ', '_')  # Replace spaces with underscores

        if raw:
            return df_raw, df_LeaseNGL_cleaned
        else:
            return df_LeaseNGL_cleaned


    def _read_scraped_csv(self) -> pd.DataFrame:
        """
        Read the CSV file containing previously scraped leases.

        Returns:
            pd.DataFrame: The DataFrame containing already scraped lease information.
        """
        cols = ['lease_number', 'prod_dt', 'sub_type', 'primary_taxpayer_', 'comm_code', 'lse_typ',
                'cnty/_dpi', 'exmt_typ', 'api_nbr', 'off_lease', 'other_party_taxpayer',
                'secondary_tp_name', 'tax_reimb', 'ttl_lease_volume', 'your_volume',
                'your_value', 'tax_due', 'gr_volume', 'gr_value', 'marketing_cost',
                'net_tax_value', 'tax_rate', '05_tax_due', 'error_status'
                ]
        

        if os.path.exists(self.scraped_csv):
            return pd.read_csv(self.scraped_csv)
        return pd.DataFrame(columns=cols)
    

    def _append_to_csv(self, df: pd.DataFrame) -> None:
        """
        Append the newly scraped lease data to the CSV file.

        Args:
            df (pd.DataFrame): The DataFrame with new lease data.
        """
        if os.path.exists(self.scraped_csv):
            df.to_csv(self.scraped_csv, mode='a', header=False, index=False)
        else:
            df.to_csv(self.scraped_csv, index=False)


    def _quit(self) -> None:
        """
        Close and quit the WebDriver.
        """
        if self.driver is not None:
            self.driver.close()
            self.driver.quit()
            self.driver = None

## Reading Well Header Data from CC

In [4]:
def formatting_WellHeader_CC():
    
    # Reading well header csv to pandas DataFrame
    df_wellheader_raw = pd.read_csv('well_header.csv',low_memory=False)

    # Cleaning up the column names
    df_wellheader_modified = df_wellheader_raw.copy() # Copying the DataFrame
    df_wellheader_modified.columns = df_wellheader_modified.columns.str.lower().str.replace(' ', '_')  # Convert to lowercase and replace spaces with underscores

    # Only grab Lease Nos. that are complete
    df_wellHeader_complete_LeaseNo = df_wellheader_modified[df_wellheader_modified['lease_number'].apply(lambda x: len(str(x)) if pd.notnull(x) else 0) == 11].reset_index(drop=True)

    # Returning unique LeaseNo. from well header
    return pd.DataFrame(df_wellHeader_complete_LeaseNo['lease_number'].unique(), columns=['lease_number'])

## Testing the Scraper

In [17]:
# Function to handle scraping outside of the class
def scrape_leases(csv_dir: str, leases_df: pd.DataFrame, limit: int = 1000) -> None:
    """
    Scrapes leases from a DataFrame and stops after scraping 1,000 rows, with pauses every 25 rows. 
    Closes and re-opens the browser after every 5 leases.
    
    If the page shows 'Cookies are required for this application', it reloads the page and resumes scraping.
    
    Args:
        csv_dir (str): The directory path where the scraped CSV file will be saved.
        leases_df (pd.DataFrame): The DataFrame containing lease_number columns.
        limit (int): The maximum number of leases to scrape in one session. Defaults to 1,000.
    """
    logger.info(f"\n\n################## Started Lease Scraping with {limit} limit. ##################\n\n")

    # Initialize the scraper with the provided CSV directory
    logger.info("Initializing the scraper with provided CSV directory.")
    scraper = _LeaseDropNaturalGas_WebScraper(csv_dir=csv_dir)

    # Read existing scraped leases from the CSV file
    scraped_df = scraper._read_scraped_csv()
    logger.info(f"Read {len(scraped_df['lease_number'].unique()):,} previously unique scraped leases from {scraper.scraped_csv}.")

    # Filter out leases that have already been scraped
    leases_to_scrape = leases_df[~(leases_df['lease_number'].isin(scraped_df['lease_number'].unique()))]
    logger.info(f"Found {len(leases_to_scrape):,} leases to scrape.")

    scraped_data: List[pd.DataFrame] = []
    lease_not_found: List = []

    try:
        scraper._load_page()
        logger.info("Loaded initial web page for scraping.")
        
        for count, lease_no in enumerate(leases_to_scrape['lease_number'], start=1):
            retry_count = 0
            max_retries = 3  # Set the maximum number of retries

            while retry_count < max_retries:
                # Check if the page requires cookies before scraping the lease number
                soup = BeautifulSoup(scraper.driver.page_source, 'html.parser')
                h1_text = soup.find('h1').get_text(strip=True)
                
                if h1_text == 'Cookies are required for this application.':
                    logger.info(f"Cookies required error. Reloading page for lease {lease_no}. Retry {retry_count + 1}/{max_retries}.")
                    scraper._load_page()  # Reload the page
                    retry_count += 1  # Increment the retry count
                    continue  # Retry the same lease_no after reloading the page
                
                # Scrape the lease number if cookies are not required
                logger.info(f"Scraping data for lease number {lease_no}.")
                html_content: Optional[str] = scraper._get_NGL_Inquiry_html(lease_no, beg_dt='2201', end_dt='2410')

                if html_content:
                    df: pd.DataFrame = scraper._parse_html(html_content)
                    df.insert(0, 'lease_number', lease_no)  # Create a new column with the lease_number and insert it at the beginning
                    scraped_data.append(df)
                    break  # Exit the retry loop if the scraping is successful
                else:
                    lease_not_found.append(lease_no)
                    break  # Exit the retry loop if the lease is not found

            scraper._clear_entry_labels()

            time.sleep(1)

            if count % 5 == 0:  # Close and reopen the browser after scraping every 5 leases
                logger.info(f"Closing and reopening browser after scraping {count} leases.")
                scraper._quit()
                time.sleep(1)  # Wait for a short period before reopening the browser
                logger.info(f"Reopening the page to continue scraping after lease {count}.")
                scraper = _LeaseDropNaturalGas_WebScraper(csv_dir=csv_dir)
                scraper._load_page()

            # Pausing afet every 25 leases scraped
            if count % 25 == 0:
                logger.info(f"Pausing for 30 seconds after scraping {count} leases.")
                time.sleep(30)

            # Stopping after scraping the limit
            if count >= limit:
                logger.info(f"Scraping limit of {limit} reached. Stopping.")
                break
    finally:
        scraper._quit()

    # Save scraped data to CSV
    if scraped_data:
        full_df = pd.concat(scraped_data, ignore_index=True)
        scraper._append_to_csv(full_df)
        logger.info(f"Scraped {count - len(lease_not_found)} leases and saved to {scraper.scraped_csv}.")

In [18]:
scrape_leases(csv_dir=r"C:\Users\Apoorva.Saxena\OneDrive - Sitio Royalties\Desktop\Project - Apoorva\Python\Scraping\Texas-Comptroller-of-Public-Accounts-Scraper", 
              leases_df = formatting_WellHeader_CC(), limit=10)

[scraper_ipynb] INFO (10-13 05:54 PM): 

################## Started Lease Scraping with 10 limit. ##################

 (Line: 14) [3313725787.py]
[scraper_ipynb] INFO (10-13 05:54 PM): Initializing the scraper with provided CSV directory. (Line: 17) [3313725787.py]
[scraper_ipynb] INFO (10-13 05:54 PM): Read 37 previously unique scraped leases from C:\Users\Apoorva.Saxena\OneDrive - Sitio Royalties\Desktop\Project - Apoorva\Python\Scraping\Texas-Comptroller-of-Public-Accounts-Scraper\scraped_leases.csv. (Line: 22) [3313725787.py]
[scraper_ipynb] INFO (10-13 05:54 PM): Found 10,618 leases to scrape. (Line: 26) [3313725787.py]
[scraper_ipynb] INFO (10-13 05:54 PM): Loaded initial web page for scraping. (Line: 33) [3313725787.py]
[scraper_ipynb] INFO (10-13 05:54 PM): Scraping data for lease number 08-P15667-O. (Line: 51) [3313725787.py]
[scraper_ipynb] ERROR (10-13 05:55 PM): Failed to scrape lease 08-P15667-O after 3 attempts. (Line: 150) [4263674244.py]
[scraper_ipynb] INFO (10-13 05:5

In [None]:
# Testing the scraper

# scraper = _LeaseDropNaturalGas_WebScraper(
#     csv_dir=r"C:\Users\Apoorva.Saxena\OneDrive - Sitio Royalties\Desktop\Project - Apoorva\Python\Scraping\Texas-Comptroller-of-Public-Accounts-Scraper"
#     )

# try:
#     # Fill the form and get the HTML content
#     html_content = scraper._get_NGL_Inquiry_html(lease_no='7C-017147-O', beg_dt='2301', end_dt='2410')

#     # Parse the HTML and get the cleaned DataFrame
#     if html_content:
#         df = scraper._parse_html(html=html_content)
# finally:
#     scraper._quit()

# str(date.today().year)[2:] + str(date.today().month).zfill(2)