In [1]:
import functools
import shutil
import tempfile
from datetime import datetime
from pathlib import Path

import numpy as np
import pandas as pd
import requests

In [2]:
## Parameters
# Earliest year with municipality-level data
BEGIN: str = "01.01.1945"
# Reserve 2024 for out-of-sample predictions
# and, the new maps (starting Sept. 2023) aren't supported by the current code
END: str = "01.07.2023"
# We don't have municipality-level data for these votes.
EXCLUDES = [616, 617]


In [3]:
swiss_votes = pd.read_csv("../data/raw/swissvotes_dataset.csv", sep=";")
mun_reg = pd.read_excel(
    "../data/raw/be-t-00.04-agv-01.xlsx", sheet_name="GDE"
)  # List of municipalities
can_reg = pd.read_excel(
    "../data/raw/be-t-00.04-agv-01.xlsx", sheet_name="KT"
)  # List of cantons
mun_mut = pd.read_excel(
    "../data/raw/Mutierte_Gemeinden.xlsx", sheet_name="Daten", header=[0, 1]
)  # List of merged municipalities

  warn("Workbook contains no default style, apply openpyxl's default")


In [4]:
MISTAKES_MAP = {
    "Rebévelier (BE)": "Rebévelier",
    "Roggenburg (BE)": "Roggenburg",
    "Neuveville": "La Neuveville",
    "La La Neuveville": "La Neuveville",
    "Maconnens": "Massonnens",
    "Ichterswil": "Richterswil",
    "BBöckten": "Böckten",
    "Oberendingen": "Endingen",
    "Roveredo (TI)": "Roveredo-Capriasca",
    "MMülchi": "Mülchi",
    "DDättwil": "Dättwil",
    "Röthenbach b. Herzogenbuchsee": "Röthenbach bei Herzogenbuchsee",
    "Röthenbach bei Herz'buchsee": "Röthenbach bei Herzogenbuchsee",
    "Castel san Pietro": "Castel San Pietro",
    "Vugelles-la Mothe": "Vugelles-La Mothe",
    "Montet (GlMontet (Glâne)": "Montet (Glâne)",
    "La CLa Côte-aux-Fées": "La Côte-aux-Fées",
    "Le PLe Pâquier (FR)": "Le Pâquier (FR)",
    "MMörel": "Mörel",
    "LLü": "Lü",
    "NorNoréaz": "Noréaz",
    "TrTrüllikon": "Trüllikon",
    "KKüttigkofen": "Küttigkofen",
    "MMézières (VD)": "Mézières (VD)",
    "Büchseln": "Büchslen",
    "Lohn": "Lohn (GR)",
    "Jouxtens-Mezery": "Jouxtens-Mézery",
    "Crans-près-Celigny": "Crans-près-Céligny",
    "Saint-Leonard": "Saint-Léonard",
    "RRümlang": "Rümlang",
    "PrPréverenges": "Préverenges",
    "SSévaz": "Sévaz",
    "EnnetbEnnetbürgen": "Ennetbürgen",
    "RRüdlingen": "Rüdlingen",
    "SSévery": "Sévery",
    "LLüscherz": "Lüscherz",
    "Bois-d’Amont": "Bois-d'Amont",
    "Obersa Mundaun": "Obersaxen Mundaun"
}

## Part 1: Collect vote results on municipality level

