In [26]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml import Estimator
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, abs, desc, rand
from pyspark.sql.types import TimestampType, LongType
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.pipeline import Pipeline

from pyspark.ml import Transformer
from pyspark.ml.param.shared import Param, Params, TypeConverters, HasOutputCols, HasInputCols

In [27]:
print(f"Pyspark Version: {pyspark.__version__}")

# create a spark session
spark_session = SparkSession.builder \
        .appName("financial_complaints_app") \
        .config("spark.executor.memory", "2g") \
        .config("spark.driver.memory", "1g") \
        .config("spark.master", "local[*]") \
        .getOrCreate()

# check if the spark session created successfully
print("Spark Session created successfully!")

Pyspark Version: 3.5.2
Spark Session created successfully!


In [28]:
spark_session

In [29]:
frequencyInfo = Param(
    Params._dummy(),
    "getfrequencyInfo",
    "getfrequencyInfo",
    typeConverter=TypeConverters.toList
)

frequencyInfo

Param(parent='undefined', name='getfrequencyInfo', doc='getfrequencyInfo')

In [30]:
# FrequencyEncoder - An Estimator that calculates the frequency of each unique category in the specified columns

class FrequencyEncoder(Estimator, HasInputCols, HasOutputCols,
                        DefaultParamsReadable, DefaultParamsWritable):
    
    # Defines a parameter to store frequency information (list of category frequencies)
    frequencyInfo = Param(
        Params._dummy(),
        "getfrequencyInfo",
        "getfrequencyInfo",
        typeConverter=TypeConverters.toList
    )
    
    # Initialization
    def __init__(self, inputCols: list[str], outputCols: list[str]):
        super().__init__()
        self._setDefault(frequencyInfo=[])
        self.setParams(inputCols=inputCols, outputCols=outputCols)
    
    # Sets the frequency info after it’s calculated in the _fit method
    def setfrequencyInfo(self, frequencyInfo: list):
        return self._set(frequencyInfo=frequencyInfo)
    
    
    def getfrequencyInfo(self):
        return self.getOrDefault(self.frequencyInfo)
    
    
    def setParams(self, inputCols: list[str], outputCols: list[str]):
        return self._set(inputCols=inputCols, outputCols=outputCols)
    
    
    def _fit(self, dataframe: DataFrame):
        input_columns = self.getInputCols()
        output_columns = self.getOutputCols()
        replace_info = []  # list to hold frequency information
        
        # for each column calculate frequency and collect the data
        for column, new_column in zip(input_columns, output_columns):
            freq = dataframe.select(col(column).alias(f"g_{column}")) \
                            .groupBy(col(f"g_{column}")).count() \
                            .withColumn(new_column, col("count"))
            
            freq = freq.drop("count") # Drop unnecessary count column
            replace_info.append(freq.collect())
            
        # Set frequency info in the Estimator
        self.setfrequencyInfo(replace_info)
        
        return FrequencyEncoderModel(inputCols=input_columns, outputCols=output_columns).setfrequencyInfo(replace_info)
    
    

# FrequencyEncoderModel - A Transformer that uses the frequency info to encode categorical values
class FrequencyEncoderModel(FrequencyEncoder, Transformer):
    
    def __init__(self, inputCols: list[str], outputCols: list[str]):
        super().__init__(inputCols, outputCols)
    
    # Transform method to encode columns with their calculated frequencies    
    def _transform(self, dataframe: DataFrame):
        input_columns = self.getInputCols()
        output_columns = self.getOutputCols()
        freqInfo = self.getfrequencyInfo()  # retrive the frequency info
        
        for in_col, out_col, freq_info in zip(input_columns, output_columns, freqInfo):
            frequency_dataframe = spark_session.createDataFrame(freq_info) # convert list to dataframe
            columns = frequency_dataframe.columns
            dataframe = dataframe.join(frequency_dataframe, dataframe[in_col] == frequency_dataframe[columns[0]])
            dataframe = dataframe.drop(columns[0]).withColumn(out_col, col(columns[1]))
        
        return dataframe

