# Imports

In [2]:
# ! pip install xmltodict pypdf rapidfuzz

In [3]:
import pandas as pd
import re
import requests
import xmltodict
import numpy as np

from rapidfuzz.process import extractOne
from rapidfuzz.utils import default_process

from PyPDF2 import PdfReader
from typing import Dict, List, Tuple, Union

# Load European deputy list

In [4]:
# Récupérer la liste actuelle des députés européens. Attention, le groupe d'appartenance peut être différent de celui du député au moment du vote.
url = "https://www.europarl.europa.eu/meps/en/full-list/xml"

def get_european_deputy() -> pd.DataFrame:
    response = requests.get(url)
    data = xmltodict.parse(response.content)
    return pd.DataFrame.from_dict(data["meps"]["mep"], orient="columns")

df_european_deputy = get_european_deputy()

# Simplifier le nom du groupe parlementaire
political_groups_matching = {
    "Group of the European People's Party (Christian Democrats)": "PPE",
    "Identity and Democracy Group": "ID",
    "Group of the Progressive Alliance of Socialists and Democrats in the European Parliament": "S&D",
    "European Conservatives and Reformists Group": "ECR",
    "Group of the Greens/European Free Alliance": "Verts/ALE",
    "Renew Europe Group": "Renew",
    "The Left group in the European Parliament - GUE/NGL": "GUE/NGL",
    "Non-attached Members": "NI",
}

df_european_deputy["shortPoliticalGroup"] = df_european_deputy.apply(lambda x: political_groups_matching[x.politicalGroup], axis=1)
df_european_deputy.head()

Unnamed: 0,fullName,country,politicalGroup,id,nationalPoliticalGroup,shortPoliticalGroup
0,Magdalena ADAMOWICZ,Poland,Group of the European People's Party (Christia...,197490,Independent,PPE
1,Asim ADEMOV,Bulgaria,Group of the European People's Party (Christia...,189525,Citizens for European Development of Bulgaria,PPE
2,Isabella ADINOLFI,Italy,Group of the European People's Party (Christia...,124831,Forza Italia,PPE
3,Matteo ADINOLFI,Italy,Identity and Democracy Group,197826,Lega,ID
4,Alex AGIUS SALIBA,Malta,Group of the Progressive Alliance of Socialist...,197403,Partit Laburista,S&D


In [5]:
full_name_list = df_european_deputy['fullName'].to_list()
processed_full_name_list = [default_process(name) for name in full_name_list]

group_name_list = {}
processed_group_name_list = {}
for group, df in df_european_deputy.groupby('shortPoliticalGroup'):
  group_name_list[group] = df['fullName'].to_list()
  processed_group_name_list[group] = [default_process(name) for name in group_name_list[group]]

def get_full_name(name: str, group: str = None, min_score: int = 90):
    # mathch a deputy name to its full name in the european deputy list
    processed_name = default_process(name)
    if group is not None:
      fuzzy_match = extractOne(processed_name, processed_group_name_list[group], processor=None, score_cutoff=min_score)
      if fuzzy_match is not None:
          return group_name_list[group][fuzzy_match[2]]
    fuzzy_match = extractOne(processed_name, processed_full_name_list, processor=None, score_cutoff=min_score)
    if fuzzy_match is not None:
      return full_name_list[fuzzy_match[2]]

# Helpers methods

In [6]:
REGEX_SUMMARY_PAGE_NUMBER = "\s*\.+?(?=\d)"

def build_regex_extract_page_num_from_footer(text: str) -> str:
  regex_footer = "^.*\d{3}\.\d{3}"
  footer = re.findall(regex_footer, text)[0]
  footer_split = footer.split()
  return re.escape(footer_split[0]) + "\s(\d+).*?" + re.escape(footer_split[-1])

