In [None]:
import os
import urllib.request
import json
import datetime

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pprint import pprint

# Permet la génération de word
from docx import Document
from docx.shared import Pt
from docxcompose.composer import Composer
from docxtpl import DocxTemplate
from docx.enum.style import WD_STYLE_TYPE

In [None]:
def import_json_to_dict(url) :
    response = urllib.request.urlopen(url)
    my_dict = json.loads(response.read())
    return my_dict

In [None]:
def mkdir_ifnotexist(path) :
    if not os.path.isdir(path) :
        os.mkdir(path)

In [None]:
def format_amount(indic, valeur):
    if "Montant" in indic:
        f_valeur = float(valeur)
        if f_valeur > 1000000:
            return str(round(f_valeur/1000000, 1)) + ' M€'
        elif f_valeur > 10000:
            return str(round(f_valeur/1000, 1)) + ' k€'
        else:
            return str(f_valeur)
    else:
        try:
            return int(valeur.split(".")[0])
        except ValueError as err:
            print(f"L'indicateur {indic} possède des valeurs invalides : {err}")

In [None]:
mailles = ["national", "regional", "departemental"]

In [None]:
taxo_dep_df = pd.read_csv('refs/taxo_deps.csv', dtype={'dep':str, 'reg':str})
taxo_dep_df['dep'] = taxo_dep_df['dep'].apply(lambda x: x.zfill(2))
taxo_dep_df['reg'] = taxo_dep_df['reg'].apply(lambda x: x.zfill(2))
dep_list = list(taxo_dep_df['dep'].unique())
print('{} departements.'.format(len(dep_list)))

taxo_reg_df = pd.read_csv('refs/taxo_regions.csv', dtype={'reg':str})
taxo_reg_df['reg'] = taxo_reg_df['reg'].apply(lambda x: x.zfill(2))
reg_list = list(taxo_reg_df['reg'].unique())
print('{} regions.'.format(len(reg_list)))

In [None]:
pp_dep = pd.read_csv("pp_dep.csv", sep=";", dtype={"reg":str, "dep":str})

In [None]:
list_mesure_indic = list(pp_dep.pivot_table(index=["short_mesure", "short_indic"], values="valeur").index)

In [None]:
short_mesures_to_keep =set([
'AAP Efficacité énergétique',
'Assurance prospection',
"Ma Prime Rénov'",
"Prime à l'embauche des jeunes",
"Prime à l'embauche pour les travailleurs handicapés",
"Apprentissage",
"Bonus électrique",
"Contrats Initiatives Emploi (CIE) Jeunes",
'Contrats de professionnalisation',
'France Num : aide à la numérisation des TPE,PME,ETI',
'Garantie jeunes',
"AAP industrie : Modernisation des filières auto et aéro",
"Parcours emploi compétences (PEC) Jeunes",
'Prime à la conversion des véhicules légers',
"AAP Industrie : Soutien aux projets industriels territoires",
"AAP Industrie : Sécurisation approvisionnements critiques",
"Renforcement subventions Business France",
'Rénovation des bâtiments Etats (marchés notifiés)',
"Service civique",
'Soutien recherche aéronautique civil'
])
#print(list_mesure_indic)
#print(list(x[0] for x in list_mesure_indic if not(x[0] in short_mesures_to_keep)))
list_mesure_indic = [x for x in list_mesure_indic if x[0] in short_mesures_to_keep]

In [None]:
def check_pp_reg(pp_reg):
    assert sorted(pp_reg['reg'].unique()) == sorted(taxo_reg_df['reg'])
    assert sorted(pp_reg['region'].unique()) == sorted(taxo_reg_df['libelle'])
    assert sorted(pp_reg['mesure'].unique()) == sorted(pp_dep['mesure'].unique())
    assert sorted(pp_reg['short_mesure'].unique()) == sorted(pp_dep['short_mesure'].unique())
    

def check_pp_nat(pp_nat):
    assert sorted(pp_nat['mesure'].unique()) == sorted(pp_nat['mesure'].unique())
    assert sorted(pp_nat['short_mesure'].unique()) == sorted(pp_nat['short_mesure'].unique())

