<img src="../images/AzPTravel_PPM.png">

## Source File Consolidation

#### This script fetches the current data collection submission files, consolidates them in to a combined csv file, and outputs that to the network.
- All source files stored in the path are read, summarized, and then consolidated into a single dataframe named 'df'.
- This process assumes that the multiple sources are in no way duplicates of each other.
- df is then output in the data collection subfolder of the "Production" folder.


### User Variables
- These are overwritten if inherited from run_control.ipynb.
- Feel Free to reset them for a manual run if you like
- Do not save without percode = "-f"

In [1]:
commit_message = "Development and testing."
# Give a brief reason for the run.

run_control = 1
#run_type = 0 - Lite run with no reporting, not recommended.
#run_type = 1 - Lite run with normal reporting, default setting.
#run_type = 2 - Heavy run with full reporting, available for audits and troubleshooting.
#run_type = 5 - A default setting. Indicates the script is being run by an outside process without an inherited value

percode = "2021.Q1"
# Data Collection Code, this controls file paths and output names
# "-f" is the value indicating a bad inheritance from run with arg

s_format = "p"
# denotes the source data format x == Excel; j == json, p == parquet

#----------
# do not edit - this either inherits the full instance timestamp from the papermill book or captures the run time of this script.
from datetime import datetime
inst_datetime = datetime.now().strftime("%m%d%Y%H%M%S")

In [2]:
# Parameters
run_control = 1
percode = "2021.Q1"
commit_message = "Edited data for Austraila, json 2 parquet and add comments from email. Begin general evaluation of data submissions."
inst_datetime = "05242021183245"
source_type = "p"


#### Notebook display options

In [3]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

#### import packages

In [4]:
#### Packages used

import os # System commands
import sys # System commands

from datetime import datetime # datetime options
import warnings # custom warnigns options

import getpass # parquet file read/write
import json # json file read/write

import matplotlib.pyplot as plt #Plots and Graphs
import openpyxl # Excel operations
import numpy as np
import pandas as pd #DataFrame and math





#### Default Variables, these govern logic, do not edit.

In [5]:
default_dc = "20XX.QX"
default_rc = 0 #extra lite mode
dummy_perc = "33Q3" # bad inheritance

#### Script determining run context ie, manual, run_control.ipynb, or other.

In [6]:
if run_control == 5:
    run_control = default_rc 
else:
    run_control = run_control

try:
    if sys.argv[1] == "-f":
        percode = percode
    else:
        percode = sys.argv[1]

except IndexError:
    percode = default_dc
except NameError:
    percode = default_dc


#### Make paths for the source folder

In [7]:
rt_path = f'//hecate/Insurance_US/Product Development/Product Management/Global PPM/Reporting/Data Collection/Production/{str(percode)}'
ls_path = os.path.join( rt_path, 'live_sources')


#### Get a list of only source files in the path that start with "us_dat".
#### Logic determines the source file types.

#### User instructions:
- Make sure that you have 1 file per source in this folder.
    -  For instance, do not have two files for Portugal. If there is an update, archive the old one.
- Do not overwrite files in the archive.
    - Rename newly archived files, no strict convention, we keep track of these by the modified date.
- It is ok to have multiple sources in one file.

In [8]:
files = os.listdir(ls_path)
files = [files.lower() for files in files]

if s_format == "x":
    files_sour = [f for f in files if f[-3:] in ('lsx' , 'lsm' ,'xls')]

elif s_format == "j":
    files_sour = [f for f in files if f[:6] == 'us_dat'  and  f[-5:]  == '.json']

elif s_format == "p":
    files_sour = [f for f in files if f[:6] == 'us_dat' and  f[-8:]  == '.parquet']


#### Create a list 'pathfiles' that has every source file with the full path.

In [9]:
pathfiles = []

for f in files_sour:
    makepathsfiles = os.path.join(str(ls_path), str(f))
    pathfiles.append(makepathsfiles)


#### File Check