# def extract_summary(page_list: List[str]) -> str:
#   # get summary first page
#   regex_summary_title = '^SOMMAIRE\n'
#   summary_first_page_num = None
#   for i, page_text in enumerate(page_list):
#     if len(re.findall(regex_summary_title, page_text)) > 0:
#       summary_first_page_num = i
#       summary_first_page_text = page_text
#       break
#   if summary_first_page_num is None:
#     raise Exception("Summary not found.")
#   summary_first_page_text = re.sub(regex_summary_title, '', summary_first_page_text)

#   # get summary last page
#   first_amendment_summary_line = summary_first_page_text.split('\n')[0]
#   first_amendment_page_num = int(re.split(REGEX_SUMMARY_PAGE_NUMBER, first_amendment_summary_line)[1])
#   summary_last_page_num = first_amendment_page_num - 2

#   # load every pages of the summary into as single text
#   summary_text = "\n".join([page_list[i] for i in range(summary_first_page_num, summary_last_page_num + 1)])
#   summary_text = re.sub(regex_summary_title, '', summary_text)
#   return summary_text


# def get_amendment_infos_from_summary(page_list: List[str]) -> Dict:
#   # read summary as text
#   summary_text = extract_summary(page_list)

#   # read for every amendment its name, id in the document and first page number
#   amendment_summary_infos = []
#   for amendment in summary_text.split('\n'):
#     amendment_full_name, page_num = re.split(REGEX_SUMMARY_PAGE_NUMBER, amendment)
#     amendment_num, amendment_name = amendment_full_name.split('. ')
#     amendment_summary_infos.append((amendment_name, int(amendment_num), int(page_num)))

#   # infer an amendment last page number from the next amendment first page
#   amendment_infos = {}
#   for i in range(len(amendment_summary_infos)):
#     amendment_name, amendment_num, page_num = amendment_summary_infos[i]
#     if i != len(amendment_summary_infos) - 1 :
#       page_end = amendment_summary_infos[i + 1][2] - 1
#     else:
#       page_end = len(page_list)
#     amendment_infos[amendment_name] = {'id': amendment_num, 'page_start': page_num, 'page_end': page_end}
#   return amendment_infos

def get_amendment_infos_from_body(page_list: List[str], footer_regex: str) -> Dict:
  # read for every amendment its name, id in the document and first page number
  header_regex = "^(\d+)\.\s(.+)\s\d{1,2}/\d{1,2}/\d{4}\s\d{2}:\d{2}:\d{2}.\d{3}\\n"
  amendment_infos = {}
  previous_amendment_name = None
  for page in page_list:
    _, page_num, page = re.split(footer_regex, page)
    match_found = re.findall(header_regex, page)
    if len(match_found) == 1:
      id, name = match_found[0]
      amendment_infos[name] = {'id': int(id), 'page_start': int(page_num)}
      if previous_amendment_name is not None:
          amendment_infos[previous_amendment_name]['page_end'] = int(page_num) - 1
      previous_amendment_name = name
    elif len(match_found) > 1:
      raise Exception('Error reading, found more thant one match for header.')

  return amendment_infos


# Read the file

## Load all pages

In [7]:
# path to the pdf file
path = "../tests/data/domain/vote_analysis/test_minutes.pdf"

# read pdf and store each page extracted text in a list
with open(path, 'rb') as f:
    pdf = PdfReader(f)
    page_list = []
    for i, page in enumerate(pdf.pages):
        page_text = page.extract_text()
        if i == 0:
          footer_regex = build_regex_extract_page_num_from_footer(page_text)
        page_list.append(page_text)

In [10]:
print(page_list[3])

