<a href="https://colab.research.google.com/github/DOJO-Smart-Ways/DOJO-Beam-Transforms/blob/pbi-footprint/pbi_footprint/pbi_pfx_nsap_pipeline_to_trusted.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install git+https://github.com/DOJO-Smart-Ways/DOJO-Beam-Transforms.git@pbi-footprint#egg=dojo-beam-transforms
#!pip install git+https://github.com/DOJO-Smart-Ways/DOJO-Beam-Transforms.git#egg=dojo-beam-transforms

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

import os

from google.cloud import bigquery

# Google Auth
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

# GCP Project
os.environ["GOOGLE_CLOUD_PROJECT"]= 'nidec-ga'

Authenticated


In [None]:
#from pipeline_components.input_file import read_excels_union
from pipeline_components import data_enrichment as de
from pipeline_components import data_cleaning as dc
from pipeline_components import data_understand as du

In [None]:
pipeline_options = {
    'project':'nidec-ga',
    'runner':'DataflowRunner',
    'region':'us-central1',
    'staging_location':'gs://nidec-ga-temp/data-flow-pipelines/pbi-footprint/staging',
    'temp_location':'gs://nidec-ga-temp/data-flow-pipelines/pbi-footprint/temp',
    'template_location':'gs://nidec-ga-temp/data-flow-pipelines/pbi-footprint/template/pbi-pfx-nsap',
    'sdk_container_image': 'us-central1-docker.pkg.dev/nidec-ga/dojo-beam/dojo_beam',
    'sdk_location': 'container'
}

"""
pipeline_options = {
    'project':'nidec-ga'
}

"""


pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
pipeline = beam.Pipeline(options=pipeline_options)

# Aux Classes

In [None]:
class DeduplicateByColumnFn(beam.DoFn):
    def __init__(self, columns):
        self.seen = set()
        self.columns = columns

    def process(self, element):
        # Verifica se 'element' é um dicionário
        if isinstance(element, dict):
            # Cria uma tupla de valores das colunas especificadas
            values = tuple(element.get(col, None) for col in self.columns)
        else:
            # Se não for um dicionário, considera o próprio elemento como chave
            values = (element,)

        # Verifica se já vimos a combinação de valores das colunas antes
        if values not in self.seen:
            # Se não, adiciona à lista de observados e produz o elemento
            self.seen.add(values)
            yield element

class TrimValues(beam.DoFn):
    def __init__(self, columns):
        """
        Initializes the TrimValues instance.

        Args:
            columns (list of str): A list of column names to trim spaces from.
        """
        self.columns = columns

    def process(self, element):
        """
        Processes each element, trimming leading and trailing spaces from specified columns.

        Args:
            element (dict): The input element to process, where keys are column names.
        """
        for column in self.columns:
            if column in element and isinstance(element[column], str):
                element[column] = element[column].strip()
        yield element


class NormalDistributedSampler(beam.DoFn):
    """A DoFn implementation that samples rows based on normally distributed random numbers."""

    def __init__(self, mean, std_dev, threshold):
        """Initializer.

        Args:
            mean (float): The mean of the normal distribution.
            std_dev (float): The standard deviation of the normal distribution.
            threshold (float): A threshold to decide whether to keep a row. Rows with a generated value less or equal to this threshold will be kept.
        """
        self.mean = mean
        self.std_dev = std_dev
        self.threshold = threshold

    def process(self, element):
        import numpy as np
        """Processes each element, deciding whether to keep it based on a normally distributed random number.

        Args:
            element: The input element to process.

        Yields:
            The input element if a generated normally distributed random number is less or equal to the threshold.
        """
        # Generate a normally distributed random number
        random_value = np.random.normal(self.mean, self.std_dev)

        # Keep the element based on the threshold condition
        if random_value <= self.threshold:
            yield element


class ProbabilisticSampler(beam.DoFn):
    """A DoFn implementation that probabilistically samples rows."""

    def __init__(self, probability):
        """Initializer.

        Args:
            probability (float): The probability of keeping a row. Should be between 0 and 1.
        """
        self.probability = probability

    def process(self, element):
        import random
        """Processes each element, randomly deciding whether to keep it.

        Args:
            element: The input element to process.

        Yields:
            The input element, with a probability of `self.probability`.
        """
        if random.random() < self.probability:
            yield element



class GetFirstSCharactersForColumnFn(beam.DoFn):
    def __init__(self, column_name):
        self.column_name = column_name

    def process(self, element):
        # Check if the specified column exists in the element (which is a dictionary)
        if self.column_name in element:
            # Extract the value for the specified column and truncate to first 6 characters
            truncated_value = element[self.column_name][:10]
            # Update the element with the truncated value
            element[self.column_name] = truncated_value
        # Yield the modified element
        yield element


class ExtractAndCheckDistinctValuesFn(beam.DoFn):
    def __init__(self, column_name):
        self.column_name = column_name

    def process(self, element):
        # Check if the column_name key exists in the element
        if self.column_name in element:
            yield element[self.column_name]
        else:
            # Option 1: Yield a special value (e.g., None) if the key doesn't exist
            yield None

            # Option 2: Do nothing (skip the element) if the key doesn't exist
            #pass


class EnsureSchemaOrderDoFn(beam.DoFn):
    def __init__(self, schema_str):
        """
        Initializes the DoFn with a schema string.

        Args:
            schema_str (str): A multiline string representing the schema.
        """
        # Parse the schema string to extract field names
        self.schema_fields = [line.split(":")[0].strip() for line in schema_str.strip().split("\n")]

    def process(self, element):
        """
        Orders the element according to the schema, maintaining a dictionary format.

        Args:
            element: A dictionary representing a single record.

        Yields:
            A dictionary with keys ordered according to the schema.
        """
        ordered_dict = {field: element.get(field, None) for field in self.schema_fields}
        yield ordered_dict



class DateTimeToString(beam.DoFn):
    def __init__(self, column_name):
        # Initialize with the name of the column to update
        self.column_name = column_name

    def process(self, element):
        datetime_value = element.get(self.column_name,'')
        # Convert the datetime value to string in the specified format
        datetime_string = datetime_value.strftime('%Y-%m-%d')

        # Update the specified column with the datetime string
        element[self.column_name] = datetime_string

        # Output the modified element
        yield element


class GenerateStatistics(beam.PTransform):
    def expand(self, pcollection):
        def compute_tfdv_statistics(elements):
            # Converter elements para um DataFrame e gerar estatísticas com TFDV
            import pandas as pd
            import tensorflow_data_validation as tfdv
            dataframe = pd.DataFrame(elements)
            stats = tfdv.generate_statistics_from_dataframe(dataframe)
            return [stats]

        return (
            pcollection
            | "CollectIntoBatch" >> beam.BatchElements()
            | "ComputeTFDVStatistics" >> beam.Map(compute_tfdv_statistics)
        )


class ProcessExcelFiles(beam.DoFn):
    def __init__(self, skip_rows=None, tab=None, column_date=None, parse_dates=False, converters=None):
        self.skip_rows = skip_rows
        self.tab = 0 if tab is None else tab
        self.column_date = column_date
        self.parse_dates = parse_dates
        self.converters = converters

    def process(self, file):
        import pandas as pd
        from apache_beam.io.filesystems import FileSystems
        from datetime import datetime
        import apache_beam as beam

        try:
            with FileSystems.open(file.metadata.path) as f:
                df = pd.read_excel(f, skiprows=self.skip_rows, sheet_name=self.tab, parse_dates=self.parse_dates, converters=self.converters)
                for col in self.column_date if self.column_date is not None else []:
                    # Certifique-se de que a coluna existe no DataFrame para evitar KeyError.
                    if col in df.columns:
                        # Convertendo a coluna para datetime e então formatando a data.
                        df[col] = pd.to_datetime(df[col]).dt.strftime('%Y-%m-%d')

            df = df.dropna(how='all')

            if isinstance(self.tab, int):
                xls = pd.ExcelFile(file.metadata.path)
                sheet_name = xls.sheet_names[self.tab]
            else:
                sheet_name = self.tab

            df['SOURCE'] = f"{file.metadata.path}/{sheet_name}"

            yield from df.to_dict('records')

        except Exception as e:
            yield beam.pvalue.TaggedOutput('error', {'path': file.metadata.path, 'error': str(e)})


def read_excels_union(pipeline, input_pattern, identifier='', skiprows=None, tab=None, column_date=None, parse_dates=False, converters=None):
    read_files = (
        pipeline
        | f'Match Files {identifier}' >> beam.io.fileio.MatchFiles(input_pattern)
        | f'Read Matches {identifier}' >> beam.io.fileio.ReadMatches()
    )
    processed_files = read_files | f'Process Excel Files {identifier}' >> beam.ParDo(ProcessExcelFiles(skip_rows=skiprows, tab=tab, column_date=column_date, parse_dates=parse_dates, converters=converters)).with_outputs('error', main='main')

    main_output = processed_files.main
    error_output = processed_files.error

    return main_output, error_output


class ChangeDateFormatError(beam.DoFn):
    def __init__(self, date_columns, input_format='%Y%m%d', output_format='%Y-%m-%d %H:%M:%S'):
        """
        Initialize the DoFn class.

        Parameters:
        - date_columns: The names of the columns containing the date strings.
        - input_format: The strftime format string for the input date format.
        - output_format: The strftime format string for the output date format.
        """
        self.date_columns = date_columns
        self.input_format = input_format
        self.output_format = output_format

    def process(self, element):
        from datetime import datetime
        import pandas as pd
        import apache_beam as beam
        """
        Process each element to format the dates in the specified columns.

        Parameters:
        - element: The input element to process.
        """
        for date_column in self.date_columns:
            date_str = element.get(date_column)
            if date_str:
                try:
                    # Attempt to parse the date using pandas to_datetime for flexibility
                    parsed_date = pd.to_datetime(date_str, format=self.input_format)
                    # Format the date into the desired output format
                    element[date_column] = parsed_date.strftime(self.output_format)
                except Exception as e:
                    yield beam.pvalue.TaggedOutput('error', {'element': element, 'error': str(e), 'column': date_column})
                    return  # Skip further processing to avoid multiple error entries for the same element
        yield element


