# Data Optimization - Johnson & Johnson

In [None]:
import glob as glob
import csv
import pandas as pd
import col_types

files_processed = []
deltas_processed = []
if glob.glob('./data/files_processed.txt'):
    with open('./data/files_processed.txt', 'r') as file:
        reader = csv.reader(file, delimiter='\n')
        for row in reader:
            files_processed += row

if glob.glob('./data/deltas_processed.txt'):
    with open('./data/deltas_processed.txt', 'r') as file:
        reader = csv.reader(file, delimiter='\n')
        for row in reader:
            deltas_processed += row
            
files_full = [file for file in glob.glob('./data/full/*.txt') if file not in files_processed]
files_delta = [file for file in glob.glob('./data/delta/cvtJnJVisionDelta09*.txt') if file not in deltas_processed]
files = files_full + files_delta

# Raw data

## Leitura dos arquivos

In [None]:
full_df_vec = []

for file in files_full:
    full_df_vec.append(pd.read_csv(file, sep='|', dtype=col_types.types_dict, parse_dates=col_types.parse_dates))

if full_df_vec:
    full_df = pd.concat(full_df_vec)
    full_df = full_df.reset_index(drop=True)

## Agrupando dados

Requisitos:

Group rows by

- poNumber
- costCenter
- primaryInternalOrder
- profitCenter
- generalLedgerAccount
- needByDate
- poEndDate
- poStartDate
- receivableIndicator
- projectWbs
- matOrSrc
- accountingActivityCode

In [None]:
if not full_df.empty:
    full_df = full_df.reset_index(names='id')
    full_df['delta'] = -1

    accrual = full_df.groupby(['poNumber', 'costCenter', 'primaryInternalOrder',
    'profitCenter', 'generalLedgerAccount', 'needByDate',
    'poEndDate', 'poStartDate', 'receivableIndicator',
    'projectWbs', 'matOrSrc'], dropna=False)

    full_df['id'] = accrual['id'].transform('min')

No momento de agrupar, é necessário somar os campos que serão mesclados.

`SUM(poValueInGlobalCurrency)`

`SUM(poValueInLocalCurrency)`

`SUM(poValueInDocCurrency)`

`SUM(gdsReceiptValueInGlobalCurrency)`

`SUM(gdsReceiptValueInlocalCurrency)`

`SUM(gdsReceiptValueInDocCurrency)`

`SUM(invoiceReceiptValueInGlobalCurrency)`

`SUM(invoiceReceiptValueInLocalCurrency)`

`SUM(invoiceReceiptValueInDocCurrency)`

`SUM(deliverTo)`

Fazer persistir colunas que somem após operação de soma, fazer como:
`'columnName': 'first'` na propriedade do aggreggate `.agg()`

In [None]:
if not full_df.empty:
    accrued_df = accrual.agg({
    'id': 'min',
    'delta': 'max',

    'poNumber': 'first',
    'costCenter': 'first',
    'primaryInternalOrder': 'first',
    'profitCenter': 'first',
    'generalLedgerAccount': 'first',
    'needByDate': 'first',
    'poEndDate': 'first',
    'poStartDate': 'first',
    'receivableIndicator': 'first',
    'projectWbs': 'first',
    'matOrSrc': 'first',

    'poName': 'first',
    'poRequisitionerWwid': 'first',
    'poRequisitionerWwid': 'first',
    'poRequisitionerName': 'first',
    'poPreparerWwid': 'first',
    'poPreparerName': 'first',
    'costCenterDesc': 'first',
    'generalLedgerAccountDesc': 'first',
    'projectWbs': 'first',
    'supplierNumber': 'first',
    'supplierName': 'first',
    'supplierEmailAddress': 'first',
    'poType': 'first',
    'poStatus': 'first',
    'poCloseStatus': 'first',
    'poCreationDate': 'first',
    'receiptDates': 'first',
    'invoiceDates': 'first',
    'invoicePaidStatus': 'first',
    'transactionDate': 'first',
    'clearingDocumentRef': 'first',
    'clearingDateReference': 'first',
    'localCurrencyForPoValue': 'first',
    'documentCurrencyForPoValue': 'first',
    'localCurrencyForGoodsReceipt': 'first',
    'documentCurrencyForGoodsReceipt': 'first',
    'localCurrencyForInvoiceReceipt': 'first',
    'docCurrencyForInvoiceReceipt': 'first',
    'poValueInGlobalCurrency': 'first',
    'poValueInLocalCurrency': 'first',
    'poValueInDocCurrency': 'first',
    'gdsReceiptValueInGlobalCurrency': 'first',
    'gdsReceiptValueInlocalCurrency': 'first',
    'gdsReceiptValueInDocCurrency': 'first',
    'invoiceReceiptValueInGlobalCurrency': 'first',
    'invoiceReceiptValueInLocalCurrency': 'first',
    'invoiceReceiptValueInDocCurrency': 'first',
    'aribaBu': 'first',
    'mrc': 'first',
    'companyCode': 'first',
    'legalEntity': 'first',
    'fsid': 'first',
    'region': 'first',
    'businessArea': 'first',
    'shipTo': 'first',
    'deliverTo': 'first',
    'commodityType': 'first',
    'excludeDownpaymentRequestsForPayments': 'first',
    'sourceSystemApprovableId': 'first',
    'requisitionNumber': 'first',
    'receivableIndicator': 'first',
    'poLineNumber': 'first',
    'splitLineNumber': 'first',
    
    'poValueInGlobalCurrency': 'sum',
    'poValueInLocalCurrency': 'sum',
    'poValueInDocCurrency': 'sum',
    'gdsReceiptValueInGlobalCurrency': 'sum',
    'gdsReceiptValueInlocalCurrency': 'sum',
    'gdsReceiptValueInDocCurrency': 'sum',
    'invoiceReceiptValueInGlobalCurrency': 'sum',
    'invoiceReceiptValueInLocalCurrency': 'sum',
    'invoiceReceiptValueInDocCurrency': 'sum',
    'deliverTo': 'sum'
    }).reset_index(drop=True)

    accrued_df['delta'] = accrued_df['delta'] + 1