In [None]:
pp_reg = pd.pivot_table(pp_dep, index=["mesure","short_mesure", "reg","region", "Date", "period_date", "short_indic"], values="valeur", aggfunc=np.sum)
pp_reg.rename(columns={"reg":"libelle"}, inplace=True)
pp_reg.reset_index(inplace=True)
check_pp_reg(pp_reg)

In [None]:
pp_nat = pd.pivot_table(pp_reg, index=["mesure", "short_mesure", "Date","period_date", "short_indic"], values="valeur", aggfunc=np.sum)
pp_nat.reset_index(inplace=True)
check_pp_nat(pp_nat)

In [None]:
dict_mesure_indic = {}
for x in list_mesure_indic:
    if x[0] in dict_mesure_indic:
        dict_mesure_indic[x[0]].append(x[1])
    else:
        dict_mesure_indic[x[0]] = [x[1]]

In [None]:
dict_mesure_indic['AAP Industrie : Soutien aux projets industriels territoires'] = ['Nombre de TPE,PME,ETI bénéficiaires']
dict_mesure_indic['AAP Industrie : Sécurisation approvisionnements critiques'] = ['Nombre de TPE,PME,ETI bénéficiaires']
dict_mesure_indic['AAP Efficacité énergétique'] = ["Nombre d'entreprises ayant reçu l'aide"]
dict_mesure_indic['AAP industrie : Modernisation des filières auto et aéro'] = ['Nombre de PME']

In [None]:
# On ne veut pas afficher les lignes de Prime Rénov nulles
pp_dep = pp_dep.loc[(pp_dep.short_mesure != "Ma Prime Rénov'") | (pp_dep.valeur != 0) ]
pp_reg = pp_reg.loc[(pp_reg.short_mesure != "Ma Prime Rénov'") | (pp_reg.valeur != 0) ]
pp_nat = pp_nat.loc[(pp_nat.short_mesure != "Ma Prime Rénov'") | (pp_nat.valeur != 0) ]

assert pp_dep[(pp_dep['valeur'] == 0) & (pp_dep.short_mesure == "Ma Prime Rénov'")].shape[0] == 0
assert pp_reg[(pp_reg['valeur'] == 0) & (pp_reg.short_mesure == "Ma Prime Rénov'")].shape[0] == 0
assert pp_nat[(pp_nat['valeur'] == 0) & (pp_nat.short_mesure == "Ma Prime Rénov'")].shape[0] == 0

In [None]:
pp_dep.valeur = pp_dep.valeur.astype(str)
pp_dep.valeur = pp_dep.apply(lambda x: format_amount(x["short_indic"], x["valeur"]), axis=1)

In [None]:
pp_reg.valeur = pp_reg.valeur.astype(str)
pp_reg.valeur = pp_reg.apply(lambda x: format_amount(x["short_indic"], x["valeur"]), axis=1)

In [None]:
pp_nat.valeur = pp_nat.valeur.astype(str)
pp_nat.valeur = pp_nat.apply(lambda x: format_amount(x["short_indic"], x["valeur"]), axis=1)

In [None]:
#Building the folders structures

# For PDF
img_dir_path = './img/'
reports_dir_path = './reports/'
mkdir_ifnotexist(reports_dir_path)

# For words
word_dir_path = "reports_word"
word_gen_dir_path = "reports_word/Generation_p2p"
mkdir_ifnotexist(word_dir_path)
mkdir_ifnotexist(word_gen_dir_path)

In [None]:
all_charts_as_df = {"departemental": {dep: {} for dep in dep_list},
                    "national": {'France': {}},
                    "regional": {reg: {} for reg in reg_list}}
    

