In [None]:
import csv
import os
from dotenv import load_dotenv
import pandas as pd
import json
import numpy as np
from utils.data_utils import unaccent_and_upper, format_insee_or_postal_code


load_dotenv()

csp_filepath = os.environ['CSP_PATHFILE_2025']
base_output_filepath = os.environ['DB_CSP_EXPORT_2025']
existing_codes_filepath = os.environ['EXISTING_CODES_PATHFILE_2025']
cps_organism_code = '459'

In [None]:
csp_df = pd.read_csv(csp_filepath, on_bad_lines='skip', sep=',', engine="c", dtype=str)
existing_codes = pd.read_csv(existing_codes_filepath, on_bad_lines='skip', sep=',', engine="c")

In [None]:
# Telephone as is, it is not exploitable
# csp_df['allocataire-telephone'] = csp_df['allocataire-telephone'].str.replace(' ', '')

In [None]:
# assign code_orgnisme for csp
csp_df['allocataire-code_organisme'] = cps_organism_code
csp_df['organisme'] = 'MSA'

In [None]:
csp_df['adresse-allocataire_code_insee'] = csp_df['adresse-allocataire_code_insee'].astype(str)

# Remove extra white spaces
csp_df['nom'] = csp_df['nom'].astype(str).apply(unaccent_and_upper).str.strip()
csp_df['prenom'] = csp_df['prenom'].astype(str).apply(unaccent_and_upper).str.strip()
csp_df['genre'] = csp_df['genre'].astype(str).str.upper()

In [None]:
# Countries that are different from France and Miquelon
foreign_country_mask = (csp_df['allocataire-code_iso_pays_naissance'] != 'FR') & (csp_df['allocataire-code_iso_pays_naissance'] != 'PM')
csp_df.loc[foreign_country_mask, ['allocataire-code_insee_commune_naissance', 'allocataire-commune_naissance']] = np.NaN

In [None]:
# map to json values for target DB model
## map allocataire json
def to_json_allocataire_without_null(row):
    allocataire_mapping = {
        'qualite': row['allocataire-qualite'],
        'matricule': row['allocataire-matricule'],
        'code_organisme': row['allocataire-code_organisme'],
        'telephone': row['allocataire-telephone'],
        'nom': unaccent_and_upper(row['allocataire-nom']),
        'prenom': unaccent_and_upper(row['allocataire-prenom']),
        'date_naissance': row['allocataire-date_naissance'],
        'courriel': row['allocataire-courriel'],
        'code_insee_commune_naissance': format_insee_or_postal_code(row['allocataire-code_insee_commune_naissance']),
        'commune_naissance': row['allocataire-commune_naissance'],
        'code_iso_pays_naissance': row['allocataire-code_iso_pays_naissance'],
    }
    filtered_NaN_allocataire = {k: v for k, v in allocataire_mapping.items() if pd.notnull(v)}
    return json.dumps(filtered_NaN_allocataire, ensure_ascii=False)

csp_df['allocataire'] = csp_df.apply(to_json_allocataire_without_null, axis=1)

In [None]:
csp_df['adresse-allocataire_voie'] = np.where(csp_df['adresse-allocataire_voie'].str.len() > 0, csp_df['adresse-allocataire_voie'] + ' ' + csp_df['adresse-allocataire_nom_adresse_postal'], csp_df['adresse-allocataire_nom_adresse_postal'])
csp_df['adresse-allocataire_voie']

In [None]:
## map adresse_allocataire json
def to_json_adresse_without_null(row):
    adresse_mapping = {
        'voie': row['adresse-allocataire_voie'],
        'code_postal': format_insee_or_postal_code(row['adresse-allocataire_code_postal']),
        'commune': row['adresse-allocataire_commune'],
        'code_insee': format_insee_or_postal_code(row['adresse-allocataire_code_insee']),
        'cplt_adresse': row['adresse-allocataire_cptl_adresse'],
    }
    
    filtered_address = {k: v for k, v in adresse_mapping.items() if pd.notnull(v)}
    return json.dumps(filtered_address, ensure_ascii=False)

