# Electoral geography: predicting the universe of city-level vote-shares with a sample of cities

Outline: the objectives of this exercise are:

(1) to predict changes in the vote-share of the workers party in Brazil (PT) for every municipality in Brazil, using only a subset of such municipalities. I will train a Graph Neural Network to make predictions.

(2) given some treatment affecting a subset of municipalities (treated), use the model to predict changes in the vote-share of the treated group using only the control group (non-treated). I will train the model in the years before treatment, and I will use the model to predict the counterfactual outcome of the treated using the control group, before and after treatment. The model bias will be estimated as the difference between the predicted and the actual outcome of the treated group. If the model is roughly unbiased before treatment, then the difference between the predicted and the actual outcome of the treated group after treatment will be a measure of the average treatment effect (ATE = -bias).

First, import the necessary libraries and load the data.

In [None]:
import os
os.chdir(r"C:\Users\rafap\Desktop\RA2023 - personal files\neuralnet-counterfactuals")


In [None]:
!pip install torch_geometric

import pandas as pd
import numpy as np
import torch
from torch_geometric.data import Data, Batch
from scipy.spatial.distance import pdist, squareform
from sklearn.neighbors import NearestNeighbors
import torch_geometric.utils
import matplotlib.pyplot as plt
import torch
from torch.nn import Linear
from torch_geometric.nn import GCNConv
from sklearn.manifold import TSNE
import torch.nn.functional as F
from torch_geometric.nn import GATConv # [^1^][1]

# Load the dta datasets into a pandas dataframe
df2006 = pd.read_stata(rf"data/brazilian_municipalities2006.dta")
df2010 = pd.read_stata(rf"data/brazilian_municipalities2010.dta")
df2014 = pd.read_stata(rf"data/brazilian_municipalities2014.dta")
df2018 = pd.read_stata(rf"data/brazilian_municipalities2018.dta")
df2022 = pd.read_stata(rf"data/brazilian_municipalities2022.dta")

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device
device_cpu = torch.device('cpu')

Now, I create a list containing the graphs of each election. Each graph is a tensor_geometric Data object, which contains the adjacency matrix and the features of each node. The graph structure is defined by linking each city to it's 10 closest peers (in terms of geographical distance).

Roughly, I take the datasets for the years 2010 through 2022 and I define the output vector (Y) as the difference in vote-share in the runoff for the PT (workers party) between the current and last election. The X matrix, containing covariates, includes city-level covariates extracted from the Brazilian 2010 census. Note that it also includes the output vector (difference in vote-share). The goal here is to train the network by (1) dropping some cities in the training set arbitralily, and (2) predicting the vote-share of the PT in those cities through convolutions/attention layers. That is, I leverage "local" voting patterns to extrapolate voting behavior to the omitted-outcome cities. The network should thus be learning an underlying geographical contagion/distribution process of voting behavior. In evaluating the model, I will only use cities out of the training set. In particular, I will drop "diff" for half the cities in the test set and predict their vote-share based on the other half (see the definition of Z).

I also define treatment variables here - i.e. I drop the "labels" or outcomes of the treated cities, and will use the non-dropped controls to predict the counterfactuals for the treated cities.

