In [22]:
import matplotlib.pyplot as plt

dates = ['20120411', '20160413', '20200415', '20240410']

In [23]:
class Candidate:
    def __init__(self, party: str, name: str, votes: float):
        self.party = party
        self.name = name
        self.votes = votes

In [24]:
# 0: 극우
# 1: 보수
# 2: 중도
# 3: 진보
# 4: 극좌

def party_to_spectrum(date: str, city: str, candidate: Candidate):
    f = open(f'election_change\\{date}-party.csv', 'r', encoding='UTF-8')
    result = []
    for line in f:
        line = line.split(',')

        if line[0] == candidate.party:
            if candidate.party != '무소속':
                result = list(map(int, line[3:]))
                break
            elif line[1] == city and line[2] == candidate.name:
                result = list(map(int, line[3:]))
                break
    
    f.close()
    return result

def get_party_color(spectrums: list[int]):
    if len(spectrums) == 0:
        return (0.5, 0.5, 0.5)
    colors = [(1, 0, 0), (1, 0, 0), (0, 1, 0), (0, 0, 1), (1, 1, 0)]
    return colors[spectrums[0]]

In [25]:
def get_prev_district(date, city, district_name):
    prev_file = open(f'election_change\\{date}-district.csv', 'r')

    prev_city = city
    if date == '20240410':
        if city == '강원특별자치도':
            prev_city = '강원도'
        elif city == '전북특별자치도':
            prev_city = '전라북도'
        
    for line in prev_file:
        line = line.split(',')

        if line[0] == city and line[1] == district_name:
            result = []
            i = 2
            
            while i < len(line):
                if date == '20240410' and line[i] == '군위군의성군청송군영덕군':
                    prev_city = '경상북도'
                        
                result.append((prev_city, line[i], int(line[i + 1])))
                i += 2
            return result
        
    return [(prev_city, district_name, 1)]

In [26]:
data = []

for date in dates:
    election_file = open(f'election_data\\{date}-district.csv', 'r')
    election_data = {}
    party_set = set()

    print(date)

    for line in election_file:
        row = {}

        line = line.split(',')
        city = line[0]
        district_name = line[1].strip()

        if city not in election_data.keys():
            election_data[city] = {}

        idx = 2
        results = []

        while idx < len(line):
            party, name, votes = line[idx:idx+3]
            votes = float(votes)

            results.append(Candidate(party, name, votes))
            idx += 3

            if votes >= 10:
                party_set.add(party)

            # if votes >= 10 and party == '무소속':
            #     print(f'{party},{city},{name},')
            
        election_data[city][district_name] = results
        if date == dates[0]:
            continue
    
    data.append(election_data)

    print(party_set)

20120411
{'국민생각', '무소속', '민주통합당', '자유선진당', '친박연합', '새누리당', '통합진보당', '진보신당'}
20160413
{'무소속', '정의당', '노동당', '더불어민주당', '새누리당', '민중연합당', '국민의당', '녹색당'}
20200415
{'무소속', '정의당', '더불어민주당', '미래통합당', '우리공화당', '민생당', '민중당'}
20240410
{'새진보연합', '새로운미래', '무소속', '녹색정의당', '더불어민주당', '소나무당', '국민의힘', '진보당', '우리공화당', '개혁신당'}


In [31]:
def propagate_voters(base, vote_dist, curr):
    if len(vote_dist[curr]) > 0:
        return vote_dist[curr]
    
    new_vote_dist = []
    if curr >= base and curr + 1 < len(vote_dist):
        new_vote_dist.extend(propagate_voters(base, vote_dist, curr + 1))

    if curr <= base and curr - 1 >= 0:
        new_vote_dist.extend(propagate_voters(base, vote_dist, curr - 1))
    
    # Suppose the people supporting the spectrum moves to adjacent spectrum by 50-50
    # If no candidate exists
    return [(candidate, weight * 0.5) for (candidate, weight) in new_vote_dist]


