In [7]:
import requests
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
import os
import concurrent.futures
import time
import logging
from datetime import datetime

# Maximum number of retries
MAX_RETRIES = 10

# Delay between retries in seconds
RETRY_DELAY = 1

# Define the maximum number of concurrent threads
MAX_WORKERS = 3

FUTURES = []

TYPE_ELECTIONS = [
    ["pilpres", "PILPRES"],
    ["pilegdpr", "PILEG DPR"],
    ["pilegdprd_prov", "PILEG DPRD PROVINSI"],
    ["pilegdprd_kab", "PILEG DPRD KAB KOTA"],
    ["pemilu_dpd", "PEMILU DPD"],
]

url_base = "https://sirekap-obj-data.kpu.go.id/wilayah/pemilu/ppwp"
url_chart = "https://sirekap-obj-data.kpu.go.id/pemilu/hhcw/pdpr"
today_date = datetime.now().strftime('%Y-%m-%d')


def generate_url_form(type: str):
    return f"https://pemilu2024.kpu.go.id/{type}/hitung-suara/wilayah"


def ensure_directory_exists(directory_path):
    """
    Ensure that a directory exists. If it does not exist, create it.

    Args:
        directory_path (str): The path of the directory to ensure existence.

    Returns:
        str: The path of the directory.
    """
    if not os.path.exists(directory_path):
        os.makedirs(directory_path)
    return directory_path


def custom_logger(today_date: str):
    # Configure the logging settings
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s - %(levelname)s - %(message)s')

    # Create a FileHandler to write log messages to a file
    ensure_directory_exists(f"./logs")
    log_file = f"./logs/{today_date}_app.log"
    file_handler = logging.FileHandler(log_file)
    # Set the log level for the file handler
    file_handler.setLevel(logging.INFO)

    # Create a Formatter to specify the log message format
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)

    # Create a logger object and add the FileHandler to it
    logger = logging.getLogger(__name__)
    logger.addHandler(file_handler)

    # Return the logger object
    return logger


logger = custom_logger(today_date)


def getJSON(url: str):
    headers = {
        'Accept': 'application/json, text/plain, */*',
        'Accept-Language': 'en-GB,en;q=0.6',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
        'Origin': 'https://pemilu2024.kpu.go.id',
        'Pragma': 'no-cache',
        'Referer': 'https://pemilu2024.kpu.go.id/',
        'Sec-Fetch-Dest': 'empty',
        'Sec-Fetch-Mode': 'cors',
        'Sec-Fetch-Site': 'same-site',
        'Sec-GPC': '1',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
        'sec-ch-ua': '"Not A(Brand";v="99", "Brave";v="121", "Chromium";v="121"',
        'sec-ch-ua-mobile': '?0',
        'sec-ch-ua-platform': '"Windows"',
    }

    response = requests.get(url=url, headers=headers)
    response = response.json()
    return response


def crawl_website(url: str):
    # Set up Selenium WebDriver
    # Path to chromedriver executable
    service = Service("./chrome-driver/chromedriver.exe")
    driver = webdriver.Chrome(service=service)

    # Load the webpage
    driver.get(url)

    # Execute a user script to click the button
    user_script = """
        const btn = document.querySelector("button.btn.btn-dark.float-end");
        btn.click();
    """
    driver.execute_script(user_script)

    # Wait for the page to fully render
    driver.implicitly_wait(10)  # Adjust the wait time as needed

    # Extract image links
    def extract_image_links(driver):
        # Find all links on the page
        links = driver.find_elements(By.TAG_NAME, 'a')

        # Extract image links only
        image_links = []

        for link in links:
            href = link.get_attribute('href')
            if href and any(ext in href.lower() for ext in ['.jpg', '.jpeg', '.png', '.gif']):
                image_links.append(href)

        if not image_links:
            # Capture the result of the recursive call
            image_links += extract_image_links(driver)

        return image_links

    images = extract_image_links(driver)
    # Quit the WebDriver
    driver.quit()
    return images


def find_in_array_of_json(array, key, value):
    """
    Find the first element in the array of JSON objects where the specified key has the given value.

    Args:
        array (list): The array of JSON objects to search through.
        key (str): The key to search for.
        value (any): The value to search for in the specified key.

    Returns:
        dict or None: The first JSON object in the array where the specified key has the given value,
                      or None if no such element is found.
    """
    for item in array:
        if item.get(key) == value:
            return item
    return None


