In [3]:
from pathlib import Path
from sys import platform
from typing import Optional, Type, Union

from bs4 import BeautifulSoup

import easyocr
import matplotlib.pyplot as plt
import cv2
import requests
from PIL import Image
import logging

url = 'https://verify.bmdc.org.bd/'

In [4]:
from multiprocessing import Pool, cpu_count
num_processes = cpu_count()
num_processes

16

## If on kaggle, skip the following block
- Using `selenium`

In [27]:
import pandas as pd
import numpy as np
import time
from selenium.common.exceptions import WebDriverException, NoSuchElementException
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeDriverService
from selenium.webdriver.chrome.webdriver import WebDriver as ChromeDriver
from selenium.webdriver.common.by import By
from selenium.webdriver.edge.options import Options as EdgeOptions
from selenium.webdriver.edge.service import Service as EdgeDriverService
from selenium.webdriver.edge.webdriver import WebDriver as EdgeDriver
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.firefox.service import Service as GeckoDriverService
from selenium.webdriver.firefox.webdriver import WebDriver as FirefoxDriver
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.safari.options import Options as SafariOptions
from selenium.webdriver.safari.webdriver import WebDriver as SafariDriver
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.firefox import GeckoDriverManager
from webdriver_manager.microsoft import EdgeChromiumDriverManager as EdgeDriverManager

BrowserOptions = Union[ChromeOptions, EdgeOptions, FirefoxOptions, SafariOptions]

def open_selenium_browser(browser_name: str, headless: bool, )->tuple[WebDriver, str]:
    class Config:
        selenium_web_browser = browser_name
        selenium_headless = headless

    config = Config()

    logging.getLogger("selenium").setLevel(logging.INFO)

    options_available: dict[str, Type[BrowserOptions]] = {
        "chrome": ChromeOptions,
        "edge": EdgeOptions,
        "firefox": FirefoxOptions,
        "safari": SafariOptions,
    }

    options: BrowserOptions = options_available[config.selenium_web_browser]()
    options.add_argument(
        "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.5615.49 Safari/537.36"
    )

    if config.selenium_web_browser == "firefox":
        if config.selenium_headless:
            options.headless = True
            options.add_argument("--disable-gpu")
        driver = FirefoxDriver(
            service=GeckoDriverService(GeckoDriverManager().install()), options=options
        )
    elif config.selenium_web_browser == "edge":
        driver = EdgeDriver(
            service=EdgeDriverService(EdgeDriverManager().install()), options=options
        )
    elif config.selenium_web_browser == "safari":
        # Requires a bit more setup on the users end
        # See https://developer.apple.com/documentation/webkit/testing_with_webdriver_in_safari
        driver = SafariDriver(options=options)
    else:
        if platform == "linux" or platform == "linux2":
            options.add_argument("--disable-dev-shm-usage")
            options.add_argument("--remote-debugging-port=9222")

        options.add_argument("--no-sandbox")
        if config.selenium_headless:
            options.add_argument("--headless=new")
            options.add_argument("--disable-gpu")

        chromium_driver_path = Path("/usr/bin/chromedriver")

        driver = ChromeDriver(
            service=ChromeDriverService(str(chromium_driver_path))
            if chromium_driver_path.exists()
            else ChromeDriverService(ChromeDriverManager().install()),
            options=options,
        )
    return driver

def go_to_page_with_selenium(driver, url: str="https://verify.bmdc.org.bd/") -> tuple[WebDriver, str]:
    """Scrape text from a website using selenium

    Args:
        url (str): The url of the website to scrape

    Returns:
        Tuple[WebDriver, str]: The webdriver and the text scraped from the website
    """

    driver.get(url)

    WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.TAG_NAME, "body"))
    )

    # Get the HTML content directly from the browser's DOM
    page_source = driver.execute_script("return document.body.outerHTML;")

    return driver, page_source


def get_captcha_image(page_source)-> Image:
    # page_source = requests.get(url=url)
    bs_html = BeautifulSoup(page_source, 'html.parser')
    captcha_img_url = bs_html.find('div', {"id": "captcha1"}).find("img")["src"]
    img = np.array(Image.open(requests.get(captcha_img_url, stream = True).raw))

    return img[1:29,1:99,:]