P9_PV(2019)11-14(RCV)_FR.docx 4 PE 643.9601. A9-0019/2019 -  Ondřej Kovařík - Vote unique 14/11/2019 11:38:20.000
565+
ECR: Aguilar, Bielan, Brudziński, Buxadé Villalba, Ďuriš Nicholsonová, Dzhambazki, Eppink, Fidanza, Fiocchi, Fitto, Fotyga, 
Fragkos, Geuking, Hannan, Jaki, Jurzyca, Kanko, Karski, Kempa, Kloc, Kopcińska, Krasnodębski, Kruk, Kuźmiuk, 
Legutko, Lundgren, McIntyre, Mazurek, Melbārde, Możdżanowska, Poręba, Procaccini, Rafalska, Rooken, Roos, 
Ruissen, Rzońca, Saryusz-Wolski, Slabakov, Stancanelli, Stegrud, Szydło, Tertsch, Tobiszowski, Tomaševski, Tomašić, 
Tošenovský, Van Orden, Van Overtveldt, Vrecionová, Waszczykowski, Weimers, Wiśniewska, Zahradil, Zalewska, Zīle, 
Złotowski
GUE/NGL: Arvanitis, Aubry, Barrena Arza, Bompard, Botenga, Buschmann, Chaibi, Daly, Demirel, Ernst, Ferreira, Flanagan, 
Georgiou, Georgoulis, Gusmão, Kizilyürek, Kokkalis, Konečná, Kouloglou, Kountoura, Maurel, Michels, Omarjee, 
Papadimoulis, Pelletier, Pereira Sandra, Pineda, Rego, Schirdewan, 

## Extract amendment names and locations

In [11]:
amendment_pages = get_amendment_infos_from_body(page_list, footer_regex)
print(amendment_pages)

