**Next method**

In [1]:
import sys
print(sys.executable)

c:\Users\edward_b\OneDrive - Institute for Fiscal Studies\Work\Brazil social insurance\venv\Scripts\python.exe


***Pyautogui***

In [2]:
import pyautogui
import os
import time
import subprocess
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
from bs4 import BeautifulSoup
import psutil
from selenium.webdriver.support.ui import Select
import matplotlib.pyplot as plt
import logging
from logging.handlers import RotatingFileHandler
import random
import requests
import re

In [3]:
os.chdir("C:/Users/edward_b/OneDrive - Institute for Fiscal Studies/Work/Brazil social insurance")

**Define functions**

In [4]:
# Function to kill Chrome processes
def kill_chrome():
    """Kill all Chrome processes."""
    for proc in psutil.process_iter(['pid', 'name']):
        if 'chrome' in proc.info['name'].lower():
            try:
                proc.kill()
            except psutil.NoSuchProcess:
                pass


In [5]:
def cnpj_check(driver, cnpj):
    soup = BeautifulSoup(driver.page_source, 'html.parser')
    cnpj_check = re.sub(r'\D','',soup.find("li", class_="list-group-item").get_text(strip=True).split("Nome")[0])
    # compare the cnpj found on the page with the one we are looking for
    print(f"cnpj_check: {cnpj_check}")
    print(f"cnpj: {cnpj}")
    if cnpj_check != cnpj:
        raise ValueError(f"CNPJ mismatch: expected {cnpj}, found {cnpj_check}")

In [6]:


def scrape_data(cnpj, year, soup, table):
    # Step 1: Extract column headers (excluding unwanted labels)
    header_rows = table.find('thead').find_all('tr')
    cols = []
    for row in header_rows:
        headers = [th.get_text(strip=True) for th in row.find_all("th") if th.get_text(strip=True)]
        filtered = [h for h in headers if h != "Resumo do DAS a ser gerado"]
        cols.extend(filtered)
    print(f"Extracted headers: {cols}")

    # Check if "Quotas" is in the headers to determine if we need to split rows
    quota_split = "Quotas" in cols
    quota_index = cols.index("Quotas") if quota_split else None #Find index of "Quotas" column if it exists

    # Find index of "INSS" column if it exists
    inss_index = cols.index("Benefício INSS") if "Benefício INSS" in cols else None #Find index of "Benefício INSS" column if it exists

    # Step 2: Find all relevant data rows
    rows = soup.find_all("tr", class_="pa")

    # Step 3: Process data rows with split-row logic
    cleaned_data = []
    i = 0
    while i < len(rows):
        row = rows[i]
        cells = row.find_all("td")
        
        # Check if the row has INSS box ticked
        inss_row = any(
            inp.get("data-benefico-apurado") == "True"
            for inp in row.find_all("input", attrs={"data-benefico-apurado": True})
        )
        
        # Extract visible text from the cells (skipping the first <td> with checkbox)
        cell_texts = [td.get_text(strip=True) for td in cells[1:]]
        cell_texts[inss_index] = "1" if inss_row else "0"
        
        if quota_split: #If we have a table with a quotas column
        
            # Check if each row  has quotas that require a split
            quota_row = any(
                inp.get("data-pa-quota") == "true"
                for inp in row.find_all("input", attrs={"data-pa-quota": True})
            )


            if quota_row:
                # First 4 cells: Período, Apurado, Benefício, Quotas (set to 1)
                base_info = cell_texts[:4]
                base_info[quota_index] = "1" if quota_split else "0"
                payment_data = cell_texts[4:]
                cleaned_data.append(base_info + payment_data)

                # Append next row with same identifying info if exists
                if i + 1 < len(rows):
                    next_row = rows[i + 1]
                    next_cells = next_row.find_all("td")
                    next_texts = [td.get_text(strip=True) for td in next_cells]

                    cleaned_data.append(base_info + next_texts)
                    i += 2
                else:
                    i += 1
            else:
                # Normal row within a quotas table, treat quotas as 0 if not explicitly set
                if len(cell_texts) >= 5:
                    cell_texts[3] = "0"
                cleaned_data.append(cell_texts)
                i += 1

        # Normal table without quotas
        else:    
            cleaned_data.append(cell_texts)
            i += 1

    
    # Step 4: Build DataFrame
    df = pd.DataFrame(cleaned_data, columns=cols)
    df['cnpj'] = cnpj
    df['data_found'] = True

    return df