In [None]:
def make_pp_chart(maille, mesure, short_indics):
    if maille == "departemental":
        df = pp_dep.loc[(pp_dep.short_mesure == mesure)].sort_values(by="period_date", ascending=True).copy()
        deps = taxo_dep_df.dep.unique()  # Liste exhaustive de départements
        
        default = df.groupby(["Date", "period_date"]).sum().sort_values("period_date", ascending=True).reset_index()
        default[short_indics] = '-'
        default = default[["Date"] + short_indics]

        for dep in deps:
            print(f"Plotting {mesure}-{short_indics} : departement {dep}")
            df_dep = df.loc[df.dep == dep]
            if df_dep.shape[0] == 0:
                all_charts_as_df[maille][dep][mesure] = default.T.reset_index().T
            else:
                df_plot = pd.pivot_table(df_dep, index=['period_date', 'Date'], columns=['short_indic'], values='valeur', aggfunc='first')
                df_plot = df_plot.reset_index().drop('period_date', axis=1)
                df_plot = df_plot.rename_axis(None, axis=1)
                df_plot = df_plot.fillna('-')
                cols = set(df_plot.columns).intersection(short_indics)
                if len(cols) != len(short_indics):
                    missing_cols = set(short_indics) - cols
                    for missing_col in missing_cols:
                        df_plot[missing_col] = '-'
                df_plot = df_plot[['Date'] + short_indics]
                all_charts_as_df[maille][dep][mesure] = df_plot.T.reset_index().T
                
            
    elif maille == "regional":
        df = pp_reg.loc[(pp_reg.short_mesure == mesure)].sort_values(by="period_date", ascending=True).copy()
        regs = taxo_dep_df.reg.unique()
        
        default = df.groupby(["Date", "period_date"]).sum().sort_values("period_date", ascending=True).reset_index()
        default[short_indics] = '-'
        default = default[["Date"] + short_indics]

        for reg in regs:
            print(f"Plotting region {mesure}-{short_indics} : {reg}")
            df_reg = df.loc[df.reg == reg]
            if df_reg.shape[0] == 0:
                all_charts_as_df[maille][reg][mesure] = default.T.reset_index().T
            else:
                df_plot = pd.pivot_table(df_reg, index=['period_date', 'Date'], columns=['short_indic'], values='valeur', aggfunc='first')
                df_plot = df_plot.reset_index().drop('period_date', axis=1)
                df_plot = df_plot.rename_axis(None, axis=1)
                df_plot = df_plot.fillna('-')
                cols = set(df_plot.columns).intersection(short_indics)
                if len(cols) != len(short_indics):
                    missing_cols = set(short_indics) - cols
                    for missing_col in missing_cols:
                        df_plot[missing_col] = '-'
                df_plot = df_plot[['Date'] + short_indics]
                all_charts_as_df[maille][reg][mesure] = df_plot.T.reset_index().T
            
    elif maille == "national":
        print(f"Plotting country {mesure}-{short_indics}")
        df_nat = pp_nat.loc[(pp_nat.short_mesure == mesure)].sort_values(by="period_date", ascending=True).copy()
        df_plot = pd.pivot_table(df_nat, index=['period_date', 'Date'], columns=['short_indic'], values='valeur', aggfunc='first')
        df_plot = df_plot.reset_index().drop('period_date', axis=1)
        df_plot = df_plot.rename_axis(None, axis=1)
        df_plot = df_plot.fillna('-')
        df_plot = df_plot[['Date'] + short_indics]
        all_charts_as_df[maille]['France'][mesure] = df_plot.T.reset_index().T


In [None]:
def make_all_charts():
    for short_mesure in dict_mesure_indic:
        short_indics = dict_mesure_indic[short_mesure]
        for maille in mailles :
            make_pp_chart(maille, short_mesure, short_indics)

In [None]:
make_all_charts()

In [None]:
def check_charts_exhaustivity(all_charts_as_df):
    assert sorted(all_charts_as_df['departemental'].keys()) == sorted(taxo_dep_df['dep'])
    assert sorted(all_charts_as_df['regional'].keys()) == sorted(taxo_reg_df['reg'])
    assert sorted(all_charts_as_df['national'].keys()) == ['France']
    
    # Vérifier si des graphiques manquent.
    for dep in taxo_dep_df['dep']:
        assert sorted(all_charts_as_df['departemental'][dep].keys()) == sorted(dict_mesure_indic.keys()), f"{dep}"
    for reg in taxo_reg_df['reg']:
        assert sorted(all_charts_as_df['regional'][reg].keys()) == sorted(dict_mesure_indic.keys())
    
    assert sorted(all_charts_as_df['national']['France'].keys()) == sorted(dict_mesure_indic.keys())

    