In [None]:
data_list = []
treatment_list = []
treatment_mask = []
Z_list = []
Zmask_list = []
#loop over years
for df in [df2010, df2014, df2018, df2022]:
    # encode the cateogrical variables as integers
    df["regiao"] = df["regiao"].astype("category").cat.codes
    df["sigla_uf"] = df["sigla_uf"].astype("category").cat.codes

    #drop missing values
    df = df[~np.isnan(df).any(axis=1)]

    #create difference from previous election to current election
    df["diff"] = df["pt_share"] - df["l4pt_share"]
    Y = df.loc[:, ["diff"]]
    #divide Y in 100 quantiles
    Y = pd.qcut(Y["diff"], q=100, labels=False)
    Y = Y.to_numpy()

    X = df.loc[:, ["diff", "l4pt_share", "populacao", "evangelico", "urban", "radio", "televisao", "idade", "alfabetizado", "rend_total", "area", "density", "white", "nasceu_mun", "horas_trabprin", "filhos_nasc_vivos", "high_school", "bachelor", "vive_conjuge", "sexo", "high"]]
    #save the collumn high in a pd series
    high = X["high"]
    #normalize the data
    X = (X - X.mean()) / X.std()
    #add the collumn high to the normalized data
    X["high"] = high

    # Create treat, which is a copy of df with the values of diff set to 0 if high (my treatment variable) is 1
    # This is to evaluate the effect of treatment in 2022
    treat = X.copy().to_numpy()
    treat[:, 0] = treat[:, 0] * (1 - treat[:, 20])
    #drop collumn high from treat
    treat = treat[:, :-1]
    # Create a tensor for treat
    treat = torch.tensor(treat, dtype=torch.float).to(device)
    # Create a boolean taking values 1 if treat is 0 and 1 otherwise (use sourceTensor.clone().detach())
    treat_mask = treat[:, 0].clone().detach()
    treat_mask[treat_mask != 0] = 1
    treat_mask = torch.logical_not(treat_mask).to(device)
    treatment_list.append(treat)
    treatment_mask.append(treat_mask)

    #drop the collumn high
    X = X.drop(columns=["high"])

    # Create Z, which is a copy of X with a random 50% of the values in diff set to 0
    # This is for testing the model at the end
    Z = X.copy().to_numpy()
    Z[:, 0] = np.random.choice([0, 1], size=Z.shape[0], p=[0.5, 0.5]) * Z[:, 0]
    # Create a tensor for Z
    Z = torch.tensor(Z, dtype=torch.float).to(device)
    # Create a boolean taking values 1 if Z is 0 and 1 otherwise (use sourceTensor.clone().detach())
    Z_mask = Z[:, 0].clone().detach()
    Z_mask[Z_mask != 0] = 1
    Z_mask = torch.logical_not(Z_mask).to(device)
    Z_list.append(Z)
    Zmask_list.append(Z_mask)

    # Extract the coordinates as a numpy array
    coords = df[["_Y0", "_X0"]].to_numpy()

    # Alternatively, use a k-nearest neighbors algorithm to find the 50 nearest neighbors for each city
    nn = NearestNeighbors(n_neighbors=10, metric="euclidean")
    nn.fit(coords)
    dist_sparse = nn.kneighbors_graph(mode="distance")

    # Convert the sparse matrix to edge index and edge weight tensors
    edge_index, edge_attr = torch_geometric.utils.convert.from_scipy_sparse_matrix(dist_sparse)

    # Make X and Y tensors
    X = torch.tensor(X.to_numpy(), dtype=torch.float)
    Y = torch.tensor(Y, dtype=torch.float)
    data = Data(x=X, y=Y, edge_attr=edge_attr, edge_index=edge_index)

    # Get the number of cities
    num_cities = data.num_nodes

    # Randomly select 2500 cities for training
    perm = torch.randperm(num_cities)
    train_idx = perm[:2500]

    # Split the remaining cities into validation and test sets
    val_idx, test_idx = torch.split(perm[2500:], num_cities // 2)

    # Create boolean masks for each set
    train_mask = torch.zeros(num_cities, dtype=torch.bool)
    train_mask = train_mask.scatter(0, train_idx, 1)

    val_mask = torch.zeros(num_cities, dtype=torch.bool)
    val_mask = val_mask.scatter(0, val_idx, 1)

    test_mask = torch.zeros(num_cities, dtype=torch.bool)
    test_mask = test_mask.scatter(0, test_idx, 1)

    # Add the masks as attributes to the data object
    data.train_mask = train_mask
    data.val_mask = val_mask
    data.test_mask = test_mask

    # Convert the labels to long tensors
    data.y = data.y.long()

    data.validate(raise_on_error=True)

    in_features = X.shape[1]
    #define out_features as the number of quantiles in Y, as a function of Y
    out_features = len(np.unique(Y))

    #put in gpu
    data.to(device)

    #append to data_list
    data_list.append(data)

We now have the graphs for each year stored in data_list. We can access some graph information as follows:

In [None]:
print(data_list[0])
print('==============================================================')

# Gather some statistics about the graph.
print(f'Number of nodes: {data.num_nodes}')
print(f'Number of edges: {data.num_edges}')
print(f'Average node degree: {data.num_edges / data.num_nodes:.2f}')
print(f'Number of training nodes: {data.train_mask.sum()}')
print(f'Training node label rate: {int(data.train_mask.sum()) / data.num_nodes:.2f}')
print(f'Has isolated nodes: {data.has_isolated_nodes()}')
print(f'Has self-loops: {data.has_self_loops()}')
print(f'Is undirected: {data.is_undirected()}')


To test whether the predictions also generalize across years, I take 2022 (and possibly 2018) away from the training set. I'll then apply the model to 2022 and check the performance.

In [None]:
# store all the elections
data2010 = data_list[0]
data2014 = data_list[1]
data2018 = data_list[2]

# store the last item of data_list to data2022
data2022 = data_list[-1]
data_list.pop(-1)
#data_list.pop(-1) #if want to train without 2018
data_list

I now turn to defining the model. I will use a Graph Attention Network (GAT). I have a dropout layer, which omits the information of some nodes in each layer. I then have two GAT layers, with a RELU activation function in between, which are the core of the model. Finally, I have a linear layer, which outputs the predicted vote-share for the PT in each city. The model is defined as follows:

In [None]:
import torch.nn as nn
import random
j = random.randint(0, 1000)
class GAT(nn.Module):
    def __init__(self):
        super().__init__()
        torch.manual_seed(j)
        self.conv1 = GATConv(in_features, in_features, heads=2)
        self.conv2 = GATConv(in_features*2, in_features, heads=2)
        self.lin1 = Linear(in_features*2, out_features)

    def forward(self, x, edge_index, edge_attr):
        #make drops start at 0.9 and decrease by 0.05 every 10 epochs, until it reaches 0.5
        drops = 0.9 - (0.05 * (epoch // 10))
        if drops < 0.5:
            drops = 0.5
        x = F.dropout(x, p=drops, training=self.training)
        x = self.conv1(x, edge_index, edge_attr)
        x = F.relu(x)
        x = self.conv2(x, edge_index, edge_attr)
        x = self.lin1(x)
        return x

model = GAT()
model = model.to(device)
print(model)

The training of the model is simple: for every epoch I actually take one optimization step for each election included in the training. By alternating the elections at every training step (instead of training on each election sequentially), I hope to capture a more robust relationship underlying electoral geographical distributions. Note that I start by dropping 90% of the nodes in the dropout layer, going down to 50% as the training progresses.

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
criterion = torch.nn.CrossEntropyLoss().to(device)

def train():
    #loop over items of datalist
    for data in data_list:
        model.train()
        optimizer.zero_grad()  # Clear gradients.
        out = model(data.x, data.edge_index, data.edge_attr)  # Perform a single forward pass.
        # Compute the loss solely based on the training nodes.
        loss = criterion(out[data.train_mask], data.y[data.train_mask])
        loss.backward()  # Derive gradients.
        optimizer.step()  # Update parameters based on gradients.
    return loss

for epoch in range(1, 100):
    drops = 0.9
    loss = train()
    print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}')

Testing is defined as follows. There are 3 functions. The main one (test(year)), is used to test the model's predictions for the treatment groups defined above. Unexpected biases in the predictions of the model are interpreted as the effect of treatment (since we are comparing with the models "counterfactual" predictions).

The other two are simple test functions, which evaluate the models capabilities to predict vote-shares of a sample of randomly selected cities given the remaining ones' vote-shares in 2022. More precisely: Z is defined above by taking data2022 and setting half of the values of diff to 0. Z_mask is a boolean vector taking value 1 if diff was set to 0 in a particular observation. Here, I evaluate the models ability to predict diff for those cities that had their diff set to 0.

test_testset() evaluates the model's fit for the test set, whereas test_trainset() evaluates the model's fit for the training set (the latter is mainly to diagnose overfitting in case the accuracy in the training set is much larger than in the test set.)

In [None]:
def test(year):
      model.eval()
      #test on each dataset in datalist
      data = data_list
      #data.append(data2018) #if trained without 2018
      data.append(data2022)
      data = data[int((year+2)/4-503)] #the weird integer is just the list index (0,1,2,3) from the year list (2010, 2014, 2018, 2022)
      out = model(treatment_list[int((year+2)/4-503)], data.edge_index, data.edge_attr)
      pred = out.argmax(dim=1)  # Use the class with highest probability.
      #note: no need to use treatment_mask list since the selection across years is constant (what changes is the dataset with diff)
      mask = treat_mask*data.test_mask #add *data.test_mask if year is in in the training set (so we exclude the training cities for each year)
      test_correct = pred[mask] == data.y[mask]  # Check against ground-truth labels.
      test_acc = int(test_correct.sum()) / int(mask.sum())  # Derive ratio of correct predictions.
      # calculate the mean squared error, make input a float tensor
      mse = torch.mean((pred[mask].float() - data.y[mask].float())**2)
      # bias
      bias = torch.mean((pred[mask].float() - data.y[mask].float()))
      # generate a tensor with the values of data.y[mask] but randomly shuffled
      shuffled = data.y[mask].float().clone()
      shuffled = shuffled[torch.randperm(len(shuffled))]
      #mse with suffled data
      mse_shuffled = torch.mean((shuffled - data.y[mask].float())**2)
      #fake rsquared
      r2 = 1 - mse/mse_shuffled
      #plot the two histograms together, with transparency and different colors
      plt.hist(pred[mask].cpu().float() - data.y[mask].cpu().float(), alpha=0.5, label='Predictions')
      plt.hist(shuffled.cpu() - data.y[mask].cpu().float(), alpha=0.5, label='Shuffled')
      plt.legend(loc='upper right')
      plt.show()
      return test_acc, pred, mse, bias, r2, out

#Test on the test data
def test_testset(year):
      model.eval()
      #test on each dataset in datalist
      data = data_list
      #data.append(data2018) #if trained without 2018
      data.append(data2022)
      data = data[int((year+2)/4-503)] #the weird integer is just the list index (0,1,2,3) from the year list (2010, 2014, 2018, 2022)
      out = model(Z_list[int((year+2)/4-503)], data.edge_index, data.edge_attr)
      pred = out.argmax(dim=1)  # Use the class with highest probability.
      mask = Zmask_list[int((year+2)/4-503)]*data.test_mask #add *data.test_mask if year is in in the training set (so we exclude the training cities for each year)
      test_correct = pred[mask] == data.y[mask]  # Check against ground-truth labels.
      test_acc = int(test_correct.sum()) / int(mask.sum())  # Derive ratio of correct predictions.
      # calculate the mean squared error, make input a float tensor
      mse = torch.mean((pred[mask].float() - data.y[mask].float())**2)
      # bias
      bias = torch.mean((pred[mask].float() - data.y[mask].float()))
      # generate a tensor with the values of data.y[mask] but randomly shuffled
      shuffled = data.y[mask].float().clone()
      shuffled = shuffled[torch.randperm(len(shuffled))]
      #mse with suffled data
      mse_shuffled = torch.mean((shuffled - data.y[mask].float())**2)
      #fake rsquared
      r2 = 1 - mse/mse_shuffled

      #plot the two histograms together, with transparency and different colors
      plt.hist(pred[mask].cpu().float() - data.y[mask].cpu().float(), alpha=0.5, label='Predictions')
      plt.hist(shuffled.cpu() - data.y[mask].cpu().float(), alpha=0.5, label='Shuffled')
      plt.legend(loc='upper right')
      #save plot
      plt.savefig(rf"outputs/performance.pdf")
      return test_acc, pred, mse, bias, r2, out

#test on the training data
def test_trainset():
        model.eval()
        data = data2022
        out = model(Z, data.edge_index, data.edge_attr)
        pred = out.argmax(dim=1)  # Use the class with highest probability.
        mask = Z_mask*data.train_mask #add *data.train_mask if data2022 in the training set
        test_correct = pred[mask] == data.y[mask]  # Check against ground-truth labels.
        test_acc = int(test_correct.sum()) / int(mask.sum())  # Derive ratio of correct predictions.
        return test_acc


test_acc, pred, mse, bias, r2, out = test(2022)
print(f'Test Accuracy: {test_acc:.4f}')
print(f'MSE: {mse:.4f}')
print(f'Bias: {bias:.4f}')
print(f'Fake R2: {r2:.4f}')
train_acc = test_trainset()
print("-------------------------")
print(f'Training Accuracy: {train_acc:.4f}')

Now I train the model 100 times. For each training, I save the model's average bias in the treatment group predictions. I'll then plot the bias distribution for each year.

Note that if I use test() here, I will be storing the average bias in the model's prediction to my treatment group. As mentioned before, this allows me to estimate a treatment effect under the assumption that the model's predictions are a good counterfactual in the absence of treatment. I will then plot the distribution of the treatment effect estimates for each year.

We can easily do a placebo analysis, by instead using test_testset(), which instead just amounts to storing bias values for a random selection of cities at every run.

In [None]:
#for some reason running the code calling another file is MUCH faster.
bias_list = []
for loop in range(100):
    #toberun is just a copy of the code above (not including this and what follows).
    with open(r"code\toberun.py") as f:
        exec(f.read())
    bias = []
    for year in range(2010, 2026, 4):
        #use test_testset if want to evaluate on "fake" treatment (i.e. model performance across years)
        test_acc_temp, pred, mse_temp, bias_temp, r2_temp, out = test(year)
        bias.append(bias_temp)
    bias_list.append(bias)
    print(loop)

What follows are just manipulations of the data to plot the results.

In [None]:
#make bias_list a numpy array
bias_list = [[x.cpu().item() for x in y] for y in bias_list]
bias_list = np.array(bias_list)

#calculate the mean of each column
bias_mean = np.mean(bias_list, axis=0)

#save bias_list as a csv file
np.savetxt(rf"outputs/bias_list.csv", bias_list, delimiter=',')

In [None]:
# gen long_bias, which is bias_list but in panel data form. The resulting dataframe has 2 collumns: year and bias
long_bias = pd.DataFrame(bias_list)

#training on 2010-2018
#long_bias = pd.read_csv(rf"{path}\outputs\results_train_2010_2018.csv", header=None)
long_bias2 = pd.read_csv(rf"outputs/results_2010_2018_random.csv", header=None)

#training on 2010-2014
#long_bias = pd.read_csv(r'C:\Users\rafap\Documents\Masters\fourth semester\MachineLearning\GNN project\results_train_2010_2014.csv', header=None)
#long_bias2 = pd.read_csv(r'C:\Users\rafap\Documents\Masters\fourth semester\MachineLearning\GNN project\results_2010_2014_random.csv', header=None)

#get bias_list in from the saved csv file
long_bias = long_bias.stack().reset_index()

#make level_1 be called year and set 0 as 2010, 1 as 2014, etc
long_bias = long_bias.rename(columns={'level_1': 'year'})
long_bias['year'] = long_bias['year'].replace([0, 1, 2, 3], [2010, 2014, 2018, 2022])
#rename collumn 0 as bias
long_bias = long_bias.rename(columns={0: 'bias'})
#drop everything but bias and year
long_bias = long_bias.drop(columns=['level_0'])

#do the same for long_bias2
long_bias2 = long_bias2.stack().reset_index()
long_bias2 = long_bias2.rename(columns={'level_1': 'year'})
long_bias2['year'] = long_bias2['year'].replace([0, 1, 2, 3], [2010, 2014, 2018, 2022])
long_bias2 = long_bias2.rename(columns={0: 'bias'})
long_bias2 = long_bias2.drop(columns=['level_0'])
#replace 2010 with 2010_1, 2014 with 2014_1, etc
long_bias2['year'] = long_bias2['year'].replace([2010, 2014, 2018, 2022], [2011, 2015, 2019, 2023])

#concatenate long_bias and long_bias2
long_bias = pd.concat([long_bias, long_bias2], ignore_index=True)

#create var random, which is 1 if the year is in the list 2010.1, 2014.1 etc and 0 otherwise
long_bias['random'] = np.where(long_bias['year'].isin([2011, 2015, 2019, 2023]), 1, 0)
#make random a categorical variable
long_bias['random'] = long_bias['random'].astype('category')
#make year a categorical variable
long_bias['year'] = long_bias['year'].astype('category')


In [None]:
from plotnine import *

p = (
    # Plot violin plots for each year
    ggplot(long_bias, aes(x='factor(year)', y='bias', fill="random")) +
    geom_violin(scale='width', trim=False, alpha=0.2) +
    # Plot the mean bias for each year
    geom_point(aes(x='factor(year)', y='bias'), stat='summary', fun_y=np.mean, size=2) +
    # Plot the 95% confidence interval for the mean bias for each year
    geom_errorbar(aes(x='factor(year)', ymin='bias', ymax='bias'), stat='summary', fun_ymin=lambda x: np.mean(x)-1.96*np.std(x), fun_ymax=lambda x: np.mean(x)+1.96*np.std(x), width=0.2) +
    # Plot a horizontal dashed line at 0
    geom_hline(yintercept=0, linetype='dashed')
    #x axis label
    + xlab('Year')
    #y axis label
    + ylab('Bias')
    #group ticks two by two and label them 2010, 2014, etc
    + scale_x_discrete(breaks=[2010, 2014, 2018, 2022], labels=['2010', '2014', '2018', '2022'])
     #do not show label legend instead of "random"
    + labs(fill='')
    #change legend position
    + theme(legend_position='bottom')
    #change style of the legend
    + theme(legend_key=element_rect(fill='white', colour='white'))
    #change the 0 and 1 in the legend to "Treatment" and "Placebo Treatment"
    + scale_fill_discrete(labels=['Treatment', 'Placebo'])
    #make the background of the plot white
    + theme_bw()
)

#2010_2018 version
p.save(rf"outputs/2010_2018.pdf", width=6, height=4)
#2010_2014 version
#p.save(r'C:\Users\rafap\Documents\Masters\fourth semester\MachineLearning\GNN project\2010_2014.pdf', width=6, height=4)