In [31]:
# DerivedFeatureGenerator - A Transformer that generates new features based on time differences

class DerivedFeatureGenerator(Transformer, HasInputCols, HasOutputCols,
                                DefaultParamsReadable, DefaultParamsWritable):
    
    def __init__(self, inputCols: list[str] = None, outputCols: list[str] = None):
        super().__init__()
        self.second_within_day = 24 * 60 * 60 # seconds in a day
        self.setParams(inputCols=inputCols, outputCols=outputCols)
        
    def setParams(self, inputCols: list[str], outputCols: list[str]):
        return self._set(inputCols=inputCols, outputCols=outputCols)
    
    def _transform(self, dataframe: DataFrame):
        inputCols = self.getInputCols()
        
        # Convert columns to timestamp, then calculate difference in days
        dataframe = dataframe.withColumn(inputCols[0], col(inputCols[0]).cast(TimestampType()))
        dataframe = dataframe.withColumn(inputCols[1], col(inputCols[1]).cast(TimestampType()))
        
        dataframe = dataframe.withColumn(self.getOutputCols()[0], 
                                        abs(col(inputCols[1]).cast(LongType()) - col(inputCols[0]).cast(LongType())) / self.second_within_day)
        
        return dataframe

In [32]:

# FrequencyImputer - An Estimator that imputes missing values using the most frequent category in each column

class FrequencyImputer(Estimator, HasInputCols, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable):
    
    
    topCategories = Param(
        Params._dummy(),
        "getTopCategories",
        "Stores most frequent categories",
        typeConverter=TypeConverters.toList
    )
    
    def __init__(self, inputCols: list[str], outputCols: list[str]):
        super().__init__()
        
        self._setDefault(topCategories=[])
        self.setParams(inputCols=inputCols, outputCols=outputCols)
        
    def setTopCategories(self, values: list[str]):
        return self._set(topCategories=values)
    
    def getTopCategories(self):
        return self.getOrDefault(self.topCategories)
    
    def setParams(self, inputCols: list[str], outputCols: list[str]):
        return self._set(inputCols=inputCols, outputCols=outputCols)
    
    def _fit(self, dataframe: DataFrame):
        inputCols = self.inputCols()
        topCategories = []
        
        for column in inputCols:
            most_common = dataframe.groupBy(column) \
                                    .count().orderBy(desc('count')).first()[column]
            
            topCategories.append(most_common)
        
        self.setTopCategories(topCategories)
        
        # Return a Transformer (FrequencyImputerModel) with top categories set
        
        return FrequencyImputerModel(inputCols=self.getInputCols(), outputCols=self.getOutputCols()).setTopCategories(topCategories)


# FrequencyImputerModel - A Transformer that fills missing values with the most frequent categories
class FrequencyImputerModel(FrequencyImputer, Transformer):
    
    def __init__(self, inputCols, outputCols):
        super().__init__(inputCols, outputCols)
    
    # Transform method to fill missing values
    def _transform(self, dataset: DataFrame):
        topCategorys = self.getTopCategorys()
        outputCols = self.getOutputCols()

        updateMissingValue = dict(zip(outputCols, topCategorys))

        inputCols = self.getInputCols()
        
        for outputColumn, inputColumn in zip(outputCols, inputCols):
            dataset = dataset.withColumn(outputColumn, col(inputColumn))
            # print(dataset.columns)
            # print(outputColumn, inputColumn)

        dataset = dataset.na.fill(updateMissingValue)

        return dataset

In [33]:
from typing import List
from pyspark.sql.types import TimestampType, StringType, FloatType, StructType, StructField
from finance_complaint.exception import FinanceException
import os, sys

from pyspark.sql import DataFrame
from typing import Dict