In [8]:
class SwissvotesVotesDatasetBuilder:
    _xlsx_columns = ["region_id", "region_name", "votes_total", "votes_yes"]
    _drop_id_threshold = 19000

    def __init__(
        self,
        sv_df: pd.DataFrame,
        mun_reg: pd.DataFrame,
        can_reg: pd.DataFrame,
        mun_mut: pd.DataFrame,
        begin: str = BEGIN,
        end: str = END,
        excludes: list[int] = EXCLUDES,
        mistakes_map: dict = MISTAKES_MAP,
        cache_dir: Path = Path(tempfile.gettempdir()),
    ) -> None:
        self.sv_df = sv_df
        self.mun_reg = mun_reg
        self.can_reg = can_reg
        self.mun_mut = mun_mut
        self.cache_dir = cache_dir
        self.mistakes_map = mistakes_map
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.begin = begin
        self.end = end
        self.excludes = excludes

    @staticmethod
    def _extract_id(url):
        if pd.isnull(url):
            return None
        map_id = url.split("/")[-1]
        map_nr = map_id.split("_")[0]
        if "html" in map_nr:
            return None
        return map_nr

    @staticmethod
    def _download_file(url: str, path: Path):
        local_filename = path / url.split("/")[-1]
        with requests.get(url, stream=True, timeout=5) as r:
            with open(local_filename, "wb") as f:
                shutil.copyfileobj(r.raw, f)
        return local_filename

    @staticmethod
    def _combine_xlsx(df: pd.DataFrame):
        vote_df = pd.DataFrame(
            columns=SwissvotesVotesDatasetBuilder._xlsx_columns + ["vote_id"]
        )
        for _, row in df.iterrows():
            local_filename = row["local_filename"]
            print(f"Processing {local_filename}")
            try:
                xlsx = pd.read_excel(
                    local_filename, names=SwissvotesVotesDatasetBuilder._xlsx_columns
                )
            except Exception as e:
                print(f"Error processing {local_filename}: {e}")
                continue
            xlsx["vote_id"] = row["anr"]
            # Drop rows that are not data.
            xlsx = xlsx[pd.to_numeric(xlsx["region_id"], errors="coerce").notnull()]
            # Convert region_id to numeric.
            xlsx["region_id"] = pd.to_numeric(xlsx["region_id"], errors="coerce")
            # Drop votes that don't belong to a region.
            xlsx = xlsx[
                xlsx["region_id"] < SwissvotesVotesDatasetBuilder._drop_id_threshold
            ]
            vote_df = pd.concat([vote_df, xlsx])
        return vote_df

    def _harmonize_municipalities(self, df: pd.DataFrame):
        df = df.dropna(subset=["region_name"])
        # Adjust municipality names to match the ones in the municipality register.
        df["region_name"] = df["region_name"].replace(self.mistakes_map)
        df["region_name"] = df["region_name"].replace(
            mun_mut[("Vorgänger", "Gemeindename")].tolist(),
            mun_mut[("Nachfolger", "Gemeindename")].tolist(),
        )

        # Map municipality names to municipality IDs.
        muni_id_map = self.mun_reg.set_index("GDENAME")["GDENR"]
        df["region_id"] = df["region_name"].map(muni_id_map)

        # Map municipality IDs to canton IDs.
        can_id_map = self.can_reg.set_index("GDEKTNA")["KTNR"]
        self.mun_reg["KTNR"] = self.mun_reg["GDEKTNA"].map(can_id_map)
        muni_id_can_id_map = self.mun_reg.set_index("GDENR")["KTNR"]
        df["canton_id"] = df["region_id"].map(muni_id_can_id_map)
        return df
        
    def build(self):
        url_df = self.sv_df[["anr", "datum", "bfsmap-de"]]
        start_date = datetime.strptime(self.begin, "%d.%m.%Y")
        end_date = datetime.strptime(self.end, "%d.%m.%Y")
        url_df["datum"] = pd.to_datetime(url_df["datum"], format="%d.%m.%Y")
        url_df = url_df[(url_df["datum"] >= start_date) & (url_df["datum"] <= end_date)]
        url_df["map_nr"] = url_df["bfsmap-de"].apply(self._extract_id)
        url_df["xlsx"] = url_df["map_nr"].apply(
            lambda map_nr: f"https://www.atlas.bfs.admin.ch/core/projects/12/xshared/xlsx/{map_nr}_131.xlsx"
        )
        dl_func = functools.partial(self._download_file, path=self.cache_dir)
        # url_df["local_filename"] = url_df["xlsx"].apply(dl_func)
        # url_df = url_df.dropna(subset=["map_nr"])
        url_df["local_filename"] = self.cache_dir / url_df["map_nr"].apply(lambda x: f"{x}_131.xlsx")
        vote_df = self._combine_xlsx(url_df)
        vote_df.to_csv(self.cache_dir / "votes.csv", index=False)
        vote_df = pd.read_csv(self.cache_dir / "votes.csv")
        vote_df = self._harmonize_municipalities(vote_df)
        vote_df = self._postprocess(vote_df)
        self.df = vote_df

    def _postprocess(self, df):
        df = df.drop_duplicates()
        df = df[~df["vote_id"].isin(self.excludes)]
        # Keep missing vote results as NaN; handle them later.
        df["votes_total"] = df["votes_total"].replace("*", np.nan)
        df["votes_yes"] = df["votes_yes"].replace("*", np.nan)
        df["votes_total"] = df["votes_total"].astype(float)
        df["votes_yes"] = df["votes_yes"].astype(float)
        return df

    @property
    def dataset(self):
        """Accessor for the dataset."""
        return self.df