**Import master**

In [None]:
path = "raw/CNPJ numbers"
cnpj_master = pd.read_csv(f'{path}/simples.csv', sep=',', encoding='utf-8')
cnpj_master = cnpj_master[['cnpj_basico','opcao_mei']]

# find length of cnpj
cnpj_master["length cnpj_basico"] = cnpj_master["cnpj_basico"].astype(str).str.len()
pd.crosstab(cnpj_master["length cnpj_basico"], cnpj_master["opcao_mei"], margins=True, margins_name="Total")

In [None]:
cnpj_master["cnpj_basico"] = cnpj_master["cnpj_basico"].astype(str)
cnpj_master = cnpj_master[cnpj_master['opcao_mei'] == 1]
cnpj_master.drop(columns=['length cnpj_basico'], inplace=True)

**Import example full file**

In [None]:
cnpj_test = pd.read_csv(f'{path}/establishmentsPI.csv', sep=',', encoding='utf-8')
#cnpj_test = cnpj_test[['cnpj']]
cnpj_test["cnpj_basico"] = cnpj_test["cnpj"].astype(str).str[:8]

In [None]:
cnpj_merged = pd.merge(cnpj_master, cnpj_test, left_on='cnpj_basico', right_on='cnpj_basico', how='inner')
cnpj_merged["cnpj"] = cnpj_merged["cnpj"].astype(str)
#cnpj_merged["cnpj"].str.len().hist()

In [None]:
cnpj_merged = cnpj_merged[cnpj_merged['cnpj'].str.len() == 14]

#Check for duplicates
duplicates = cnpj_merged[cnpj_merged['cnpj'].duplicated(keep=False)]
print(len(duplicates) == 0)
cnpj_merged = cnpj_merged.drop_duplicates(subset=['cnpj'], keep='first')

In [None]:
cnpj_merged.to_csv('MEI_numbers.csv', sep=',', encoding='utf-8', index=False)

*****-----------------------Load in MEI numbers and start scraping-----------------------*****

In [7]:
cnpj_merged = pd.read_csv('MEI_numbers.csv', sep=',', encoding='utf-8')

In [None]:
# proxy_list = pd.read_csv("scripts/proxies.txt", sep=',', encoding='utf-8', header=None, names=['proxy'])
# proxy_list = proxy_list["proxy"].to_list()
# proxy_list

In [8]:
# get random sample of 10
cnpj_merged = cnpj_merged.sample(n=5, random_state=10)
#convert cnpj's to a list
cnpj_list = cnpj_merged['cnpj'].tolist()
cnpj_merged.shape # 13,894 obs

(5, 3)

**Check if proxies work**

In [None]:


# proxy = "66.201.7.151"  # Replace with the proxy you want to test

# try:
#     response = requests.get(
#         "https://httpbin.org/ip",
#         proxies={"http": proxy, "https": proxy},
#         timeout=10
#     )
#     print("Proxy is working. IP seen by server:", response.text)
# except Exception as e:
#     print("Proxy failed:", e)

**Set up scraper**

In [9]:
cnpj_list = [str(i) for i in cnpj_list]
cnpj_list

['28740844000198',
 '11757216000112',
 '33962463000193',
 '31898571000119',
 '12902387000150']

In [7]:
cnpj_list = ["35184782000140","33962463000193", "31898571000119"]


In [10]:
import shutil
import os

chrome_profile_path = "C:/Temp/ChromeDebug"
if os.path.exists(chrome_profile_path):
    shutil.rmtree(chrome_profile_path)  # delete the profile folder

In [9]:
log_file = 'mei_scraper_log.log'
with open(log_file, 'w'):
    pass  # This empties the file
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        RotatingFileHandler(log_file, maxBytes=5*1024*1024, backupCount=3),  # 5MB per file
        logging.StreamHandler()  # Optional: print to console too
    ]
)

In [12]:
pyautogui.position()

Point(x=749, y=486)