class VerifyStrictDateFormat(beam.DoFn):
    def __init__(self, date_column, expected_format='%Y-%m-%d'):
        """
        Inicializa a classe DoFn.

        Parâmetros:
        - date_column: O nome da coluna que contém as strings de data.
        - expected_format: A string de formato strftime para o formato de data esperado.
        """
        self.date_column = date_column
        self.expected_format = expected_format

    def process(self, element):
        from datetime import datetime
        import pandas as pd
        import apache_beam as beam
        """
        Processa cada elemento para verificar se as datas na coluna especificada
        estão no formato esperado.

        Parâmetros:
        - element: O elemento de entrada para processar.
        """
        date_value = element.get(self.date_column)


        try:
            # Convert to datetime and back to string if necessary
            if isinstance(date_value, datetime):
                element[self.date_column] = date_value.strftime(self.expected_format)
            elif isinstance(date_value, str):
                # Using pd.to_datetime for string conversion
                datetime_obj = pd.to_datetime(date_value, format=self.expected_format, errors='coerce')
                if pd.isnull(datetime_obj):
                    raise ValueError(f"Invalid date format for '{date_value}'")
                element[self.date_column] = datetime_obj.strftime(self.expected_format)
            else:
                # Outputting elements with invalid date types to a separate PCollection
                yield beam.pvalue.TaggedOutput('error', {'element':element, 'error':f"Invalid type: {type(date_value)} for date in column '{self.date_column}'"})
                return

            yield element
        except Exception as e:
            # Handling any conversion errors
            yield beam.pvalue.TaggedOutput('error', {'element': element, 'error': str(e)})
            return

def log_error(element):
    # Supondo que 'element' contém a informação do erro que você quer logar
    import logging
    logging.error(f"Error: {element}")


class MaxDate(beam.DoFn):
    def __init__(self, keys, date_column):
        self.keys = keys  # Lista de chaves para agrupamento
        self.date_column = date_column  # Nome da coluna de data

    def process(self, element):
        # Criar uma chave de agrupamento com os valores das chaves especificadas
        key = tuple(element[k] for k in self.keys)
        # Extrair a data e o elemento completo
        date = element[self.date_column]
        yield (key, (date, element))


class FilterMaxDate(beam.DoFn):
    def process(self, element):
        (key), values = element
        max_date, max_element = max(values)
        max_element['maxdate_LASTUPDATEON'] = max_date
        yield max_element


class ExtractDateYYYYMMDD(beam.DoFn):
    def __init__(self, column_name):
        self.column_name = column_name

    """
    Uma classe DoFn para extrair datas em formato YYYYMMDD de strings e
    converter para objetos datetime.
    """
    def process(self, element):
        import re
        # Regex para identificar uma data no formato YYYYMMDD
        date_pattern = r'(\d{4})(\d{2})(\d{2})'

        # Buscar pela data no texto da coluna especificada
        text = element.get(self.column_name, '')
        match = re.search(date_pattern, text)

        if match:
            # Construir a data no formato YYYY-MM-DD
            extracted_date = f'{match.group(1)}-{match.group(2)}-{match.group(3)}'
            # Adicionar a data extraída ao elemento com uma nova chave
            element['EXTRACT_DATE'] = extracted_date
            yield element
        else:
            # Opcional: lidar com casos onde a data não é encontrada
            # Por exemplo, pode-se escolher emitir o elemento sem modificação ou adicionar um valor padrão
            element['EXTRACT_DATE'] = '1970-01-01'
            yield element


class CheckMissingValuesDoFn(beam.DoFn):
    def __init__(self, column_list):
        self.column_list = column_list

    def process(self, element):
        import pandas as pd
        import apache_beam as beam
        # Check for missing values in the specified columns
        missing_columns = [col for col in self.column_list if pd.isnull(element.get(col))]
        if missing_columns:
            # If missing values are found, output to a separate PCollection for errors
            error_msg = f"Missing values in columns: {', '.join(missing_columns)}"
            yield beam.pvalue.TaggedOutput('missing_values_error', {'element': element, 'error': error_msg})
        else:
            yield element

# Get Path

In [None]:
def make_path(*args: str) -> str:
    if args:
        return "gs://" + "/".join(args)
    else:
        return ""

In [None]:
def get_cimd_in_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data", "*CIMD IN Spend Data*.xlsx")

def get_cimd_folder_in_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","CIMD IN", "*CIMD IN Spend Data*.xlsx")

def get_wet_mx_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","*WET MX Spend Data*.xlsx")

def get_wet_folder_mx_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","WET NAR","*WET MX Spend Data*.xlsx")

def get_hvac_mx_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","*HVAC MX Spend Data*.xlsx")

def get_hvac_folder_mx_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","HVAC MX","*HVAC MX Spend Data*.xlsx")

def get_cimd_nar_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","*CIMD NAR Spend Data*.xlsx")

def get_cimd_folder_nar_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","CIMD NAR","*CIMD NAR Spend Data*.xlsx")

def get_wet_cn_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","*WET CN Spend Data*.xlsx")

def get_wet_folder_cn_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","WET CN","*WET CN Spend Data*.xlsx")

def get_wet_ro_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","*WET RO Spend Data*.xlsx")

def get_wet_folder_ro_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","WET EMEA","*WET RO Spend Data*.xlsx")

def get_wet_ro_intracompany_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","*WET RO Intracompany Spend Data*.xlsx")

def get_wet_mx_intracompany_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","*WET MX Intracompany Spend Data*.xlsx")

def get_wet_folder_mx_intracompany_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","WET NAR IC","*WET MX Intracompany Spend Data*.xlsx")

def get_hvac_cn_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data", "*HVAC CN Spend Data*.xlsx")

def get_hvac_folder_cn_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data", "HVAC CN", "*HVAC CN Spend Data*.xlsx")

def get_cimd_folder_emea_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","CIMD EMEA", "*CIMD EMEA Spend Data*.xlsx")

def get_cimd_emea_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","*CIMD EMEA Spend Data*.xlsx")

def get_fir_sc_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","FIR", "*FIR Spend Data Subcontracting*.xlsx")

def get_fir_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "spend-data","FIR","*FIR Spend Data NF * *.xlsx")

def get_file_org_path() -> str:
    return make_path("nidec-ga-transient", "pbi-supplier-footprint", "PFX MD.xlsx")


# Aux Lists

In [None]:
rename_columns = {
'Posting Date':'PERIOD',
'Vendor Name':'VENDOR_NAME_LOCAL',
'Item No.':'MATERIAL',
'Commodity Code':'CATEGORY_LOCAL',
'Description':'MATERIAL_DESCRIPTION',
'Total Amount':'SPEND_PO_TC',
'Quantity':'TRY_QTY',
'Currency Code':'TRANSACTION_CURRENCY',
'Payment Method Code':'PAYMENT_TERM',
'Shipment Method Code':'INCOTERM',
'Source No.':'VENDOR',
'Org':'ORG',
'Purchase Order':'PURCHASE_ORDER',
'Item Number':'MATERIAL',
'Trx Qty':'TRX_QTY',
'Extended PO Price':'SPEND_PO_LC',
'Supplier Number':'VENDOR',
'Extended Standard Cost':'SPEND_STANDARD_LC',
'Transaction Unit Of Measure':'UOM',
'DRI Code':'DRI_CODE',
'Commodity Family':'CATEGORY_LOCAL',
'Item Description':'MATERIAL_DESCRIPTION',
'Supplier Name':'VENDOR_NAME_LOCAL',
'Date':'PERIOD',
'Item':'MATERIAL',
'Amount':'TRX_QTY',
'Supplier Code':'VENDOR',
'Total Cost':'SPEND_PO_TC',
'Supplier':'VENDOR_NAME_LOCAL',
'Division ID':'ORG',
'PO_NUMBER':'PURCHASE_ORDER',
'VENDOR_ID':'VENDOR',
'ITM_GLMOCO':'MATERIAL',
'QTY_RECVD':'TRX_QTY',
'Currency Code':'TRANSACTION_CURRENCY',
'Unit Price':'UNIT_COST',
'RECV_DATE':'PERIOD',
'VEND_NAME':'VENDOR_NAME_LOCAL',
'COMMODITY FAMILY':'CATEGORY_LOCAL',
'DRI CODE':'DRI_CODE',
'Unit':'UOM',
'Item':'MATERIAL',
'Sup':'VENDOR',
'Company':'ORG',
'Gross TO':'SPEND_PO_LC',
'Pu Qty':'TRX_QTY',
'TransactionCurrency':'TRANSACTION_CURRENCY',
'INT_EXT':'VENDOR_TYPE',
'Cm_Descr':'CATEGORY_LOCAL',
'Item Descr':'MATERIAL_DESCRIPTION',
'Vendor ID':'VENDOR',
'PONumber':'PURCHASE_ORDER',
'Item Number':'MATERIAL' ,
'Quantity':'TRX_QTY' ,
'POReceipt Local Amount':'SPEND_PO_LC',
'Foreign Invoice Amount':'SPEND_PO_TC',
'Currency Code':'TRANSACTION_CURRENCY',
'Item Description':'MATERIAL_DESCRIPTION',
'Item Description.1':'CATEGORY_LOCAL',
'Vendor EN_Name':'VENDOR_NAME_LOCAL',
'Transaction Date':'PERIOD'
}

COMMON_DATE_AND_TIME_FORMATS = [
'%Y-%m-%d',                 # ISO 8601 date format (e.g., 2024-03-28)
'%m/%d/%Y',                 # U.S. style date (e.g., 03/28/2024)
#'%d/%m/%Y',                 # European style date (e.g., 28/03/2024)
'%Y-%m-%dT%H:%M:%S',        # ISO 8601 datetime format (e.g., 2024-03-28T14:00:00)
'%Y-%m-%d %H:%M:%S',        # ISO 8601 datetime format with space (e.g., 2024-03-28 14:00:00)
'%m/%d/%Y %H:%M:%S',        # U.S. style datetime (e.g., 03/28/2024 14:00:00)
#'%d/%m/%Y %H:%M:%S',        # European style datetime (e.g., 28/03/2024 14:00:00)
'%Y%m%d%H%M%S',             # Compact datetime format (e.g., 20240328140000)
'%Y-%m-%d %H:%M:%S.%f',     # ISO 8601 with microseconds (e.g., 2024-03-28 14:00:00.123456)
'%m/%d/%Y %I:%M:%S %p',     # U.S. style with 12-hour clock (e.g., 03/28/2024 02:00:00 PM)
#'%d/%m/%Y %I:%M:%S %p',     # European style with 12-hour clock (e.g., 28/03/2024 02:00:00 PM)
]