- create file information Data.Frame "file_info_df" with name, size, and modified date of each source file
- import previous source file list for comparison
- print all new files, store as a list "newfiles"
- print all dropped files, store as a list "dropped_files"

In [10]:
#print("Current Source Files")
cols = ["Filename", "Size", "Last Modified" ]

file_info_df = pd.DataFrame(columns = cols)
for f, p in zip(files_sour, pathfiles):
    name = f
    size = os.path.getsize(p) # size in bytes
    moddate = os.path.getctime(p)# time of last metadata change;
    moddate =  datetime.fromtimestamp(moddate).strftime('%Y-%m-%d %H:%M:%S')# format change;
    new_row = pd.DataFrame([[name, size, moddate]], columns = cols)
    file_info_df = file_info_df.append(new_row, ignore_index=True)

#print(file_info_df)

file_list = os.path.join( rt_path, 'Current_Source_Files.csv')
prev_files = pd.read_csv(file_list)


fl_comp = file_info_df["Filename"] + file_info_df["Size"].astype(str) + file_info_df["Last Modified"].astype(str)

prev_comp = prev_files["Filename"] + prev_files["Size"].astype(str) + prev_files["Last Modified"].astype(str)

newfiles = []

for idx, r in enumerate(fl_comp):
    
    if r in prev_comp.values:
    
        pass
     
    else:
        if file_info_df['Filename'][idx] in prev_files["Filename"].values:
            print("")
            print(f"Changed data file found: {file_info_df['Filename'][idx]}, last update {file_info_df['Last Modified'][idx]}.")
            newfiles.append(pathfiles[idx])
        else:
            print("")
            print(f"New data file found: {file_info_df['Filename'][idx]}, last update {file_info_df['Last Modified'][idx]}.")
            newfiles.append(pathfiles[idx])

if len(newfiles) == 0:
    
    print("New new or changed data files found in live sources folder.")

else:

    pass

dropped_files = []

for r in prev_files["Filename"]:
    
    if r in file_info_df["Filename"].values:
        
        pass
    
    else:
        print("")
        print(f"File {r} has been archived or removed from live sources folder.")
        dropped_files.append(os.path.join( ls_path, r))
    
    
    
    
file_info_df.to_csv(file_list, index = False)




New new or changed data files found in live sources folder.


#### Business Unit Check

- import existing Business Unit and raw file list
- enusure there is a 1 to 1 relationship
- throw error if not


In [11]:
cols = [ "Business Unit", "File"]

bu_x_file = pd.DataFrame(columns = cols ) 

for p, f  in zip(pathfiles,files_sour):

    if s_format == "x":

        try:
            data = pd.read_excel(p, sheet_name = 'Portfolio_Monitoring', na_values = [0], header=3, converters={ 'Business Partner Name': str, 'Type of Business': str, 'Type of Account': str, 'Distribution Type': str, 'LOB': str, 'Distribution Channel': str,
            'Sub LOB': str,'Business Partner ID Number': str,  'Product Name': str, 'Product ID Number': str,  'Product Family': str,  'Standard Product': str, 'Total Expenses': float,    })

        except:
            data = pd.read_excel(p, sheet_name = 'Ptf_Monitoring_GROSS_Reins', na_values = [0], header=3, converters={ 'Business Partner Name': str, 'Type of Business': str, 'Type of Account': str, 'Distribution Type': str, 'LOB': str, 'Distribution Channel': str,
            'Sub LOB': str,'Business Partner ID Number': str,  'Product Name': str, 'Product ID Number': str,  'Product Family': str,  'Standard Product': str, 'Total Expenses': float,    })


    elif s_format == "j":

        data = pd.read_json(p, orient="table")

    elif s_format == "p":

        data = pd.read_parquet(p, engine = "pyarrow")

    bus = data["Business Unit"].unique()
    
    for i in bus: bu_x_file = bu_x_file.append({"Business Unit": i,"File": f} , ignore_index=True)

        
#This is a list of BU's that are included in the raw data files, and the file name(s) they are contained in.
bu_filelist = bu_x_file.sort_values(by='Business Unit').style.hide_index()