def predict_prev_election(i, city: str, district_name: str) -> list[Candidate]:
    date = dates[i]
    curr_data = data[i][city][district_name]

    prev_date = dates[i - 1]
    prev_districts = get_prev_district(date, city, district_name)

    # Get amount of votes that each spectrum got
    bucket = [0 for _ in range(5)]
    for (prev_city, prev_district, weight) in prev_districts:
        for candidate in data[i - 1][prev_city][prev_district]:
            spectrums = party_to_spectrum(prev_date, prev_city, candidate)
            for s in spectrums:
                bucket[s] += weight * votes / len(spectrums)
    
    # Calculate how is the vote is going to be distributed, based on spectrum
    vote_dist = [[] for _ in range(5)]
    for c_i, candidate in enumerate(curr_data):
        spectrums = party_to_spectrum(date, city, candidate)
        for s in spectrums:
            vote_dist[s].append((c_i, 1 / len(spectrums)))

    # For empty vote_dist bucket (which means), propagate voters to adjacent spectrum
    vote_dist_original = vote_dist[:]
    for s, d in enumerate(vote_dist):
        if len(d) == 0:
            propagate_voters(s, vote_dist_original, s)

    # Distribute the votes from the previous result to the result
    vote_prediction = [0 for _ in curr_data]
    for s, dist_list in enumerate(vote_dist):
        weight_sum = sum(map(lambda x: x[1], dist_list))
        for (c, weight) in dist_list:
            vote_prediction[c] += weight / weight_sum * bucket[s]

    # Normalize the sum
    vote_sum = sum(vote_prediction)
    for n in range(len(vote_prediction)):
        vote_prediction[n] /= vote_sum / 100

    result = []
    for c_i, candidate in enumerate(curr_data):
        prediction = vote_prediction[c_i]
        result.append(Candidate(candidate.party, candidate.name, prediction))
    
    return result

In [28]:
class PredictionEval:
    def __init__(self, date: str, real: list[list[Candidate]], prediction: list[list[Candidate]]):
        self.date = date
        self.real = real
        self.prediction = prediction

    def find_winner(self, result: list[Candidate]):
        winner = result[0]

        for candidate in result:
            if candidate.votes > winner.votes:
                winner = candidate

        return winner
    
    def get_seats(self):
        pred_bin = {}
        real_bin = {}

        for pred, real in  zip(self.prediction, self.real):
            pred_winner = self.find_winner(pred)
            real_winner = self.find_winner(real)

            pred_bin[pred_winner.party] = pred_bin.setdefault(pred_winner.party, 0) + 1
            real_bin[real_winner.party] = real_bin.setdefault(real_winner.party, 0) + 1

        return pred_bin, real_bin


In [32]:
X = []
Y = []
colors = []
evalulators = []

for i, date in enumerate(dates[1:]):
    i = i + 1
    real_list = []
    pred_list = []

    for city in data[i].keys():
        for district_name in data[i][city].keys():
            prediction = predict_prev_election(i, city, district_name)
            result = data[i][city][district_name]

            pred_list.append(prediction)
            real_list.append(result)

            colors.extend(map(lambda x: get_party_color(party_to_spectrum(date, city, x)), result))
            X.extend(map(lambda x: x.votes, prediction))
            Y.extend(map(lambda x: x.votes, result))

            # Get candidate with most vote
            prediction.sort(key=lambda x: x.votes, reverse=True)
            result.sort(key=lambda x: x.votes, reverse=True)

    evalulators.append(PredictionEval(date, real_list, pred_list))

line = [x for x in range(101)]

plt.xlim(-1, 101)
plt.ylim(-1, 101)
plt.title('Prediction Based on Previous Election Result Only')
plt.xlabel('Prediction')
plt.ylabel('Actual result')

plt.plot((0, 100), (0, 100), 'k--')
plt.scatter(X, Y, c=colors, alpha=0.3, edgecolors='none')

TypeError: 'Candidate' object is not subscriptable

In [None]:
for e in evalulators:
    pred, real = e.get_seats()
    print(pred)
    print(real)

{'더불어민주당': 107, '새누리당': 137, '무소속': 7, '민중연합당': 2}
{'더불어민주당': 110, '새누리당': 105, '국민의당': 25, '무소속': 11, '정의당': 2}
{'더불어민주당': 104, '미래통합당': 122, '무소속': 4, '민생당': 19, '민중당': 1, '정의당': 3}
{'더불어민주당': 163, '미래통합당': 84, '무소속': 5, '정의당': 1}
{'더불어민주당': 159, '국민의힘': 93, '새로운미래': 1, '녹색정의당': 1}
{'더불어민주당': 161, '국민의힘': 90, '진보당': 1, '새로운미래': 1, '개혁신당': 1}