schema_pbi_nsap = """
ORG:STRING,
PERIOD:DATETIME,
PURCHASE_ORDER:STRING,
MATERIAL:STRING,
MATERIAL_DESCRIPTION:STRING,
UOM:STRING,
CATEGORY_LOCAL:STRING,
FAMILY_LOCAL:STRING,
DRI_CODE:STRING,
VENDOR:STRING,
VENDOR_NAME_LOCAL:STRING,
VENDOR_COUNTRY:STRING,
VENDOR_TYPE:STRING,
TRX_QTY:FLOAT,
SPEND_PO_TC:FLOAT,
TRANSACTION_CURRENCY:STRING,
SPEND_PO_LC:FLOAT,
SPEND_STANDARD_LC:FLOAT,
LOCAL_CURRENCY:STRING,
INCOTERM:STRING,
PAYMENT_TERM:STRING,
SOURCE:STRING"""

# Build Pipeline

In [None]:
def build_fir() -> beam.PCollection:

  input_fir_sc, error_read_fir_sc = read_excels_union(pipeline, get_fir_sc_path(), identifier='fir sc', converters={'articolo':str, 'cod_forn':str})
  error_read_fir_sc | 'Handle Format Errors Read FIR SC' >> beam.Map(log_error)

  clean_fir_sc, error_fir_sc = (input_fir_sc
      | 'Trim columns fir sc' >> beam.ParDo(TrimValues(['deposito', 'data_reg', 'cod_forn', 'ragsoc1', 'articolo', 'descrizione', 'qta_mov_mag', 'costo_lavoro', 'numero_mov_mag', 'azienda', 'riga_mov_mag']))
      | 'Derive Purchase Order' >> beam.ParDo(de.MergeColumnsFn([(['numero_mov_mag','riga_mov_mag'], 'PURCHASE_ORDER', '-')]))
      | 'Keep Columns fir sc' >> beam.ParDo(dc.KeepColumns(['PURCHASE_ORDER','SOURCE','deposito', 'data_reg', 'cod_forn', 'ragsoc1', 'articolo', 'descrizione', 'qta_mov_mag', 'costo_lavoro', 'azienda']))
      | 'Rename Columns fir sc' >> beam.ParDo(dc.RenameColumns({'deposito':'DEPOSITO', 'data_reg':'PERIOD', 'cod_forn':'VENDOR', 'ragsoc1':'VENDOR_NAME_LOCAL', 'articolo':'MATERIAL', 'descrizione':'MATERIAL_DESCRIPTION', 'qta_mov_mag':'TRX_QTY', 'costo_lavoro':'SPEND_PO_LC', 'azienda':'CENTER'}))
      | 'Replace Values in SOURCE  fir sc' >> beam.ParDo(dc.ReplaceValues([('VENDOR', ' ', ''), ('MATERIAL', ' ', ''),  ('PURCHASE_ORDER', 'NONE', ''), ('SOURCE','gs://nidec-ga-transient/','')]))
      | 'Convert Columns to Float fir sc' >> beam.ParDo(de.ColumnsToFloatConverter(['TRX_QTY','SPEND_PO_LC']))
      | 'Convert Columns to String fir sc' >> beam.ParDo(de.ColumnsToStringConverter(['VENDOR', 'SOURCE', 'PURCHASE_ORDER', 'DEPOSITO', 'VENDOR_NAME_LOCAL', 'MATERIAL', 'MATERIAL_DESCRIPTION', 'CENTER']))
      #| 'Replace Patters Material startswith 0 for ""  FIR SC' >> beam.ParDo(ReplacePatterns(columns=['MATERIAL'],pattern=r'^0+', replacement=''))
      | 'Convert String Columns to fir sc' >> beam.ParDo(de.ConvertToUpperCase(['VENDOR', 'SOURCE', 'PURCHASE_ORDER', 'DEPOSITO', 'VENDOR_NAME_LOCAL', 'MATERIAL', 'MATERIAL_DESCRIPTION', 'CENTER']))
      | 'Verify Column Date Format FIR SC' >> beam.ParDo(VerifyStrictDateFormat(date_column='PERIOD')).with_outputs('error', main='main')
  )
  error_fir_sc | 'Handle Format Errors FIR SC' >> beam.Map(log_error)

  input_fir, error_read_fir = read_excels_union(pipeline, get_fir_path(), identifier='fir', converters={'codart':str, 'codcliforn':str})
  input_fir_rnm = (input_fir | 'Rename Column depproprieta to match in all files' >> beam.ParDo(dc.RenameColumns({'dep_proprieta': 'depproprieta'})))
  error_read_fir | 'Handle Format Errors Read FIR' >> beam.Map(log_error)

  clean_fir, error_fir = (input_fir_rnm
      | 'TRim columns fir' >> beam.ParDo(TrimValues(['rigadoc','deposito', 'datareg', 'numerodoc', 'codart', 'descrart', 'categ01', 'categ04', 'codcliforn', 'ragsoc', 'qta', 'importonetto', 'azienda','depproprieta','naturamov']))
      | 'Filter rows to have only Y' >> beam.ParDo(dc.KeepColumnValues('depproprieta',['Y']))
      | 'Filter rows to have naturamov INBOUND_MAT or PURCHASE_RETURN' >> beam.ParDo(dc.KeepColumnValues('naturamov',['INBOUND_MAT','PURCHASE_RETURN']))
      | 'Derive Purchase Order fir' >> beam.ParDo(de.MergeColumnsFn([(['numerodoc','rigadoc'], 'PURCHASE_ORDER', '-')]))
      | 'Keep Columns fir' >> beam.ParDo(dc.KeepColumns(['deposito', 'datareg', 'codart', 'descrart', 'categ01', 'categ04', 'codcliforn', 'ragsoc', 'qta', 'importonetto', 'azienda','SOURCE','PURCHASE_ORDER']))
      | 'Rename Columns fir' >> beam.ParDo(dc.RenameColumns({'deposito':'DEPOSITO', 'datareg':'PERIOD', 'codart':'MATERIAL', 'descrart':'MATERIAL_DESCRIPTION', 'categ01':'FAMILY_LOCAL', 'categ04':'CATEGORY_LOCAL', 'codcliforn':'VENDOR', 'ragsoc':'VENDOR_NAME_LOCAL', 'qta':'TRX_QTY', 'importonetto':'SPEND_PO_LC', 'azienda':'CENTER'}))
      | 'Replace Values in FIR' >> beam.ParDo(dc.ReplaceValues([('VENDOR', ' ', ''), ('MATERIAL', ' ', ''),  ('PURCHASE_ORDER', 'NONE', ''), ('SOURCE','gs://nidec-ga-transient/','')]))
      | 'Convert Columns to Float fir' >> beam.ParDo(de.ColumnsToFloatConverter(['TRX_QTY','SPEND_PO_LC']))
      | 'Convert Columns to String fir' >> beam.ParDo(de.ColumnsToStringConverter(['VENDOR', 'SOURCE', 'PURCHASE_ORDER', 'DEPOSITO', 'VENDOR_NAME_LOCAL', 'MATERIAL', 'MATERIAL_DESCRIPTION', 'FAMILY_LOCAL', 'CATEGORY_LOCAL', 'CENTER']))
      | 'Convert String Columns to fir' >> beam.ParDo(de.ConvertToUpperCase(['VENDOR', 'SOURCE', 'PURCHASE_ORDER', 'DEPOSITO', 'VENDOR_NAME_LOCAL', 'MATERIAL', 'MATERIAL_DESCRIPTION', 'FAMILY_LOCAL', 'CATEGORY_LOCAL', 'CENTER']))
      #| 'Replace Patters Material startswith 0 for ""  FIR' >> beam.ParDo(ReplacePatterns(columns=['MATERIAL'],pattern=r'^0+', replacement=''))
      | 'Verify Column Date Format FIR' >> beam.ParDo(VerifyStrictDateFormat(date_column='PERIOD')).with_outputs('error', main='main')
  )

  error_fir | 'Handle Format Errors FIR' >> beam.Map(log_error)
  union_firs = ((clean_fir,clean_fir_sc) | 'Union PCollections firs' >> beam.Flatten())

  input_fir_org, error_read_fir_org = read_excels_union(pipeline, get_file_org_path(), identifier='fir org', tab='FIR_ORG')
  error_read_fir_org | 'Handle Format Errors Read FIR ORG' >> beam.Map(log_error)

  clean_fir_org = (input_fir_org
      | 'Keep Columns fir org' >> beam.ParDo(dc.KeepColumns(['DEPOSITO', 'ORG', 'CENTER']))
      | 'Convert String Columns to fir org' >> beam.ParDo(de.ConvertToUpperCase(['DEPOSITO', 'ORG', 'CENTER']))
  )

  group_clean_fir_org = de.key_transform(clean_fir_org, ['DEPOSITO','CENTER'], identifier='fir org grouped')

  group_union_firs = de.key_transform(union_firs, ['DEPOSITO','CENTER'], identifier='union fir fir_sc grouped')

  stage_fir = de.join(group_union_firs, group_clean_fir_org, method='left_join')

  stage_fir = (stage_fir
      | 'replace missvalue for fir in org' >> beam.ParDo(de.ReplaceMissingValues('ORG','FIR'))
      | "Ensure Schema Order FIR" >> beam.ParDo(EnsureSchemaOrderDoFn(schema_pbi_nsap))
  )

  return stage_fir