check_charts_exhaustivity(all_charts_as_df)

In [None]:
volet2mesures = {
'Ecologie': ["Ma Prime Rénov'",
		"Bonus électrique",
		'AAP Efficacité énergétique',
        "AAP industrie : Modernisation des filières auto et aéro",
		'Prime à la conversion des véhicules légers',
		'Soutien recherche aéronautique civil', 
		'Rénovation des bâtiments Etats (marchés notifiés)',],

'Compétitivité': ['Assurance prospection', 
		'France Num : aide à la numérisation des TPE,PME,ETI',
		"AAP Industrie : Soutien aux projets industriels territoires",
		"AAP Industrie : Sécurisation approvisionnements critiques",
		"Renforcement subventions Business France",],

'Cohésion': ["Apprentissage",
		"Prime à l'embauche des jeunes",
		"Prime à l'embauche pour les travailleurs handicapés",
		"Contrats Initiatives Emploi (CIE) Jeunes",
		'Contrats de professionnalisation',
		'Garantie jeunes',
		"Parcours emploi compétences (PEC) Jeunes",
		"Service civique",]

}

In [None]:
def get_kpi(dep, short_indic, short_mesure):
    kpi_dep = (pp_dep.loc[(pp_dep.dep == dep) 
                          & (pp_dep.short_mesure == short_mesure) 
                          & (pp_dep.short_indic == short_indic)]
                .sort_values(by="period_date", ascending=False))
    if kpi_dep.shape[0] != 0:
        date= kpi_dep.iloc[0].Date
        valeur = kpi_dep.iloc[0].valeur
    else:
        date = pp_dep.Date.max()
        valeur = 0
    return date, valeur


In [None]:
# Liste des régions pour lesquelles on ne veut pas de fiche. Noms provenant de taxo_regs.csv
L_reg_no_output = ["00"]  # 0 correspond à Etranger


def creation_front_page(nom_departement):
    doc = DocxTemplate("template/template_front_page.docx")
    today = datetime.datetime.today().strftime('%Y-%m-%d')
    context = {'dep': str(nom_departement), 
               'date': today}
    doc.render(context)
    name_file = "reports_word/Generation_p2p/front_page_{}.docx".format(nom_departement)
    doc.save(name_file)
    return name_file


def creation_volet_page(nom_volet, num_volet):
    doc = DocxTemplate("template/template_volet.docx")
    context = {'volet': nom_volet, 
               'num_volet': num_volet}
    doc.render(context)
    name_file = "reports_word/Generation_p2p/{}.docx".format(nom_volet)
    doc.save(name_file)
    return name_file


def delete_paragraph(paragraph):
    p = paragraph._element
    p.getparent().remove(p)
    paragraph._p = paragraph._element = None
    
    
def fusion_word(word1, word2, dep):
    master = Document(word1)
    master.add_page_break()
    composer = Composer(master)
    doc1 = Document(word2)
    composer.append(doc1)
    name_fusion = "reports_word/Suivi_Territorial_plan_relance_{}.docx".format(dep)
    composer.save(name_fusion)
    return name_fusion


def must_add_laureat(volet, mesure):
    target_mesures = ['AAP Efficacité énergétique', 'Soutien recherche aéronautique civil']
    if volet == "Compétitivité" or mesure in target_mesures or mesure.lower().startswith('aap industrie'):
        return True
    else:
        return False