csp_df['adresse_allocataire'] = csp_df.apply(to_json_adresse_without_null, axis=1)

In [None]:
csp_df['date_naissance'] = pd.to_datetime(csp_df['date_naissance']).dt.floor('D') + pd.DateOffset(hours=4)

In [None]:
# Add missing default column needed for target DB model
timestamp_with_custom_tz = pd.Timestamp.now(tz='Europe/Paris')

csp_df['exercice_id'] = 4
csp_df['uuid_doc'] = np.NaN
csp_df[['zrr', 'qpv', 'a_valider', 'refuser']] = False
csp_df[['updated_at', 'created_at']] = timestamp_with_custom_tz

In [None]:
from datetime import datetime

mask_nom_equal = csp_df['allocataire-nom'] == csp_df['nom']
mask_prenom_equal = csp_df['allocataire-prenom'] == csp_df['prenom']
mask_same_names = mask_nom_equal & mask_prenom_equal

# Keep AAH benef only between 20 and 30 years old
mask_dob_start = csp_df['date_naissance'].dt.date >= datetime(1995, 1, 1).date()
mask_dob_end = csp_df['date_naissance'].dt.date <= datetime(2005, 12, 31).date()
mask_dob = mask_dob_start & mask_dob_end

print(f"{len(csp_df[mask_same_names & mask_dob])} AAH with prenom_benef = prenom_alloc & nom_benef = nom_alloc and AAH within period")
print(f"{len(csp_df[~mask_dob])} AAH outside the period ")

aah_df = csp_df[mask_same_names & mask_dob]
aah_df['situation'] = 'AAH'

In [None]:
# ARS data 14-17 years old
mask_jeune_dob_start = pd.to_datetime(csp_df['date_naissance']).dt.date >= datetime(2008, 1, 1).date()
mask_jeune_dob_end = pd.to_datetime(csp_df['date_naissance']).dt.date <= datetime(2011, 12, 31).date()
mask_jeune_dob = mask_jeune_dob_start & mask_jeune_dob_end

jeunes_df = csp_df[mask_jeune_dob & ~mask_same_names]
jeunes_df['situation'] = 'jeune'
print(f"{len(jeunes_df)} jeunes within period")
print(f"{len(csp_df[~mask_jeune_dob & ~mask_same_names])} jeunes outside the period ")

In [None]:
assert(len(pd.merge(aah_df, jeunes_df, how='inner', on=['prenom', 'nom'])) == 0)
print(f"{len(aah_df)} AAH")
print(f"{len(jeunes_df)} Jeunes")

merged_df = pd.concat([aah_df, jeunes_df])

In [None]:
# Unique codes generation
import random
import string
import datetime

current_date = datetime.datetime.now()
current_year = str(current_date.year)[-2:]

def get_characters_set(size = 4):
    return ''.join(random.choices([c for c in string.ascii_uppercase if c not in 'OI'], k=size))

def generate_code():
    return f"{current_year}-{get_characters_set(4)}-{get_characters_set(4)}"

# init set of codes with existing
unique_codes = set(existing_codes['code'])

# init current_code count
current_codes_count = len(unique_codes)

while len(unique_codes) < (len(merged_df) + len(existing_codes)):
    unique_codes.add(generate_code())

In [None]:
# Ensure we have generated codes for all the rows
assert len(unique_codes) == (len(merged_df)+len(existing_codes))

In [None]:
new_codes = unique_codes.difference(set(existing_codes['code']))
assert len(new_codes) == len(merged_df)

In [None]:
# Assign generated code for production data
merged_df['id_psp'] = list(new_codes)

In [None]:
csp_df_column_filtered = merged_df[[
       'id_psp', 'date_naissance',
       'genre', 'nom', 'prenom', 'organisme', 'situation',
       'allocataire', 'adresse_allocataire',
       'updated_at', 'exercice_id', 'uuid_doc',
       'zrr', 'qpv', 'a_valider', 'refuser', 'created_at'
]]

In [None]:
# output to CSV
csp_df_column_filtered.to_csv(base_output_filepath, sep=';', index=False, encoding='utf-8')