In [12]:

bucount =  bu_x_file.groupby("Business Unit").count().sort_values(by='File', ascending=False)

filecount =  bu_x_file.groupby("File").count().sort_values(by='File')

err_check = 1

if len(bucount[bucount["File"] > 4]) > 0: # change this back to 1
    warnings.warn("\n \nThere is a Business Unit with data in multiple source files.\nReview the file counts and filenames to resolve the issue.\nThis is a critical control.")
    err_check = 0
else:
    pass

try:
    1 / err_check

except ZeroDivisionError:
    print("Execution halted due to multiple source file per one or more Business Units.")


1.0

In [13]:

print("This is a count of how many BUs appear in each file. It is fine to submit multiple BUs in one source.")
filecount


print("\nThis is a count of how many files each BU appears in. In all cases the count values should be one (1). The process as desgined, should have a single submission and source for each BU.")
bucount

print("This is a list of BU's that are included in the raw data files, and the file name(s) they are contained in.")   

bu_filelist

This is a count of how many BUs appear in each file. It is fine to submit multiple BUs in one source.


Unnamed: 0_level_0,Business Unit
File,Unnamed: 1_level_1
us_dat_au_02172021174434.parquet,1
us_dat_ca_05202021134242.parquet,1
us_dat_ch_05182021192455.parquet,1
us_dat_es_05202021105141.parquet,1
us_dat_it_05242021111520.parquet,1
us_dat_pt_05202021121056.parquet,1



This is a count of how many files each BU appears in. In all cases the count values should be one (1). The process as desgined, should have a single submission and source for each BU.


Unnamed: 0_level_0,File
Business Unit,Unnamed: 1_level_1
AU,1
CA,1
CH,1
ES,1
IT,1
PT,1


This is a list of BU's that are included in the raw data files, and the file name(s) they are contained in.


Business Unit,File
AU,us_dat_au_02172021174434.parquet
CA,us_dat_ca_05202021134242.parquet
CH,us_dat_ch_05182021192455.parquet
ES,us_dat_es_05202021105141.parquet
IT,us_dat_it_05242021111520.parquet
PT,us_dat_pt_05202021121056.parquet


- Read each json file into temp DataFrame "data".
- Prep actions - Strip (trim) leading and trailing spaces values in string columns, remove rows without business units.
- Append each json output to into 1 DataFrame "df".
- Process Cleanup, the indices restart with each append, reset and drop the previous.

In [14]:
df_dict = {}
#empty dict to append each file's DataFrame to

for f in pathfiles:

    if s_format == "x":

        try:
            data = pd.read_excel(f, sheet_name = 'Portfolio_Monitoring', na_values = [0], header=3, converters={ 'Business Partner Name': str, 'Type of Business': str, 'Type of Account': str, 'Distribution Type': str, 'LOB': str, 'Distribution Channel': str,
            'Sub LOB': str,'Business Partner ID Number': str,  'Product Name': str, 'Product ID Number': str,  'Product Family': str,  'Standard Product': str, 'Total Expenses': float,    })

        except:
            data = pd.read_excel(f, sheet_name = 'Ptf_Monitoring_GROSS_Reins', na_values = [0], header=3, converters={ 'Business Partner Name': str, 'Type of Business': str, 'Type of Account': str, 'Distribution Type': str, 'LOB': str, 'Distribution Channel': str,
            'Sub LOB': str,'Business Partner ID Number': str,  'Product Name': str, 'Product ID Number': str,  'Product Family': str,  'Standard Product': str, 'Total Expenses': float,    })


    elif s_format == "j":

        data = pd.read_json(f, orient="table")

    elif s_format == "p":

        data = pd.read_parquet(f, engine = "pyarrow")

    df_dict.update({f: data})

## Raw file Summaries

#### First 5 rows of each file.

In [15]:
if run_control > 0:

    for k, v in df_dict.items():
        print(f"{k} - First 5 Samples:")
        print("-" * 100)
        print(v.head())

