# prediction Pipeline

In [68]:
import time
start_time = time.time()

In [69]:
import spacy
import en_core_web_sm
import re
import numpy as np

from concurrent.futures import ProcessPoolExecutor
from functools import partial
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics.pairwise import cosine_distances

from sentence_transformers import SentenceTransformer

nlp = spacy.load("en_core_web_sm")

import boto3
import pandas as pd
import pyarrow.parquet as pq
# import s3fs
import joblib

import string
import nltk
nltk.data.path.append('../../nltk_data')
from nltk.corpus import stopwords
# nltk.download('stopwords')

# Constants

In [84]:
# Specify the S3 bucket and prefix where the Parquet files are stored
# s3://adl-core-sagemaker-studio/external/IVA/IVA_daily/
# s3://adl-core-sagemaker-studio/external/IVA/Search_daily/
# s3://adl-core-sagemaker-studio/external/Deepali/iva-data(3-apr).csv
# s3://adl-core-sagemaker-studio/external/Deepali/search_data/

# model_path = 'Bert_EC_model/ec_model_v1.pkl'
model_path = 'Bert_CC_model/cc_model_v1.joblib'

bucket_name = 'adl-core-sagemaker-studio'
# prefix = 'external/web_clickstream/clickstream20230403_20230403/'
prefix = 'external/Deepali/search_data/'

# words_3 = ['elder','elder women','elder','Octogenarians','Nonagenarians','Centenarians',
# 'elderly people',
#  'senior assistance',
#  'grey generation',
#  'senior health',
#  'elderly companion',
#  'senior citizen',
#  'elderly',
#  'senior members',
#  'elderly residents',
#  'senior assistance',
#  'grey generation',
#  'elder',
#  'elderly',
#  'elderly people',
#  'elderly residents',
#  'elder',
#  'elder',
#  'senior citizen',
#  'elder generation',
#  'gerontology',
#  'elderly population',
#  'senior members',
#  'retirees',
#  'elderly population',
#  'eldercare',
#  'elder',
# 'eldercae', 'eldercarr', 'eldermann',
# 'eldercre','eldery','elderman','elders','eldercrae']
# words_3 = ['day care', 'creche', 'childcare', 'daycare', 'after school care', 'pre school', 'child', 'baby',
#            'infant', 'girl child', 'play school', 'boy child', 'Adolescent', 'nursery', 'preschool', 'day nursery',
#            'playschool', 'kindergarten', 'childminding', 'babysitting', 'babysitter', 'nanny', 'children supervision', 
#            'toddler care', 'baby sitter', 'children', 'child supervision', 'childs', 'stepchild', 'step daughter', 'step son', 
#            'grandchildren', 'grandchild', 'daughter', 'son', 'stepchildren', 'childhood', 'day cares', 'childrent', 'daycares', 
#            'childplus']

# words_4 = set([word.lower() for word in words_3])
sent_trans_model = SentenceTransformer('all-mpnet-base-v2')
# len(words_4)

# Inferencing Pipeline

In [85]:
# def fetch_data_from_s3(text_col):
#     # Initialize S3 client
#     s3 = boto3.client('s3')
    
#     # List all Parquet files in the bucket with the specified prefix
#     response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
#     if 'Contents' in response:
#         # parquet_files = [obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.parq')]
#         csv_files = [obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.csv')]
#     else:
#         # parquet_files = []
#         csv_files = []

#     # Read Parquet files and concatenate them together
#     dfs = []
#     #s3fs = s3fs.S3FileSystem()

#     # for file in parquet_files:
#     for file in csv_files:
#         # Read the Parquet file into a PyArrow table
#         s3_key = f"{bucket_name}/{file}"
#         # dataset = pq.ParquetDataset(f"s3://{s3_key}", filesystem=s3fs)
#         df = pd.read_csv(f"s3://{s3_key}")
#         # table = dataset.read()

#         # Convert the PyArrow table to a Pandas DataFrame
#         # df = table.to_pandas()
#         # print(df.head())
#         dfs.append(df)

#     # # Concatenate all the DataFrames together
#     # try:
#     #     concatenated_df = pd.concat(dfs, ignore_index=True)
#     #     #print(concatenated_df.head())
#     # except:
#     #     # print("No Parquet files found.")
#     #     print("No csv files found.")
#     #     raise
#     if len(dfs) == 0:
#         raise ValueError("No DataFrame provided.")
#     elif len(dfs) == 1:
#         return dfs[0]
#     else:
#         try:
#             concatenated_df = pd.concat(dfs, ignore_index=True)
#             return concatenated_df
#         except:
#             raise ValueError("Error occurred while concatenating DataFrames.")
        
#     # concatenated_df= concatenated_df[['client_id','person_internal_id','page_name']]
#     concatenated_df= concatenated_df[['client_id','person_internal_id', text_col]]
        
#     return concatenated_df