In [None]:
def build_cimd_in() -> beam.PCollection:

  input_cimd_in, errors_read_cimd_in = read_excels_union(pipeline, get_cimd_in_path(), identifier='cimd_in', column_date=['Posting Date'], converters={'Item No.':str, 'Source No.':str})
  input_folder_cimd_in, errors_read_folder_cimd_in = read_excels_union(pipeline, get_cimd_folder_in_path(), identifier='cimd_in_folder', column_date=['Posting Date'], converters={'Item No.':str, 'Source No.':str})

  errors_read_cimd_in | 'Handle Format Errors Read CIMD IN' >> beam.Map(log_error)
  errors_read_folder_cimd_in | 'Handle Format Errors Read Folder CIMD IN' >> beam.Map(log_error)

  merged_cimd_in = ((input_folder_cimd_in,input_cimd_in) | 'Merge PCollections CIMD IN' >> beam.Flatten())

  stage_cimd_in, errors_stage_cimd_in = (merged_cimd_in
      | 'Derive PURCHASE_ORDER by MERGING CIMD IN' >> beam.ParDo(de.MergeColumnsFn([(['PO No','PO Line No'], 'PURCHASE_ORDER', '-')]))
      | 'Replace Patters , for ""  CIMD' >> beam.ParDo(dc.ReplaceValues([('Total Amount', ',', ''), ('Quantity', ',', ''), ('UOM', 'NO', 'UN')]))
      | 'Derive SPEND_PO_LC with Total Amount / Currency_Factor' >> beam.ParDo(de.GenericArithmeticOperation([{'operands':['Total Amount','Currency Factor'],'result_column':'SPEND_PO_LC','formula': lambda c1,c2: c1 / c2 if c2 else 0}]))
      | 'Rename Columns CIMD' >> beam.ParDo(dc.RenameColumns(rename_columns))
      | 'Derive ORG CIMD IN' >> beam.ParDo(de.GenericDeriveCondition(column=None, map=None, new_column='ORG', default='IN01'))
      | 'Derive LOCAL_CURRENCY' >> beam.ParDo(de.GenericDeriveCondition(column=None, map=None, new_column='LOCAL_CURRENCY', default='INR'))
      | 'Trim CIMD IN' >> beam.ParDo(TrimValues(['MATERIAL', 'VENDOR_NAME_LOCAL', 'VENDOR', 'PURCHASE_ORDER', 'MATERIAL_DESCRIPTION']))
      | 'Replace Values in CIMD IN' >> beam.ParDo(dc.ReplaceValues([('VENDOR', ' ', ''),  ('MATERIAL', ' ', ''), ('PURCHASE_ORDER', 'NONE', ''), ('UOM', 'NO', 'UN'), ('TRANSACTION_CURRENCY', 'INR', 'LOCAL_CURRENCY'), ('TRANSACTION_CURRENCY', 'INR', 'LOCAL_CURRENCY'), ('SOURCE','gs://nidec-ga-transient/','')]))
      | 'Convert Columns to Float CIMD' >> beam.ParDo(de.ColumnsToFloatConverter(['SPEND_PO_LC','TRX_QTY']))
      | 'Convert Columns to String CIMD' >> beam.ParDo(de.ColumnsToStringConverter(['UOM', 'VENDOR_NAME_LOCAL', 'MATERIAL', 'CATEGORY_LOCAL', 'MATERIAL_DESCRIPTION', 'TRANSACTION_CURRENCY', 'PAYMENT_TERM', 'INCOTERM', 'PURCHASE_ORDER', 'SOURCE', 'LOCAL_CURRENCY', 'ORG', 'VENDOR']))
      | 'Convert String Columns to UPPERCASE CIMD' >> beam.ParDo(de.ConvertToUpperCase(['UOM', 'VENDOR_NAME_LOCAL', 'MATERIAL', 'CATEGORY_LOCAL', 'MATERIAL_DESCRIPTION', 'TRANSACTION_CURRENCY', 'PAYMENT_TERM', 'INCOTERM', 'PURCHASE_ORDER', 'SOURCE', 'LOCAL_CURRENCY', 'ORG', 'VENDOR']))
      #| 'Replace Patters Material startswith 0 for ""  CIMD IN' >> beam.ParDo(ReplacePatterns(columns=['MATERIAL'],pattern=r'^0+', replacement=''))
      | "Ensure Schema Order CIMD IN" >> beam.ParDo(EnsureSchemaOrderDoFn(schema_pbi_nsap))
      | 'Verify Column Date Format CIMD IN' >> beam.ParDo(VerifyStrictDateFormat(date_column='PERIOD')).with_outputs('error', main='main')
  )

  errors_stage_cimd_in | 'Handle Format Errors Cleaner CIMD IN' >> beam.Map(log_error)

  return stage_cimd_in

In [None]:
def build_oracle_nar() -> beam.PCollection:

  input_cimd_nar, errors_read_cimd_nar = read_excels_union(pipeline, get_cimd_nar_path(), identifier='cimd_nar', column_date=['Transaction Date'], converters={'Item Number':str, 'Supplier Number':str})
  errors_read_cimd_nar | 'Handle Format Errors Read CIMD NAR' >> beam.Map(log_error)

  input_folder_cimd_nar, errors_read_folder_cimd_nar = read_excels_union(pipeline, get_cimd_folder_nar_path(), identifier='cimd_nar_folder', column_date=['Transaction Date'], converters={'Item Number':str, 'Supplier Number':str})
  errors_read_folder_cimd_nar | 'Handle Format Errors Read Folder CIMD NAR' >> beam.Map(log_error)

  input_hvac_mx, errors_read_hvac_mx = read_excels_union(pipeline, get_hvac_mx_path(), identifier='hvac_mx', column_date=['Transaction Date'], converters={'Item Number':str, 'Supplier No':str})
  input_hvac_mx_rnm = (input_hvac_mx | 'Rename Column Vendor Not Folder' >> beam.ParDo(dc.RenameColumns({'Supplier No': 'Supplier Number'})))
  errors_read_hvac_mx | 'Handle Format Errors Read HVAC MX' >> beam.Map(log_error)

  input_folder_hvac_mx, errors_read_folder_hvac_mx = read_excels_union(pipeline, get_hvac_folder_mx_path(), identifier='hvac_mx_folder', column_date=['Transaction Date'], converters={'Item Number':str, 'Supplier No':str})
  input_folder_hvac_mx_rnm = (input_folder_hvac_mx | 'Rename Column Vendor in Folder' >> beam.ParDo(dc.RenameColumns({'Supplier No': 'Supplier Number'})))
  errors_read_folder_hvac_mx | 'Handle Format Errors Read Folder HVAC MX' >> beam.Map(log_error)

  input_wet_mx_1, error_read_wet_mx_1 = read_excels_union(pipeline, get_wet_mx_path(), identifier='wet_mx_1', tab='A01 Paragould', column_date=['Transaction Date'], converters={'Item Number':str, 'Supplier Number':str})
  error_read_wet_mx_1  | 'Handle Format Errors Read WET MX Sheet 1' >> beam.Map(log_error)

  input_folder_wet_mx_1, error_read_folder_wet_mx_1 = read_excels_union(pipeline, get_wet_folder_mx_path(), identifier='folder_wet_mx_1', column_date=['Transaction Date'], converters={'Item Number':str, 'Supplier Number':str}, tab='A01 Paragould')
  error_read_folder_wet_mx_1 | 'Handle Format Errors Read Folder WET MX Sheet 1' >> beam.Map(log_error)

  input_wet_mx_2, error_read_wet_mx_2 = read_excels_union(pipeline, get_wet_mx_path(), identifier='wet_mx_2', column_date=['Transaction Date'], converters={'Item Number':str, 'Supplier Number':str}, tab='A02 Reynosa')
  error_read_wet_mx_2  | 'Handle Format Errors Read WET MX' >> beam.Map(log_error)

  input_folder_wet_mx_2, error_read_folder_wet_mx_2 = read_excels_union(pipeline, get_wet_folder_mx_path(), identifier='folder_wet_mx_2', column_date=['Transaction Date'], converters={'Item Number':str, 'Supplier Number':str}, tab='A02 Reynosa')
  error_read_folder_wet_mx_2 | 'Handle Format Errors Read Folder WET MX' >> beam.Map(log_error)

  merged_oracle_nar = ((
      input_cimd_nar,
      input_folder_cimd_nar,
      input_hvac_mx_rnm,
      input_folder_hvac_mx_rnm,
      input_wet_mx_1,
      input_folder_wet_mx_1,
      input_wet_mx_2,
      input_folder_wet_mx_2) | 'Merge PCollections ORACLE NAR' >> beam.Flatten())

  stage_oracle_nar, error_oracle_nar = (merged_oracle_nar
      | 'Drop Period' >> beam.ParDo(dc.DropColumns(['PERIOD']))
      | 'Rename Columns ORACLE NAR' >> beam.ParDo(dc.RenameColumns({'Transaction Unit Of Measure':'ORG','Org':'ORG', 'Transaction Date':'PERIOD', 'Purchase Order':'PURCHASE_ORDER', 'Item Number':'MATERIAL', 'Trx Qty':'TRX_QTY', 'Extended PO Price':'SPEND_PO_LC', 'Supplier Number':'VENDOR', 'Extended Standard Cost':'SPEND_STANDARD_LC', 'Transaction Unit Of Measure':'UOM', 'DRI Code':'DRI_CODE', 'Commodity Family':'CATEGORY_LOCAL', 'Item Description':'MATERIAL_DESCRIPTION','Supplier Name':'VENDOR_NAME_LOCAL'}))
      | 'Keep Columns ORACLE NAR' >> beam.ParDo(dc.KeepColumns(['ORG','PERIOD','PURCHASE_ORDER','MATERIAL','TRX_QTY','SPEND_PO_LC','VENDOR','SPEND_STANDARD_LC','UOM','DRI_CODE','CATEGORY_LOCAL','MATERIAL_DESCRIPTION','VENDOR_NAME_LOCAL','SOURCE']))
      | 'Filter Col ORG with ORG ORACLE NAR' >> beam.ParDo(dc.FilterColumnValues(column='ORG', values_to_filter=['ORG']))
      | 'Convert Columns to String ORACLE' >> beam.ParDo(de.ColumnsToStringConverter(['ORG', 'PURCHASE_ORDER', 'MATERIAL', 'VENDOR', 'SOURCE', 'VENDOR_NAME_LOCAL', 'MATERIAL_DESCRIPTION', 'UOM', 'DRI_CODE', 'CATEGORY_LOCAL']))
      | 'Convert String Columns to UPPERCASE ORACLE' >> beam.ParDo(de.ConvertToUpperCase(['ORG', 'PURCHASE_ORDER', 'MATERIAL', 'VENDOR', 'SOURCE', 'VENDOR_NAME_LOCAL', 'MATERIAL_DESCRIPTION', 'UOM', 'DRI_CODE', 'CATEGORY_LOCAL']))
      | 'Remove None wet mx' >> beam.ParDo(dc.ReplaceValues([('PURCHASE_ORDER', 'NONE', '')]))
      #| 'Replace Patters Material startswith 0 for ""  ORACLE NAR' >> beam.ParDo(ReplacePatterns(columns=['MATERIAL'],pattern=r'^0+', replacement=''))
      | 'Convert Columns to Float ORACLE' >> beam.ParDo(de.ColumnsToFloatConverter(['SPEND_STANDARD_LC','SPEND_PO_LC','TRX_QTY']))
      | 'Replace Values in SOURCE ORACLE' >> beam.ParDo(
          dc.ReplaceValues(
              [
                  ('SOURCE', 'GS://NIDEC-GA-TRANSIENT/', ''), ('SPEND_STANDARD_LC','$   ', ''), ('SPEND_PO_LC','$   ', ''),
                  ('TRX_QTY',r'\(',''), ('SPEND_PO_LC', r'\(',''), ('SPEND_STANDARD_LC', r'\(',''),
                  ('TRX_QTY',r'\)',''), ('SPEND,_PO_LC',r'\)',''), ('SPEND_STANDARD_LC',r'\)',''),
                  ('TRX_QTY',',',''), ('SPEND_PO_LC', ',', ''), ('SPEND_STANDARD_LC', ',', ''),
                  ('TRX_QTY',' ',''), ('SPEND_PO_LC', ' ', ''), ('SPEND_STANDARD_LC', ' ', ''),
                   ('MATERIAL_DESCRIPTION', ' ', ''), ('VENDOR_NAME_LOCAL', ' ', '')
          ])
        )
      | 'Clean NaN to write in BQ Oracle' >> beam.ParDo(dc.CleanNaN())
      | "Ensure Schema Order ORACLE NAR" >> beam.ParDo(EnsureSchemaOrderDoFn(schema_pbi_nsap))
      | 'Verify Column Date Format ORACLE NAR' >> beam.ParDo(VerifyStrictDateFormat(date_column='PERIOD')).with_outputs('error', main='main')
  )
  error_oracle_nar |  'Handle Format Errors Oracle Nar' >> beam.Map(log_error)

  return stage_oracle_nar