else:
    print("Default Report 2 Skipped")

//hecate/Insurance_US/Product Development/Product Management/Global PPM/Reporting/Data Collection/Production/2021.Q1\live_sources\us_dat_au_02172021174434.parquet - First 5 Samples:
----------------------------------------------------------------------------------------------------
  Business Unit Country Currency Region Reporting Date From Reporting Date To  \
0            AU      AU      AUD   APAC          2020-01-01        2020-12-31   
1            AU      AU      AUD   APAC          2020-01-01        2020-12-31   
2            AU      AU      AUD   APAC          2020-01-01        2020-12-31   
3            AU      AU      AUD   APAC          2020-01-01        2020-12-31   
4            AU      AU      AUD   APAC          2020-01-01        2020-12-31   

  Date of Analysis         Type of Analysis  Analysed Months  Reporting Year  \
0       2021-02-08  Most Recently 12 Months             12.0          2020.0   
1       2021-02-08  Most Recently 12 Months             12.0          

#### Summary Statisitics.

In [16]:
if run_control > 0:

    for k, v in df_dict.items():
        print(f"{k} - Statistical summary:")
        print("-" * 100)
        print(v.describe())
else:
    print("Default Report 3 Skipped")

//hecate/Insurance_US/Product Development/Product Management/Global PPM/Reporting/Data Collection/Production/2021.Q1\live_sources\us_dat_au_02172021174434.parquet - Statistical summary:
----------------------------------------------------------------------------------------------------
       Analysed Months  Reporting Year  Reporting Quarter  Reporting Month  \
count             85.0            85.0               85.0             85.0   
mean              12.0          2020.0                4.0             12.0   
std                0.0             0.0                0.0              0.0   
min               12.0          2020.0                4.0             12.0   
25%               12.0          2020.0                4.0             12.0   
50%               12.0          2020.0                4.0             12.0   
75%               12.0          2020.0                4.0             12.0   
max               12.0          2020.0                4.0             12.0   

       Num

       Analysed Months  Reporting Year  Reporting Quarter  Reporting Month  \
count             17.0            17.0               17.0             17.0   
mean               3.0          2021.0                1.0              3.0   
std                0.0             0.0                0.0              0.0   
min                3.0          2021.0                1.0              3.0   
25%                3.0          2021.0                1.0              3.0   
50%                3.0          2021.0                1.0              3.0   
75%                3.0          2021.0                1.0              3.0   
max                3.0          2021.0                1.0              3.0   

       Number of Products per Row  Number of B-Partners per Row  \
count                        17.0                          17.0   
mean                          0.0                           0.0   
std                           0.0                           0.0   
min                          

       Analysed Months  Reporting Year  Reporting Quarter  Reporting Month  \
count           3187.0          3187.0             3187.0           3187.0   
mean              12.0          2021.0                1.0              3.0   
std                0.0             0.0                0.0              0.0   
min               12.0          2021.0                1.0              3.0   
25%               12.0          2021.0                1.0              3.0   
50%               12.0          2021.0                1.0              3.0   
75%               12.0          2021.0                1.0              3.0   
max               12.0          2021.0                1.0              3.0   

       Number of Products per Row  Number of B-Partners per Row  \
count                      3187.0                   3187.000000   
mean                          1.0                     10.396297   
std                           0.0                     33.928521   
min                          

       Analysed Months  Reporting Year  Reporting Quarter  Reporting Month  \
count             28.0            28.0               28.0             28.0   
mean               3.0          2021.0                1.0              3.0   
std                0.0             0.0                0.0              0.0   
min                3.0          2021.0                1.0              3.0   
25%                3.0          2021.0                1.0              3.0   
50%                3.0          2021.0                1.0              3.0   
75%                3.0          2021.0                1.0              3.0   
max                3.0          2021.0                1.0              3.0   

       Number of Products per Row  Number of B-Partners per Row  \
count                   28.000000                          28.0   
mean                     2.428571                           1.0   
std                      3.084352                           0.0   
min                      1.00

       Analysed Months  Reporting Year  Reporting Quarter  Reporting Month  \
