In [None]:
!pip install pandas pyreadr
import os
import pandas as pd
import pyreadr
import re
import requests
import shutil
from tqdm import tqdm
import zipfile

In [None]:
# Identification des fichiers csv
def extract_strings_from_webpage(url):
    response = requests.get(url) 
    if response.status_code == 200:
        strings = re.findall(r'"([^"]*)"', response.text)
        return strings
    else:
        print(f"Failed to fetch the webpage. Status code: {response.status_code}")
        return []

webpage_url = "https://unehistoireduconflitpolitique.fr/telecharger.html"
extracted_strings = extract_strings_from_webpage(webpage_url)

download_links = [item for item in extracted_strings if item.endswith("csv.zip") or item.endswith("csp.zip")]

In [None]:
# Téléchargement des fichiers
os.makedirs('data_zip', exist_ok=True)

progress_bar = tqdm(total=len(download_links), desc="Downloading", unit="file")

for link in download_links:
    try:
        file_name = os.path.join('data_zip', os.path.basename(link))
        response = requests.get(link)
        with open(file_name, 'wb') as file:
            file.write(response.content)
        progress_bar.update(1)
    except Exception as e:
        print(f"Error downloading {link}: {e}")
        
progress_bar.close()

In [None]:
# Extraction des résultats électoraux
for prefix in ['pres', 'leg', 'ref']:
    prefix_dir = os.path.join('data_csv', 'Elections_' + prefix)
    os.makedirs(prefix_dir, exist_ok=True)

zip_files = [file for file in os.listdir('data_zip') if file.endswith('.zip')]
total_zip_files = sum(file.startswith(prefix) for prefix in ['pres', 'leg', 'ref'] for file in zip_files)
progress_bar = tqdm(total=total_zip_files, desc="Extracting", unit="file")

for prefix in ['pres', 'leg', 'ref']:
    for file in zip_files:
        if file.startswith(prefix) and file.endswith('.zip'):
            try:
                zip_file_path = os.path.join('data_zip', file)
                prefix_dir = os.path.join('data_csv', 'Elections_' + prefix)
                with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
                    for member in zip_ref.infolist():
                        if member.filename.lower().endswith('.csv'):
                            target_path = os.path.join(prefix_dir, os.path.basename(member.filename))
                            with zip_ref.open(member) as source, open(target_path, 'wb') as dest:
                                shutil.copyfileobj(source, dest)
                os.remove(zip_file_path)
                progress_bar.update(1)
            except Exception as e:
                print(f"Error converting {file}: {e}")
            
progress_bar.close()

In [None]:
# Nettoyage des résultats électoraux
for root, dirs, files in os.walk('data_csv'):
    for file_name in files:
        if file_name.startswith("._"):
            file_path = os.path.join(root, file_name)
            try:
                os.remove(file_path)
            except Exception as e:
                print(f"Error deleting file {file_path}: {e}")
print("Deletion of extra files completed.")

In [None]:
# Extraction des contrôles
zip_files = [f for f in os.listdir('data_zip') if f.endswith('.zip')]
total_zip_files = sum(not any(file.startswith(prefix) for prefix in ['pres', 'leg', 'ref']) for file in zip_files)
progress_bar = tqdm(total=total_zip_files, desc="Extracting", unit="file")

for zip_file in zip_files:
    if zip_file.startswith(('pres', 'leg', 'ref')):
        continue
    zip_path = os.path.join('data_zip', zip_file)
    try:
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            file_list = [file for file in zip_ref.namelist() if not file.startswith('__MACOSX')]
            zip_ref.extractall('data_csv', members=file_list)
            os.remove(zip_path)            
            progress_bar.update(1)
    except Exception as e:
        print(f"Error extracting {zip_file}: {e}")

progress_bar.close()

In [None]:
# Nettoyage des contrôles
folders = [f for f in os.listdir('data_csv') if os.path.isdir(os.path.join('data_csv', f))]
for folder in folders:
    if folder.endswith('_csv'):
        old_path = os.path.join('data_csv', folder)
        new_folder_name = folder[:-4]
        new_path = os.path.join('data_csv', new_folder_name)
        os.rename(old_path, new_path)

shutil.move('data_csv/alphabetisationcommunes.csv', 'data_csv/Diplomes/')

print("Folder cleaning complete.")

In [None]:
# Suppression du répertoire de téléchargement
shutil.rmtree('data_zip')
print('Downloaded data removed.')

In [None]:
# Conversion au format rda
total_csv_files = 0
for root, dirs, files in os.walk('data_csv'):
    csv_files = [file for file in files if file.endswith(".csv")]
    total_csv_files += len(csv_files)

progress_bar = tqdm(total=total_csv_files, desc="Converting", unit="file")

for root, dirs, files in os.walk('data_csv'):
    for file in files:
        if file.endswith(".csv"):
            input_csv_path = os.path.join(root, file)
            relative_path = os.path.relpath(input_csv_path, 'data_csv')
            output_rda_path = os.path.join('data_rda', os.path.splitext(relative_path)[0] + ".rda")
            os.makedirs(os.path.dirname(output_rda_path), exist_ok=True)
            data = pd.read_csv(input_csv_path, low_memory=False, encoding='utf-8')
            pyreadr.write_rdata(output_rda_path, data, compress='gzip')
            os.remove(input_csv_path)
            progress_bar.update(1)

progress_bar.close()

In [None]:
# Supression du répertoire d'extraction
shutil.rmtree('data_csv')
print('Extracted data removed.')