class FinanceDataSchema:

    def __init__(self):
        self.col_company_response: str = 'company_response'
        self.col_consumer_consent_provided: str = 'consumer_consent_provided'
        self.col_submitted_via = 'submitted_via'
        self.col_timely: str = 'timely'
        self.col_diff_in_days: str = 'diff_in_days'
        self.col_company: str = 'company'
        self.col_issue: str = 'issue'
        self.col_product: str = 'product'
        self.col_state: str = 'state'
        self.col_zip_code: str = 'zip_code'
        self.col_consumer_disputed: str = 'consumer_disputed'
        self.col_date_sent_to_company: str = "date_sent_to_company"
        self.col_date_received: str = "date_received"
        self.col_complaint_id: str = "complaint_id"
        self.col_sub_product: str = "sub_product"
        self.col_complaint_what_happened: str = "complaint_what_happened"
        self.col_company_public_response: str = "company_public_response"

    @property
    def dataframe_schema(self) -> StructType:
        try:
            schema = StructType([
                StructField(self.col_company_response, StringType()),
                StructField(self.col_consumer_consent_provided, StringType()),
                StructField(self.col_submitted_via, StringType()),
                StructField(self.col_timely, StringType()),
                StructField(self.col_date_sent_to_company, TimestampType()),
                StructField(self.col_date_received, TimestampType()),
                StructField(self.col_company, StringType()),
                StructField(self.col_issue, StringType()),
                StructField(self.col_product, StringType()),
                StructField(self.col_state, StringType()),
                StructField(self.col_zip_code, StringType()),
                StructField(self.col_consumer_disputed, StringType()),

            ])
            return schema

        except Exception as e:
            raise FinanceException(e, sys) from e

    @property
    def target_column(self) -> str:
        return self.col_consumer_disputed

    @property
    def one_hot_encoding_features(self) -> List[str]:
        features = [
            self.col_company_response,
            self.col_consumer_consent_provided,
            self.col_submitted_via,
        ]
        return features

    @property
    def im_one_hot_encoding_features(self) -> List[str]:
        return [f"im_{col}" for col in self.one_hot_encoding_features]

    @property
    def string_indexer_one_hot_features(self) -> List[str]:
        return [f"si_{col}" for col in self.one_hot_encoding_features]

    @property
    def tf_one_hot_encoding_features(self) -> List[str]:
        return [f"tf_{col}" for col in self.one_hot_encoding_features]

    @property
    def tfidf_features(self) -> List[str]:
        features = [
            self.col_issue
        ]
        return features

    @property
    def derived_input_features(self) -> List[str]:
        features = [
            self.col_date_sent_to_company,
            self.col_date_received
        ]
        return features

    @property
    def derived_output_features(self) -> List[str]:
        return [self.col_diff_in_days]

    @property
    def numerical_columns(self) -> List[str]:
        return self.derived_output_features

    @property
    def im_numerical_columns(self) -> List[str]:
        return [f"im_{col}" for col in self.numerical_columns]

    @property
    def tfidf_feature(self) -> List[str]:
        return [self.col_issue]

    @property
    def tf_tfidf_features(self) -> List[str]:
        return [f"tf_{col}" for col in self.tfidf_feature]

    @property
    def input_features(self) -> List[str]:
        in_features = self.tf_one_hot_encoding_features + self.im_numerical_columns + self.tf_tfidf_features
        return in_features

    @property
    def required_columns(self) -> List[str]:
        features = [self.target_column] + self.one_hot_encoding_features + self.tfidf_features + \
                    [self.col_date_sent_to_company, self.col_date_received]
                    
        return features

    @property
    def required_prediction_columns(self) -> List[str]:
        features =  self.one_hot_encoding_features + self.tfidf_features + \
                    [self.col_date_sent_to_company, self.col_date_received]
                    
        return features



    @property
    def unwanted_columns(self) -> List[str]:
        features = [
            self.col_complaint_id,
            self.col_sub_product, self.col_complaint_what_happened
            ]

        return features

    @property
    def vector_assembler_output(self) -> str:
        return "va_input_features"

    @property
    def scaled_vector_input_features(self) -> str:
        return "scaled_input_features"

    @property
    def target_indexed_label(self) -> str:
        return f"indexed_{self.target_column}"

    @property
    def prediction_column_name(self) -> str:
        return "prediction"

    @property
    def prediction_label_column_name(self) -> str:
        return f"{self.prediction_column_name}_{self.target_column}"