def generate_url(url_base: str, components_uri: list, is_json: bool = True):
    """
    Generate a URL by concatenating components with the base URL.

    Args:
        url_base (str): The base URL.
        components_uri (list): List of components to append to the base URL.

    Returns:
        str: The generated URL.
    """
    if is_json:
        return url_base + "/" + "/".join(components_uri) + ".json"
    else:
        return url_base + "/" + "/".join(components_uri)


def filter_array_of_dicts(array_of_dicts, key, value):
    """
    Filter an array of dictionaries based on a given key-value pair.

    Args:
        array_of_dicts (list): The array of dictionaries to filter.
        key (str): The key to filter on.
        value (any): The value to filter for.

    Returns:
        list: The filtered list of dictionaries.
    """
    return [d for d in array_of_dicts if d.get(key) == value]


def download_images(image_links, output_path, overwrite=False):
    """
    Download images from the provided links.

    Args:
        image_links (list): List of image URLs to download.
        output_path (str): Path to the directory where the downloaded images will be saved.
        overwrite (bool, optional): Whether to overwrite images if they already exist in the output directory. Defaults to False.

    Returns:
        list: List of filenames of successfully downloaded images.
    """
    # Create the output directory if it does not exist
    os.makedirs(output_path, exist_ok=True)

    downloaded_images = []

    # Download images and save them to the output directory
    for i, image_link in enumerate(image_links):
        # Assuming images are JPEGs
        image_filename = os.path.join(output_path, f"image_{i+1}.jpg")
        if not overwrite and os.path.exists(image_filename):
            logger.info(
                f"Image already exists : '{image_filename}'. Skipping...")
            downloaded_images.append(image_filename)
            continue

        attempt = 0
        while attempt < MAX_RETRIES:
            try:
                response = requests.get(image_link)
                with open(image_filename, 'wb') as f:
                    f.write(response.content)
                downloaded_images.append(image_filename)
                break  # Break out of the retry loop if download is successful
            except Exception as e:
                logger.error(f"Failed to download image {image_link}: {e}")
                attempt += 1
                logger.error(f"Retrying ({attempt}/{MAX_RETRIES})...")
                time.sleep(RETRY_DELAY)
        else:
            logger.error(
                f"Failed to download image {image_link} after {MAX_RETRIES} attempts.")

    return downloaded_images


def process_tps(url_website, tps_dir):
    # Crawl website to get image links
    image_links = crawl_website(url=url_website)
    time.sleep(0.5)

    # Check if images are already downloaded
    if os.path.isdir(tps_dir) and len(os.listdir(tps_dir)) == len(image_links):
        logger.info(f"Images already downloaded at {tps_dir}. Skipping...")
    else:
        # Download images
        downloaded_images = download_images(image_links, tps_dir)
        
        # Check if all images are downloaded successfully
        if len(downloaded_images) == len(image_links):
            logger.info(f'Images downloaded to "{tps_dir}"')
        else:
            logger.error(f"Couldn't download all images to '{tps_dir}'")


def process_village(output_path:str, provincy:dict, regency:dict, district:dict, village:dict, election:str):
    village_dir = os.path.join(
                    output_path, provincy['nama'], regency['nama'], district['nama'], village['nama'])
    tps = getJSON(generate_url(url_base=url_base, components_uri=[
        provincy['kode'], regency['kode'], district['kode'], village['kode']]))
    chart = getJSON(generate_url(url_base=url_chart, components_uri=[
                    provincy['kode'], regency['kode'], district['kode'], village['kode']]))
    filtered_tps = [current_tps for current_tps in tps if current_tps['kode'] in chart['table'] and chart['table'][current_tps['kode']].get("1") is not None and chart['table'][current_tps['kode']]['status_progress']]

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        for current_tps in filtered_tps:
            tps_dir = os.path.join(village_dir, current_tps["nama"])
            url_website = generate_url(url_base=generate_url_form(election), components_uri=[
                provincy['kode'], regency['kode'], district['kode'], village['kode'], current_tps['kode']], is_json=False)
            executor.submit(process_tps, url_website, tps_dir)
        executor.shutdown(wait=True)
      