In [None]:
def build_wet_mx_intracompany() -> beam.PCollection:

  input_wet_mx_ic, error_read_wet_mx_ic = read_excels_union(pipeline, get_wet_mx_intracompany_path(), identifier='wet_mx_ic', skiprows=1, column_date=['Date'], converters={'Item':str})
  error_read_wet_mx_ic  | 'Handle Format Errors Read WET MX Intracompany' >> beam.Map(log_error)

  input_folder_wet_mx_ic, error_read_folder_wet_mx_ic = read_excels_union(pipeline, get_wet_folder_mx_intracompany_path(), identifier='folder_wet_mx_ic', skiprows=1, column_date=['Date'], converters={'Item':str})
  error_read_folder_wet_mx_ic | 'Handle Format Errors Read Folder WET MX Intracompany' >> beam.Map(log_error)

  merged_wet_mx_ic = ((input_wet_mx_ic,input_folder_wet_mx_ic) | 'Merge PCollections WET MX IC' >> beam.Flatten())

  stage_wet_mx_ic, error_wet_mx_ic = (merged_wet_mx_ic
      | 'Clean NaN wet mx ic' >> beam.ParDo(dc.CleanNaN())
      | 'Drop Missing Value in Transaction Type' >> beam.ParDo(dc.FilterColumnValues(column='Transaction Type', values_to_filter=[None, '']))
      | 'Convert Columns Order Number & Line to Integer WET MX' >> beam.ParDo(de.ColumnsToIntegerConverter(['Order Number','Line']))
      | 'Derive PURCHASE_ORDER by MERGING WET MX IC' >> beam.ParDo(de.MergeColumnsFn([(['Order Number','Line'],'PURCHASE_ORDER', '-')]))
      | 'Drop Columns WET MC IC' >> beam.ParDo(dc.DropColumns(['Receipt','Packing Slip','Bill of Lading','Order Number','Line','Cost','Transaction Type','Unnamed: 0']))
      | 'Rename Columns WET MX IC' >> beam.ParDo(dc.RenameColumns(rename_columns))
      | 'Trim WET MX IC' >> beam.ParDo(TrimValues(['MATERIAL', 'VENDOR_NAME_LOCAL', 'VENDOR', 'PURCHASE_ORDER']))
      | 'Replace Values in SOURCE  WET MX IC' >> beam.ParDo(dc.ReplaceValues([('MATERIAL', ' ', ''), ('VENDOR', ' ', ''),  ('PURCHASE_ORDER', 'NONE', ''), ('SOURCE','gs://nidec-ga-transient/', '')]))
      | 'Derive ORG wet mx' >> beam.ParDo(de.GenericDeriveCondition(column=None, map=None, new_column='ORG', default='A02'))
      | 'Derive TRANSACTION_CURRENCY' >> beam.ParDo(de.GenericDeriveCondition(column=None, map=None, new_column='TRANSACTION_CURRENCY', default='USD'))
      | 'Convert Columns to Float WET MX IC' >> beam.ParDo(de.ColumnsToFloatConverter(['SPEND_PO_TC','TRX_QTY']))
      | 'Convert Columns to String WET MX IC' >> beam.ParDo(de.ColumnsToStringConverter(['ORG', 'PURCHASE_ORDER', 'MATERIAL', 'VENDOR', 'SOURCE', 'VENDOR_NAME_LOCAL','UOM','TRANSACTION_CURRENCY']))
      #| 'Replace Patters Material startswith 0 for ""  WET MX IC' >> beam.ParDo(ReplacePatterns(columns=['MATERIAL'],pattern=r'^0+', replacement=''))
      | 'Convert String Columns to UPPERCASE WET MX IC' >> beam.ParDo(de.ConvertToUpperCase(['ORG', 'PURCHASE_ORDER', 'MATERIAL', 'VENDOR', 'SOURCE', 'VENDOR_NAME_LOCAL','UOM','TRANSACTION_CURRENCY']))
      | 'Derive SPEND_PO_LC from SPEND_PO_TC' >> beam.ParDo(de.ColumnCopy('SPEND_PO_TC','SPEND_PO_LC'))
      | "Ensure Schema Order wet mx in" >> beam.ParDo(EnsureSchemaOrderDoFn(schema_pbi_nsap))
      | 'Verify Column Date Format WET MX IC' >> beam.ParDo(VerifyStrictDateFormat(date_column='PERIOD')).with_outputs('error', main='main')
  )
  error_wet_mx_ic | 'Handle Format Errors WET MX Intracompany' >> beam.Map(log_error)

  return stage_wet_mx_ic


In [None]:
def build_wet_cn() -> beam.PCollection:

  input_wet_cn, error_read_wet_cn = read_excels_union(pipeline, get_wet_cn_path(), identifier='wet_cn', parse_dates=['RECV_DATE'], converters={'ITM_GLMOCO':str,'VENDOR_ID':str})
  error_read_wet_cn | 'Handle Format Errors Read WET CN' >> beam.Map(log_error)

  input_folder_wet_cn, error_read_folder_wet_cn = read_excels_union(pipeline, get_wet_folder_cn_path(), identifier='wet_folder_cn', parse_dates=['RECV_DATE'], converters={'ITM_GLMOCO':str,'VENDOR_ID':str})
  error_read_folder_wet_cn | 'Handle Format Errors Read Folder WET CN' >> beam.Map(log_error)

  merged_wet_cn = ((input_wet_cn,input_folder_wet_cn) | 'Merge PCollections WET CN' >> beam.Flatten())

  stage_wet_cn, error_wet_cn = (merged_wet_cn
      | 'Rename Columns WET CN' >> beam.ParDo(dc.RenameColumns(rename_columns))
      | 'Derive SPEND_PO_TC with UNIT_COST * TRX_QTY' >> beam.ParDo(de.GenericArithmeticOperation([{'operands':['UNIT_COST','TRX_QTY'],'result_column':'SPEND_PO_TC','formula': lambda c1,c2: c1 * c2 }]))
      | 'Derive SPEND_PO_TC with UNIT_COST / EXCH_RATE1' >> beam.ParDo(de.GenericArithmeticOperation([{'operands':['SPEND_PO_TC','EXCH_RATE1'],'result_column':'SPEND_PO_LC','formula': lambda c1,c2: c1 / c2 if c2 else 0 }]))
      | 'Derive ORG wet cn' >> beam.ParDo(de.GenericDeriveCondition(column='ORG', map={'GA':'B01', 'COM':'B00'}, new_column='ORG', default='MISSING')) # para validar com a tabela será feito depois.
      | 'Clean NaN to write in wet cn' >> beam.ParDo(dc.CleanNaN())
      | 'Drop Missing Value in PERIOD' >> beam.ParDo(dc.FilterColumnValues(column='PERIOD', values_to_filter=[None, '']))
      | 'Trim WET CN' >> beam.ParDo(TrimValues(['MATERIAL', 'VENDOR_NAME_LOCAL', 'VENDOR', 'PURCHASE_ORDER']))
      | 'Replace Values in  WET CN' >> beam.ParDo(dc.ReplaceValues([('MATERIAL', ' ', ''), ('VENDOR', ' ', ''),  ('SOURCE', 'gs://nidec-ga-transient/', '')]))
      | 'Convert Columns to Float WET CN' >> beam.ParDo(de.ColumnsToFloatConverter(['TRX_QTY','UNIT_COST','SPEND_PO_TC','SPEND_PO_LC','EXCH_RATE1']))
      | 'Convert Columns to String WET CN' >> beam.ParDo(de.ColumnsToStringConverter(['PERIOD','ORG', 'PURCHASE_ORDER', 'MATERIAL', 'CATEGORY_LOCAL', 'DRI_CODE', 'VENDOR', 'VENDOR_NAME_LOCAL', 'TRANSACTION_CURRENCY', 'SOURCE']))
      #| 'Replace Patters Material startswith 0 for ""  WET CN' >> beam.ParDo(ReplacePatterns(columns=['MATERIAL'],pattern=r'^0+', replacement=''))
      | 'Convert String Columns to UPPERCASE WET CN' >> beam.ParDo(de.ConvertToUpperCase(['ORG', 'PURCHASE_ORDER', 'MATERIAL', 'CATEGORY_LOCAL', 'DRI_CODE', 'VENDOR', 'VENDOR_NAME_LOCAL', 'TRANSACTION_CURRENCY', 'SOURCE']))
      | 'Keep Columns WET CN' >> beam.ParDo(dc.KeepColumns(['ORG', 'PERIOD', 'PURCHASE_ORDER', 'MATERIAL', 'VENDOR', 'TRX_QTY', 'SPEND_PO_TC', 'SPEND_PO_LC', 'TRANSACTION_CURRENCY', 'SOURCE', 'VENDOR_NAME_LOCAL', 'CATEGORY_LOCAL', 'DRI_CODE']))
      | "Ensure Schema Order WET CN" >> beam.ParDo(EnsureSchemaOrderDoFn(schema_pbi_nsap))
      | 'Drop Missing Value in PERIOD WET CN' >> beam.ParDo(dc.FilterColumnValues(column='PERIOD', values_to_filter=[None, '', ' ']))
      | 'Verify Column Date Format WET CN' >> beam.ParDo(VerifyStrictDateFormat(date_column='PERIOD')).with_outputs('error', main='main')
  )
  error_wet_cn | 'Handle Format Errors WET CN' >> beam.Map(log_error)

  return stage_wet_cn

