In [1]:
from io import StringIO
from pathlib import Path

import geocoder
import geopandas as gpd
import h3
import pandas as pd
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from srai.regionalizers import VoronoiRegionalizer, geocode_to_region_gdf
from tqdm import tqdm

In [2]:
base_url = 'https://wybory.gov.pl'

In [9]:
def get_polling_districts_with_urls(tr) -> tuple[int, str, str]:
    a_href = tr.select_one("a", href=True)
    a_href.find("div").decompose()

    address = tr.find_all("td")[-1].text

    return (
        int(a_href.text.strip().replace(" ", "").replace("\xa0", "")),
        a_href["href"],
        address,
    )


def get_votes_from_polling_district(driver, url) -> tuple[dict, int, int]:
    # president
    driver.get(url)
    delay = 5  # seconds

    WebDriverWait(driver, delay).until(
        EC.presence_of_element_located((By.ID, "obkw_can_cont_4_1"))
    )

    html = driver.page_source
    soup = BeautifulSoup(html, "html.parser")

    turnout_div = soup.select_one("div.turnout").select_one("table")
    turnout_tbl = pd.read_html(StringIO(str(turnout_div)))[0]
    voters_possible = int(
        turnout_tbl[
            turnout_tbl[0].str.contains("Liczba wyborców")
            & turnout_tbl[0].str.contains("Prezydenta")
        ]
        .iloc[0][1]
        .replace("\xa0", "")
    )

    voters_voted = int(
        turnout_tbl[
            turnout_tbl[0].str.contains("Liczba kart")
            & turnout_tbl[0].str.contains("Prezydenta")
        ]
        .iloc[0][1]
        .replace("\xa0", "")
    )

    candidates_div = soup.select_one("div#obkw_can_cont_4_1").select_one("table")
    candidates_tbl = pd.read_html(StringIO(str(candidates_div).replace("\xa0", "")))[0]
    votes_per_candidate = (
        candidates_tbl[["Nazwisko i imiona", "Liczba głosów na kandydata"]]
        .set_index("Nazwisko i imiona")["Liczba głosów na kandydata"]
        .to_dict()
    )
    return voters_possible, voters_voted, votes_per_candidate