In [None]:
#Sinaliza qual PO sofreu aggregate e qual é raw (original)
full_df['isRaw'] = True
accrued_df['isRaw'] = False

final_df = pd.concat([full_df, accrued_df])
#Sinaliza que todas POs são válidas, nenhuma sofreu edição ainda
final_df['isValid'] = True
#Salva id único da PO
final_df['poId'] = final_df['poId'] = final_df['poNumber'] + final_df['poLineNumber'] + final_df['splitLineNumber']

final_df = final_df.reset_index(drop=True)

## Salvando arquivos

In [None]:
if not final_df.empty:
    final_df.to_csv('./data/accruedDataJnJ.csv', index=False, sep='|')

    with open("./data/files_processed.txt", "w") as txt_file:
        for line in files_full:
            txt_file.write(line + "\n")

# Processando Delta

## Leitura de arquivos Delta

In [None]:
delta_df_vec = []

#Lê arquivos delta e armazena num vetor
for file in files_delta:
    df = pd.read_csv(file, sep='|', dtype=col_types.types_dict, parse_dates=col_types.parse_dates)
    df['delta'] = -2
    df['poId'] = df['poNumber'] + df['poLineNumber'] + df['splitLineNumber']
    delta_df_vec.append(df)


#Lê arquivos já accruados e seleciona as POs raws (que não são frutos de um aggregate)
accruedDataJnJ_df = pd.read_csv('./data/accruedDataJnJ.csv', sep='|', dtype=col_types.types_dict, parse_dates=col_types.parse_dates)
###
#accruedDataJnJ_df['isValid'] = True #Apagar depois...
#accruedDataJnJ_df['poId'] = accruedDataJnJ_df['poNumber'] + accruedDataJnJ_df['poLineNumber'] + accruedDataJnJ_df['splitLineNumber']
###
raw_df = accruedDataJnJ_df.loc[(accruedDataJnJ_df['isRaw'] == True) & (accruedDataJnJ_df['isValid'] == True)]

#Preenche as POs do delta com novos IDs (começando do último)
last_id = accruedDataJnJ_df['id'].max()
delta_df = pd.concat(delta_df_vec)
delta_df.insert(0, 'id', range(last_id + 1, last_id + len(delta_df) + 1))

delta_raw_df_vec = [delta_df, raw_df]

#Concatena todos data frames e cria coluna de poId para identificar qual PO foi editada ou se é PO nova
if delta_df_vec:
    delta_raw_df = pd.concat(delta_raw_df_vec)
    delta_raw_df = delta_raw_df.reset_index(drop=True)
    delta_raw_df['poId'] = delta_raw_df['poNumber'] + delta_raw_df['poLineNumber'] + delta_raw_df['splitLineNumber']

In [None]:
if delta_df_vec:

    accrual = delta_raw_df.groupby(['poNumber', 'poLineNumber', 'splitLineNumber'], dropna=False)

    #Conta quantas vezes a PO se repete para ver se é nova ou editada
    delta_raw_df['count'] = accrual['poId'].transform('count')

    #POs novas
    new_po_df = delta_raw_df.loc[(delta_raw_df['count'] == 1) & (delta_raw_df['delta'] == -2)].copy(deep=True)
    new_po_df['delta'] = -1
    new_po_df['isRaw'] = True
    new_po_df['isValid'] = True
    del new_po_df['poId']
    del new_po_df['count']

    #POs editadas
    edited_po_df = delta_raw_df.loc[(delta_raw_df['count'] == 2) & (delta_raw_df['delta'] == -2)].copy(deep=True)
    edited_po_df['delta'] = -1
    edited_po_df['isRaw'] = True
    del edited_po_df['count']