def creation_content_page(all_charts_as_df, departement, region, mesure, volet, dep_name, reg_name, num_mesure):
    # Ouverture de template
    doc = DocxTemplate("template/template_content_page.docx")
    # Recuperation des datas pour les 3 scales
    df_nat = all_charts_as_df["national"]["France"][mesure]
    df_reg = all_charts_as_df["regional"][region][mesure]
    df_dep = all_charts_as_df["departemental"][departement][mesure]
    # Recuperation des noms des colonens
    col_labels = df_nat.iloc[0]
    short_indic = dict_mesure_indic[mesure][0]
    date, valeur = get_kpi(departement, short_indic, mesure )
    context = { 'mesure' : mesure,
                'date': date,
                'indicateur': short_indic,
                'indicateur_top': f"{num_mesure} - {short_indic}",
                'val': valeur,            
                'title_table_nat' : "Niveau National", 
                'title_table_reg' : "Niveau Régional", 
                'title_table_dep' : "Niveau Départemental",
                'lib_reg': reg_name, 
                'lib_dep': dep_name,
                'col_labels' : col_labels, 
                # Les 3 lignes suivantes permettent de générer des tabeaux avec uniquement les 3 derniers mois.
                # Prend en compte le cas ou il n'y a pas encore 3 mois de données
                'tbl_contents_nat': [{'cols' : list(df_nat.iloc[-i-1])} for i in range(min(len(df_nat)-1, 3))],
                'tbl_contents_reg': [{'cols' : list(df_reg.iloc[-i-1])} for i in range(min(len(df_reg)-1, 3))],
                'tbl_contents_dep': [{'cols' : list(df_dep.iloc[-(i+1)])} for i in range(min(len(df_dep)-1, 3))],
                'text_laureats': "Exemples de lauréats :" if must_add_laureat(volet, mesure) else "Espace Commentaires :",
                }
    doc.render(context)
    name_file = "reports_word/Generation_p2p/content_page_{}.docx".format(mesure)
    doc.save(name_file)
    return name_file


def creation_fiche(dep):
    #departement: code departement 01:
    #On a les variables volet2mesures, all_charts
    reg = taxo_dep_df[taxo_dep_df['dep'] == dep].iloc[0]['reg']  # Code region
    if reg in L_reg_no_output:
        return False
    num_volet, num_mesure = 1, 1
    reg_name = taxo_reg_df[taxo_reg_df['reg'] == reg].iloc[0]['libelle']  # libelle
    dep_name = taxo_dep_df[taxo_dep_df['dep'] == dep].iloc[0]['libelle']
    name_fusion = creation_front_page(dep_name)
    for volet in list(volet2mesures.keys()):  # 3 itérations, dep_name, reg_name
        name_volet = creation_volet_page(volet, num_volet)
        name_fusion = fusion_word(name_fusion, name_volet, dep_name)
        liste_mesure = volet2mesures[volet]
        num_volet += 1
        for mesure in liste_mesure:
            name_content = creation_content_page(all_charts_as_df, dep, reg, mesure, volet, dep_name, reg_name, num_mesure)
            name_fusion = fusion_word(name_fusion, name_content, dep_name)
            num_mesure += 1
    return name_fusion
            


def create_all_dep():
    list_all_dep = taxo_dep_df["dep"]
    for dep in list_all_dep:
        docx_path = creation_fiche(dep)
        doc = Document(docx_path)

        # Retirer le double espacement après les tableaux
        for paragraph in doc.paragraphs:
            # Retirer les lignes vides après les tableaux en fin de page : elles sont reconnues par la taille 
            # de la police <= 2 où ne possède pas de run.
            if paragraph.text.__len__() == 0 and (any(run.font.size <= Pt(2) for run in paragraph.runs if run.font.size is not None) or 
                                                  (all(run.font.size is None for run in paragraph.runs))):
                delete_paragraph(paragraph)
        doc.save(docx_path)
        
        # Réduire la taille du paragraphe après le dernier tableau de chaque page
        # Cela permet d'éviter de créer une nouvelle page quand le texte devient trop long
        doc = Document(docx_path)
        styles = doc.styles
        style = styles.add_style('Custom_style', WD_STYLE_TYPE.PARAGRAPH)
        style.font.size = Pt(2)
        for paragraph in doc.paragraphs:
            #print(f'-{paragraph.text}-')
            if len(paragraph.text) == 1:
                paragraph.style = doc.styles['Custom_style']
                #print('changed style')
        doc.save(docx_path)

        
# Lance la génération dans le dossier reports_word
create_all_dep()


In [None]:
def check_num_docx_created():
    num_test = len([fn for fn in os.listdir('reports_word') if "Suivi" in fn])
    num_true = taxo_dep_df['dep'].shape[0]
    # num_true-1 car on enlève le département Etranger "00"
    assert num_test == num_true-1, f"{num_test} -- {num_true-1}"
    
check_num_docx_created()