In [None]:
def build_wet_ro() -> beam.PCollection:

  input_wet_ro, error_read_wet_ro = read_excels_union(pipeline, get_wet_ro_path(), identifier='wet_ro', skiprows=3, converters={'Item':str, 'Sup':str})
  error_read_wet_ro | 'Handle Format Errors Read WET RO' >> beam.Map(log_error)

  input_wet_ro_ic, error_read_wet_ro_ic = read_excels_union(pipeline, get_wet_ro_intracompany_path(), identifier='wet_ro_i', skiprows=3, converters={'Item':str, 'Sup':str})
  error_read_wet_ro_ic | 'Handle Format Errors Read WET RO Intracompany' >> beam.Map(log_error)

  input_folder_wet_ro, error_read_folder_wet_ro = read_excels_union(pipeline, get_wet_folder_ro_path(), identifier='wet_folder_ro', skiprows=3, converters={'Item':str, 'Sup':str})
  error_read_folder_wet_ro | 'Handle Format Errors Read Folder WET RO' >> beam.Map(log_error)

  merged_wet_ro = ((input_wet_ro, input_folder_wet_ro, input_wet_ro_ic) | 'Merge PCollections WET RO' >> beam.Flatten())

  stage_wet_ro, error_wet_ro = (merged_wet_ro
      | 'Clean NaN Wet RO' >> beam.ParDo(dc.CleanNaN())
      | 'Drop Missing Value in INT_EXT' >> beam.ParDo(dc.FilterColumnValues(column='INT_EXT', values_to_filter=[None, '']))
      | 'Filter Col INT_EXT with INT_EXT value' >> beam.ParDo(dc.FilterColumnValues('INT_EXT','INT_EXT'))
      | 'Derive Test Cooling wet ro' >> beam.ParDo(de.ColumnValueAssignment(value='01', new_column='day'))
      | 'Convert Columns to integer before convert to string WET RO' >> beam.ParDo(de.ColumnsToIntegerConverter(['Year','MTH']))
      | 'Convert Columns to String WET RO DATE' >> beam.ParDo(de.ColumnsToStringConverter(['Year','MTH','day']))
      | 'Derive Period wet ro' >> beam.ParDo(de.MergeColumnsFn([(['Year','MTH','day'],'PERIOD','-')]))
      | 'Keep Columns WET RO' >> beam.ParDo(dc.KeepColumns(['ORG','PERIOD', 'Item', 'Sup', 'Company', 'Gross TO', 'Pu Qty', 'TransactionCurrency', 'INT_EXT', 'Cm_Descr', 'Item Descr','SOURCE', 'Supplier','TO local currency']))
      | 'Rename Columns WET RO' >> beam.ParDo(dc.RenameColumns({'INT_EXT': 'VENDOR_TYPE', 'Cm_Descr': 'CATEGORY_LOCAL', 'Item': 'MATERIAL', 'Item Descr': 'MATERIAL_DESCRIPTION', 'Sup': 'VENDOR', 'Supplier': 'VENDOR_NAME_LOCAL', 'Company': 'ORG', 'Gross TO': 'SPEND_PO_LC', 'TO local currency': 'SPEND_PO_LC' , 'Pu Qty': 'TRX_QTY', 'TransactionCurrency': 'TRANSACTION_CURRENCY'}))
      | 'Trim WET RO' >> beam.ParDo(TrimValues(['MATERIAL','MATERIAL_DESCRIPTION', 'VENDOR']))
      | 'Replace Values in SOURCE  WET RO' >> beam.ParDo(dc.ReplaceValues([('MATERIAL', ' ', ''), ('VENDOR', ' ', ''), ('SPEND_PO_LC', ',', ''), ('TRX_QTY', ',', ''), ('SOURCE', 'gs://nidec-ga-transient/', '')]))
      | 'Convert Columns to Float WET RO' >> beam.ParDo(de.ColumnsToFloatConverter(['TRX_QTY', 'SPEND_PO_LC']))
      | 'Multiply TRX and SPEND by 1000' >> beam.ParDo(de.MultiplyColumns(columns=['TRX_QTY', 'SPEND_PO_LC'],factor=1000.0))
      | 'Convert Columns to String WET RO' >> beam.ParDo(de.ColumnsToStringConverter(['ORG', 'MATERIAL', 'VENDOR', 'TRANSACTION_CURRENCY', 'SOURCE', 'VENDOR_TYPE', 'CATEGORY_LOCAL', 'MATERIAL_DESCRIPTION', 'VENDOR_NAME_LOCAL']))
      | 'Convert String Columns to UPPERCASE WET RO' >> beam.ParDo(de.ConvertToUpperCase(['ORG', 'MATERIAL', 'VENDOR', 'TRANSACTION_CURRENCY', 'SOURCE', 'VENDOR_TYPE', 'CATEGORY_LOCAL', 'MATERIAL_DESCRIPTION', 'VENDOR_NAME_LOCAL']))
      #| 'Replace Patters Material startswith 0 for ""  WET RO' >> beam.ParDo(ReplacePatterns(columns=['MATERIAL'],pattern=r'^0+', replacement=''))
      | "Ensure Schema Order WET RO" >> beam.ParDo(EnsureSchemaOrderDoFn(schema_pbi_nsap))
      | 'Drop Missing Value in PERIOD WET RO' >> beam.ParDo(dc.FilterColumnValues(column='PERIOD', values_to_filter=[None, '', ' ']))
      | 'Verify Column Date Format WET RO' >> beam.ParDo(VerifyStrictDateFormat(date_column='PERIOD')).with_outputs('error', main='main')
  )
  error_wet_ro | 'Handle Format Errors WET RO' >> beam.Map(log_error)


  return stage_wet_ro

In [None]:
def build_hvac_cn() -> beam.PCollection:

  input_hvac_cn, error_read_hvac_cn = read_excels_union(pipeline, get_hvac_cn_path(), identifier='hvac_cn', converters={'Item Number':str, 'Vendor ID':str})
  input_hvac_cn_nm = (input_hvac_cn | 'Rename Column Order Qty to match in all input_hvac_cn' >> beam.ParDo(dc.RenameColumns({'Order Qty': 'Quantity'})))
  error_read_hvac_cn | 'Handle Format Errors Read HVAC CN' >> beam.Map(log_error)

  input_folder_hvac_cn, error_read_folder_hvac_cn = read_excels_union(pipeline, get_hvac_folder_cn_path(), identifier='hvac_folder_cn', converters={'Item Number':str, 'Vendor ID':str})
  input_folder_hvac_cn_rnm = (input_folder_hvac_cn | 'Rename Column Order Qty to match in all input_folder_hvac_cn' >> beam.ParDo(dc.RenameColumns({'Order Qty': 'Quantity'})))
  error_read_folder_hvac_cn | 'Handle Format Errors Read Folder HVAC CN' >> beam.Map(log_error)

  merged_hvac_cn = (( input_hvac_cn_nm,input_folder_hvac_cn_rnm ) | 'Merge PCollections HVAC CN' >> beam.Flatten())

  stage_hvac_cn, error_hvac_cn = ( merged_hvac_cn
      | 'Clean NaN HVAC CN' >> beam.ParDo(dc.CleanNaN())
      | 'Drop Missing Value in Year HVAC CN' >> beam.ParDo(dc.FilterColumnValues(column='Year', values_to_filter=[None, '']))
      | 'Derive Day HVAC CN' >> beam.ParDo(de.ColumnValueAssignment(value='01', new_column='day'))
      | 'Derive ORG HVAC CN' >> beam.ParDo(de.GenericDeriveCondition(column=None, map=None, new_column='ORG', default='B02'))
      | 'Convert Columns to integer before convert to string HVAC CN' >> beam.ParDo(de.ColumnsToIntegerConverter(['Year','Period','day']))
      | 'Convert Columns to String HVAC CN date' >> beam.ParDo(de.ColumnsToStringConverter(['Year','Period','day']))
      | 'Derive Period' >> beam.ParDo(de.MergeColumnsFn([(['Year','Period','day'],'PERIOD','-')]))
      | 'Keep Columns HVAC CN' >> beam.ParDo(dc.KeepColumns(['ORG','PONumber', 'Vendor ID', 'Item Number', 'PERIOD', 'Quantity', 'Foreign Invoice Amount', 'Currency Code', 'POReceipt Local Amount', 'Vendor EN_Name', 'Item Description', 'Item Description.1', 'SOURCE']))
      | 'Rename Columns HVAC CN' >> beam.ParDo(dc.RenameColumns(rename_columns))
      | 'Trim HVAC CN' >> beam.ParDo(TrimValues(['MATERIAL','INCOTERM', 'PAYMENT_TERM','VENDOR', 'PURCHASE_ORDER']))
      | 'Replace Values in SOURCE  HVAC CN' >> beam.ParDo(dc.ReplaceValues([('MATERIAL', ' ', ''),  ('VENDOR', ' ', ''), ('SOURCE','gs://nidec-ga-transient/','')]))
      | 'Convert Columns to Float HVAC CN' >> beam.ParDo(de.ColumnsToFloatConverter(['TRX_QTY','SPEND_PO_LC','SPEND_PO_TC']))
      | 'Convert Columns to String HVAC CN' >> beam.ParDo(de.ColumnsToStringConverter(['ORG', 'MATERIAL', 'VENDOR', 'TRANSACTION_CURRENCY', 'SOURCE', 'CATEGORY_LOCAL', 'MATERIAL_DESCRIPTION', 'VENDOR_NAME_LOCAL']))
      | 'Convert String Columns to UPPERCASE HVAC CN' >> beam.ParDo(de.ConvertToUpperCase(['ORG', 'MATERIAL', 'VENDOR', 'TRANSACTION_CURRENCY', 'SOURCE', 'CATEGORY_LOCAL', 'MATERIAL_DESCRIPTION', 'VENDOR_NAME_LOCAL']))
      #| 'Replace Patters Material startswith 0 for ""  HVAC CN' >> beam.ParDo(ReplacePatterns(columns=['MATERIAL'],pattern=r'^0+', replacement=''))
      | "Ensure Schema Order HVAC CN" >> beam.ParDo(EnsureSchemaOrderDoFn(schema_pbi_nsap))
      | 'Drop Missing Value in PERIOD HVAC CN' >> beam.ParDo(dc.FilterColumnValues(column='PERIOD', values_to_filter=[None, '', ' ']))
      | 'Verify Column Date Format HVAC CN' >> beam.ParDo(VerifyStrictDateFormat(date_column='PERIOD')).with_outputs('error', main='main')
  )
  error_hvac_cn | 'Handle Format Errors HVAC CN' >> beam.Map(log_error)

  return stage_hvac_cn

