In [27]:
import polars as pl
import pandas as pd
import requests
import os
from glob import glob
import zipfile
from io import BytesIO
import numpy as np
import re

In [28]:
leg_year_list = ["1848","1849","1871juil","1871fev","1876","1881","1885","1889","1893","1898","1902","1906","1910","1914","1919","1924","1928","1932","1936","1945","1946Juin", "1946Nov","1951","1956","1958","1962","1967","1968","1973","1978","1981","1986","1988","1993","1997","2002","2007","2012","2017","2022"]

pres_year_list = ['1848','1965','1969','1974','1981','1988','1995','2002','2007','2012','2017','2022']

ref_year_list = ["1793","1795","1946","1992","2005"]


leg_name_list = ["https://conflit-politique-data.ams3.cdn.digitaloceanspaces.com/zip/{}_csv.zip".format("leg" + i) for i in leg_year_list]
pres_name_list = ["https://conflit-politique-data.ams3.cdn.digitaloceanspaces.com/zip/{}_csv.zip".format("pres" + i) for i in pres_year_list]
ref_name_list = ["https://conflit-politique-data.ams3.cdn.digitaloceanspaces.com/zip/{}_csv.zip".format("ref" + i) for i in ref_year_list]

df_com = pl.read_csv("https://www.insee.fr/fr/statistiques/fichier/6800675/v_commune_2023.csv",infer_schema_length=100000)[["COM","DEP","LIBELLE"]]


In [19]:
os.makedirs("data")

#downloading all files
for link in (leg_name_list + pres_name_list + ref_name_list):
    r = requests.get(link)
    file_name = link.replace("https://conflit-politique-data.ams3.cdn.digitaloceanspaces.com/zip/","")
    print("downloaded " + file_name, end=", ")
    with open('data/' + file_name, 'wb') as f:
        f.write(r.content)

link = "https://conflit-politique-data.ams3.cdn.digitaloceanspaces.com/zip/CagePiketty2023CodesInformatiques.zip"
r = requests.get(link)
file_name = link.replace("https://conflit-politique-data.ams3.cdn.digitaloceanspaces.com/zip/","")
print("downloaded " + file_name, end=", ")
with open(file_name, 'wb') as f:
    f.write(r.content)


downloaded leg1848_csv.zip, done
downloaded leg1849_csv.zip, done
downloaded leg1871juil_csv.zip, done
downloaded leg1871fev_csv.zip, done
downloaded leg1876_csv.zip, done
downloaded leg1881_csv.zip, done
downloaded leg1885_csv.zip, done
downloaded leg1889_csv.zip, done
downloaded leg1893_csv.zip, done
downloaded leg1898_csv.zip, done
downloaded leg1902_csv.zip, done
downloaded leg1906_csv.zip, done
downloaded leg1910_csv.zip, done
downloaded leg1914_csv.zip, done
downloaded leg1919_csv.zip, done
downloaded leg1924_csv.zip, done
downloaded leg1928_csv.zip, done
downloaded leg1932_csv.zip, done
downloaded leg1936_csv.zip, done
downloaded leg1945_csv.zip, done
downloaded leg1946Juin_csv.zip, done
downloaded leg1946Nov_csv.zip, done
downloaded leg1951_csv.zip, done
downloaded leg1956_csv.zip, done
downloaded leg1958_csv.zip, done
downloaded leg1962_csv.zip, done
downloaded leg1967_csv.zip, done
downloaded leg1968_csv.zip, done
downloaded leg1973_csv.zip, done
downloaded leg1978_csv.zip, d

In [37]:
#lecture des orientations politiques

dic_replace  =  {'leg1871 (fevrier)' : "leg1871fev", 'leg1871 (juillet)' : "leg1871juil" ,"leg1946 (juin)" : "leg1946Juin", 'leg1946 (novembre)' : "leg1946Nov"}

