## Data Preparation

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, isnan, isnull
from pyspark.sql.types import IntegerType

# Create a SparkSession
spark = SparkSession.builder.appName("IndeedJobPostings") \
    .config("spark.driver.memory", "14g") \
    .config("spark.executor.memory", "14g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.autoBroadcastJoinThreshold", "10485760") \
    .getOrCreate()

# Assume the data is loaded into a DataFrame called 'df'
# If you need to load the data, uncomment and modify the following line:
df = spark.read.csv("fake_job_postings.csv", header=True, inferSchema=True)

# 1. Encode the label column and delete invalid data
df = df.withColumn("fraudulent", col("fraudulent").cast(IntegerType()))
df = df.filter((col("fraudulent") == 0) | (col("fraudulent") == 1))

# 2. List missing values per attribute and their percentages
def null_value_calc(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull() | isnan(col(k))).count()
        percentage = (nullRows / numRows) * 100
        null_columns_counts.append((k, nullRows, percentage))
    return null_columns_counts

null_columns_calc = null_value_calc(df)

print("Null values per attribute:")
for item in null_columns_calc:
    print(f"{item[0]}: {item[1]} ({item[2]:.2f}%)")

# 3. Delete attributes with more than 1% null values
columns_to_drop = [item[0] for item in null_columns_calc if item[2] > 1]
df = df.drop(*columns_to_drop)

print(f"\nColumns dropped due to >1% null values: {columns_to_drop}")

# Show the resulting schema
print("\nResulting DataFrame schema:")
df.printSchema()

# Show the first few rows of the prepared dataset
print("\nFirst few rows of the prepared dataset:")
df.show(5, truncate=False)

# Don't forget to stop the SparkSession when you're done
# spark.stop()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/22 21:21:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Null values per attribute:
job_id: 0 (0.00%)
title: 0 (0.00%)
location: 337 (1.99%)
department: 11039 (65.07%)
salary_range: 14258 (84.04%)
company_profile: 3206 (18.90%)
description: 0 (0.00%)
requirements: 2571 (15.15%)
benefits: 6949 (40.96%)
telecommuting: 0 (0.00%)
has_company_logo: 0 (0.00%)
has_questions: 0 (0.00%)
employment_type: 3273 (19.29%)
required_experience: 6675 (39.34%)
required_education: 7661 (45.16%)
industry: 4667 (27.51%)
function: 6158 (36.30%)
fraudulent: 0 (0.00%)

Columns dropped due to >1% null values: ['location', 'department', 'salary_range', 'company_profile', 'requirements', 'benefits', 'employment_type', 'required_experience', 'required_education', 'industry', 'function']