In [34]:
schema = FinanceDataSchema()
schema.dataframe_schema

StructType([StructField('company_response', StringType(), True), StructField('consumer_consent_provided', StringType(), True), StructField('submitted_via', StringType(), True), StructField('timely', StringType(), True), StructField('date_sent_to_company', TimestampType(), True), StructField('date_received', TimestampType(), True), StructField('company', StringType(), True), StructField('issue', StringType(), True), StructField('product', StringType(), True), StructField('state', StringType(), True), StructField('zip_code', StringType(), True), StructField('consumer_disputed', StringType(), True)])

In [35]:
schema.target_column

'consumer_disputed'

In [36]:
schema.required_columns

['consumer_disputed',
 'company_response',
 'consumer_consent_provided',
 'submitted_via',
 'issue',
 'date_sent_to_company',
 'date_received']

In [37]:
schema.required_prediction_columns

['company_response',
 'consumer_consent_provided',
 'submitted_via',
 'issue',
 'date_sent_to_company',
 'date_received']

In [38]:
schema.input_features

['tf_company_response',
 'tf_consumer_consent_provided',
 'tf_submitted_via',
 'im_diff_in_days',
 'tf_issue']

In [39]:
schema.derived_input_features

['date_sent_to_company', 'date_received']

In [40]:
schema.derived_output_features

['diff_in_days']

In [41]:
schema.numerical_columns

['diff_in_days']

In [42]:
schema.im_numerical_columns

['im_diff_in_days']

In [43]:
schema.one_hot_encoding_features

['company_response', 'consumer_consent_provided', 'submitted_via']

In [44]:
schema.im_one_hot_encoding_features

['im_company_response', 'im_consumer_consent_provided', 'im_submitted_via']

In [45]:
schema.string_indexer_one_hot_features

['si_company_response', 'si_consumer_consent_provided', 'si_submitted_via']

In [46]:
schema.tfidf_features

['issue']

In [47]:
from pyspark.ml.feature import StandardScaler, VectorAssembler, OneHotEncoder, StringIndexer, Imputer, \
    IDF, Tokenizer, HashingTF

In [48]:

def get_data_transformation_pipeline():
    stages = []
    
    # generating additional column
    derived_feature = DerivedFeatureGenerator(inputCols=schema.derived_input_features, 
                                                outputCols=schema.derived_output_features)
    stages.append(derived_feature)
    
    # creating imputer to fill null values
    imputer = Imputer(inputCols=schema.numerical_columns,
                        outputCols=schema.im_numerical_columns)
    stages.append(imputer)
    
    # Frequency imputer for categorical features
    frequency_imputer = FrequencyImputer(inputCols=schema.one_hot_encoding_features,
                                            outputCols=schema.im_one_hot_encoding_features)
    stages.append(frequency_imputer)
    
    # StringIndexer for one-hot encoding features
    for im_feature, indexer_col in zip(schema.im_one_hot_encoding_features, schema.string_indexer_one_hot_features):
        string_indexer = StringIndexer(inputCol=im_feature, outputCol=indexer_col)
        stages.append(string_indexer)
    
    # OneHotEncoder for indexed categorical features
    one_hot_encoder = OneHotEncoder(inputCols=schema.string_indexer_one_hot_features,
                                    outputCols=schema.tf_one_hot_encoding_features)
    stages.append(one_hot_encoder)
    
    # Tokenizer for text features
    tokenizer = Tokenizer(inputCol=schema.tfidf_features[0], outputCol="words")
    stages.append(tokenizer)
    
    # HashingTF for tokenized words
    hashing_tf = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rawFeatures", numFeatures=40)
    stages.append(hashing_tf)
    
    # IDF for term frequency features
    idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol=schema.tf_tfidf_features[0])
    stages.append(idf)

    # VectorAssembler to assemble input features into a single vector
    vector_assembler = VectorAssembler(inputCols=schema.input_features,
                                        outputCol=schema.vector_assembler_output)
    stages.append(vector_assembler)

    # StandardScaler to scale the vector of input features
    standard_scaler = StandardScaler(inputCol=schema.vector_assembler_output,
                                        outputCol=schema.scaled_vector_input_features)
    stages.append(standard_scaler)
    
    # Creating the pipeline with all stages
    pipeline = Pipeline(stages=stages)
    
    return pipeline