def process_image(img):

    img_inv = cv2.bitwise_not(img)
    erode_kernel = np.ones((2, 2), np.uint8)
    dilute_kernel = np.ones((2, 2), np.uint8)
    # kernel[[0,0,2,2],[0,2,0,2]] = 0
    img_inv = cv2.erode(img_inv, erode_kernel, iterations=1)

    # _, img_inv = cv2.threshold(img_inv,128,255,cv2.THRESH_BINARY)
    # img_inv = cv2.dilate(img_inv, dilute_kernel, iterations=2)
    # img_inv = cv2.cvtColor(img_inv , cv2.COLOR_BGR2GRAY)

    x_shift = 20
    y_shift = 20
    ocr_ready_img = np.zeros((img_inv.shape[0] + x_shift, img_inv.shape[1] + y_shift, 3)).astype(np.uint8)


    # x_start = ocr_ready_img.shape[0]//2-img_inv.shape[0]//2
    # y_start = ocr_ready_img.shape[1]//2-img_inv.shape[1]//2

    ocr_ready_img[x_shift:img_inv.shape[0]+x_shift, y_shift:img_inv.shape[1]+y_shift] = img_inv

    return ocr_ready_img


def solve_captcha(img):

    ## OCR Solution (easyOCR)
    reader = easyocr.Reader(['en']) # Maybe define this at the top, while importing packages
    result= reader.readtext(img, allowlist='ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
    captcha_solution = result[0][1]
    ## Pytesseract
    # pytesseract.pytesseract.tesseract_cmd = "C:\\Program Files\\Tesseract-OCR\\tesseract.exe"
    # config = '-c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRST0123456789 --psm 6'
    # captcha_solution = pytesseract.image_to_string(img_bin, config=config)

    # Furnish text

    return captcha_solution

def submit_form_selenium(driver,  doc_id, captcha_solution,):

    input_doc_id = driver.find_element(By.XPATH, "//div[@class='form-group']/input" )
    input_doc_id.send_keys(f"{doc_id}")

    input_captcha = driver.find_element(By.XPATH, "//input[@id='captcha_code']" )
    input_captcha.send_keys(captcha_solution)

    submit_button = driver.find_element(By.XPATH, "//button[@id='submit']" )
    submit_button.click()


    return driver, driver.current_url


def get_doctor_dict_selenium(driver):
    # driver.maximize_window()
    # try:
    name_xpath = "//div[@class='col-md-8']/h3"
    bmdc_code_xpath = "//div[@class='text-center']/h3"

    registration_year_xpath = "//h5[@class='font-weight-bold mb-0 d-block']"
    dob_bg_lxpath = "//div[@class='form-group row mb-0']/div/h6"
    other_lxpath = "//div[@class='col-md-12']/h6"

    name = driver.find_element(By.XPATH, name_xpath) if driver.find_element(By.XPATH, name_xpath) else None
    bmdc_code = driver.find_element(By.XPATH, bmdc_code_xpath) if driver.find_element(By.XPATH, bmdc_code_xpath) else None

    registration_details = driver.find_elements(By.XPATH, registration_year_xpath)
    registration_year, registration_validity, _ = registration_details if registration_details else [None, None, None]

    dob_bg = driver.find_elements(By.XPATH, dob_bg_lxpath)
    dob, bg = dob_bg[:2] if dob_bg else [None, None]

    other_details = driver.find_elements(By.XPATH, other_lxpath)

    reg_status = other_details[-1] if other_details else None
    driver.execute_script("arguments[0].scrollIntoView();", reg_status)
    permanent_add = other_details[-2] if other_details else None
    if len(other_details) > 2:
        father_name = other_details[0]
        mother_name = other_details[1]
    else:
        father_name = None
        mother_name = None

    # Scroll to the last element
    doc_entry_dict = {
        "name": name.text if name else None,
        "bmdc_code": bmdc_code.text if bmdc_code else None,
        "registration_year": registration_year.text if registration_year else None,
        "registration_validity": registration_validity.text if registration_validity else None,
        "dob": dob.text if dob else None,
        "bg": bg.text if bg else None,
        "father_name": father_name.text if father_name else None,
        "mother_name": mother_name.text if mother_name else None,
        "permanent_add": permanent_add.text if permanent_add else None,
        "reg_status": reg_status.text if reg_status else None,
    }
    # print(doc_entry_dict)

    return doc_entry_dict


def doc_entry_generator(driver, id_start, id_end):
    id = id_start
    while id <= id_end:
        driver, page_source = go_to_page_with_selenium(driver, url=url)
        captcha_img = get_captcha_image(page_source)
        captcha_img = process_image(captcha_img)
        captcha_solution = solve_captcha(captcha_img)

        driver, _ = submit_form_selenium(driver, id, captcha_solution)
        try:
            doc_entry_dict = get_doctor_dict_selenium(driver)
            yield doc_entry_dict
            id += 1
        except NoSuchElementException:
            pass
        time.sleep(3)

    driver.close()
    driver.quit()

def main_faster(doc_id_start, doc_id_end, url=f'https://verify.bmdc.org.bd/', browser_name="chrome", headless=False):
    driver = open_selenium_browser(browser_name, headless=headless)

    doctor = pd.DataFrame(doc_entry_generator(driver, doc_id_start, doc_id_end), columns=["name", "bmdc_code", "registration_year", "registration_validity", "dob", "father_name", "mother_name", "permanent_add", "reg_status"])

    return doctor


# def main(doc_id_start, doc_id_end, url=f'https://verify.bmdc.org.bd/', browser_name="chrome", headless=False):
#     doctor_list = []
#     driver = open_selenium_browser(browser_name, headless=headless)
#
#     id = doc_id_start
#     while id <= doc_id_end:
#         driver, page_source = go_to_page_with_selenium(driver, url=url)
#         captcha_img = get_captcha_image(page_source)
#         captcha_img = process_image(captcha_img)
#         captcha_solution = solve_captcha(captcha_img)
#
#         driver, _ = submit_form_selenium(driver, id, captcha_solution)
#         try:
#             doc_entry_dict = get_doctor_dict_selenium(driver)
#             doctor_list.append(doc_entry_dict)
#             # yield doc_entry_dict
#             id += 1
#         except NoSuchElementException:
#             pass
#
#         # print(doc_entry_dict)
#
#
#         time.sleep(3)
#
#     driver.close()
#     driver.quit()
#     doctor = pd.DataFrame(doctor_list, columns=["name", "bmdc_code", "registration_year", "registration_validity", "dob", "father_name", "mother_name", "permanent_add", "reg_status"])
#     return doctor

# page_source

In [304]:
s = 11
e = 28
w = 5

D = (e-s)//w
start = [s+i*(D+1)  for i in range(w)]
end = [s+i*(D+1)+D if s+i*(D+1)+D <= e else e for i in range(w) ]

In [305]:
start, end

([11, 15, 19, 23, 27], [14, 18, 22, 26, 28])

In [294]:
import pandas as pd

pd.DataFrame()._append()

3

In [32]:
# driver = open_selenium_browser("chrome", headless=False)
driver, page_source = go_to_page_with_selenium(driver, url)
captcha_img = get_captcha_image(page_source)
captcha_img = process_image(captcha_img)
captcha_solution = solve_captcha(captcha_img)
#
driver, current_url = submit_form_selenium(driver, 35, captcha_solution)
doc_entry_dict = get_doctor_dict_selenium(driver)

Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


{'name': 'FATEMA KHATUN', 'bmdc_code': 'A-35', 'registration_year': '1972', 'registration_validity': '05/04/1977', 'dob': '-', 'bg': '-', 'father_name': '-', 'mother_name': '-', 'permanent_add': '153/A, RAM KRISHNA MISSION, DISTRICT: DHAKA.', 'reg_status': 'ACTIVE [VALIDITY EXPIRED]'}


In [None]:
doc_entry_dict = get_doctor_dict_selenium(driver)

In [None]:
from multiprocessing import Pool

with Pool(5) as p:
    doc = p.starmap(func=main_faster, iterable=[(84256, 84260), (84261, 84265), (84266, 84270), (84271, 84275), (84276, 84280)],)
doctors = main_faster(84256, 84266)

Multiprocessing Test

In [51]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

# This is our generator function
def countdown(n, low):
    while n > low:
        yield n
        n -= 1

# This is the function that will be run in each thread
def consume_generator(upper_limit, lower_limit=0):
    generator = countdown(upper_limit, lower_limit)
    print(f"Upper_limit_ {upper_limit}, Lower_limit_ {lower_limit}")
    out = []
    for item in generator:
        # Simulate some I/O-bound work with time.sleep
        time.sleep(1)
        print(f"Item_{item}\n")
        out.append(item)
    return out
# Create a generator
# generator = countdown(25)

# Create a ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=2) as executor:
    # Start two threads that consume the generator
    fs1 = [executor.submit(consume_generator, x) for x in range(1,15)]
    for i, f in enumerate(as_completed(fs1)):
        print(f"Thread number {i}:")
        try:
            results = f.result()
        except ValueError:
            continue
        for result in results:
            print(result)

        # print(result)
    # executor.submit(consume_generator, 15)

Upper_limit_ 1, Lower_limit_ 0
Upper_limit_ 2, Lower_limit_ 0
Item_1

Upper_limit_ 3, Lower_limit_ 0
Thread number 0:
1
Item_2

Item_3

Item_1

Upper_limit_ 4, Lower_limit_ 0
Thread number 1:
2
1
Item_2

Item_4

Item_1

Upper_limit_ 5, Lower_limit_ 0
Thread number 2:
3
2
1
Item_3

Item_5

Item_2

Item_4

Item_1

Upper_limit_ 6, Lower_limit_ 0
Thread number 3:
4
3
2
1
Item_3

Item_6

Item_2

Item_5

Item_1

Upper_limit_ 7, Lower_limit_ 0
Thread number 4:
5
4
3
2
1
Item_4

Item_7

Item_3

Item_6

Item_2

Item_5

Item_1

Upper_limit_ 8, Lower_limit_ 0
Thread number 5:
6
5
4
3
2
1
Item_4

Item_8

Item_3

Item_7

Item_2

Item_6

Item_1

Upper_limit_ 9, Lower_limit_ 0
Thread number 6:
7
6
5
4
3
2
1
Item_5

Item_9

Item_4

Item_8

Item_3

Item_7

Item_2

Item_6

Item_1

Upper_limit_ 10, Lower_limit_ 0
Thread number 7:
8
7
6
5
4
3
2
1
Item_5

Item_10

Item_4

Item_9

Item_3

Item_8

Item_2

Item_7

Item_1

Upper_limit_ 11, Lower_limit_ 0
Thread number 8:
9
8
7
6
5
4
3
2
1
Item_6

Item_11

Item

In [50]:
res = [consume_generator(x) for x in range(1,15)]
res

Upper_limit_ 1, Lower_limit_ 0
Item_1

Upper_limit_ 2, Lower_limit_ 0
Item_2

Item_1

Upper_limit_ 3, Lower_limit_ 0
Item_3

Item_2

Item_1

Upper_limit_ 4, Lower_limit_ 0
Item_4

Item_3

Item_2

Item_1

Upper_limit_ 5, Lower_limit_ 0
Item_5

Item_4

Item_3

Item_2

Item_1

Upper_limit_ 6, Lower_limit_ 0
Item_6

Item_5

Item_4

Item_3

Item_2

Item_1

Upper_limit_ 7, Lower_limit_ 0
Item_7

Item_6

Item_5

Item_4

Item_3

Item_2

Item_1

Upper_limit_ 8, Lower_limit_ 0
Item_8

Item_7

Item_6

Item_5

Item_4

Item_3

Item_2

Item_1

Upper_limit_ 9, Lower_limit_ 0
Item_9

Item_8

Item_7

Item_6

Item_5

Item_4

Item_3

Item_2

Item_1

Upper_limit_ 10, Lower_limit_ 0
Item_10

Item_9

Item_8

Item_7

Item_6

Item_5

Item_4

Item_3

Item_2

Item_1

Upper_limit_ 11, Lower_limit_ 0
Item_11

Item_10

Item_9

Item_8

Item_7

Item_6

Item_5

Item_4

Item_3

Item_2

Item_1

Upper_limit_ 12, Lower_limit_ 0
Item_12

Item_11

Item_10

Item_9

Item_8

Item_7

Item_6

Item_5

Item_4

Item_3

Item_2

Ite

[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]

In [316]:
def my_func(n1, n2, random_param=0, random_param2=1):
    # for i in range(n+1):
    for i in range(n1,n2):
        yield i**2
    # return np.sum(range(n+1+random_param+random_param2))



with ThreadPoolExecutor(max_workers=5) as pool:
    input_range = range(2,7)
    # results = list(pool.map(my_func, input_range, [0,]*len(input_range), [0,]*len(input_range)))
    fs = [pool.submit(my_func, x, x+3, 0) for x in range(2,7)]

# results
outs = []
for f in as_completed(fs):
    print(f.result())
    out = []
    for res in f.result():
        out.append(res)
    outs.append(out)
outs

<generator object my_func at 0x000001CD1FEF4190>
<generator object my_func at 0x000001CD1FEFC040>
<generator object my_func at 0x000001CD1FEFE5F0>
<generator object my_func at 0x000001CD1FEF45F0>
<generator object my_func at 0x000001CD1FEF49E0>


[[25, 36, 49], [16, 25, 36], [36, 49, 64], [9, 16, 25], [4, 9, 16]]

In [317]:
out

[4, 9, 16]

In [249]:
for f in as_completed(fs):
    res = pd.DataFrame(f.result())
res
# x = f.result()

In [252]:
x = f.result()

In [258]:
x.__next__()

StopIteration: 

In [201]:
outs

[[0, 1], [0, 1, 4], [0, 1, 4, 9], [0, 1, 4, 9, 16], [0, 1, 4, 9, 16, 25]]

In [79]:
[0,]*len(range(1,25))

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

In [98]:
x = my_func(100)

In [199]:
x.__next__()

StopIteration: 