In [None]:
def build_cimd_emea() -> beam.PCollection:

    input_cimd_emea_dados_transacionais, error_read_cimd_emea_dt = read_excels_union(pipeline, get_cimd_emea_path(), identifier='cimd_emea dados transacionais', skiprows=2, tab='Receipt Book',column_date=['Receipt Date'], converters={'Part Number':str, 'Supplier ID':str})
    error_read_cimd_emea_dt | 'Handle Format Errors Read CIMD EMEA DADOS TRANSACIONAIS' >> beam.Map(log_error)

    input_folder_cimd_emea_dados_transacionais, error_read_folder_cimd_emea_dt = read_excels_union(pipeline, get_cimd_folder_emea_path(), identifier='folder_cimd_emea dados transacionais', skiprows=2, tab='Receipt Book',column_date=['Receipt Date'], converters={'Part Number':str, 'Supplier ID':str})
    error_read_folder_cimd_emea_dt | 'Handle Format Errors Read Folder CIMD EMEA DADOS TRANSACIONAIS' >> beam.Map(log_error)

    merged_cimd_emea_dados_transacionais = ((input_cimd_emea_dados_transacionais,input_folder_cimd_emea_dados_transacionais) | 'Merge PCollections CIMD EMEA dados transacionais' >> beam.Flatten())

    clean_cimd_emea_dados_transacionais, error_cimd_emea_dt = (merged_cimd_emea_dados_transacionais
        | 'Drop Columns CIMD EMEA dados transacionais' >> beam.ParDo(dc.DropColumns(['Unnamed: 12', 'Unnamed: 13']))
        | 'Rename Columns CIMD EMEA dados transacionais' >> beam.ParDo(dc.RenameColumns({'OU Code (Legal entity)': 'ORG', 'Receipt Date': 'PERIOD', 'Supplier ID': 'VENDOR', 'PO Number': 'PURCHASE_ORDER', 'Part Number': 'MATERIAL', 'Receipt Quantity (LS UOM)': 'TRX_QTY', 'Receipt Amount (PO currency)': 'SPEND_PO_TC', 'PO Currency': 'TRANSACTION_CURRENCY', 'Receipt Amount (Local Currency)': 'SPEND_PO_LC', 'Incoterm Code': 'INCOTERM', 'Payment Terms (SUPP)': 'PAYMENT_TERM', 'Local Currency': 'LOCAL_CURRENCY'}))
        | 'Filter Col ORG with ORG and LS08 value' >> beam.ParDo(dc.FilterColumnValues(column='ORG', values_to_filter=['LS08','ORG']))
        | 'Convert Columns to String CIMD EMEA dados transacionais' >> beam.ParDo(de.ColumnsToStringConverter(['PERIOD', 'ORG', 'PURCHASE_ORDER', 'MATERIAL', 'VENDOR', 'TRANSACTION_CURRENCY', 'LOCAL_CURRENCY', 'PAYMENT_TERM', 'INCOTERM', 'SOURCE']))
        #| 'Replace Patters Material startswith 0 for ""  CIMD EMEA' >> beam.ParDo(ReplacePatterns(columns=['MATERIAL'],pattern=r'^0+', replacement=''))
        | 'Trim CIMD EMEA MD DT' >> beam.ParDo(TrimValues(['MATERIAL','INCOTERM', 'PAYMENT_TERM','VENDOR', 'PURCHASE_ORDER']))
        | 'Replace Values Spaces in MATERIAL CIMD EMEA dados transacionais' >> beam.ParDo(dc.ReplaceValues([('MATERIAL', ' ', ''),  ('SPEND_PO_LC', ',', '.'), ('SOURCE', 'gs://nidec-ga-transient/', '')]))
        | 'Convert Columns to Float CIMD EMEA dados transacionais' >> beam.ParDo(de.ColumnsToFloatConverter(['TRX_QTY', 'SPEND_PO_LC', 'SPEND_PO_TC']))
        | 'Convert String Columns to UPPERCASE CIMD EMEA dados transacionais' >> beam.ParDo(de.ConvertToUpperCase(['ORG', 'PURCHASE_ORDER', 'MATERIAL', 'VENDOR', 'TRANSACTION_CURRENCY', 'LOCAL_CURRENCY', 'PAYMENT_TERM', 'INCOTERM', 'SOURCE']))
        | 'Clean NaN dados transacionais' >> beam.ParDo(dc.CleanNaN())
        | 'Drop Missing Value in MATERIAL MD dados transacionais' >> beam.ParDo(dc.FilterColumnValues(column='MATERIAL', values_to_filter=[None, '']))
        | 'Drop Missing Value in PERIOD CIMD EMEA' >> beam.ParDo(dc.FilterColumnValues(column='PERIOD', values_to_filter=[None, '', ' ']))
        | 'Drop columns CIMD EMEA dados transionais out left join' >> beam.ParDo(dc.DropColumns(['VENDOR_NAME_LOCAL', 'VENDOR_COUNTRY','VENDOR_TYPE','MATERIAL_DESCRIPTION','UOM','CATEGORY_LOCAL','FAMILY_LOCAL','DRI_CODE']))
        | 'Verify Column Date Format CIMD EMEA' >> beam.ParDo(VerifyStrictDateFormat(date_column='PERIOD')).with_outputs('error', main='main')
    )
    error_cimd_emea_dt | 'Handle Format Errors CIMD EMEA DADOS TRANSACIONAIS' >> beam.Map(log_error)

    cimd_emea_md_material = build_cimd_emea_md_material()

    group_clean_cimd_emea_dados_transacionais = de.key_transform(clean_cimd_emea_dados_transacionais, ['MATERIAL'], identifier='dados transacionais para materiais')
    group_clean_cimd_emea_md_material = de.key_transform(cimd_emea_md_material, ['MATERIAL'], identifier='master data materiais')

    cimd_dados_transacionais_material = de.join(group_clean_cimd_emea_dados_transacionais, group_clean_cimd_emea_md_material, method='left_join', columns_to_include=['MATERIAL_DESCRIPTION','UOM', 'CATEGORY_LOCAL','FAMILY_LOCAL','DRI_CODE'])

    cimd_emea_md_vendor = build_cimd_emea_md_vendor()
    group_cimd_dados_transacionais_material = de.key_transform(cimd_dados_transacionais_material, ['VENDOR'], identifier='dados transacionais para vendor')
    group_clean_cimd_emea_md_vendor = de.key_transform(cimd_emea_md_vendor, ['VENDOR'], identifier='dados transacionais')

    stage_cimd_emea = de.join(group_cimd_dados_transacionais_material, group_clean_cimd_emea_md_vendor, method='left_join', columns_to_include=['VENDOR_NAME_LOCAL', 'VENDOR_COUNTRY', 'VENDOR_TYPE']) | "Ensure Schema Order CIMD EMEA" >> beam.ParDo(EnsureSchemaOrderDoFn(schema_pbi_nsap))

    return stage_cimd_emea