## POs editadas

Primeiramente identifica-se quais POs serão editadas (to_edit_po) e depois identifica as edições que serão feitas (edited_po). Ordena-se os dois data frames para que os IDs sejam mantidos

In [None]:
#POs to be edited
to_edit_po_df = delta_raw_df.loc[(delta_raw_df['count'] == 2) & (delta_raw_df['isRaw'] == True)].copy(deep=True)
del to_edit_po_df['count']

edited_po_df.sort_values(by=['poId'], inplace=True)
to_edit_po_df.sort_values(by=['poId'], inplace=True)

edited_po_df.reset_index(drop=True, inplace=True)
edited_po_df['id'] = to_edit_po_df['id'].reset_index(drop=True)

#Invalida POs antigas, valida POs editadas
#to_edit_po_df['isValid'] = False
#edited_po_df['isValid'] = True

Torna as POs antigas inválidas e as editadas válidas. Concatena as editadas no dataframe original.

In [None]:
accruedDataJnJ_df.loc[(accruedDataJnJ_df.poId.isin(to_edit_po_df.poId)) & (accruedDataJnJ_df.isRaw == True),'isValid'] = False
edited_po_df['isValid'] = True
accruedDataJnJ_df = pd.concat([accruedDataJnJ_df, edited_po_df])
accruedDataJnJ_df.reset_index(drop=True)

## POs novas

Agora é necessário processar as novas POs que vieram nos arquivos de delta (POs criadas recentemente). Para isso utilizamos as POs resultantes do último processamento e reprocessa-se as POs antigas com as novas.

In [None]:
#POs que já existiam
old_po_df = accruedDataJnJ_df.loc[(accruedDataJnJ_df['delta'] == accruedDataJnJ_df['delta'].max()) &
(accruedDataJnJ_df['isValid'] == True)]

#POs que já existiam concatenadas com POs novas
to_accrual_df = pd.concat([old_po_df, new_po_df])

#Agrupando as POs
new_accrual = to_accrual_df.groupby(['poNumber', 'costCenter', 'primaryInternalOrder',
    'profitCenter', 'generalLedgerAccount', 'needByDate',
    'poEndDate', 'poStartDate', 'receivableIndicator',
    'projectWbs', 'matOrSrc'], dropna=False)

#Salva o menor id dos grupos nas novas POs
to_accrual_df['id'] = new_accrual['id'].transform('min')
new_po_df = to_accrual_df.loc[to_accrual_df['delta'] == -1].copy(deep=True)

Agrega os grupos, selecionando o menor ID de cada grupo e o maior delta (talvez não seja necessário?)

In [None]:
new_accrued_df = new_accrual.agg({
    'id': 'min',
    'isRaw': 'first',
    'delta': 'max',
    'isValid': 'first',

    'poNumber': 'first',
    'costCenter': 'first',
    'primaryInternalOrder': 'first',
    'profitCenter': 'first',
    'generalLedgerAccount': 'first',
    'needByDate': 'first',
    'poEndDate': 'first',
    'poStartDate': 'first',
    'receivableIndicator': 'first',
    'projectWbs': 'first',
    'matOrSrc': 'first',

    'poName': 'first',
    'poRequisitionerWwid': 'first',
    'poRequisitionerWwid': 'first',
    'poRequisitionerName': 'first',
    'poPreparerWwid': 'first',
    'poPreparerName': 'first',
    'costCenterDesc': 'first',
    'generalLedgerAccountDesc': 'first',
    'projectWbs': 'first',
    'supplierNumber': 'first',
    'supplierName': 'first',
    'supplierEmailAddress': 'first',
    'poType': 'first',
    'poStatus': 'first',
    'poCloseStatus': 'first',
    'poCreationDate': 'first',
    'receiptDates': 'first',
    'invoiceDates': 'first',
    'invoicePaidStatus': 'first',
    'transactionDate': 'first',
    'clearingDocumentRef': 'first',
    'clearingDateReference': 'first',
    'localCurrencyForPoValue': 'first',
    'documentCurrencyForPoValue': 'first',
    'localCurrencyForGoodsReceipt': 'first',
    'documentCurrencyForGoodsReceipt': 'first',
    'localCurrencyForInvoiceReceipt': 'first',
    'docCurrencyForInvoiceReceipt': 'first',
    'poValueInGlobalCurrency': 'first',
    'poValueInLocalCurrency': 'first',
    'poValueInDocCurrency': 'first',
    'gdsReceiptValueInGlobalCurrency': 'first',
    'gdsReceiptValueInlocalCurrency': 'first',
    'gdsReceiptValueInDocCurrency': 'first',
    'invoiceReceiptValueInGlobalCurrency': 'first',
    'invoiceReceiptValueInLocalCurrency': 'first',
    'invoiceReceiptValueInDocCurrency': 'first',
    'aribaBu': 'first',
    'mrc': 'first',
    'companyCode': 'first',
    'legalEntity': 'first',
    'fsid': 'first',
    'region': 'first',
    'businessArea': 'first',
    'shipTo': 'first',
    'deliverTo': 'first',
    'commodityType': 'first',
    'excludeDownpaymentRequestsForPayments': 'first',
    'sourceSystemApprovableId': 'first',
    'requisitionNumber': 'first',
    'receivableIndicator': 'first',
    'poLineNumber': 'first',
    'splitLineNumber': 'first',
    
    'poValueInGlobalCurrency': 'sum',
    'poValueInLocalCurrency': 'sum',
    'poValueInDocCurrency': 'sum',
    'gdsReceiptValueInGlobalCurrency': 'sum',
    'gdsReceiptValueInlocalCurrency': 'sum',
    'gdsReceiptValueInDocCurrency': 'sum',
    'invoiceReceiptValueInGlobalCurrency': 'sum',
    'invoiceReceiptValueInLocalCurrency': 'sum',
    'invoiceReceiptValueInDocCurrency': 'sum',
    'deliverTo': 'sum'
    })

