In [1]:
 # Copyright 2023 Akvelon Inc.
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
 # regarding copyright ownership.  The ASF licenses this file
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
 #
 #     http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.

In [2]:
import apache_beam as beam
import numpy as np
import pandas as pd

import torch

from apache_beam import dataframe
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

In [3]:
ib.options.num = '10m'

In [4]:
train = pd.read_csv('../../../data/Salesforce/dtc_opportunity_train.csv')
test = pd.read_csv('../../../data/Salesforce/dtc_opportunity_test.csv')

### Custom wrappers for embedding and clustering models
These custom handlers are used to determine the way the data processed before the input to RunInference:
1. For anomaly detection model
2. For PyTorch model

In [5]:
import category_encoders as ce
import time
hasher = ce.HashingEncoder(n_components=64, max_process=1, max_sample=1)

def encode_and_normalize(bq_row, hasher=hasher, num_fields=None, id_field='Id'):
    if num_fields is None:
        num_fields = ['Amount']

    amount_mean = 1137889.913561848
    amount_std = 1197302.0975264315


    target_list = [getattr(bq_row, feature_name) for feature_name in set(bq_row._fields)-set(num_fields)-set(id_field)]
    start_t = time.time()
    hashed_list = hasher.fit_transform(np.asarray([target_list]))
    print(f'hash took {time.time() - start_t}')
    result = (getattr(bq_row, id_field),
              {'cont_cols': torch.Tensor([(getattr(bq_row, x)-amount_mean)/amount_std for x in num_fields]).reshape(-1, 1),
            'cat_cols': torch.Tensor(hashed_list.to_numpy().reshape(-1, 1))})
    return result

Construct and run the pipeline

In [6]:
def process_with_ptransform(data_csv):
    from pipeline.anomaly_detection.anomaly_detection import AnomalyDetection
    with beam.Pipeline(InteractiveRunner()) as p:
        input_data = p | "Read CSV" >> beam.dataframe.io.read_csv(data_csv)
        # For now dropping dates
        input_data = input_data.drop(['Billing State/Province',
                                      'Created Date',
                                      'Close Date',
                                      'Account Name',
                                      'Product Name',
                                      'Opportunity Name',
                                      'Opportunity Owner',
                                      'Account Owner'], axis=1)
        input_data = input_data.drop('#', axis=1)
        input_data = input_data.replace('-', '', regex=True)
        input_data = input_data.replace('/', '', regex=True)
        input_data = input_data.replace(' ', '', regex=True)
        input_data = input_data.rename(columns={x: x.replace(' ', '') for x in input_data.columns})
        input_data['Id'] = input_data.index
        input_data = beam.dataframe.convert.to_pcollection(input_data)

        input_data = input_data | AnomalyDetection(encoder_uri='./pretrained/encoder.pth', model_uri='./pretrained/anomaly_detection.model', params_uri='./pretrained/model.params')
        return ib.collect(input_data)

In [7]:
test_2entries = pd.read_csv('../../../data/Salesforce/dtc_opportunity_test.csv')[:2]
test_2entries.to_csv('../../../data/Salesforce/test.csv', index=False)

In [8]:
data = process_with_ptransform(data_csv='../../../data/Salesforce/test.csv')

<pipeline.anomaly_detection.anomaly_detection.CustomPytorchModelHandlerTensor object at 0x000002146EA60220>
PCollection[Unbatch 'set_column_DataFrame_2286778976096'.None]




Set model config: {'hidden_layers': [[25, 20], [20, 10]], 'epochs': 10, 'lr': 0.0001, 'verbose': 1, 'batch_size': 4}
Model `in_features`: 33




Set model config: {'hidden_layers': [[25, 20], [20, 10]], 'epochs': 10, 'lr': 0.0001, 'verbose': 1, 'batch_size': 4}
Model `in_features`: 33