In [13]:
# ---- Define url and cnpj list ----
url = "https://www8.receita.fazenda.gov.br/SimplesNacional/Aplicacoes/ATSPO/pgmei.app/Identificacao"

timings = []
total_start_time = time.time()
master_df = pd.DataFrame() 


for cnpj in cnpj_list:
    #proxy = random.choice_list)
    #logging.info("Processing CNPJ:", cnpj)
    start_time = time.time()
    data = []

    try:

        # ---- Step 1: Start Chrome in remote debug mode ----
        subprocess.Popen([
            r"C:/Program Files/Google/Chrome/Application/chrome.exe",
            #f"--proxy-server={proxy}",
            "--remote-debugging-port=9222",
            "--user-data-dir=" + chrome_profile_path,
            "--start-maximized",  # or "--start-fullscreen"
            "--disable-popup-blocking",  # optional, disable for debugging only
            "--disable-extensions",
            "--no-first-run",
            "--no-default-browser-check"
        ])
        time.sleep(2)  # Give Chrome time to launch

        # ---- Step 2: Use pyautogui to interact with the site ----
        pyautogui.hotkey('ctrl', 'l')
        pyautogui.typewrite(url, interval=0.01)
        pyautogui.press('enter')  
        time.sleep(2)

        pyautogui.moveTo(x=722, y=391 , duration=1) # laptop x=722, y=391  deksptop : x=1027, y=377
        pyautogui.click()
        pyautogui.typewrite(cnpj, interval=0.1)

        pyautogui.moveTo(x=722, y=514, duration=1) # Laptop x=722, y=514 desktop: x=1027, y=500
        pyautogui.click()
        time.sleep(2)

        options = Options()
        options.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
        #options.add_argument(f'--proxy-server={proxy}')
        driver = webdriver.Chrome(options=options)
        wait = WebDriverWait(driver, 3)

        driver.get("https://www8.receita.fazenda.gov.br/SimplesNacional/Aplicacoes/ATSPO/pgmei.app/emissao")
        time.sleep(1.5)

        cnpj_check(driver, cnpj)

        # First try: Bootstrap-styled dropdown
        try:
            dropdown_button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, 'button[data-id="anoCalendarioSelect"]')))
            dropdown_button.click()
            time.sleep(1)

            year_elements = wait.until(EC.presence_of_all_elements_located(
                (By.CSS_SELECTOR, ".dropdown-menu.inner li a span.text")
            ))
            enabled_years = [el.text.strip() for el in year_elements if el.text.strip()]
            # remove elements from the list that contain "Não optante"
            enabled_years = [year for year in enabled_years if "Não optante" not in year]

            # Raise an exception if no enabled years are found
            if not enabled_years:
                raise ValueError("No enabled years found in the dropdown menu.")

            print("Bootstrap dropdown enabled years for CNPJ ", cnpj , ":", enabled_years)
            use_bootstrap = True
            

        except Exception as e:
            print("Bootstrap dropdown failed, falling back to native <select> method.")
            # Try native <select>
            select_element = wait.until(EC.presence_of_element_located((By.ID, "anoCalendarioSelect")))
            dropdown = Select(select_element)
            enabled_years = [o.text.strip() for o in dropdown.options if o.text.strip()]
            enabled_years = [year for year in enabled_years if "Não optante" not in year]
            print("Native <select> enabled years for CNPJ ", cnpj,":", enabled_years)
            use_bootstrap = False

        print("scraping years", enabled_years)
        #enabled_years = [str(max(enabled_years))] # to just scrape the most recent year
        enabled_years.insert(0, "2010")  #add a year to the start of the list
        
        for index, year in enumerate(enabled_years):
            try:
                if use_bootstrap:
                    dropdown_button = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'button[data-id="anoCalendarioSelect"]')))
                    driver.execute_script("arguments[0].click();", dropdown_button)
                    time.sleep(1.5)  # allow dropdown to render
                    
                    print("Clicking on year:", year)
                    year_option = wait.until(EC.element_to_be_clickable(
                        (By.XPATH, f"//span[@class='text' and normalize-space(text())='{year}']")
                    ))
                    time.sleep(2)
                    driver.execute_script("arguments[0].click();", year_option)
                    #ActionChains(driver).move_to_element(year_option).click().perform()
                    print(f"Selected (Bootstrap) year: {year}")
                else:
                    dropdown = Select(driver.find_element(By.ID, "anoCalendarioSelect"))
                    dropdown.select_by_visible_text(year)
                    print(f"Selected (native) year: {year}")

                ok_button = driver.find_element(By.CSS_SELECTOR, "button[type='submit']")
                ok_button.click()
                time.sleep(2)
                
                # check if table is present
                soup = BeautifulSoup(driver.page_source, 'html.parser')
                table = soup.find('table', class_='table table-hover table-condensed emissao is-detailed')
                if not table:
                    print(f"No table found for {cnpj} in year {year}. Skipping the rest.")
                    # Mark all remaining years as not found
                    for remaining_year in enabled_years[index:]:
                        data.append({ 
                            'cnpj': cnpj,
                            'Período de Apuração': remaining_year,
                            'data_found': False
                        })
                        # Convert missing data to DataFrame and append to master_df
                    missing_df = pd.DataFrame(data)
                    master_df = pd.concat([master_df, missing_df], ignore_index=True)
                    break  # Exit the year loop
                    
                new_data = scrape_data(cnpj, year, soup, table)
                master_df = pd.concat([master_df, new_data], ignore_index=True)

                driver.back()
                time.sleep(2)

            except Exception as e:
                print(f"Error with year {year}:", e)

    except Exception as outer_error:
        print(f"Fatal error with CNPJ {cnpj}:", outer_error)

    finally:
        try:
            driver.quit()
        except:
            pass
        kill_chrome()
        end_time = time.time() 
        elapsed = end_time - start_time
        timings.append(elapsed)
        total_elapsed = time.time() - total_start_time
        average_elapsed = sum(timings) / len(timings)
        
        logging.info(f"Finished CNPJ: {cnpj} in {elapsed:.2f} seconds\n")
        logging.info(f"Average time per CNPJ: {average_elapsed:.2f} seconds")
        logging.info(f"Total time elapsed: {total_elapsed:.2f} seconds\n")
        time.sleep(2)