Resulting DataFrame schema:
root
 |-- job_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- telecommuting: string (nullable = true)
 |-- has_company_logo: string (nullable = true)
 |-- has_questions: string (nullable = tr

#### Verification code (to be added after the main data preparation code)
1. Encoding the label column and deleting invalid data:
   - The `groupBy("fraudulent").count().show()` command will display the count of rows for each unique value in the "fraudulent" column. We should only see two rows: one for 0 and one for 1. If there are any other values or if the column contains non-integer types, the task wasn't completed correctly.

2. Listing missing values per attribute and their percentages:
   - The `null_value_calc(df)` function calculates this, and the results are printed for each column. We can review these percentages to confirm they've been calculated correctly.

3. Deleting attributes with more than 1% null values:
   - The code prints the final list of columns in the DataFrame. We can compare this with the original list to see which columns were removed.
   - Additionally, it recalculates the null value percentages for the remaining columns. We should see that no column has more than 1% null values. If any do, it will print a warning.

4. Showing the first few rows of the prepared dataset:
   - This allows us to visually inspect the data and confirm that the "fraudulent" column only contains 0 or 1, and that the problematic columns have been removed.

In [2]:
# 1. Verify encoding of label column and deletion of invalid data
print("\nVerification 1: Encoding of label column and deletion of invalid data")
print("Unique values in 'fraudulent' column:")
df.groupBy("fraudulent").count().show()

# 2. Verify listing of missing values
print("\nVerification 2: Listing of missing values")
null_columns_calc = null_value_calc(df)
for item in null_columns_calc:
    print(f"{item[0]}: {item[1]} ({item[2]:.2f}%)")

# 3. Verify deletion of attributes with >1% null values
print("\nVerification 3: Deletion of attributes with >1% null values")
print("Columns in the final DataFrame:")
print(df.columns)

print("\nConfirming no columns have >1% null values:")
new_null_calc = null_value_calc(df)
for item in new_null_calc:
    if item[2] > 1:
        print(f"Warning: {item[0]} still has {item[2]:.2f}% null values")
    else:
        print(f"{item[0]}: {item[2]:.2f}% null values (OK)")

# 4. Show the first few rows of the final prepared dataset
print("\nFirst few rows of the final prepared dataset:")
df.show(5, truncate=False)


Verification 1: Encoding of label column and deletion of invalid data
Unique values in 'fraudulent' column:


+----------+-----+
|fraudulent|count|
+----------+-----+
|         1|  886|
|         0|16080|
+----------+-----+


Verification 2: Listing of missing values
job_id: 0 (0.00%)
title: 0 (0.00%)
description: 0 (0.00%)
telecommuting: 0 (0.00%)
has_company_logo: 0 (0.00%)
has_questions: 0 (0.00%)
fraudulent: 0 (0.00%)

Verification 3: Deletion of attributes with >1% null values
Columns in the final DataFrame:
['job_id', 'title', 'description', 'telecommuting', 'has_company_logo', 'has_questions', 'fraudulent']

Confirming no columns have >1% null values:
job_id: 0.00% null values (OK)
title: 0.00% null values (OK)
description: 0.00% null values (OK)
telecommuting: 0.00% null values (OK)
has_company_logo: 0.00% null values (OK)
has_questions: 0.00% null values (OK)
fraudulent: 0.00% null values (OK)

First few rows of the final prepared dataset:
+------+-----------------------------------------+-------------------------------------------------------------------------------------------------

### Clean the datasets: remove anything that is not a letter, remove multiple spaces, lower case everything.

In [None]:
from pyspark.sql.functions import col, when, count, isnan, isnull, regexp_replace, lower, trim


# 4. Clean the text columns: remove non-letter characters, multiple spaces, and convert to lowercase
text_columns = [
    "title", "location", "department", "company_profile", "description", 
    "requirements", "benefits", "employment_type", "required_experience", 
    "required_education", "industry", "function"
]

for column in text_columns:
    if column in df.columns:
        df = df.withColumn(
            column,
            lower(
                trim(
                    regexp_replace(col(column), "[^a-zA-Z\\s]", "")
                )
            )
        )
        df = df.withColumn(column, regexp_replace(col(column), "\\s+", " "))

# Show the resulting schema
print("\nResulting DataFrame schema:")
df.printSchema()

# Show the first few rows of the cleaned dataset
print("\nFirst few rows of the cleaned dataset:")
df.show(5, truncate=False)



Resulting DataFrame schema:
root
 |-- job_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- telecommuting: string (nullable = true)
 |-- has_company_logo: string (nullable = true)
 |-- has_questions: string (nullable = true)
 |-- fraudulent: integer (nullable = true)


First few rows of the cleaned dataset:
+------+---------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Verification of data cleaning

Check the output of the verification steps. 

1. We should see zero counts for non-letter characters, multiple spaces, and uppercase letters in all columns.

2. Examine the sample of cleaned data. All text should be lowercase, with only single spaces between words, and no special characters or numbers.

3. Look at the column data types. All text columns should be of type 'string'.
Review the summary statistics. This will show us the minimum, maximum, and average lengths of the text in each column, which can help identify any unexpected values.

In [4]:
from pyspark.sql.functions import regexp_extract,length

# Get the list of columns that actually exist in the DataFrame
existing_columns = [col for col in text_columns if col in df.columns]

print(f"\nColumns present in the DataFrame: {existing_columns}")

# 1. Check for non-letter characters
def check_non_letter(column_name):
    return df.filter(regexp_extract(col(column_name), "[^a-z\\s]", 0) != "").count()

# 2. Check for multiple spaces
def check_multiple_spaces(column_name):
    return df.filter(regexp_extract(col(column_name), "\\s{2,}", 0) != "").count()

# 3. Check for uppercase letters
def check_uppercase(column_name):
    return df.filter(regexp_extract(col(column_name), "[A-Z]", 0) != "").count()

for column in existing_columns:
    print(f"\nChecking column: {column}")
    print(f"  Rows with non-letter characters: {check_non_letter(column)}")
    print(f"  Rows with multiple spaces: {check_multiple_spaces(column)}")
    print(f"  Rows with uppercase letters: {check_uppercase(column)}")

# Show a sample of cleaned data
print("\nSample of cleaned data:")
df.select(existing_columns).show(5, truncate=False)

# Print column data types
print("\nColumn data types:")
df.printSchema()

# Print summary statistics for string columns
print("\nSummary statistics for string columns:")
df.select([length(col(c)).alias(c) for c in existing_columns]).summary().show()



Columns present in the DataFrame: ['title', 'description']

Checking column: title


  Rows with non-letter characters: 0
  Rows with multiple spaces: 0
  Rows with uppercase letters: 0

Checking column: description


                                                                                

  Rows with non-letter characters: 0


                                                                                

  Rows with multiple spaces: 0


                                                                                

  Rows with uppercase letters: 0

Sample of cleaned data:
+---------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------



+-------+------------------+-----------------+
|summary|             title|      description|
+-------+------------------+-----------------+
|  count|             16966|            16966|
|   mean|27.376340917128374|1136.124720028292|
| stddev|12.778408105827449|828.9325114404571|
|    min|                 3|                0|
|    25%|                18|              556|
|    50%|                25|              952|
|    75%|                33|             1501|
|    max|               134|            14292|
+-------+------------------+-----------------+



                                                                                

### Balance the dataset by undersampling the majority class

In [6]:
# First, let's check the class distribution
class_counts = df.groupBy("fraudulent").count().collect()
total_count = sum(row['count'] for row in class_counts)
print("\nClass distribution before balancing:")
for row in class_counts:
    print(f"Class {row['fraudulent']}: {row['count']} ({row['count']/total_count*100:.2f}%)")

# Identify the minority class
minority_class = min(class_counts, key=lambda x: x['count'])['fraudulent']
minority_count = min(class_counts, key=lambda x: x['count'])['count']

# Calculate the fraction for each class
fractions = {
    0: minority_count / df.filter(col("fraudulent") == 0).count(),
    1: minority_count / df.filter(col("fraudulent") == 1).count()
}

# Use sampleBy to balance the dataset
balanced_df = df.sampleBy("fraudulent", fractions, seed=42)

# Check the new class distribution
new_class_counts = balanced_df.groupBy("fraudulent").count().collect()
new_total_count = sum(row['count'] for row in new_class_counts)
print("\nClass distribution after balancing:")
for row in new_class_counts:
    print(f"Class {row['fraudulent']}: {row['count']} ({row['count']/new_total_count*100:.2f}%)")

# Show the resulting schema
print("\nResulting DataFrame schema:")
balanced_df.printSchema()

# Show the first few rows of the balanced dataset
print("\nFirst few rows of the balanced dataset:")
balanced_df.show(5, truncate=False)


Class distribution before balancing:
Class 1: 886 (5.22%)
Class 0: 16080 (94.78%)

Class distribution after balancing:
Class 1: 886 (50.66%)
Class 0: 863 (49.34%)

Resulting DataFrame schema:
root
 |-- job_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- telecommuting: string (nullable = true)
 |-- has_company_logo: string (nullable = true)
 |-- has_questions: string (nullable = true)
 |-- fraudulent: integer (nullable = true)


First few rows of the balanced dataset:
+------+---------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Split text into words, remove stopwords, and convert text into vectors (your choice of encoding). Help: feature extractors and feature transformers .

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.sql.functions import col, udf, concat_ws
from pyspark.sql.types import StringType

# Define the columns we want to process
text_columns = ["title", "description", "requirements"]

# Check which columns actually exist in the DataFrame
existing_columns = [c for c in text_columns if c in balanced_df.columns]

if not existing_columns:
    raise ValueError("None of the specified text columns exist in the DataFrame")

# Concatenate the existing text columns
balanced_df = balanced_df.withColumn("text", concat_ws(" ", *existing_columns))

# Create a pipeline for text processing and feature extraction
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="raw_features", vocabSize=10000)
idf = IDF(inputCol="raw_features", outputCol="features")

pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, idf])

# Fit and transform the data
model = pipeline.fit(balanced_df)
processed_df = model.transform(balanced_df)

# Select relevant columns
final_df = processed_df.select("job_id", "features", "fraudulent")

# Show the resulting schema
print("\nResulting DataFrame schema:")
final_df.printSchema()

# Show the first few rows of the processed dataset
print("\nFirst few rows of the processed dataset:")
final_df.show(5, truncate=False)

# Get vocabulary for reference
vocabulary = model.stages[2].vocabulary
print("\nTop 20 words in vocabulary:")
print(vocabulary[:20])

                                                                                


Resulting DataFrame schema:
root
 |-- job_id: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- fraudulent: integer (nullable = true)


First few rows of the processed dataset:
+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Perform a random split (70%,30%) of the data into training and test.

In [8]:
# Set a seed for reproducibility
seed = 42

# Perform the random split
train_df, test_df = final_df.randomSplit([0.7, 0.3], seed=seed)

# Cache the DataFrames for faster processing
train_df.cache()
test_df.cache()

# Print the sizes of the resulting datasets
print(f"Full dataset size: {final_df.count()}")
print(f"Training dataset size: {train_df.count()}")
print(f"Test dataset size: {test_df.count()}")

# Show a few examples from each dataset
print("\nSample from training dataset:")
train_df.show(3, truncate=False)

print("\nSample from test dataset:")
test_df.show(3, truncate=False)

## Optionally, you can save these DataFrames for future use
# train_df.write.parquet("dataset/train_data")
# test_df.write.parquet("dataset/test_data")

Full dataset size: 1749
Training dataset size: 1218
Test dataset size: 531

Sample from training dataset:
+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [9]:
# Set log level to ERROR to suppress WARN messages
spark.sparkContext.setLogLevel("ERROR")

### Ugh final

In [10]:
from pyspark.ml.classification import LogisticRegression, LinearSVC, RandomForestClassifier, MultilayerPerceptronClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import pandas as pd
from datetime import datetime
import os

# Function to create parameter grid for each model
def create_param_grid(model_type):
    if isinstance(model_type, LogisticRegression):
        return ParamGridBuilder()\
            .addGrid(model_type.regParam, [0.01, 0.1, 0.3])\
            .addGrid(model_type.elasticNetParam, [0.0, 0.5, 1.0])\
            .addGrid(model_type.maxIter, [10, 50, 100])\
            .build()
            
    elif isinstance(model_type, LinearSVC):
        return ParamGridBuilder()\
            .addGrid(model_type.regParam, [0.01, 0.1, 0.3])\
            .addGrid(model_type.maxIter, [10, 50, 100])\
            .build()
            
    elif isinstance(model_type, RandomForestClassifier):
        return ParamGridBuilder()\
            .addGrid(model_type.numTrees, [10, 50, 100])\
            .addGrid(model_type.maxDepth, [5, 10, 15])\
            .addGrid(model_type.maxBins, [16, 32])\
            .build()
            
    elif isinstance(model_type, MultilayerPerceptronClassifier):
        return ParamGridBuilder()\
            .addGrid(model_type.layers, [[len(train_df.select("features").first()[0]), 20, 10, 2],
                                       [len(train_df.select("features").first()[0]), 40, 20, 2]])\
            .addGrid(model_type.maxIter, [50, 100])\
            .addGrid(model_type.blockSize, [64, 128])\
            .build()

# Define models to evaluate
models = {
    'LogisticRegression': LogisticRegression(labelCol="fraudulent", featuresCol="features"),
    'LinearSVC': LinearSVC(labelCol="fraudulent", featuresCol="features"),
    'RandomForest': RandomForestClassifier(labelCol="fraudulent", featuresCol="features"),
    'MultilayerPerceptron': MultilayerPerceptronClassifier(labelCol="fraudulent", featuresCol="features")
}

# Evaluators
# binary_evaluator = BinaryClassificationEvaluator(labelCol="fraudulent")
multi_evaluator = MulticlassClassificationEvaluator(labelCol="fraudulent")

# Initialize results list for this run
current_results = []

# Train and evaluate each model
for model_name, model in models.items():
    print(f"\nTraining {model_name}...")
    
    # Create parameter grid
    param_grid = create_param_grid(model)
    
    # Create CrossValidator
    cv = CrossValidator(
        estimator=model,
        estimatorParamMaps=param_grid,
        evaluator=multi_evaluator.setMetricName("f1"),
        numFolds=10,
        seed=42
    )
    
    # Fit CrossValidator
    cv_model = cv.fit(train_df)
    
     # Get predictions on test set
    test_predictions = cv_model.transform(test_df)
    
    # Calculate metrics
    f1_score = multi_evaluator.setMetricName("f1").evaluate(test_predictions)
    accuracy = multi_evaluator.setMetricName("accuracy").evaluate(test_predictions)
    
    current_results.append({
        'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'model': model_name,
        'f1_score': f1_score,
        'accuracy': accuracy
    })

# Convert current results to DataFrame
current_df = pd.DataFrame(current_results)

# Find best models
best_f1_model = current_df.loc[current_df['f1_score'].idxmax()]
best_accuracy_model = current_df.loc[current_df['accuracy'].idxmax()]