def read_dataframe_from_s3(file_path):
    if file_path.endswith(".parquet"):
        # Initialize S3 client
        s3 = boto3.client('s3')
        s3fs = s3fs.S3FileSystem()

        # Read the Parquet file into a PyArrow table
        dataset = pq.ParquetDataset(f"s3://{file_path}", filesystem=s3fs)
        table = dataset.read()

        # Convert the PyArrow table to a Pandas DataFrame
        df = table.to_pandas()
        return df
    elif file_path.endswith(".csv"):
        # Initialize S3 client
        s3 = boto3.client('s3')
        
        # Read the CSV file into a DataFrame
        df = pd.read_csv(f"s3://{file_path}")
        return df
    elif file_path.endswith(".xlsx"):
        # Initialize S3 client
        s3 = boto3.client('s3')
        
        # Read the Excel file into a DataFrame
        df = pd.read_excel(f"s3://{file_path}")
        return df
    else:
        raise ValueError("Unsupported file type. Only .parquet, .csv, and .xlsx are supported.")

def fetch_data_from_s3(bucket_name, prefix, text_col):
    # Initialize S3 client
    s3 = boto3.client('s3')
    
    # List all Parquet, CSV, and Excel files in the bucket with the specified prefix
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

    # Check if the response contains any objects
    if 'Contents' not in response:
        print("No DataFrame provided.")
        return None

    file_paths = [obj['Key'] for obj in response['Contents'] if obj['Key'].endswith(('.parquet', '.csv', '.xlsx'))]

    # Read files and concatenate them together
    dfs = []
    for file_path in file_paths:
        try:
            df = read_dataframe_from_s3(file_path)
            dfs.append(df)
        except:
            print(f"Failed to read {file_path}. Skipping.")

    if len(dfs) == 0:
        print("No DataFrame provided.")
        return None
    elif len(dfs) == 1:
        return dfs[0]
    else:
        try:
            concatenated_df = pd.concat(dfs, ignore_index=True)
            concatenated_df = concatenated_df[['client_id', 'person_internal_id', text_col]]
            return concatenated_df
        except:
            raise ValueError("Error occurred while concatenating DataFrames.")


def clean_text(df, text_cols):
    if df is None or df.empty:
        print("No DataFrame provided.")
        return pd.DataFrame()
    # Create a new dataframe to hold the cleaned text columns
    cleaned_df = pd.DataFrame()
    
    # Define the list of stopwords
    stop_words = set(stopwords.words('english'))
    
    # Clean each text column and add it to the cleaned dataframe
    for text_col in text_cols:
        text_list = df[text_col].tolist()
        text_list = [str(text) for text in text_list]
        text_list = [text if text.strip() and not
                     set(text).issubset(set(string.punctuation + string.whitespace)) else '' 
                     for text in text_list]
        text_list = [x.lower() for x in text_list]
        translator = str.maketrans(string.punctuation + string.digits + "_", " " * len(
            string.punctuation + string.digits + "_"))
        cleaned_list = []
        for text in text_list:
            cleaned_text = text.translate(translator)
            cleaned_text = ' '.join(cleaned_text.split())
            cleaned_text = ' '.join([word for word in cleaned_text.split() if word not in stop_words])
            cleaned_list.append(cleaned_text)
        cleaned_df[text_col] = cleaned_list
    
    # Add the non-text columns to the cleaned dataframe
    for col in df.columns:
        if col not in text_cols:
            cleaned_df[col] = df[col]
    
    return cleaned_df

def predict(df, text_col, category_name, threshold):
    # Load the model
    classifier = joblib.load(model_path)
    
    # Encode the text samples from the Parquet file
    encodings = sent_trans_model.encode(df[text_col].fillna(' ').tolist())

    # Make predictions for the encoded samples
    # predictions = loaded_model.predict(encodings)
    probabilities = classifier.predict_proba(encodings)
    
    # Apply threshold and assign labels
    df['predicted_label'] = np.where(probabilities[:, 0] > threshold, category_name, 'Other')
    df['prediction'] = df['predicted_label'].map({category_name:1, 'Other':0})
    
    df['pred_probab'] = probabilities[:, 0]
    
    # Save the predictions in an Excel file
    df[['client_id','person_internal_id', text_col, 'predicted_label', 'prediction', 'pred_probab']].to_csv(f'{category_name}_{text_col}_predictions.csv', index=False)

    elapsed_time = time.time() - start_time
    print(elapsed_time)


# Main

In [86]:
def run_prediction_pipeline(text_col, category_name, threshold):    
    df =  fetch_data_from_s3(bucket_name, prefix, text_col)

    # cleaned_concatenated_df = clean_text(concatenated_df, text_cols=['page_name'])
    cleaned_concatenated_df = clean_text(df, text_cols=[text_col])
    
    predict(cleaned_concatenated_df, text_col, category_name, threshold)

In [87]:
#threshold for cc for f1 score =1 is 0.999
#threshold for ec for f1 score =0.97913 is 0.485
if __name__ == "__main__":
    run_prediction_pipeline(text_col = 'search_text', 
                            category_name='Child care' , threshold= 0.5)

Failed to read external/Deepali/search_data/search-data(3-apr).csv. Skipping.
No DataFrame provided.
No DataFrame provided.


KeyError: 'search_text'