def download_data_for_city(city_code: str):
    if (
        not Path(f"../../output_data/{city_code}_polling_districts_data.json").exists()
        or not Path(f"../../output_data/{city_code}_h3_votes_data.json").exists()
        or not Path(f"../../output_data/{city_code}_geo_data.geojson").exists()
        or not Path(f"../../output_data/{city_code}_voronoi_data.geojson").exists()
    ):
        city_data = (
            pd.read_csv("../../input_data/nec_urls.csv", sep=",")
            .query(f'city == "{city_code}"')
            .iloc[0]
        )

        geocoded_addresses = {}
        polling_districts = {}

        driver = webdriver.Firefox()

        if not Path(f"../input_data/{city_code}.csv").exists():
            scraping_url = city_data.url

            driver.get(scraping_url)
            delay = 5  # seconds

            WebDriverWait(driver, delay).until(
                EC.presence_of_element_located((By.CLASS_NAME, "obkw"))
            )

            div = None
            while not div:
                html = driver.page_source
                soup = BeautifulSoup(html, "html.parser")
                div = soup.select_one("div.obkw").select_one("table")

            for tr in tqdm(div.find_all("tr")[1:]):
                number, district_url, address = get_polling_districts_with_urls(tr)
                if address not in geocoded_addresses:
                    geocoded_addresses[address] = geocoder.arcgis(address).json

                if (
                    not geocoded_addresses[address]
                    or not geocoded_addresses[address]["ok"]
                ):
                    print("Can't geocode:", address, geocoded_addresses[address])
                    continue

                polling_districts[number] = (district_url, geocoded_addresses[address])

            polling_districts_df = pd.DataFrame(
                [
                    dict(idx=k, url=v[0], lon=v[1]["lng"], lat=v[1]["lat"])
                    for k, v in polling_districts.items()
                ]
            )
            polling_districts_df.to_csv(
                f"../../input_data/{city_code}.csv", index=False
            )

        polling_districts_df = pd.read_csv(f"../../input_data/{city_code}.csv")
        gdf = gpd.GeoDataFrame(
            data=polling_districts_df,
            geometry=gpd.GeoSeries.from_xy(
                polling_districts_df.lon, polling_districts_df.lat, crs=4326
            ),
        )
        gdf["h3"] = gdf.geometry.apply(lambda pt: h3.latlng_to_cell(pt.y, pt.x, 15))

        votes_per_district = {}
        votes_possible_per_district = {}
        votes_voted_per_district = {}
        for _, row in tqdm(gdf.iterrows(), total=len(gdf)):
            idx = row.idx
            url = base_url + row.url
            voters_possible, voters_voted, votes_per_candidate = (
                get_votes_from_polling_district(driver, url)
            )
            votes_per_district[idx] = votes_per_candidate
            votes_possible_per_district[idx] = voters_possible
            votes_voted_per_district[idx] = voters_voted

        rows = []
        for k, v in votes_per_district.items():
            rows.append(
                {
                    "idx": k,
                    **v,
                    "voters_total": votes_possible_per_district[k],
                    "voters_voted": votes_voted_per_district[k],
                }
            )

        df_votes_per_district = pd.DataFrame(rows)
        joined_full_data = gdf.merge(df_votes_per_district, on="idx")
        joined_full_data = joined_full_data[joined_full_data["voters_voted"] >= 50]

        candidates_names = list(
            votes_per_district[list(votes_per_district.keys())[0]].keys()
        )
        for candidate_name in candidates_names:
            joined_full_data[candidate_name] = (
                joined_full_data[candidate_name]
                .apply(lambda x: x.replace("\xa0", "") if isinstance(x, str) else x)
                .astype(int)
            )

        polling_districts_data = joined_full_data[
            [
                "h3",
                "idx",
                *candidates_names,
                "voters_total",
                "voters_voted",
            ]
        ]

        polling_districts_data["total_votes"] = polling_districts_data[
            candidates_names
        ].sum(axis=1)
        for candidate_name in candidates_names:
            polling_districts_data[f"{candidate_name}_%"] = (
                polling_districts_data[candidate_name]
                / polling_districts_data["total_votes"]
            )

        polling_districts_data["turnout_%"] = polling_districts_data[
            "voters_voted"
        ].astype(float) / polling_districts_data["voters_total"].astype(float)

        polling_districts_data.to_json(
            f"../../output_data/{city_code}_polling_districts_data.json",
            orient="records",
        )

        h3_votes_data = (
            joined_full_data[["h3", *candidates_names, "voters_voted", "voters_total"]]
            .groupby("h3")
            .sum()
            .reset_index()
        )

        h3_votes_data["total_votes"] = h3_votes_data[candidates_names].sum(axis=1)
        for candidate_name in candidates_names:
            h3_votes_data[f"{candidate_name}_%"] = (
                h3_votes_data[candidate_name] / h3_votes_data["total_votes"]
            )

        h3_votes_data["turnout_%"] = (
            h3_votes_data["voters_voted"] / h3_votes_data["voters_total"]
        )

        h3_votes_data.to_json(
            f"../../output_data/{city_code}_h3_votes_data.json", orient="records"
        )

        geo_distinct_data = (
            joined_full_data[
                [
                    "h3",
                    "geometry",
                ]
            ]
            .groupby("h3")
            .first()
        )
        geo_distinct_data = gpd.GeoDataFrame(geo_distinct_data).set_crs(
            4326, allow_override=True
        )

        geo_distinct_data.to_file(f"../../output_data/{city_code}_geo_data.geojson")

        area = geocode_to_region_gdf(city_data.geocode)

        voronoi_regions = VoronoiRegionalizer(seeds=geo_distinct_data).transform(area)
        voronoi_regions.to_file(f"../../output_data/{city_code}_voronoi_data.geojson")

        driver.quit()

In [4]:
cities = [
    "kra",
    "gda",
    "poz",
    "szc",
    "gdy",
    "bia",
    "byd",
    "tor",
    "zie",
    "rad",
    "lub",
    "lod",
    "opo",
    "kat",
    "kie",
    "rze",
]

In [10]:
for city in cities:
    download_data_for_city(city)

 96%|█████████▌| 305/317 [02:45<00:08,  1.49it/s]

Can't geocode: None


 97%|█████████▋| 306/317 [02:46<00:08,  1.30it/s]

Can't geocode: None


100%|██████████| 317/317 [02:56<00:00,  1.80it/s]
100%|██████████| 315/315 [05:45<00:00,  1.10s/it]
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  polling_districts_data["total_votes"] = polling_districts_data[
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  polling_districts_data[f"{candidate_name}_%"] = (
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-