# Add summary row
summary_row = pd.DataFrame([{
    'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
    'model': f"BEST_F1_{best_f1_model['model']}",
    'f1_score': best_f1_model['f1_score'],
    'accuracy': best_f1_model['accuracy']
}])


# Combine results
final_df = pd.concat([current_df, summary_row])

# Save/update results
results_file = 'model_results.csv'
if os.path.exists(results_file):
    # Append to existing file
    final_df.to_csv(results_file, mode='a', header=False, index=False)
else:
    # Create new file
    final_df.to_csv(results_file, index=False)

# Print current results
print("\nCurrent Run Results:")
print(current_df)
print("\nBest Models:")
print(f"Best F1 Score: {best_f1_model['model']} (F1: {best_f1_model['f1_score']:.4f}, Accuracy: {best_f1_model['accuracy']:.4f})")
print(f"Best Accuracy: {best_accuracy_model['model']} (F1: {best_accuracy_model['f1_score']:.4f}, Accuracy: {best_accuracy_model['accuracy']:.4f})")


Training LogisticRegression...

Training LinearSVC...

Training RandomForest...


                                                                                


Training MultilayerPerceptron...

Current Run Results:
             timestamp                 model  f1_score  accuracy
0  2024-10-22 21:53:12    LogisticRegression  0.841744  0.841808
1  2024-10-22 22:01:31             LinearSVC  0.838041  0.838041
2  2024-10-22 22:06:43          RandomForest  0.821092  0.821092
3  2024-10-22 22:16:11  MultilayerPerceptron  0.832405  0.832392

Best Models:
Best F1 Score: LogisticRegression (F1: 0.8417, Accuracy: 0.8418)
Best Accuracy: LogisticRegression (F1: 0.8417, Accuracy: 0.8418)


### Excel tracking of Models

In [15]:
from pyspark.ml.classification import LogisticRegression, LinearSVC, RandomForestClassifier, MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pandas as pd
from datetime import datetime
import os

class ExcelExperimentTracker:
    def __init__(self, excel_path="model_performance_tracking.xlsx"):
        self.excel_path = excel_path
        self.current_run_results = []
        self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        
        # Load existing Excel file if it exists, otherwise create new DataFrame
        if os.path.exists(excel_path):
            self.existing_df = pd.read_excel(excel_path)
        else:
            self.existing_df = pd.DataFrame(columns=[
                'Run_DateTime', 'Model_Name', 'Hyperparameters', 
                'Accuracy', 'F1_Score', 'Is_Best_Accuracy', 'Is_Best_F1'
            ])
    
    def _format_hyperparameters(self, param_map):
        """Convert parameter map to readable string"""
        params = []
        for param in param_map.keys():
            params.append(f"{param.name}:{param_map[param]}")
        return ", ".join(params)
    
    def log_experiment(self, model_name, best_model, predictions):
        """Log a single experiment results"""
        evaluator_accuracy = MulticlassClassificationEvaluator(
            labelCol="fraudulent", metricName="accuracy")
        evaluator_f1 = MulticlassClassificationEvaluator(
            labelCol="fraudulent", metricName="f1")
        
        accuracy = evaluator_accuracy.evaluate(predictions)
        f1_score = evaluator_f1.evaluate(predictions)
        
        result = {
            'Run_DateTime': self.timestamp,
            'Model_Name': model_name,
            'Hyperparameters': self._format_hyperparameters(best_model.extractParamMap()),
            'Accuracy': accuracy,
            'F1_Score': f1_score,
            'Is_Best_Accuracy': False,  # Will be updated later
            'Is_Best_F1': False        # Will be updated later
        }
        
        self.current_run_results.append(result)
        
    def save_results(self):
        """Save all results to Excel file"""
        if not self.current_run_results:
            return
        
        # Convert current run results to DataFrame
        current_df = pd.DataFrame(self.current_run_results)
        
        # Find best models for current run
        best_accuracy_idx = current_df['Accuracy'].idxmax()
        best_f1_idx = current_df['F1_Score'].idxmax()
        
        current_df.loc[best_accuracy_idx, 'Is_Best_Accuracy'] = True
        current_df.loc[best_f1_idx, 'Is_Best_F1'] = True
        
        # Combine with existing results
        updated_df = pd.concat([self.existing_df, current_df], ignore_index=True)
        
        # Save to Excel
        updated_df.to_excel(self.excel_path, index=False)
        print(f"Results saved to {self.excel_path}")
def create_models_and_params():
    # 1. Logistic Regression
    lr = LogisticRegression(labelCol="fraudulent", featuresCol="features")
    lr_param_grid = ParamGridBuilder() \
        .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
        .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
        .addGrid(lr.maxIter, [10, 50, 100]) \
        .build()

    # 2. Linear SVC
    svc = LinearSVC(labelCol="fraudulent", featuresCol="features")
    svc_param_grid = ParamGridBuilder() \
        .addGrid(svc.regParam, [0.01, 0.1, 1.0]) \
        .addGrid(svc.maxIter, [10, 50, 100]) \
        .addGrid(svc.threshold, [0.3, 0.5, 0.7]) \
        .build()

    # 3. Random Forest
    rf = RandomForestClassifier(labelCol="fraudulent", featuresCol="features")
    rf_param_grid = ParamGridBuilder() \
        .addGrid(rf.numTrees, [10, 50, 100]) \
        .addGrid(rf.maxDepth, [5, 10, 15]) \
        .addGrid(rf.impurity, ['gini', 'entropy']) \
        .build()

    # 4. Multilayer Perceptron
    # We know the feature size is 10000 from the data
    layers = [10000, 20, 10, 2]  # Input layer -> 20 nodes -> 10 nodes -> 2 nodes (output)
    
    mlp = MultilayerPerceptronClassifier(labelCol="fraudulent", 
                                        featuresCol="features",
                                        layers=layers)
    mlp_param_grid = ParamGridBuilder() \
        .addGrid(mlp.maxIter, [50, 100]) \
        .addGrid(mlp.blockSize, [64, 128]) \
        .addGrid(mlp.stepSize, [0.01, 0.03, 0.05]) \
        .build()

    return [
        ("LogisticRegression", lr, lr_param_grid),
        ("LinearSVC", svc, svc_param_grid),
        ("RandomForestClassifier", rf, rf_param_grid),
        ("MultilayerPerceptronClassifier", mlp, mlp_param_grid)
    ]