{'A9-0019/2019 -  Ondřej Kovařík - Vote unique': {'id': 1, 'page_start': 4, 'page_end': 5}, 'A9-0021/2019 -  José Manuel Fernandes - Am 1': {'id': 2, 'page_start': 6, 'page_end': 7}, 'A9-0021/2019 -  José Manuel Fernandes - Résolution': {'id': 3, 'page_start': 8, 'page_end': 9}, 'B9-0170/2019 - Résolution': {'id': 4, 'page_start': 10, 'page_end': 11}, 'B9-0169/2019 - Résolution': {'id': 5, 'page_start': 12, 'page_end': 13}, 'B9-0171/2019 - Résolution': {'id': 6, 'page_start': 14, 'page_end': 15}, 'B9-0172/2019 - Résolution': {'id': 7, 'page_start': 16, 'page_end': 17}, 'B9-0166/2019 - Am 8': {'id': 8, 'page_start': 18, 'page_end': 19}, 'B9-0166/2019 - Am 9': {'id': 9, 'page_start': 20, 'page_end': 21}, 'B9-0166/2019 - Am 10': {'id': 10, 'page_start': 22, 'page_end': 23}, 'B9-0166/2019 - Am 11': {'id': 11, 'page_start': 24, 'page_end': 25}, 'B9-0166/2019 - Am 12': {'id': 12, 'page_start': 26, 'page_end': 27}, 'B9-0166/2019 - Am 13': {'id': 13, 'page_start': 28, 'page_end': 29}, 'B9-0166

In [12]:
# remove footer
page_list = [re.split(footer_regex, page)[-1] for page in page_list]

## Extract amendment votes

In [13]:
def read_amendment_votes(name: str, amendment_pages: Dict, page_list: List[str]) -> Tuple[str, Union[str, None]]:
  # load amendment full text
  amendment_infos = amendment_pages[name]
  text = "\n".join([page_list[i] for i in range(amendment_infos['page_start'] - 1, amendment_infos['page_end'])])

  # remove amendment header
  text_without_header = "\n".join(text.split('\n')[1:])

  # separate original votes from potential correction votes
  correction_regex = 'ПОПРАВКИ[\S\s]+RÖSTER'
  splitted_corrections = re.split(correction_regex, text_without_header)
  if len(splitted_corrections) > 1:
    original_votes, corrected_votes = splitted_corrections
  else:
    original_votes, corrected_votes = text_without_header, None

  return original_votes, corrected_votes

In [14]:
amendment_name = 'A9-0019/2019 -  Ondřej Kovařík - Vote unique'

original_votes, corrected_votes = read_amendment_votes(amendment_name, amendment_pages, page_list)

## Extract voters

In [15]:
def extract_original_votes(original_votes: str) -> pd.DataFrame:
  regex_vote_categories = '(\d+)([+0-])'
  regex_group = '\n+?(.+?): *'
  votes_by_category = re.split(regex_vote_categories, original_votes)[1:]
  df_votes = []
  for i in range(0, len(votes_by_category), 3):
    vote_number = votes_by_category[i]
    vote_category = votes_by_category[i+1]
    votes = votes_by_category[i+2]
    votes_by_group = re.split(regex_group, votes)
    for i in range(1, len(votes_by_group), 2):
      group_name = votes_by_group[i]
      group_voters = votes_by_group[i+1].replace('\n', '').split(', ')
      for voter in group_voters:
        df_votes.append([voter, group_name, vote_category])

  df_votes = pd.DataFrame(df_votes, columns=['name', 'group', 'vote'])
  df_votes['full_name'] = df_votes.apply(lambda row: get_full_name(row['name'], row['group']), axis=1)
  return df_votes

def extract_corrected_votes(corrected_votes: str) -> pd.DataFrame:
  regex_correction = '\n([+0-])'
  votes_by_category = re.split(regex_correction, corrected_votes)
  df_corrected_votes = []
  for i in range(1, len(votes_by_category), 2):
    vote_category = votes_by_category[i]
    category_voters = votes_by_category[i+1].replace('\n', '').split(', ')
    # case when no correction for a specific category
    if len(category_voters) == 1 and category_voters[0] == '':
      category_voters = []
    for voter in category_voters:
      df_corrected_votes.append([voter, vote_category])
  df_corrected_votes = pd.DataFrame(df_corrected_votes, columns=['name', 'corrected_vote'])
  df_corrected_votes['full_name'] = df_corrected_votes.apply(lambda row: get_full_name(row['name']), axis=1)
  return df_corrected_votes

def correct_votes(df_votes: pd.DataFrame, df_corrected_votes: pd.DataFrame) -> pd.DataFrame:
    df_updated_votes = df_votes.merge(df_corrected_votes[['full_name', 'corrected_vote']], how='outer', on='full_name')
    df_updated_votes['vote'] = np.where(
      df_updated_votes['corrected_vote'].notna(),
      df_updated_votes['corrected_vote'],
      df_updated_votes['vote']
    )
    df_updated_votes.drop(columns='corrected_vote', inplace=True)
    return df_updated_votes

In [16]:
print("ORIGINAL VOTES")
df_votes = extract_original_votes(original_votes)
display(df_votes)

if corrected_votes is not None:
  print("\n\n", "CORRECTIONS TO VOTES AND VOTING INTENTIONS")
  df_corrected_votes = extract_corrected_votes(corrected_votes)
  display(df_corrected_votes)

  print("\n\n", "UPDATED VOTES")
  df_updated_votes = correct_votes(df_votes, df_corrected_votes)
  display(df_updated_votes)

ORIGINAL VOTES


Unnamed: 0,name,group,vote,full_name
0,Aguilar,ECR,+,Mazaly AGUILAR
1,Bielan,ECR,+,Adam BIELAN
2,Brudziński,ECR,+,Joachim Stanisław BRUDZIŃSKI
3,Buxadé Villalba,ECR,+,Jorge BUXADÉ VILLALBA
4,Ďuriš Nicholsonová,ECR,+,Lucia ĎURIŠ NICHOLSONOVÁ
...,...,...,...,...
648,Nikolaou-Alavanos,NI,0,Lefteris NIKOLAOU-ALAVANOS
649,Papadakis Kostas,NI,0,Kostas PAPADAKIS
650,Radačovský,NI,0,Miroslav RADAČOVSKÝ
651,Sinčić,NI,0,Ivan Vilibor SINČIĆ


In [None]:
full_name = 'Stanislav POLČÁK'
display(df_votes.loc[df_votes.full_name == full_name])
print()
display(df_updated_votes.loc[df_updated_votes.full_name == full_name])

Unnamed: 0,name,group,vote,full_name
78,Polčák,PPE,+,Stanislav POLČÁK





Unnamed: 0,name,group,vote,full_name
178,Polčák,PPE,-,Stanislav POLČÁK
