# Extract & Load *Indice des prix* 

**TODO** : 
- Fix offset from years and dimensions merge

## Setup

In [1]:
import os
import pathlib
import warnings
import re

from hydra import compose, initialize
from omegaconf import dictconfig
from omegaconf import OmegaConf
import pandas as pd

### Project Variables

In [2]:
CONFIG_NAME = "indice_des_prix"
SOURCE_SETTINGS_PATH = pathlib.Path('/dataplatform_lab', 'lab', 'notebooks', 'sources')
DATA_PATH = pathlib.Path('/dataplatform_lab', 'lab', 'dwh_data')
EXTRACTS_PATH = pathlib.Path(DATA_PATH, 'extracts')
LOAD_PATH = pathlib.Path(DATA_PATH, 'raw')

### Functions

In [3]:
def get_config() -> dictconfig.DictConfig:
    config_path = SOURCE_SETTINGS_PATH.relative_to(os.getcwd()).as_posix()
    with initialize(
        version_base=None, 
        config_path=config_path
    ):
        return compose(config_name=CONFIG_NAME)


def get_available_year(historic_years, year):
    return list(range(year-1, year-(historic_years+1), -1))


def get_years_to_load(historic_years, years):
    years.sort(reverse=True)
    dc_available_years = {
        y:get_available_year(historic_years, y) for y in years
    }
    dc_years = {}
    for year in years:
        dc = {
            ay:year for ay in dc_available_years[year] if ay not in dc_years
        }
        dc_years.update(dc)
    return dc_years


def get_excel_file(folder_path: pathlib.Path, excel_file_name: str) -> pd.ExcelFile:
    excel_file_path = pathlib.Path(folder_path, excel_file_name)
    return pd.ExcelFile(excel_file_path)


def process_column_names(
    df,
    dimension_cols = None,
    dimension_cols_rename = None
):
    rename_dc = {}
    value_cols = df.columns.difference(dimension_cols)
    if dimension_cols_rename:
        rename_dc.update(
            {
                old:new for old, new in zip(
                    dimension_cols, dimension_cols_rename
                )
            }
        )
        
    for col in value_cols:
        renamed_col = ' '.join([x for x in re.findall(r"[^\W]*",col) if x])
        rename_dc.update({col: renamed_col})

    return (
        df
        .rename(columns=rename_dc)
    )


def merge_multiline_labels(
        df: pd.DataFrame,
        dimension_cols: list
    ) -> pd.DataFrame:
    col_years=list(df.columns.difference(dimension_cols))
    ls_produits = []
    previous = {}
    for i, row in df.iterrows():
        row_dc = row.to_dict()
        actual = {col:row_dc[col] for col in dimension_cols}
        elem = row_dc[col_years[0]]
        if pd.isna(elem):
            previous = actual
        elif previous:
            ls_produits.append(
                {
                    col:previous.get(col).strip() + "" + actual.get(col).strip() for col in dimension_cols
                }
            )
            previous = {}
        else:
            ls_produits.append(
                {
                    col:actual.get(col).strip() for col in dimension_cols
                }
            )
            previous = ""

    df_produits = pd.DataFrame(ls_produits)
    
    # clean values
    df_values = (
        df[col_years]
        .dropna(
            axis=0,
            how='any',
        )
        .reset_index(drop=True)
    )
    
    return pd.concat([df_produits, df_values], axis=1)


def prepare_df(version, df:pd.DataFrame) -> pd.DataFrame:

    dimension_cols = df.columns[version.config.dimension_cols_index]
    clean_col_df = process_column_names(
        df, 
        dimension_cols=dimension_cols,
        dimension_cols_rename=version.config.dimension_cols_rename
    )
    dimension_cols = clean_col_df.columns[version.config.dimension_cols_index]
    clean_df = merge_multiline_labels(clean_col_df, dimension_cols) 

    return clean_df


def save_df(df, version):
    path = pathlib.Path(
        LOAD_PATH,
        version.dataset.source.schema,
        f"{version.fqtn}.parquet"
    )
    if not path.parent.exists():
        path.parent.mkdir(parents=True)
    
    df.to_parquet(
        path,
        index = None
    )

### Classes