cnpj_check: 28740844000198
cnpj: 28740844000198
Bootstrap dropdown enabled years for CNPJ  28740844000198 : ['2020', '2021', '2022', '2023', '2024', '2025']
scraping years ['2020', '2021', '2022', '2023', '2024', '2025']
Clicking on year: 2010
Error with year 2010: Message: 
Stacktrace:
	GetHandleVerifier [0x00007FF62163CF45+75717]
	GetHandleVerifier [0x00007FF62163CFA0+75808]
	(No symbol) [0x00007FF621408F9A]
	(No symbol) [0x00007FF62145F4C6]
	(No symbol) [0x00007FF62145F77C]
	(No symbol) [0x00007FF6214B2577]
	(No symbol) [0x00007FF6214873BF]
	(No symbol) [0x00007FF6214AF39C]
	(No symbol) [0x00007FF621487153]
	(No symbol) [0x00007FF621450421]
	(No symbol) [0x00007FF6214511B3]
	GetHandleVerifier [0x00007FF62193D71D+3223453]
	GetHandleVerifier [0x00007FF621937CC2+3200322]
	GetHandleVerifier [0x00007FF621955AF3+3322739]
	GetHandleVerifier [0x00007FF621656A1A+180890]
	GetHandleVerifier [0x00007FF62165E11F+211359]
	GetHandleVerifier [0x00007FF621645294+109332]
	GetHandleVerifier [0x00007FF

In [14]:
master_df = master_df[['cnpj', 'Período de Apuração', 'Apurado', 'Situação', 'Benefício INSS',
         'Quotas', 'Principal', 'Multa', 'Juros', 'Total',
         'Data de Vencimento', 'Data de Acolhimento', 'data_found']]

# replace quotas with 0 if it is NaN
master_df['Quotas'] = master_df['Quotas'].fillna(0).astype(int)

In [17]:
#sort master df by cnpj and Período de Apuração
master_df = master_df.sort_values(by=['cnpj', 'Período de Apuração'])

In [None]:
pd.DataFrame(data).to_csv('MEI_data.csv', sep=',', encoding='utf-8', index=False)