count            100.0           100.0              100.0            100.0   
mean               3.0          2021.0                1.0              3.0   
std                0.0             0.0                0.0              0.0   
min                3.0          2021.0                1.0              3.0   
25%                3.0          2021.0                1.0              3.0   
50%                3.0          2021.0                1.0              3.0   
75%                3.0          2021.0                1.0              3.0   
max                3.0          2021.0                1.0              3.0   

       Number of Products per Row  Number of B-Partners per Row  \
count                  100.000000                    100.000000   
mean                     2.790000                     34.830000   
std                      7.657168                    102.610478   
min                      1.00

       Analysed Months  Reporting Year  Reporting Quarter  Reporting Month  \
count             39.0            39.0               39.0             39.0   
mean               3.0          2021.0                1.0              3.0   
std                0.0             0.0                0.0              0.0   
min                3.0          2021.0                1.0              3.0   
25%                3.0          2021.0                1.0              3.0   
50%                3.0          2021.0                1.0              3.0   
75%                3.0          2021.0                1.0              3.0   
max                3.0          2021.0                1.0              3.0   

       Number of Products per Row  Number of B-Partners per Row  \
count                        39.0                          39.0   
mean                          1.0                           1.0   
std                           0.0                           0.0   
min                          

#### Null Value Report.

In [17]:
if run_control > 0:

    for k, v in df_dict.items():
        print(f"{k} - Null values in the dataset:")
        print("-" * 100)
        print(v.isnull().sum())
else:
    print("Default Report 4 Skipped")

//hecate/Insurance_US/Product Development/Product Management/Global PPM/Reporting/Data Collection/Production/2021.Q1\live_sources\us_dat_au_02172021174434.parquet - Null values in the dataset:
----------------------------------------------------------------------------------------------------
Business Unit                     0
Country                           0
Currency                          0
Region                            0
Reporting Date From               0
                                 ..
expsub                            2
Reporting Date From INT           0
Reporting Date To INT             0
Date of Analysis INT              0
Selected Fields for Duplicates    0
Length: 73, dtype: int64
//hecate/Insurance_US/Product Development/Product Management/Global PPM/Reporting/Data Collection/Production/2021.Q1\live_sources\us_dat_ca_05202021134242.parquet - Null values in the dataset:
--------------------------------------------------------------------------------------------

#### Datatypes by field for each file.

In [18]:
if run_control > 0:

    for k, v in df_dict.items():
        print(f"{k} - Datatypes:")
        print("-" * 100)
        print(v.dtypes)
else:
    print("Default Report 5 Skipped")

//hecate/Insurance_US/Product Development/Product Management/Global PPM/Reporting/Data Collection/Production/2021.Q1\live_sources\us_dat_au_02172021174434.parquet - Datatypes:
----------------------------------------------------------------------------------------------------
Business Unit                             object
Country                                   object
Currency                                  object
Region                                    object
Reporting Date From               datetime64[ns]
                                       ...      
expsub                                   float64
Reporting Date From INT                    int64
Reporting Date To INT                      int64
Date of Analysis INT                       int64
Selected Fields for Duplicates            object
Length: 73, dtype: object
//hecate/Insurance_US/Product Development/Product Management/Global PPM/Reporting/Data Collection/Production/2021.Q1\live_sources\us_dat_ca_05202021134242.par

#### Add the datafile name as a column in each source DataFrame

In [19]:
for p, f in zip(pathfiles, files_sour):
    df_dict[p].insert(0, "Submission File", f)

#### append into a single DataFrame

In [20]:
df = pd.DataFrame()

for k, v in df_dict.items(): df = df.append(v)

df = df.reset_index(drop=True)

#### Create Temporary csv Output
- Create a direct output of the csv

##### This prep step helps parquet columns read columns with nulls or mixed dtypes. This is a good check that the data is what we expect.

In [21]:

# add any float columns that give a mixed type error below
v_flo = ['Open Claims %', 'Contribution Margin % on Earned Revenues net of Taxes - BU View', 
         'Contribution Margin % on Earned Revenues net of Taxes - HQ View', 'Loss Ratio', 'Commission Ratio',
         'Expense Ratio', 'comsub', 'expsub','% of IBNR on (OCR + IBNR)', 'Contribution Margin % on Fixed Costs - BU View',
        'Contribution Margin % on Fixed Costs - HQ View', 'Units of Risk (Written)', 'Written Revenues', 
         'Number of Policies (Earned)','Units of Risk (Earned)', 'Earned Revenues','Upfront Cash Payments',
         'Number of Open Claims','Number of Persons Involved in Claims (Paid + OCR + IBNR)','Frequency (Earned)',
         'Units of Risk (Written)' ,'Severity','Risk Premium']


# add any integer columns that give a mixed type error below
v_int = ['Number of Products per Row',  'Number of B-Partners per Row']

#v_obj = ['Units of Risk (Written)']


# add any string columns that give a mixed type error below
#v_str


# add any datetime columns that give a mixed type error below
#v_dat

for i in v_flo:
    df[i] = df[i].replace(r'^\s*$', np.nan, regex=True)
    df[i] = df[i].fillna(0)
    df[i] = df[i].astype( 'float' )
for i in v_int:
    df[i] = df[i].replace(r'^\s*$', np.nan, regex=True)
    df[i] = df[i].fillna(0)
    df[i] = df[i].astype('int64')

#for i in v_int:
#    df[i] = df[i].astype('str')
#    for i in v_str: df[i] = df[i].astype('str'  )
#    for i in v_dat: v[i] = v[i].astype('datetime64[ns]')

In [22]:
try:
    prep_df = df
    %store prep_df

except:
    pass

if run_control > 0:
    
    prepfile = f'{str(percode)}prep.csv'
    prepparq = f'{str(percode)}prep.parquet'
    prephistfile = f'{str(percode)}_{inst_datetime}prep.csv'

    try:
        os.rename(os.path.join(str(rt_path), prepfile), os.path.join(str(rt_path), "logs/raw_data_file_history/" , prephistfile))

    except FileNotFoundError:
        pass

    except FileExistsError:
        os.remove(os.path.join(str(rt_path), "logs/raw_data_file_history/" , prephistfile))
        os.rename(os.path.join(str(rt_path), prepfile), os.path.join(str(rt_path), "logs/raw_data_file_history/" , prephistfile))

    df.to_csv(os.path.join(str(rt_path), prepfile), ',', index=False , encoding='utf-8-sig')
    df.to_parquet(os.path.join(str(rt_path), prepparq),engine = "pyarrow")
    
else:
    pass

Stored 'prep_df' (DataFrame)


## This is considered "pre-pipleine" data prep. The next step is to read the resulting csv into the acutal data pipeline.

## If you are not sure what is being loaded, this step provides a safe way to create a reviewable output file while keeping the production data files in tact.

#### You can skip this step to speed up the procoess with run_contol == 0, not reccomended

In [23]:
if run_control > 0:

    dfp = (df.pivot_table(index=( "Country", "Submission File"), columns=("Reporting Date From","Reporting Date To" ), values="Earned Revenues net of Taxes").fillna(0).astype(int))
    dfp

else:
    pass

Unnamed: 0_level_0,Reporting Date From,2020-01-01,2020-04-01,2021-01-01
Unnamed: 0_level_1,Reporting Date To,2020-12-31,2021-03-31,2021-03-31
Country,Submission File,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
AU,us_dat_au_02172021174434.parquet,1270248,0,0
CA,us_dat_ca_05202021134242.parquet,0,0,687067
CH,us_dat_ch_05182021192455.parquet,0,10845,0
ES,us_dat_es_05202021105141.parquet,0,0,26294
IT,us_dat_it_05242021111520.parquet,0,0,10182
PT,us_dat_pt_05202021121056.parquet,0,0,1188


#### Store the DataFrame for other noteboks to use