In [None]:
def build_cimd_emea_md_material() -> beam.PCollection:

    input_cimd_emea_md_material, error_read_cimd_emea_material = read_excels_union(pipeline, get_cimd_emea_path(), identifier='cimd_emea', skiprows=2, tab='Material', converters={'Part Number':str})
    error_read_cimd_emea_material | 'Handle Format Errors Read CIMD EMEA DADOS MATERIAL' >> beam.Map(log_error)

    input_folder_cimd_emea_md_material, error_read_folder_cimd_emea_material = read_excels_union(pipeline, get_cimd_folder_emea_path(), identifier='folder_cimd_emea', skiprows=2, tab='Material', converters={'Part Number':str})
    error_read_folder_cimd_emea_material | 'Handle Format Errors Read Folder CIMD EMEA DADOS MATERIAL' >> beam.Map(log_error)

    merged_cimd_emea_md_material = ((input_cimd_emea_md_material,input_folder_cimd_emea_md_material) | 'Merge PCollections CIMD EMEA' >> beam.Flatten())

    clean_cimd_emea_md_material = (merged_cimd_emea_md_material
        | 'Rename Columns CIMD EMEA MD MATERIAL' >> beam.ParDo(dc.RenameColumns({'OU Code (Legal entity)': 'ORG',
                                                                                    'Part Number': 'MATERIAL', 'Part Long Description': 'MATERIAL_DESCRIPTION','Primary UOM (LS)': 'UOM', 'Parent ICC Name (ICC)': 'CATEGORY_LOCAL', 'ICC Name': 'FAMILY_LOCAL',
                                                                                    'DRI': 'DRI_CODE'}))
        | 'Drop Columns CIMD EMEA MD MATERIAL' >> beam.ParDo(dc.DropColumns(['Unnamed: 6','ORG']))
        | 'Trim CIMD EMEA MD MATERIAL' >> beam.ParDo(TrimValues(['MATERIAL','MATERIAL_DESCRIPTION', 'VENDOR', 'UOM','CATEGORY_LOCAL','FAMILY_LOCAL','DRI_CODE']))
        | 'Replace Values in SOURCE  CIMD EMEA MD MATERIAL' >> beam.ParDo(dc.ReplaceValues([('MATERIAL', ' ', ''), ('VENDOR', ' ', ''), ('MATERIAL_DESCRIPTION', '.,', '. '), ('SOURCE','gs://nidec-ga-transient/','')]))
        | 'Convert Columns to String CIMD EMEA MD MATERIAL' >> beam.ParDo(de.ColumnsToStringConverter(['MATERIAL', 'MATERIAL_DESCRIPTION', 'UOM', 'CATEGORY_LOCAL', 'FAMILY_LOCAL', 'DRI_CODE','ORG', 'SOURCE']))
        | 'Convert String Columns to UPPERCASE CIMD EMEA  MD MATERIAL' >> beam.ParDo(de.ConvertToUpperCase(['MATERIAL', 'MATERIAL_DESCRIPTION', 'UOM', 'CATEGORY_LOCAL', 'FAMILY_LOCAL', 'DRI_CODE','ORG', 'SOURCE']))
        #| 'Replace Patters Material startswith 0 for ""  CIMD EMEA MATERIAL' >> beam.ParDo(ReplacePatterns(columns=['MATERIAL'],pattern=r'^0+', replacement=''))
        | 'Drop Missing Value in MATERIAL MD MATERIAL' >> beam.ParDo(dc.FilterColumnValues(column='MATERIAL', values_to_filter=[None, '']))
        | "Extract Date in YYYYMMDD material" >> beam.ParDo(ExtractDateYYYYMMDD(column_name="SOURCE"))
        | 'Keep columns to left in Material' >> beam.ParDo(dc.KeepColumns(['MATERIAL','MATERIAL_DESCRIPTION','UOM','CATEGORY_LOCAL','FAMILY_LOCAL','DRI_CODE','EXTRACT_DATE','SOURCE']))
        | 'deduplicate material' >> beam.ParDo(DeduplicateByColumnFn(['MATERIAL', 'EXTRACT_DATE']))
    )

    parse_and_group_clean_cimd_emea_md_material = (clean_cimd_emea_md_material
        | 'Parse Date and Key Material' >> beam.ParDo(MaxDate(keys=['MATERIAL'], date_column='EXTRACT_DATE'))
        | 'Group by Material' >> beam.GroupByKey()
    )

    selected_rows_clean_cimd_emea_md_material = (parse_and_group_clean_cimd_emea_md_material
        | 'Select Max Date Row Material'  >> beam.ParDo(FilterMaxDate())
    )

    return selected_rows_clean_cimd_emea_md_material

In [None]:
def build_cimd_emea_md_vendor() -> beam.PCollection:

    input_cimd_emea_md_vendor, error_read_cimd_emea_vendor = read_excels_union(pipeline, get_cimd_emea_path(), identifier='cimd_emea_vendor', skiprows=2, tab='Vendor', converters={'Supplier ID':str})
    error_read_cimd_emea_vendor | 'Handle Format Errors Read CIMD EMEA VENDOR' >> beam.Map(log_error)

    input_folder_cimd_emea_md_vendor, error_read_folder_cimd_emea_vendor = read_excels_union(pipeline, get_cimd_folder_emea_path(), identifier='folder_cimd_emea_vendor', skiprows=2, tab='Vendor', converters={'Supplier ID':str})
    error_read_folder_cimd_emea_vendor | 'Handle Format Errors Read Folder CIMD EMEA VENDOR' >> beam.Map(log_error)

    merged_cimd_emea_md_vendor = ((input_cimd_emea_md_vendor,input_folder_cimd_emea_md_vendor) | 'Merge PCollections CIMD EMEA _vendor' >> beam.Flatten())

    clean_cimd_emea_md_vendor = (merged_cimd_emea_md_vendor
        | 'Rename Columns CIMD EMEA MD vendor' >> beam.ParDo(dc.RenameColumns({'OU Code (Legal entity)':'ORG','ORG CODE': 'ORG','Supplier ID': 'VENDOR','Supplier Name': 'VENDOR_NAME_LOCAL','Country Code (SUPP)': 'VENDOR_COUNTRY','Supplier Category ': 'VENDOR_TYPE'}))
        | 'Drop Columns CIMD EMEA MD _vendor' >> beam.ParDo(dc.DropColumns(['Unnamed: 2','Unnamed: 5','  ','Unnamed: 6','ORG']))
        | 'Trim CIMD EMEA VENDOR' >> beam.ParDo(TrimValues(['VENDOR', 'MATERIAL']))
        | 'Replace Values Spaces in MATERIAL CIMD EMEA MD _vendor' >> beam.ParDo(dc.ReplaceValues([('MATERIAL', ' ', ''), ('VENDOR', ' ', ''), ('VENDOR_NAME_LOCAL', '.,', '. ')]))
        | 'Replace Values in SOURCE  CIMD EMEA MD _vendor' >> beam.ParDo(dc.ReplaceValues([('SOURCE','gs://nidec-ga-transient/','')]))
        | 'Convert Columns to String CIMD EMEA MD _vendor' >> beam.ParDo(de.ColumnsToStringConverter(['ORG', 'VENDOR', 'VENDOR_NAME_LOCAL', 'VENDOR_COUNTRY', 'VENDOR_TYPE', 'SOURCE']))
        #| 'Replace Patters Material startswith 0 for ""  WET RO VENDOR' >> beam.ParDo(ReplacePatterns(columns=['MATERIAL'],pattern=r'^0+', replacement=''))
        | 'Convert String Columns to UPPERCASE CIMD EMEA  MD _vendor' >> beam.ParDo(de.ConvertToUpperCase(['ORG', 'VENDOR', 'VENDOR_NAME_LOCAL', 'VENDOR_COUNTRY', 'VENDOR_TYPE', 'SOURCE']))
        | 'Clean NaN MD VENDOR' >> beam.ParDo(dc.CleanNaN())
        | 'Drop Missing Value in VENDOR MD VENDOR' >> beam.ParDo(dc.FilterColumnValues(column='VENDOR', values_to_filter=[None, '']))
        | "Extract Date in YYYYMMDD VENDOR" >> beam.ParDo(ExtractDateYYYYMMDD(column_name="SOURCE"))
        | 'Keep Columns to left in vendor' >> beam.ParDo(dc.KeepColumns(['VENDOR','VENDOR_NAME_LOCAL', 'VENDOR_COUNTRY','VENDOR_TYPE','EXTRACT_DATE']))
        | 'deduplicate VENDOR' >> beam.ParDo(DeduplicateByColumnFn(['VENDOR', 'EXTRACT_DATE']))
    )

    parse_and_group_clean_cimd_emea_md_vendor = (clean_cimd_emea_md_vendor
        | 'Parse Date and Key Vendor' >> beam.ParDo(MaxDate(keys=['VENDOR'], date_column='EXTRACT_DATE'))
        | 'Group by Vendor' >> beam.GroupByKey()
    )

    selected_rows_clean_cimd_emea_md_vendor = (parse_and_group_clean_cimd_emea_md_vendor
        | 'Select Max Date Row Vendor' >> beam.ParDo(FilterMaxDate())
    )

    return selected_rows_clean_cimd_emea_md_vendor

In [None]:
def build_pbi_pfx_nsap(temp_location, output_pbi_pfx_nsap):

  pbi_pfx_nsap, error_missing_values = ((
                                          build_wet_mx_intracompany(),
                                          build_wet_ro(),
                                          build_wet_cn(),
                                          build_hvac_cn(),
                                          build_cimd_emea(),
                                          build_fir(),
                                          build_cimd_in(),
                                          build_oracle_nar()
                                        )
      | 'Union PCollections for pbi nsap' >> beam.Flatten()
      | 'Clean' >> beam.ParDo(dc.CleanNaN())
      | 'Drop MANUTORD MATERIAL' >> beam.ParDo(dc.FilterColumnValues(column='MATERIAL', values_to_filter=['MANUTORD']))
      | 'Drop rows with CATEGORY_LOCAL == oth MISCELLANEOUS' >> beam.ParDo(dc.FilterColumnValues('CATEGORY_LOCAL',['OTH MISCELLANEOUS LS']))
      | 'Verify Missing Values' >> beam.ParDo(CheckMissingValuesDoFn(column_list=['MATERIAL','VENDOR','PERIOD','TRX_QTY','SPEND_PO_LC'])).with_outputs('error', main='main')
  )
  error_missing_values | 'Got some missing values in final table' >> beam.Map(log_error)

  pbi_pfx_nsap | 'Write To BigQuery -> PBI PFX NSAP' >> beam.io.WriteToBigQuery(
      table=output_pbi_pfx_nsap,
      schema=schema_pbi_nsap,
      create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
      write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
      custom_gcs_temp_location = temp_location
  )


  pipeline.run().wait_until_finish()


In [None]:
if __name__ == '__main__':
  temp_location = make_path("nidec-ga-temp", "data-flow-pipelines", "pbi-footprint", "temp")

  output_pbi_pfx_nsap = 'nidec-ga:bq_trusted.PBI_PFX_NSAP_DF'

  build_pbi_pfx_nsap(temp_location, output_pbi_pfx_nsap)