In [10]:
builder = SwissvotesVotesDatasetBuilder(
    swiss_votes,
    mun_reg=mun_reg,
    can_reg=can_reg,
    mun_mut=mun_mut,
    begin=BEGIN,
    end=END,
    excludes=EXCLUDES,
    cache_dir=Path("../data/interim/swissvotes"),
    mistakes_map=MISTAKES_MAP,
)
builder.build()
builder.dataset.to_csv("../data/processed/swissvotes_votes.csv", index=False)

## Part 2: Topics / Text Features

In [350]:
class SwissvotesTopicDatasetBuilder:
    def __init__(
        self,
        sv_df: pd.DataFrame,
        mun_map: pd.DataFrame,
        mun_mut: pd.DataFrame,
        begin: str = BEGIN,
        end: str = END,
        excludes: list[int] = EXCLUDES,
        mistakes_map: dict = MISTAKES_MAP,
        cache_dir: Path = Path(tempfile.gettempdir()),
    ) -> None:
        self.sv_df = sv_df
        self.mun_map = mun_map
        self.mun_mut = mun_mut
        self.cache_dir = cache_dir
        self.mistakes_map = mistakes_map
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.begin = begin
        self.end = end
        self.excludes = excludes

    @staticmethod
    def _transform_topics(df: pd.DataFrame):
        tdf = df.replace(".", np.nan)
        code_cols = []
        for i in range(1, 4):
            col_1 = f"d{i}e1"
            col_2 = f"d{i}e2"
            col_3 = f"d{i}e3"
            tdf[col_1] = tdf[col_1].fillna(0.0)
            tdf[col_2] = tdf[col_2].fillna(tdf[col_1])
            tdf[col_3] = tdf[col_3].fillna(tdf[col_2])
            code_cols.append((tdf[col_3].astype(float) * 100).astype(int))
            tdf.drop([col_1, col_2, col_3], axis=1, inplace=True)

        codes = set(np.concatenate([code_col.values for code_col in code_cols]))
        codes.remove(0)

        topics = [pd.get_dummies(pd.Categorical(code_col, categories=codes)) for code_col in code_cols]
        topics = functools.reduce(lambda x, y: x | y, topics)
        topics.columns = [f"topic_{i}" for i in topics.columns]
        tdf.reset_index(inplace=True)
        tdf = tdf.join(topics)
        tdf.drop("index", axis=1, inplace=True)
        return tdf


    def build(self):
        topic_df = self.sv_df[
            [
                "anr",
                "datum",
                "d1e1",
                "d1e2",
                "d1e3",
                "d2e1",
                "d2e2",
                "d2e3",
                "d3e1",
                "d3e2",
                "d3e3",
            ]
        ]
        start_date = datetime.strptime(self.begin, "%d.%m.%Y")
        end_date = datetime.strptime(self.end, "%d.%m.%Y")
        topic_df["datum"] = pd.to_datetime(topic_df["datum"], format="%d.%m.%Y")
        topic_df = topic_df[(topic_df["datum"] >= start_date) & (topic_df["datum"] <= end_date)]
        topic_df = topic_df[~topic_df["anr"].isin(self.excludes)]
        self.topic_df = topic_df
        topic_df = self._transform_topics(topic_df)
        self.topic_df = topic_df

    @property
    def dataset(self):
        """Accessor for the dataset."""
        return self.topic_df

In [351]:
builder = SwissvotesTopicDatasetBuilder(
    swiss_votes,
    mun_map=mun_reg,
    mun_mut=mun_mut,
    begin=BEGIN,
    end=END,
    excludes=EXCLUDES,
    cache_dir=Path("../data/interim/swissvotes"),
    mistakes_map=MISTAKES_MAP,
)

builder.build()
builder.dataset.to_csv("../data/processed/swissvotes_topics.csv", index=False)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  topic_df["datum"] = pd.to_datetime(topic_df["datum"], format="%d.%m.%Y")