nuances_df_list = []
#ouverture du fichier stata contenant le codage des nuances
for elec_type in ["leg","pres"]:
    with zipfile.ZipFile("CagePiketty2023CodesInformatiques.zip") as archive:
        zfiledata = BytesIO(archive.read("CodesInformatiquesExploitationFichiersElectoraux.zip"))
        with zipfile.ZipFile(zfiledata) as zfile2:

            if elec_type == "leg":
                filename = "doAnnexeA2nuancespolitiqueslegislatives.txt"
            else:
                filename = "doAnnexeA3nuancespolitiquespresidentielles.txt"
            a = zfile2.open(filename, mode="r").read().decode("latin")

    #on split le fichier par élection
    leg_text_list = re.split(r"\*\*\d{4}[^\*]+\*\*|\*\*\d{4}\*\*", a, flags=re.MULTILINE)[1:]

    #liste des noms des élections
    elect_list = [i[0] if i[1] == "" else i[1] for i in re.findall("\*\*(\d{4}[^\*]+)\*\*|\*\*(\d{4})\*\*", a)]
    
    df_list = []
    
    for leg_nb in range(len(leg_text_list)):

        #liste des nuances et des listes/candidats y appartenant avec leur coeffience d'appartenance
        vote_list = [i.replace("gen ",'').split("=") for i in re.findall("(gen vote[A-Z]+=.+);" ,leg_text_list[leg_nb]) if not "voteT" in i and not "voteGCG" in i and not "voteDCD" in i ]
        
        for i in range(len(vote_list)):
           
            vote_list[i][1] = vote_list[i][1].split("+")
            for n in range(len(vote_list[i][1])):
                #si le code ne comporte pas de coefficient, le coefficient est 1
                if not '*' in vote_list[i][1][n]:
                    vote_list[i][1][n] = "1*" + vote_list[i][1][n]
                vote_list[i][1][n] = vote_list[i][1][n].split("*")

        #mise sous forme tabulaire des résultats
        d = pd.Series(dict(zip([i[0] for i in vote_list], [i[1] for i in vote_list]))).explode()
        de = pd.DataFrame(d.to_list(), columns=["coeff","voix"])
        de.index = d.index
        de = de.reset_index()
        de.columns = ["vote_cat","coeff","voix_cat"]
        de = de.pivot(index="voix_cat", columns="vote_cat",values="coeff").replace(np.nan,0).reset_index()
        de["elec"] = (elec_type + elect_list[leg_nb])
        de["elec"] = de["elec"].replace(dic_replace)
        de["voix_cat"] = de["voix_cat"].str.replace("^voix","",regex=True)
        df_list.append(de)
    df_leg_nuances = pd.concat(df_list)
    nuances_df_list.append(df_leg_nuances)
df_all_nuances = pd.concat(nuances_df_list)

convert_dic = {"voix_cat" : str, "voteC" : "float32", "voteCD" : "float32","voteCG" : "float32","voteD" : "float32", "voteG" : 'float32', "elec" : str}
df_all_nuances = df_all_nuances.astype(convert_dic)
df_all_nuances = pl.from_pandas(df_all_nuances)
df_all_nuances

voix_cat,voteC,voteCD,voteCG,voteD,voteG,elec
str,f32,f32,f32,f32,f32,str
"""CONS""",0.0,0.3,0.0,0.7,0.0,"""leg1848"""
"""DEMSOC""",0.0,0.0,0.3,0.0,0.7,"""leg1848"""
"""DIV""",1.0,0.0,0.0,0.0,0.0,"""leg1848"""
"""REPMOD""",1.0,0.0,0.0,0.0,0.0,"""leg1848"""
"""CONS""",0.0,0.3,0.0,0.7,0.0,"""leg1849"""
"""DEMSOC""",0.0,0.0,0.3,0.0,0.7,"""leg1849"""
"""DIV""",1.0,0.0,0.0,0.0,0.0,"""leg1849"""
"""REPCONS""",1.0,0.0,0.0,0.0,0.0,"""leg1849"""
"""BON""",0.0,1.0,0.0,0.0,0.0,"""leg1871fev"""
"""CLE""",0.0,0.0,0.0,1.0,0.0,"""leg1871fev"""


In [41]:

df_list = []
for file in glob("data/*"):
    name = file.replace("data/","").replace("_csv.zip","")
    year = re.search("\d{4}", name)[0]
    elec_type = re.search("([a-z]+)\d{4}", name)[1]
    
    print(name, end=", ")
    if name == "leg1871juil":
        name1 = name
        name2 = "leg1872"
    elif name == "leg1871fev":
        name1 = "leg1871"
        name2 = "leg1871"
    elif name == "leg1946Nov":
        name1 = name
        name2 = "leg1947"
    elif name == "leg1946Juin":
        name1 = name
        name2 = "leg1946"
    else:
        name1 = name
        name2 = name

    if name == "leg1902":
        filename_ = f'{name1}comm.csv'
    else:
        filename_ =  "{}_csv/{}comm.csv".format(name1,name2)
    df = pl.read_csv(
    zipfile.ZipFile(f"data/{name}_csv.zip").open(filename_, mode='r').read(), dtypes={"codecommune" : str, "dep" : str}, infer_schema_length=1000000
    ).with_columns(pl.col("inscrits").sub(pl.col("votants")).alias("abstention"),
                pl.col("votants").sub(pl.col("exprimes")).alias("nuls")
               ).filter(
                ~pl.col("codecommune").is_in(["75056"] + [str(i) for i in range(13201,13217)] + [str(i) for i in range(69381,69390)])
               ) #Pour Paris, Lyon, Marseille, je supprime les doublons en ne gardant que la série la plus complète (les arrrondissements pour Paris, la ville en entier pour Lyon Marseille)


    
    if any('votantsT2' in colname for colname in df.columns):
        is_t2 = True
        df = df.with_columns(
            pl.col("inscritsT2").sub(pl.col("votantsT2")).alias("abstentionT2"),
            pl.col("votantsT2").sub(pl.col("exprimesT2")).alias("nulsT2")
        )
    
    keep_cols = (["codecommune",'abstention',"nuls"] + [i for i in df.columns if i.startswith("voix") and "voixtot" not in i]
                )
    if is_t2:
        keep_cols += ['abstentionT2',"nulsT2"]
    is_t2 = False
    
    df = df[keep_cols].melt(id_vars=["codecommune"], variable_name="choix", value_name="votes").with_columns(pl.col("votes").cast(pl.Float32))
    
    df = df.with_columns(
        pl.lit(name).alias("elec_name"),
        pl.lit(year).alias("year"),
        pl.lit(elec_type).alias("elec_type"),
        pl.when(pl.col("choix").str.contains("T2")).then(pl.lit("T2")).otherwise(pl.lit("T1")).alias("tour"),
        pl.when(pl.col("choix").is_in(["abstention","nuls","abstentionT2","nulsT2"])).then(0).otherwise(1).alias("exprime_flag"),
        pl.col("choix").str.replace("T2","").str.replace('^voix',"").alias("choix")
        
    )
    df = df.join(df_all_nuances,how="left", left_on=["elec_name","choix"], right_on=["elec","voix_cat"])

    df_list.append(df)
    
df = pl.concat(df_list)
del df_list


pres1981, leg1871fev, leg2012, leg1848, pres1965, leg1906, leg1871juil, leg1910, pres2007, pres1848, leg1898, leg1968, pres2012, pres1969, leg1946Nov, leg1876, pres2022, pres2017, leg1928, leg1958, leg1986, ref1992, leg1956, leg1993, leg2022, leg1997, ref1793, leg1962, leg1951, leg1988, leg2002, leg1981, leg1889, leg1914, leg1849, ref1795, ref2005, leg1881, leg1945, ref1946, leg1885, pres1974, leg1919, leg1978, leg1893, leg2007, leg1902, pres2002, leg1924, pres1995, leg1973, leg1936, pres1988, leg1967, leg1932, leg2017, leg1946Juin, 

In [42]:

df.columns =["codecommune","choix","voix","nom_election","annee","type_election","tour","exprimeflag","centre_coef","centredroit_coef","centregauche_coef","droite_coef","gauche_coef"]

ordre = ["codecommune","nom_election","annee","type_election","tour","choix","exprimeflag", "gauche_coef","centregauche_coef","centre_coef", "centredroit_coef","droite_coef","voix"]
df = df[ordre]

df.write_parquet("elections_cagepiketty.parquet")