def run_all_experiments(train_df, test_df):
    # Initialize the Excel tracker
    tracker = ExcelExperimentTracker()
    
    # Create models and parameter grids
    models = create_models_and_params()
    
    # Create evaluators for accuracy and F1 score
    evaluator_accuracy = MulticlassClassificationEvaluator(
        labelCol="fraudulent",
        predictionCol="prediction",
        metricName="accuracy"
    )
    
    evaluator_f1 = MulticlassClassificationEvaluator(
        labelCol="fraudulent",
        predictionCol="prediction",
        metricName="f1"
    )
    
    for model_name, model, param_grid in models:
        print(f"\nTraining {model_name}")
        
        try:
            # Create CrossValidator
            cv = CrossValidator(
                estimator=model,
                estimatorParamMaps=param_grid,
                evaluator=evaluator_accuracy,
                numFolds=10,
                parallelism=4
            )
            
            # Fit the model
            cvModel = cv.fit(train_df)
            
            # Make predictions on train and test sets
            train_predictions = cvModel.transform(train_df)
            test_predictions = cvModel.transform(test_df)
            
            # Calculate and print metrics
            train_accuracy = evaluator_accuracy.evaluate(train_predictions)
            train_f1 = evaluator_f1.evaluate(train_predictions)
            test_accuracy = evaluator_accuracy.evaluate(test_predictions)
            test_f1 = evaluator_f1.evaluate(test_predictions)
            
            print("\nModel Performance Metrics:")
            print(f"Training Accuracy: {train_accuracy:.4f}")
            print(f"Training F1 Score: {train_f1:.4f}")
            print(f"Test Accuracy: {test_accuracy:.4f}")
            print(f"Test F1 Score: {test_f1:.4f}")
            print(f"Best Parameters: {cvModel.bestModel.extractParamMap()}")
            
            # Log the experiment
            tracker.log_experiment(
                model_name=model_name,
                best_model=cvModel.bestModel,
                predictions=test_predictions
            )
            
        except Exception as e:
            print(f"Error training {model_name}: {str(e)}")
            continue
    
    # Save all results to Excel
    tracker.save_results()

# Run the experiments
run_all_experiments(train_df, test_df)


Training LogisticRegression