new_accrued_df['delta'] = new_accrued_df['delta'].max() + 1

As novas POs que chegaram do delta são marcadas como RAW e o resultado do accruamento é marcado com RAW = False, para indicar que passou por processamento.

In [None]:
new_po_df['isRaw'] = True
new_accrued_df['isRaw'] = False

final_df = pd.concat([accruedDataJnJ_df, new_po_df, new_accrued_df])
final_df = final_df.reset_index(drop=True)


## Salvando arquivos

Os arquivos são salvos novamente no accruedDataJnJ, agora com o processamento do delta. Note que nenhum dado é excluído/deletado, todos são mantidos. Portanto a tendência do arquivo é só aumentar.

In [None]:
if not final_df.empty:
    final_df.to_csv('./data/accruedDataJnJ.csv', index=False, sep='|')

    with open("./data/deltas_processed.txt", "w") as txt_file:
        for line in files_delta:
            txt_file.write(line + "\n")

In [None]:
test = pd.read_csv('./data/accruedDataJnJ.csv', sep='|', dtype=col_types.types_dict, parse_dates=col_types.parse_dates)

In [None]:
test.loc[test['delta'] == 1]

## Gerando amostras de dados

Aqui serão geradas amostras de dados para fins de estudo. Seleciona-se as 10 POs que mais se repetiram e as 10 que menos se repetiram.

if not full_df.empty:
    accruedFrames = [
        accrued_df.sort_values(['poValueInGlobalCurrency'], ascending=False).head(2), 
        accrued_df.sort_values(['poValueInGlobalCurrency'], ascending=True).head(2)
    ]

    rawFrames = []

    for accruedFrame in accruedFrames:
        for index, row in accruedFrame.iterrows():
            isNull = row.isnull();
            rawFrames.append(df.loc[
                ((df['poNumber'] == row['poNumber']) | (isNull['poNumber'] & df['poNumber'].isnull())) &
                ((df['costCenter'] == row['costCenter'])  | (isNull['costCenter'] & df['costCenter'].isnull())) &
                ((df['primaryInternalOrder'] == row['primaryInternalOrder'])  | (isNull['primaryInternalOrder'] & df['primaryInternalOrder'].isnull())) &
                ((df['profitCenter'] == row['profitCenter'])  | (isNull['profitCenter'] & df['profitCenter'].isnull())) &
                ((df['generalLedgerAccount'] == row['generalLedgerAccount'])  | (isNull['generalLedgerAccount'] & df['generalLedgerAccount'].isnull())) &
                ((df['needByDate'] == row['needByDate'])  | (isNull['needByDate'] & df['needByDate'].isnull())) &
                ((df['poEndDate'] == row['poEndDate'])  | (isNull['poEndDate'] & df['poEndDate'].isnull())) &
                ((df['receivableIndicator'] == row['receivableIndicator'])  | (isNull['receivableIndicator'] & df['receivableIndicator'].isnull())) &
                ((df['projectWbs'] == row['projectWbs'])  | (isNull['projectWbs'] & df['projectWbs'].isnull())) &
                ((df['matOrSrc'] == row['matOrSrc'])  | (isNull['matOrSrc'] & df['matOrSrc'].isnull()))
            ])

    pd.concat(rawFrames).to_csv('./data/sampleData-rawPOs.csv', sep='|', index=False)
    pd.concat(accruedFrames).to_csv('./data/sampleData-accruedPOs.csv', sep='|', index=False)