In [4]:
class DatasetVersion:

    def __init__(self, dataset, version_dict, config={}):
        self.dataset = dataset
        version_config = version_dict.get('config')
        self.config = OmegaConf.unsafe_merge(config, version_config)
        if not self.config_is_valid():
            warnings.warn('The configuration is incorrect for this dataset')
        self.version_num = version_dict["version"]
        self.historic_years = self.config.historic_years
        self.years_dc = version_dict["years"]
        self.years = [y["year"] for y in self.years_dc]
        self.files_to_load = self.get_files_to_load_per_year()

    @property
    def fqn(self):
        dataset_name = self.dataset.name
        return f"{self.dataset.name} v{self.version_num}"
    
    @property
    def fqtn(self):
        return f"{self.dataset.table_name}_v{self.version_num}"
    
    @property
    def files_to_load_ls(self):
        return list(set(self.files_to_load.values()))
    
    @property
    def available_years(self):
        return list(self.files_to_load.keys())
    

    def __repr__(self):
        return self.fqn
    

    def get_files_to_load_per_year(self):
        years_to_load_dc = get_years_to_load(
            self.historic_years, 
            self.years
        )
        file_names_dc = self.dataset.source.file_names_dc
        return {k:file_names_dc[v] for k,v in years_to_load_dc.items()}
    
    def get_years_to_load_per_file(self):
        years_to_load_dc = get_years_to_load(
            self.historic_years, 
            self.years
        )
        dc_years_per_file = {y:[] for y in years_to_load_dc.values()}
        for col_year, file_year in years_to_load_dc.items():
            dc_years_per_file[file_year].append(str(col_year))
        
        return dc_years_per_file

    
    def config_is_valid(self):
        ls_required_keys = [
            "sheet_name",
            "historic_years",
            "type",
            "base_year_cell",
            "header_row",
            "data_start_row",
            "data_end_row"
        ]
        for key in ls_required_keys:
            if key not in self.config:
                return False

        if (
            self.dataset.type_data == "index" and
            "base_year_cell" not in self.config
        ):
            return False
        
        return True
            


class Dataset:
    def __init__(self, source, dataset_dict):
        self.source = source
        self.name = dataset_dict.name
        self.table_name = dataset_dict.table_name
        self.config = dataset_dict.get('config', {})
        self.type_data = self.config.get('type')
        self.versions_ls = dataset_dict["versions"]
        self.versions = []
        self.process_versions()


    def process_versions(self):
        for version_dict in self.versions_ls:
            version = DatasetVersion(self, version_dict, self.config)
            self.versions.append(version)
            


class Source:

    def __init__(self, source_config):
        self.name = source_config.name
        self.file_names_dc = source_config.file_names
        self.schema = source_config.schema
        self.datasets_ls = source_config.datasets
        self.datasets = []
        self.process_datasets()

    def __str__(self):
        return self.name
    
    def __repr__(self):
        return self.name

    
    def process_datasets(self):
        for dataset_dict in self.datasets_ls:
            dataset = Dataset(self, dataset_dict)
            self.datasets.append(dataset)






## Data Extraction

In [5]:
config = get_config()
ipp_source = Source(config)
dataset = ipp_source.datasets[0]
version = dataset.versions[0]

In [6]:
dc_df = {}
dc_years_file = version.get_years_to_load_per_file()
for file_name in version.files_to_load_ls:
    file_year = re.findall(r"\d{4}", file_name)[-1]
    folder_path = pathlib.Path(EXTRACTS_PATH, f"Annuaire Statistique {file_year}")
    excel_file = get_excel_file(folder_path, file_name)
    df = pd.read_excel(
        excel_file,
        version.config.sheet_name,
        header=0,
        skiprows=version.config.skiprows,
        skipfooter=version.config.skipfooter
    )
    prep_df = prepare_df(version, df)
    keep_cols = dc_years_file[int(file_year)]
    dc_df[file_year] = prep_df[keep_cols]
    dimensions_df = prep_df[version.config.dimension_cols_rename]


ls_dfs = [dimensions_df, *[df for df in dc_df.values()]]

df = pd.concat(ls_dfs,  axis=1)
    

In [7]:
for col in df.columns:
    print(col)

libelle_fr
libelle_ar
2016
2019
2018
2017


## Load to DuckDB raw database

In [8]:
import duckdb

%load_ext sql

%config SqlMagic.autopandas = True
%config SqlMagic.feedback = False
%config SqlMagic.displaycon = False

In [9]:
conn = duckdb.connect(f"{os.getenv('DUCKDB_DATA')}/raw_duck.db")
%sql conn

In [10]:
load_path = pathlib.Path(
    LOAD_PATH,
    version.dataset.source.schema,
    f"{version.fqtn}.parquet"
)

if not load_path.parent.exists():
    load_path.parent.mkdir(parents=True)

load_path

PosixPath('/dataplatform_lab/lab/dwh_data/raw/indice_des_prix/ipp_v1.parquet')

In [11]:
%sql COPY df TO '{{ load_path.as_posix() }}' (FORMAT PARQUET);


Unnamed: 0,Count
0,31


In [12]:
conn.close()