Model Performance Metrics:
Training Accuracy: 0.9975
Training F1 Score: 0.9975
Test Accuracy: 0.8493
Test F1 Score: 0.8493
Best Parameters: {Param(parent='LogisticRegression_affcf6045731', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2, Param(parent='LogisticRegression_affcf6045731', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_affcf6045731', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto', Param(parent='LogisticRegression_affcf6045731', name='featuresCol', doc='features column name.'): 'features', Param(parent='LogisticRegression_affcf6045731', name='fitIntercept', doc='whether to fit an intercept term.'): True, Param(parent='LogisticRegression_affcf604

                                                                                


Model Performance Metrics:
Training Accuracy: 0.9491
Training F1 Score: 0.9490
Test Accuracy: 0.8154
Test F1 Score: 0.8155
Best Parameters: {Param(parent='RandomForestClassifier_25c86f6c6b02', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True, Param(parent='RandomForestClassifier_25c86f6c6b02', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='RandomForestClassifier_25c86f6c6b02', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='Rand

24/10/21 23:13:19 ERROR StrongWolfeLineSearch: Encountered bad values in function evaluation. Decreasing step size to 0.5
24/10/21 23:13:19 ERROR StrongWolfeLineSearch: Encountered bad values in function evaluation. Decreasing step size to 0.5
24/10/21 23:13:19 ERROR StrongWolfeLineSearch: Encountered bad values in function evaluation. Decreasing step size to 0.5
24/10/21 23:13:32 ERROR StrongWolfeLineSearch: Encountered bad values in function evaluation. Decreasing step size to 0.5
24/10/21 23:13:32 ERROR StrongWolfeLineSearch: Encountered bad values in function evaluation. Decreasing step size to 0.5
24/10/21 23:13:35 ERROR StrongWolfeLineSearch: Encountered bad values in function evaluation. Decreasing step size to 0.5



Model Performance Metrics:
Training Accuracy: 1.0000
Training F1 Score: 1.0000
Test Accuracy: 0.8117
Test F1 Score: 0.8115
Best Parameters: {Param(parent='MultilayerPerceptronClassifier_86b315568482', name='blockSize', doc='block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.'): 128, Param(parent='MultilayerPerceptronClassifier_86b315568482', name='featuresCol', doc='features column name.'): 'features', Param(parent='MultilayerPerceptronClassifier_86b315568482', name='labelCol', doc='label column name.'): 'fraudulent', Param(parent='MultilayerPerceptronClassifier_86b315568482', name='maxIter', doc='max number of iterations (>= 0).'): 50, Param(parent='MultilayerPerceptronClassifier_86b315568482', name='predictionCol', doc='prediction column name.'): 'prediction', Param(parent='MultilayerPerceptronClassifier_86b315568482', name='probabilityCol', doc='Colu

  updated_df = pd.concat([self.existing_df, current_df], ignore_index=True)


ModuleNotFoundError: No module named 'openpyxl'

### JSON Experiment tracker

In [12]:
from pyspark.ml.classification import LogisticRegression, LinearSVC, RandomForestClassifier, MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import time
from datetime import datetime
import json
import os

class ExperimentTracker:
    def __init__(self, base_log_dir="experiment_logs"):
        self.base_log_dir = base_log_dir
        self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.experiment_count = 0
        
        # Create directory structure
        self.experiment_dir = os.path.join(base_log_dir, f"experiment_{self.timestamp}")
        os.makedirs(self.experiment_dir, exist_ok=True)
        
        # Initialize experiment summary
        self.experiment_summary = {
            "start_time": self.timestamp,
            "experiments": []
        }
    
    def log_experiment(self, model_name, model_params, best_model, cvModel, predictions, training_time):
        """Log a single experiment with its parameters and results"""
        self.experiment_count += 1
        
        # Create evaluators for different metrics
        evaluators = {
            'accuracy': MulticlassClassificationEvaluator(labelCol="fraudulent", metricName="accuracy"),
            'f1_score': MulticlassClassificationEvaluator(labelCol="fraudulent", metricName="f1"),
            'areaUnderROC': BinaryClassificationEvaluator(labelCol="fraudulent", metricName="areaUnderROC")
        }
        
        # Calculate metrics
        test_metrics = {metric: evaluator.evaluate(predictions) 
                       for metric, evaluator in evaluators.items()}
        
        # Collect experiment results
        experiment_results = {
            "experiment_id": f"{model_name}_{self.experiment_count}",
            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "model_name": model_name,
            "initial_parameters": str(model_params),
            "best_parameters": str(best_model.extractParamMap()),
            "cross_validation": {
                "num_folds": 10,
                "avg_metrics": cvModel.avgMetrics
            },
            "test_performance": test_metrics,
            "training_time_seconds": training_time
        }
        
        # Save individual experiment results
        experiment_file = os.path.join(
            self.experiment_dir, 
            f"{experiment_results['experiment_id']}.json"
        )
        with open(experiment_file, "w") as f:
            json.dump(experiment_results, f, indent=4)
        
        # Update experiment summary
        self.experiment_summary["experiments"].append(experiment_results)
        self._save_summary()
        
        # Print results
        self._print_experiment_results(experiment_results)
        
        return experiment_results
    
    def _save_summary(self):
        summary_file = os.path.join(self.experiment_dir, "experiment_summary.json")
        with open(summary_file, "w") as f:
            json.dump(self.experiment_summary, f, indent=4)
    
    def _print_experiment_results(self, results):
        print(f"\n{'='*50}")
        print(f"Experiment ID: {results['experiment_id']}")
        print(f"Model: {results['model_name']}")
        print(f"Test Performance:")
        for metric, value in results['test_performance'].items():
            print(f"- {metric}: {value:.4f}")
        print(f"Training Time: {results['training_time_seconds']:.2f} seconds")
        print(f"{'='*50}\n")

    def get_best_experiments(self, metric='accuracy', top_n=5):
        sorted_experiments = sorted(
            self.experiment_summary["experiments"],
            key=lambda x: x["test_performance"][metric],
            reverse=True
        )
        return sorted_experiments[:top_n]

def create_models_and_params():
    # 1. Logistic Regression
    lr = LogisticRegression(labelCol="fraudulent", featuresCol="features")
    lr_param_grid = ParamGridBuilder() \
        .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
        .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
        .addGrid(lr.maxIter, [10, 50, 100]) \
        .build()

    # 2. Linear SVC
    svc = LinearSVC(labelCol="fraudulent", featuresCol="features")
    svc_param_grid = ParamGridBuilder() \
        .addGrid(svc.regParam, [0.01, 0.1, 1.0]) \
        .addGrid(svc.maxIter, [10, 50, 100]) \
        .addGrid(svc.threshold, [0.3, 0.5, 0.7]) \
        .build()

    # 3. Random Forest
    rf = RandomForestClassifier(labelCol="fraudulent", featuresCol="features")
    rf_param_grid = ParamGridBuilder() \
        .addGrid(rf.numTrees, [10, 50, 100]) \
        .addGrid(rf.maxDepth, [5, 10, 15]) \
        .addGrid(rf.impurity, ['gini', 'entropy']) \
        .build()

    # 4. Multilayer Perceptron
    # We know the feature size is 10000 from the data
    layers = [10000, 20, 10, 2]  # Input layer -> 20 nodes -> 10 nodes -> 2 nodes (output)
    
    mlp = MultilayerPerceptronClassifier(labelCol="fraudulent", 
                                        featuresCol="features",
                                        layers=layers)
    mlp_param_grid = ParamGridBuilder() \
        .addGrid(mlp.maxIter, [50, 100]) \
        .addGrid(mlp.blockSize, [64, 128]) \
        .addGrid(mlp.stepSize, [0.01, 0.03, 0.05]) \
        .build()

    return [
        ("LogisticRegression", lr, lr_param_grid),
        ("LinearSVC", svc, svc_param_grid),
        ("RandomForestClassifier", rf, rf_param_grid),
        ("MultilayerPerceptronClassifier", mlp, mlp_param_grid)
    ]

# Create evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="fraudulent", metricName="accuracy")

# Initialize the experiment tracker
tracker = ExperimentTracker()

def run_all_experiments(train_df, test_df):
    models = create_models_and_params()
    
    for model_name, model, param_grid in models:
        print(f"\n{'='*50}")
        print(f"Training {model_name}")
        print(f"Parameter grid: {param_grid}")
        print(f"{'='*50}")
        
        start_time = time.time()
        
        try:
            # Create CrossValidator
            cv = CrossValidator(estimator=model,
                              estimatorParamMaps=param_grid,
                              evaluator=evaluator,
                              numFolds=10,
                              parallelism=4)
            
            # Fit the model
            cvModel = cv.fit(train_df)
            
            # Make predictions on test set
            predictions = cvModel.transform(test_df)
            
            training_time = time.time() - start_time
            
            # Log the experiment
            tracker.log_experiment(
                model_name=model_name,
                model_params=param_grid,
                best_model=cvModel.bestModel,
                cvModel=cvModel,
                predictions=predictions,
                training_time=training_time
            )
            
        except Exception as e:
            print(f"Error training {model_name}: {str(e)}")
            continue

# Run the experiments
run_all_experiments(train_df, test_df)

# Print summary of best results
print("\nBest models by accuracy:")
best_accuracy = tracker.get_best_experiments(metric='accuracy', top_n=4)
for exp in best_accuracy:
    print(f"{exp['model_name']}: Accuracy = {exp['test_performance']['accuracy']:.4f}")

print("\nBest models by F1 score:")
best_f1 = tracker.get_best_experiments(metric='f1_score', top_n=4)
for exp in best_f1:
    print(f"{exp['model_name']}: F1 = {exp['test_performance']['f1_score']:.4f}")

print("\nBest models by AUC-ROC:")
best_auc = tracker.get_best_experiments(metric='areaUnderROC', top_n=4)
for exp in best_auc:
    print(f"{exp['model_name']}: AUC-ROC = {exp['test_performance']['areaUnderROC']:.4f}")


Training LogisticRegression
Parameter grid: [{Param(parent='LogisticRegression_4d0c780a60d0', name='regParam', doc='regularization parameter (>= 0).'): 0.01, Param(parent='LogisticRegression_4d0c780a60d0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_4d0c780a60d0', name='maxIter', doc='max number of iterations (>= 0).'): 10}, {Param(parent='LogisticRegression_4d0c780a60d0', name='regParam', doc='regularization parameter (>= 0).'): 0.01, Param(parent='LogisticRegression_4d0c780a60d0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_4d0c780a60d0', name='maxIter', doc='max number of iterations (>= 0).'): 50}, {Param(parent='LogisticRegression_4d0c780a60d0', name='regParam', doc='regul

                                                                                


Experiment ID: RandomForestClassifier_3
Model: RandomForestClassifier
Test Performance:
- accuracy: 0.8154
- f1_score: 0.8155
- areaUnderROC: 0.9058
Training Time: 157.89 seconds


Training MultilayerPerceptronClassifier
Parameter grid: [{Param(parent='MultilayerPerceptronClassifier_cffa22b2b8e6', name='maxIter', doc='max number of iterations (>= 0).'): 50, Param(parent='MultilayerPerceptronClassifier_cffa22b2b8e6', name='blockSize', doc='block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.'): 64, Param(parent='MultilayerPerceptronClassifier_cffa22b2b8e6', name='stepSize', doc='Step size to be used for each iteration of optimization (>= 0).'): 0.01}, {Param(parent='MultilayerPerceptronClassifier_cffa22b2b8e6', name='maxIter', doc='max number of iterations (>= 0).'): 50, Param(parent='MultilayerPerceptronClassifier_cffa22b2b8e6', name='blockSize', doc='bl

24/10/21 21:53:37 ERROR StrongWolfeLineSearch: Encountered bad values in function evaluation. Decreasing step size to 0.5
24/10/21 21:53:37 ERROR StrongWolfeLineSearch: Encountered bad values in function evaluation. Decreasing step size to 0.5
24/10/21 21:53:37 ERROR StrongWolfeLineSearch: Encountered bad values in function evaluation. Decreasing step size to 0.5
24/10/21 21:53:50 ERROR StrongWolfeLineSearch: Encountered bad values in function evaluation. Decreasing step size to 0.5
24/10/21 21:53:50 ERROR StrongWolfeLineSearch: Encountered bad values in function evaluation. Decreasing step size to 0.5
24/10/21 21:53:53 ERROR StrongWolfeLineSearch: Encountered bad values in function evaluation. Decreasing step size to 0.5



Experiment ID: MultilayerPerceptronClassifier_4
Model: MultilayerPerceptronClassifier
Test Performance:
- accuracy: 0.8117
- f1_score: 0.8115
- areaUnderROC: 0.8896
Training Time: 269.75 seconds


Best models by accuracy:
LogisticRegression: Accuracy = 0.8493
LinearSVC: Accuracy = 0.8362
RandomForestClassifier: Accuracy = 0.8154
MultilayerPerceptronClassifier: Accuracy = 0.8117

Best models by F1 score:
LogisticRegression: F1 = 0.8493
LinearSVC: F1 = 0.8356
RandomForestClassifier: F1 = 0.8155
MultilayerPerceptronClassifier: F1 = 0.8115

Best models by AUC-ROC:
LogisticRegression: AUC-ROC = 0.9265
RandomForestClassifier: AUC-ROC = 0.9058
LinearSVC: AUC-ROC = 0.9057
MultilayerPerceptronClassifier: AUC-ROC = 0.8896


### Basic Experiment of Models

In [17]:
from pyspark.ml.classification import LogisticRegression, LinearSVC, RandomForestClassifier, MultilayerPerceptronClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import VectorIndexer
from pyspark.sql.functions import col
import time

# Assume we're working with train_df and test_df from the previous step

# Function to log results
def log_results(model_name, best_model, cvModel, test_auc):
    results = f"""
    Model: {model_name}
    Best Model Parameters: {best_model.extractParamMap()}
    Best AUC on CV: {cvModel.avgMetrics[0]}
    AUC on Test Set: {test_auc}
    """
    print(results)
    # You can also write this to a file
    with open(f"{model_name}_results.txt", "w") as f:
        f.write(results)

# Prepare the data
indexer = VectorIndexer(inputCol="features", outputCol="indexed_features", maxCategories=4)
indexed_train = indexer.fit(train_df).transform(train_df)
indexed_test = indexer.fit(test_df).transform(test_df)

# Define the evaluator
evaluator = BinaryClassificationEvaluator(labelCol="fraudulent", metricName="areaUnderROC")

# List of models to train
models = [
    ("LogisticRegression", LogisticRegression(labelCol="fraudulent", featuresCol="indexed_features")),
    ("LinearSVC", LinearSVC(labelCol="fraudulent", featuresCol="indexed_features")),
    ("RandomForestClassifier", RandomForestClassifier(labelCol="fraudulent", featuresCol="indexed_features")),
    ("MultilayerPerceptronClassifier", MultilayerPerceptronClassifier(labelCol="fraudulent", featuresCol="indexed_features", layers=[len(indexed_train.select("indexed_features").first()[0]), 5, 4, 2]))
]

# Train and evaluate each model
for model_name, model in models:
    print(f"\nTraining {model_name}...")
    start_time = time.time()

    # Create the parameter grid
    if model_name == "LogisticRegression":
        paramGrid = ParamGridBuilder() \
            .addGrid(model.regParam, [0.01, 0.1, 1.0]) \
            .addGrid(model.elasticNetParam, [0.0, 0.5, 1.0]) \
            .build()
    elif model_name == "LinearSVC":
        paramGrid = ParamGridBuilder() \
            .addGrid(model.regParam, [0.01, 0.1, 1.0]) \
            .addGrid(model.maxIter, [10, 100]) \
            .build()
    elif model_name == "RandomForestClassifier":
        paramGrid = ParamGridBuilder() \
            .addGrid(model.numTrees, [10, 50, 100]) \
            .addGrid(model.maxDepth, [5, 10]) \
            .build()
    else:  # MultilayerPerceptronClassifier
        paramGrid = ParamGridBuilder() \
            .addGrid(model.maxIter, [100, 200]) \
            .addGrid(model.blockSize, [64, 128]) \
            .build()

    # Create the CrossValidator
    cv = CrossValidator(estimator=model,
                        estimatorParamMaps=paramGrid,
                        evaluator=evaluator,
                        numFolds=3)

    # Fit the model
    cvModel = cv.fit(indexed_train)

    # Make predictions on test data
    predictions = cvModel.transform(indexed_test)

    # Evaluate the model
    test_auc = evaluator.evaluate(predictions)

    # Log the results
    log_results(model_name, cvModel.bestModel, cvModel, test_auc)

    end_time = time.time()
    print(f"Time taken to train and evaluate {model_name}: {end_time - start_time:.2f} seconds")

# Optionally, you can save the best model
# best_model.save("path_to_save_model")

24/10/20 16:04:09 WARN DAGScheduler: Broadcasting large task binary with size 1665.8 KiB



Training LogisticRegression...


24/10/20 16:04:10 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:04:10 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:04:10 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:04:10 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:04:10 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:04:10 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:04:10 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:04:10 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:04:11 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:04:11 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:04:11 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:04:11 WARN DAGScheduler: Broadcasting larg


    Model: LogisticRegression
    Best Model Parameters: {Param(parent='LogisticRegression_85c8af2c76b1', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2, Param(parent='LogisticRegression_85c8af2c76b1', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_85c8af2c76b1', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto', Param(parent='LogisticRegression_85c8af2c76b1', name='featuresCol', doc='features column name.'): 'indexed_features', Param(parent='LogisticRegression_85c8af2c76b1', name='fitIntercept', doc='whether to fit an intercept term.'): True, Param(parent='LogisticRegression_85c8af2c76b1', name='labelCol', doc='label column name.'): 'fraudulent', Param(parent='LogisticRegression_85c8af

24/10/20 16:09:13 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:09:13 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:09:13 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:09:13 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:09:14 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:09:14 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:09:14 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:09:14 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:09:14 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:09:14 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:09:14 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:09:14 WARN DAGScheduler: Broadcasting larg


    Model: LinearSVC
    Best Model Parameters: {Param(parent='LinearSVC_31c2aeb5907b', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2, Param(parent='LinearSVC_31c2aeb5907b', name='featuresCol', doc='features column name.'): 'indexed_features', Param(parent='LinearSVC_31c2aeb5907b', name='fitIntercept', doc='whether to fit an intercept term.'): True, Param(parent='LinearSVC_31c2aeb5907b', name='labelCol', doc='label column name.'): 'fraudulent', Param(parent='LinearSVC_31c2aeb5907b', name='maxBlockSizeInMB', doc='maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0.'): 0.0, Param(parent='LinearSVC_31c2aeb5907b', name='maxIter', doc='max number of iterations (>= 0).'): 10, Param(parent='LinearSVC_31c2aeb5907b', name='predictionCol', doc='predi

24/10/20 16:17:41 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:17:41 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:17:41 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:17:41 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:17:42 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
24/10/20 16:17:42 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
24/10/20 16:17:43 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
24/10/20 16:17:43 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
24/10/20 16:17:43 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
24/10/20 16:17:44 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:17:44 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:17:44 WARN DAGScheduler: Broadcasting larg


    Model: RandomForestClassifier
    Best Model Parameters: {Param(parent='RandomForestClassifier_9f79596528d1', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True, Param(parent='RandomForestClassifier_9f79596528d1', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='RandomForestClassifier_9f79596528d1', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='RandomForestClassifier_9f79596528d1', name='featureSubsetStrategy', doc="The number

24/10/20 16:20:06 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:20:06 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:20:06 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:20:06 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:20:06 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:20:07 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:20:07 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:20:07 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:20:07 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:20:07 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:20:07 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/10/20 16:20:07 WARN DAGScheduler: Broadcasting larg


    Model: MultilayerPerceptronClassifier
    Best Model Parameters: {Param(parent='MultilayerPerceptronClassifier_b13a93b2b4e9', name='blockSize', doc='block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.'): 64, Param(parent='MultilayerPerceptronClassifier_b13a93b2b4e9', name='featuresCol', doc='features column name.'): 'indexed_features', Param(parent='MultilayerPerceptronClassifier_b13a93b2b4e9', name='labelCol', doc='label column name.'): 'fraudulent', Param(parent='MultilayerPerceptronClassifier_b13a93b2b4e9', name='maxIter', doc='max number of iterations (>= 0).'): 100, Param(parent='MultilayerPerceptronClassifier_b13a93b2b4e9', name='predictionCol', doc='prediction column name.'): 'prediction', Param(parent='MultilayerPerceptronClassifier_b13a93b2b4e9', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: No

In [9]:
spark.stop()