# Run the function to get the pipeline
get_data_transformation_pipeline()

Pipeline_941a87809752

In [49]:

class FinanceData:
    
    def __init__(self, data_path: str, schema=FinanceDataSchema()):
        self.data_path = data_path
        self.schema = schema
    
    def read_data(self):
        file_path = self.data_path
        dataframe: DataFrame = spark_session.read.parquet(file_path)
        
        dataframe.printSchema()
        
        return dataframe
    
    
    def get_balanced_shuffled_dataframe(self, dataframe: DataFrame):
        
        count_of_each_cat = dataframe.groupby(self.schema.target_column).count().collect()
        label = []
        n_record = []
        
        for info in count_of_each_cat:
            n_record.append(info['count'])
            label.append(info[self.schema.target_column])

        minority_row = min(n_record)
        n_per = [minority_row / record for record in n_record]

        selected_row = []
        for label, per in zip(label, n_per):
            print(label, per)
            temp_df, _ = dataframe.filter(col(self.schema.target_column) == label).randomSplit([per, 1 - per])
            selected_row.append(temp_df)

        selected_df: DataFrame = None
        for df in selected_row:
            df.groupby(self.schema.target_column).count().show()
            
            if selected_df is None:
                selected_df = df
            else:
                selected_df = selected_df.union(df)

        selected_df = selected_df.orderBy(rand())

        selected_df.groupby(self.schema.target_column).count().show()
        
        return selected_df

In [52]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [53]:
def get_score(dataframe: DataFrame, metric_name, label_col, prediction_col) -> float:
    try:
        evaluator = MulticlassClassificationEvaluator(
            labelCol=label_col, predictionCol=prediction_col,
            metricName=metric_name)
        score = evaluator.evaluate(dataframe)
        print(f"{metric_name} score: {score}")
        
        #logger.info(f"{metric_name} score: {score}")
        return score
    
    except Exception as e:
        raise FinanceException(e, sys)

In [54]:
from collections import namedtuple

In [58]:
ModelTrainerConfig = namedtuple("ModelTrainerConfig", ["base_accuracy", "trained_model_file_path", "metric_list",
                                                        'label_indexer_model_dir', ])

In [59]:

def get_scores(dataframe: DataFrame, metric_names: List[str]) -> List[tuple]:
    if metric_names is None:
        metric_names = ModelTrainerConfig.metric_list

    scores: List[tuple] = []
    for metric_name in metric_names:
        score = get_score(metric_name=metric_name,
                        # A keyword argument.
                        dataframe=dataframe,
                        label_col=schema.target_indexed_label,
                        prediction_col=schema.prediction_column_name, 
                        )
        
        scores.append((metric_name, score))
        
    return scores

In [61]:
from pyspark.ml.feature import StringIndexer, StringIndexerModel
from pyspark.ml.feature import IndexToString
from pyspark.ml.classification import RandomForestClassifier

In [62]:
def get_model(self, label_indexer_model: StringIndexerModel) -> Pipeline:
    stages = []
    #logger.info("Creating Random Forest Classifier class.")
    
    random_forest_clf = RandomForestClassifier(labelCol=self.schema.target_indexed_label,
                                                featuresCol=self.schema.scaled_vector_input_features)

    #logger.info("Creating Label generator")
    
    label_generator = IndexToString(inputCol=self.schema.prediction_column_name,
                                    outputCol=f"{self.schema.prediction_column_name}_{self.schema.target_column}",
                                    labels=label_indexer_model.labels)
    stages.append(random_forest_clf)
    stages.append(label_generator)
    pipeline = Pipeline(stages=stages)
    return pipeline

In [50]:
#spark_session.stop()