def main(provincy_id: str, regency_id: str, index_type_election: int):
    provinces = getJSON(generate_url(url_base=url_base, components_uri=["0"]))
    provincy = find_in_array_of_json(provinces, "kode", provincy_id)
    regencies = getJSON(generate_url(url_base=url_base,
                        components_uri=[provincy['kode']]))

    output_path = os.path.join(
        os.getcwd(), "output", TYPE_ELECTIONS[index_type_election][1])
    ensure_directory_exists(output_path)

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        regency = find_in_array_of_json(regencies, "kode", regency_id)
        districts = getJSON(generate_url(url_base=url_base, components_uri=[
                            provincy['kode'], regency['kode']]))
        for district in districts:
            villages = getJSON(generate_url(url_base=url_base, components_uri=[
                provincy_id, regency_id, district['kode']]))
            for village in villages:
                executor.submit(process_village, output_path, provincy, regency, district, village, TYPE_ELECTIONS[index_type_election][0])
        executor.shutdown(wait=True)


In [8]:
def multiline_input(lines: list = [], prompt=""):
    for line in lines:
        print(f"{line['nama']} ({line['kode']})")
    selected_item = input(prompt)
    return selected_item

selected_type_election_index = multiline_input(lines=[{"nama": x[1], "kode": i} for i, x in enumerate(
    TYPE_ELECTIONS)], prompt="Pilih Tipe Pemilu : ")

if selected_type_election_index:
    selected_type_election = TYPE_ELECTIONS[int(selected_type_election_index)]
    provincies = getJSON(generate_url(url_base, ["0"]))
    selected_provincy = multiline_input(lines=provincies, prompt="Pilih Kode Provinsi : ")
    
    if selected_provincy:
        regencies = getJSON(generate_url(url_base, [selected_provincy]))
        selected_regency = multiline_input(lines=regencies, prompt="Pilih Kode Kecamatan : ")
        
        if selected_regency:
            main(selected_provincy, selected_regency, int(selected_type_election_index))
        else:
            logger.error("Pilih Kode Kecamatan..")
    else:
        logger.error("Pilih Kode Provinsi..")
else:
    logger.error("Pilih Tipe Pemilu..")


PILPRES (0)
PILEG DPR (1)
PILEG DPRD PROVINSI (2)
PILEG DPRD KAB KOTA (3)
PEMILU DPD (4)
ACEH (11)
BALI (51)
BANTEN (36)
BENGKULU (17)
DAERAH ISTIMEWA YOGYAKARTA (34)
DKI JAKARTA (31)
GORONTALO (75)
JAMBI (15)
JAWA BARAT (32)
JAWA TENGAH (33)
JAWA TIMUR (35)
KALIMANTAN BARAT (61)
KALIMANTAN SELATAN (63)
KALIMANTAN TENGAH (62)
KALIMANTAN TIMUR (64)
KALIMANTAN UTARA (65)
KEPULAUAN BANGKA BELITUNG (19)
KEPULAUAN RIAU (21)
LAMPUNG (18)
Luar Negeri (99)
MALUKU (81)
MALUKU UTARA (82)
NUSA TENGGARA BARAT (52)
NUSA TENGGARA TIMUR (53)
P A P U A (91)
PAPUA BARAT (92)
PAPUA BARAT DAYA (96)
PAPUA PEGUNUNGAN (95)
PAPUA SELATAN (93)
PAPUA TENGAH (94)
RIAU (14)
SULAWESI BARAT (76)
SULAWESI SELATAN (73)
SULAWESI TENGAH (72)
SULAWESI TENGGARA (74)
SULAWESI UTARA (71)
SUMATERA BARAT (13)
SUMATERA SELATAN (16)
SUMATERA UTARA (12)
BANDUNG (3204)
BANDUNG BARAT (3217)
BEKASI (3216)
BOGOR (3201)
CIAMIS (3207)
CIANJUR (3203)
CIREBON (3209)
GARUT (3205)
INDRAMAYU (3212)
KARAWANG (3215)
KOTA BANDUNG (3273)
KOT