# Preferential Voting Neural Net: ***A First Draft***

Setting up the basic imports and devices to train the net on the gpu

In [47]:
import torch
if torch.cuda.is_available():
    device = torch.device("cuda:0")
else:
    device = torch.device("cpu")

Handling the data

In [9]:
import torchvision
from torchvision import datasets, transforms
not_handled_pref_data = True
if not_handled_pref_data:
    train_dataset = datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
    test_dataset = datasets.MNIST(root='./data', train=False, transform=transforms.ToTensor(), download=True)

    trainset = torch.utils.data.DataLoader(train_dataset, batch_size=10, shuffle=True)
    testset = torch.utils.data.DataLoader(test_dataset, batch_size=10, shuffle=True)

Getting Primary and 2CP Vote Data For Polling Booth (Currently just NSW) 

Extracting data from the csv file and dumping into a list

In [69]:
import csv

def getrows(url):
    # Row Stuff ## Extract data into a big array and then combine booths.
    row_count = 0
    rows = [] # Will store the data as needed.
    with open(url,newline='') as primaryvotesFile:
        reader  = csv.reader(primaryvotesFile, delimiter=',', quotechar='"')
        for row in reader:
            rows.append(row)
            row_count += 1
    return rows

Parsing rows to get a each row as [PollingID, PartyIdentifier, Votes]

In [64]:
# Convert rows of the csv into important data
# From CSV Format
def parseData(rows):
    POLLPLACEID = 3
    CANDID = 5
    PARTYAB = 11
    PARTYNAME = 12
    VOTES = 13

    parsedData = []
    for row in rows[2:]: # Slice off the two overhead lines
        accepting = True
        data = []
        ## data = [PollingID, PartyIdentifier, Votes]
        data.append(row[3])
        # Check that the PARTYAB exist
        if row[PARTYAB] != None and row[PARTYAB] != '':
            data.append(row[PARTYAB])
        # If no PARTYAB exists, check if this is the informal vote at given polling booth
        elif row[PARTYNAME] == "Informal":
            data.append("INFORMAL")
        # If not informal and no PARTYAB try the PARTYNAME. ''.join(row[PARTYNAME].split()) removes whitespace
        elif row[PARTYNAME] != None and row[PARTYNAME] != '':
            data.append(''.join(row[PARTYNAME].split()))
        # If no PARTYAB and no PARTYNAME use candidate ID
        else:
            data.append(row[CANDID])

        data.append(row[VOTES])

        if accepting:
            parsedData.append(data)
    return parsedData

Reformatting the data into a list where each element represents the primary/2CP votes at a given pollingPlace

In [65]:
def combineData(parsedData):
    combinedData = []
    for idx, data_entry in enumerate(parsedData):
        try:
            pollingplace = int(data_entry[0])
            # Check The Current List
            if combinedData[-1]['pollingID'] == pollingplace:
                combinedData[-1][data_entry[1]] = int(data_entry[2])
            else:
                newdict = {'pollingID':pollingplace,data_entry[1]:int(data_entry[2])}
                combinedData.append(newdict)
        except IndexError:
            newdict = {'pollingID':pollingplace,data_entry[1]:int(data_entry[2])}
            combinedData.append(newdict)
    return combinedData

In [66]:
def try0(x, key):
    try:
        return x[key]
    except KeyError:
        return 0

In [67]:
def convert(combinedData):
    Xs = []
    for x in combinedData:
        others = 0
        for index, (key,value) in enumerate(x.items()):
            if key not in ["LP","NP","ALP","GRN","IND","ON","UAPP","pollingID","INFORMAL"]:
                others += value
        X = [
            try0(x,"LP"),
            try0(x,"NP"),
            try0(x,"ALP"),
            try0(x,"GRN"),
            try0(x,"ON"),
            try0(x,"UAPP"),
            try0(x,"IND"),
            others,
            try0(x,"INFORMAL"),
        ]
        Xs.append(X)
    return Xs

In [89]:
def normalise(Xs):
    newXs = []
    for x in Xs:
        if sum(x)!= 0:
            newx = [value/sum(x) for value in x]
        else:
            newx = x
        newXs.append(newx)
    return newXs

In [90]:
rows = getrows('NSWPrimaryPollingPlace.csv')
parsedData = parseData(rows)
combinedData = combineData(parsedData)
Xs = convert(combinedData)
Xs = normalise(Xs)
Xs = torch.tensor(Xs) # Convert to tensor


In [91]:
rows = getrows('NSW2CPPollingPlace.csv')
parsedData = parseData(rows)
combinedData = combineData(parsedData)
ys = convert(combinedData)
ys = normalise(ys)
ys = torch.tensor(ys) # Convert to tensor


In [92]:
VAL_PCT = 0.1
val_size = int(len(Xs)*VAL_PCT)

train_X = Xs[:-val_size]
train_y = ys[:-val_size]

test_X = Xs[-val_size:]
test_y = ys[-val_size:]

print(len(train_X))
print(len(test_X))
print(len(train_y))
print(len(test_y))

2594
288
2594
288


Creating the Neural Net

In [93]:
import torch.nn as nn
import torch.nn.functional as F

In [105]:
class Net(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.fc1 = nn.Linear(len(Xs[0]), 64) 
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, 64)
        self.fc4 = nn.Linear(64, len(ys[0]))

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return F.softmax(x,dim=1)

In [106]:
net= Net().to(device)
print(net)

Net(
  (fc1): Linear(in_features=9, out_features=64, bias=True)
  (fc2): Linear(in_features=64, out_features=64, bias=True)
  (fc3): Linear(in_features=64, out_features=64, bias=True)
  (fc4): Linear(in_features=64, out_features=9, bias=True)
)


Creating the optimizer

In [107]:
import torch.optim as optim
from tqdm import tqdm

optimizer = optim.Adam(net.parameters(),lr=0.001)

EPOCHS = 3
BATCH_SIZE = 100
loss_function = nn.MSELoss()
def train(net):
    for epoch in range(EPOCHS):
        for i in tqdm(range(0,len(train_X),BATCH_SIZE)):
            batch_X = train_X[i:i+BATCH_SIZE].view(-1,len(Xs[0]))
            batch_y = train_y[i:i+BATCH_SIZE]
            
            batch_X, batch_y = batch_X.to(device),batch_y.to(device)
            
            net.zero_grad()
            outputs = net(batch_X)
            loss = loss_function(outputs, batch_y)
            loss.backward()
            optimizer.step()
        print(f"Epoch:{epoch}. Loss: {loss}")
train(net)

100%|██████████| 26/26 [00:00<00:00, 360.78it/s]


Epoch:0. Loss: 0.04027148336172104


100%|██████████| 26/26 [00:00<00:00, 324.71it/s]


Epoch:1. Loss: 0.019344478845596313


100%|██████████| 26/26 [00:00<00:00, 333.03it/s]

Epoch:2. Loss: 0.016874486580491066





In [108]:
print(net(Xs[0].view(-1,len(Xs[0])).to(device)))

tensor([[0.3070, 0.1195, 0.4603, 0.0175, 0.0130, 0.0193, 0.0388, 0.0169, 0.0075]],
       device='cuda:0', grad_fn=<SoftmaxBackward0>)


Testing the accuracy

In [99]:
# Let's test our data!
correct = 0
total = 0
with torch.no_grad():
    for data in test:
        X,y = data
        output = net(X.view(-1,28*28).to(device))
        for idx, i in enumerate(output):
            if torch.argmax(i) == y[idx]:
                correct += 1
            total +=1 
print("Accuracy: ",round(correct/total,3))

NameError: name 'testset' is not defined