In [34]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, mean as _mean, lit
from pyspark.sql.functions import col, lit, row_number, rand, when, isnan, count, udf, sum
from pyspark.sql.types import IntegerType, FloatType, DoubleType, LongType, StringType
from pyspark.ml.feature import Imputer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.sql.window import Window

import matplotlib.pyplot as plt
import numpy as np
import requests
from bs4 import BeautifulSoup
import json
from scrapy import Selector

# Create a Spark session
spark = SparkSession.builder \
    .appName("Heart Disease Analysis") \
    .getOrCreate()

df = spark.read.csv("data/heart_disease.csv", header = True, inferSchema = True)

# Get the number of rows in the DataFrame
row_count = df.count()

# Calculate the number of rows to keep (excluding the last two)
rows_to_keep = row_count - 2

# Select all rows except the last two
df = df.limit(rows_to_keep)

print(5)

5


In [35]:
# Selecting only the desired columns
selected_columns = ['age', 'sex', 'painloc', 'painexer', 'cp', 'trestbps', 'smoke', 'fbs', 'prop', 'nitr', 'pro', 'diuretic', 'thaldur', 'thalach', 'exang', 'oldpeak', 'slope', 'target']

# Selecting only the desired columns
df = df.select(*selected_columns)

In [36]:
binary_attributes = [
    'painloc',      # Chest pain location (1 = substernal, 0 = otherwise)
    'painexer',     # Whether pain is provoked by exertion (1 = yes, 0 = no)
    'fbs',          # Fasting blood sugar > 120 mg/dL (1 = true, 0 = false)
    'prop',         # Beta blocker used during exercise ECG (1 = yes, 0 = no)
    'nitr',         # Nitrates used during exercise ECG (1 = yes, 0 = no)
    'pro',          # Calcium channel blocker used during exercise ECG (1 = yes, 0 = no)
    'diuretic',     # Diuretic used during exercise ECG (1 = yes, 0 = no)
    'exang'
]

for column in binary_attributes:
    # Calculate the mode of the binary column
    mode_value = df.groupBy(column).count().orderBy('count', ascending=False).first()[0]
    
    # Replace non-binary and NaN values with the mode
    df = df.withColumn(column, when((col(column).isNull()) | (~col(column).isin(0, 1)), mode_value).otherwise(col(column)))

# Replace missing values in 'thaldur' column with the average of the column
thaldur_average = df.agg(_mean(col('thaldur')).alias('mean')).first()['mean']
df = df.withColumn('thaldur', when(col('thaldur').isNull(), thaldur_average).otherwise(col('thaldur')))

# Replace missing values in 'thalach' column with the average of the column
thalach_average = df.agg(_mean(col('thalach')).alias('mean')).first()['mean']
df = df.withColumn('thalach', when(col('thalach').isNull(), thalach_average).otherwise(col('thalach')))

# Replace missing values in 'trestbps' column with the average of the column
trestbps_average = df.agg(_mean(col('trestbps')).alias('mean')).first()['mean']
df = df.withColumn('trestbps', when(col('trestbps').isNull(), trestbps_average).otherwise(col('trestbps')))

# Calculate the average of the 'oldpeak' column
average_oldpeak = df.agg(_mean(col('oldpeak')).alias('mean')).first()['mean']

# Replace missing values, values less than 0, and values greater than 4 with the average
df = df.withColumn('oldpeak', when(col('oldpeak').isNull() | (col('oldpeak') < 0) | (col('oldpeak') > 4), average_oldpeak).otherwise(col('oldpeak')))

valid_categories = {
    'cp': {1, 2, 3, 4},
    'slope': {1, 2, 3},
}

# for column, valid_set in valid_categories.items():
#     mode_value = df.groupBy(column).count().orderBy('count', ascending=False).first()[0]
#     df = df.withColumn(column, when(~col(column).isin(valid_set), mode_value).otherwise(col(column)))

# Function to impute invalid values with mode
def impute_with_mode(df, column, valid_set):
    mode_value = df.groupBy(column).count().orderBy('count', ascending=False).first()[0]
    df = df.withColumn(column, when(~col(column).isin(valid_set) | col(column).isNull(), mode_value).otherwise(col(column)))
    return df

# Apply the imputation to each column
for column, valid_set in valid_categories.items():
    df = impute_with_mode(df, column, valid_set)

print(5)

df.show()

5
+---+---+-------+--------+---+--------+-----+---+----+----+---+--------+-------+-------+-----+-------+-----+------+
|age|sex|painloc|painexer| cp|trestbps|smoke|fbs|prop|nitr|pro|diuretic|thaldur|thalach|exang|oldpeak|slope|target|
+---+---+-------+--------+---+--------+-----+---+----+----+---+--------+-------+-------+-----+-------+-----+------+
| 63|  1|      1|       1|  1|   145.0| null|  1|   0|   0|  0|       0|   10.5|  150.0|    0|    2.3|    3|     0|
| 67|  1|      1|       1|  4|   160.0| null|  0|   1|   0|  0|       0|    9.5|  108.0|    1|    1.5|    2|     1|
| 67|  1|      1|       1|  4|   120.0| null|  0|   1|   0|  0|       0|    8.5|  129.0|    1|    2.6|    2|     1|
| 37|  1|      1|       1|  3|   130.0| null|  0|   1|   0|  0|       0|   13.0|  187.0|    0|    3.5|    3|     0|
| 41|  0|      1|       1|  2|   130.0| null|  0|   0|   0|  0|       0|    7.0|  172.0|    0|    1.4|    1|     0|
| 56|  1|      1|       1|  2|   120.0| null|  0|   0|   0|  0|       

In [37]:
#Fill the 'smoke' column with 0s and 1s based on a random number generator
df = df.withColumn('smoke', when(rand() > 0.5, 1).otherwise(0))

df.show()

+---+---+-------+--------+---+--------+-----+---+----+----+---+--------+-------+-------+-----+-------+-----+------+
|age|sex|painloc|painexer| cp|trestbps|smoke|fbs|prop|nitr|pro|diuretic|thaldur|thalach|exang|oldpeak|slope|target|
+---+---+-------+--------+---+--------+-----+---+----+----+---+--------+-------+-------+-----+-------+-----+------+
| 63|  1|      1|       1|  1|   145.0|    0|  1|   0|   0|  0|       0|   10.5|  150.0|    0|    2.3|    3|     0|
| 67|  1|      1|       1|  4|   160.0|    1|  0|   1|   0|  0|       0|    9.5|  108.0|    1|    1.5|    2|     1|
| 67|  1|      1|       1|  4|   120.0|    0|  0|   1|   0|  0|       0|    8.5|  129.0|    1|    2.6|    2|     1|
| 37|  1|      1|       1|  3|   130.0|    0|  0|   1|   0|  0|       0|   13.0|  187.0|    0|    3.5|    3|     0|
| 41|  0|      1|       1|  2|   130.0|    0|  0|   0|   0|  0|       0|    7.0|  172.0|    0|    1.4|    1|     0|
| 56|  1|      1|       1|  2|   120.0|    1|  0|   0|   0|  0|       0|

In [14]:
# Set the webpage URL for fetching data
data_url = "https://www.abs.gov.au/statistics/health/health-conditions-and-risks/smoking-and-vaping/latest-release"
# Send a GET request to the URL
web_response = requests.get(data_url)
# Parse the HTML content using BeautifulSoup
html_content = BeautifulSoup(web_response.content, 'html.parser')
# Specify a key phrase from the chart caption to locate the right data
search_caption = "Proportion of people 15 years and over who were current daily smokers by age, 2011"
# Initialize variable to store the desired div
target_div = None

# Loop through all div elements with the specified class
for container in html_content.find_all('div', {'class': 'chart-data-wrapper'}):
    # Extract the caption text
    chart_caption = container.find('pre', {'class': 'chart-caption'}).text
    # Check if the specified caption part is in the extracted caption
    if search_caption in chart_caption:
        target_div = container
        break

# Parse and extract chart data from JSON format
chart_data = json.loads(target_div.find('pre', {'class': 'chart-data'}).text)
desired_values = chart_data[7]

# Smoking rates by age group as extracted
smoking_rates = [item for sublist in desired_values for item in sublist]

# Define age bins corresponding to the age groups in the rate table
bins = [0, 17, 24, 34, 44, 54, 64, 74, 120]
labels = [0, 1, 2, 3, 4, 5, 6, 7]

# Function to assign each age to an age group
def assign_age_group(age):
    for i, bin_end in enumerate(bins[1:]):
        if age < bin_end:
            return labels[i]
    return labels[-1]

assign_age_group_udf = udf(assign_age_group, IntegerType())

# Apply the UDF to create the age group column
df = df.withColumn('age_group_ABS', assign_age_group_udf(col('age')))

# Function to impute NaN based on smoking probability
def impute_smoking(abs_smoke, age_group):
    if abs_smoke is None:
        rate = smoking_rates[int(age_group)]
        return 1 if np.random.rand() < rate / 100 else 0
    else:
        return abs_smoke

impute_smoking_udf = udf(impute_smoking, IntegerType())

# Apply the UDF to create the ABS smoke column
df = df.withColumn('ABS_smoke', impute_smoking_udf(col('smoke'), col('age_group_ABS')))





# Fetch data from CDC website
source_url = "https://www.cdc.gov/tobacco/data_statistics/fact_sheets/adult_data/cig_smoking/index.htm"
server_response = requests.get(source_url)
if server_response.status_code != 200:
    print("Failed to retrieve data")

html_data = server_response.content
selector = Selector(text=html_data)
target_div = selector.xpath("//div[@class='row '][3]")

list_selector = target_div.xpath("//ul[@class='block-list']")
gender_data = list_selector[0].xpath(".//li/text()").getall()
age_data = list_selector[1].xpath(".//li/text()").getall()

male_rate = float(gender_data[0].split("(")[1].split("%)")[0])
female_rate = float(gender_data[1].split("(")[1].split("%)")[0])

age_rates = {}
for item in age_data:
    age_range = item.split("aged ")[1].split(" years")[0]
    rate = float(item.split("(")[1].split("%)")[0])
    if "–" in age_range:
        age_limits = age_range.split("–")
        age_rates[(int(age_limits[0]), int(age_limits[1]))] = rate
    else:
        age_rates[(int(age_range), float('inf'))] = rate

adjusted_male_rates = {key: value * (male_rate / female_rate) for key, value in age_rates.items()}

print("Adjusted male smoking rates by age:", adjusted_male_rates)
print("Female smoking rates by age:", age_rates)

bins = [18, 24, 44, 64, float('inf')]
labels = [(18, 24), (25, 44), (45, 64), (65, float('inf'))]

# Function to assign each age to a CDC age group
def assign_cdc_age_group(age):
    for i, bin_end in enumerate(bins[1:]):
        if age < bin_end:
            return labels[i]
    return labels[-1]

assign_cdc_age_group_udf = udf(assign_cdc_age_group, IntegerType())

# Apply the UDF to create the CDC age group column
df = df.withColumn('age_group_CDC', assign_cdc_age_group_udf(col('age')))


# Function to impute smoking based on CDC data
def impute_cdc_smoking(smoke, age_group, sex):
    if smoke is None:
        age_group = tuple(age_group)
        if sex == 1:  # Male
            rate = adjusted_male_rates.get(age_group, 0)
            return 1 if np.random.rand() < rate / 100 else 0
        else:  # Female
            rate = age_rates.get(age_group, 0)
            return 1 if np.random.rand() < rate / 100 else 0
    else:
        return smoke

impute_cdc_smoking_udf = udf(impute_cdc_smoking, IntegerType())


# Apply the UDF to create the CDC smoke column
df = df.withColumn('CDC_smoke', impute_cdc_smoking_udf(col('smoke'), col('age_group_CDC'), col('sex')))


# Function to impute the 'smoke' column based on ABS and CDC smoke columns
def impute_smoke(smoke, abs_smoke, cdc_smoke):
    if smoke is None:
        if abs_smoke == 0 and cdc_smoke == 0:
            return 0
        else:
            return 1
    else:
        return smoke

impute_smoke_udf = udf(impute_smoke, IntegerType())

# Apply the UDF to update the 'smoke' column
df = df.withColumn('smoke', impute_smoke_udf(col('smoke'), col('ABS_smoke'), col('CDC_smoke')))


Adjusted male smoking rates by age: {(18, 24): 6.874257425742575, (25, 44): 16.342574257425742, (45, 64): 19.325742574257426, (65, inf): 10.765346534653467}
Female smoking rates by age: {(18, 24): 5.3, (25, 44): 12.6, (45, 64): 14.9, (65, inf): 8.3}


In [33]:
# Drop unnecessary columns
#df = df.drop('age_group_ABS', 'age_group_CDC') if 'age_group_ABS' in df.columns and 'age_group_CDC' in df.columns else df

df.show(df.count())

# Check for null values in each column
null_checks = [sum(col(column).isNull().cast("int")).alias(column) for column in df.columns]
    
# Aggregate results to get the count of nulls in each column
null_counts = df.agg(*null_checks).collect()[0].asDict()
    
# Check if there are any null values
has_nulls = any(value > 0 for value in null_counts.values())
    
# Print the results
print(f"Null counts per column: {null_counts}")
print(f"DataFrame has null values: {has_nulls}")
        
# Fill the 'smoke' column with 0s and 1s based on a random number generator
df = df.withColumn('smoke', when(rand() > 0.5, 1).otherwise(0))

df.show()

# Split the data into features and target
target_column = 'target'
feature_columns = [column for column in df.columns if column != target_column]

# Split the data with stratification
stratified_df = df.withColumn('rand', rand())
train_df = stratified_df.where(col('rand') >= 0.1).drop('rand')
test_df = stratified_df.where(col('rand') < 0.1).drop('rand')

# Count NaNs in each column
nan_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])

# Setting up the logistic regression with hyperparameter grid
log_reg = LogisticRegression(labelCol=target_column)
log_reg_param_grid = ParamGridBuilder() \
    .addGrid(log_reg.regParam, [0.01, 0.1, 1, 10, 100]) \
    .build()

# Setting up cross-validation
crossval_log_reg = CrossValidator(estimator=log_reg,
                                  estimatorParamMaps=log_reg_param_grid,
                                  evaluator=MulticlassClassificationEvaluator(labelCol=target_column, metricName='accuracy'),
                                  numFolds=5)

# Fit logistic regression model
log_reg_model = crossval_log_reg.fit(train_df)
best_log_reg_model = log_reg_model.bestModel

print("Best parameters for Logistic Regression:", best_log_reg_model.extractParamMap())
print("Cross-validated accuracy:", log_reg_model.avgMetrics[0])

# Setting up the random forest classifier with hyperparameter grid
rf = RandomForestClassifier(labelCol=target_column)
rf_param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 50, 100, 200]) \
    .addGrid(rf.maxDepth, [5, 10, 20, 30]) \
    .build()

# Setting up cross-validation
crossval_rf = CrossValidator(estimator=rf,
                             estimatorParamMaps=rf_param_grid,
                             evaluator=MulticlassClassificationEvaluator(labelCol=target_column, metricName='accuracy'),
                             numFolds=5)

# Fit random forest model
rf_model = crossval_rf.fit(train_df)
best_rf_model = rf_model.bestModel

print("Best parameters for Random Forest:", best_rf_model.extractParamMap())
print("Cross-validated accuracy:", rf_model.avgMetrics[0])

# Compare the performance and select the best model
if log_reg_model.avgMetrics[0] > rf_model.avgMetrics[0]:
    final_model = best_log_reg_model
    print("Selected Logistic Regression as the final model.")
else:
    final_model = best_rf_model
    print("Selected Random Forest as the final model.")

# Final evaluation on the test data
predictions = final_model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(labelCol=target_column, metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

print("Performance on the test set:")
print("Accuracy:", accuracy)

+---+---+-------+--------+---+------------------+-----+---+----+----+---+--------+-----------------+------------------+-----+------------------+-----+------+
|age|sex|painloc|painexer| cp|          trestbps|smoke|fbs|prop|nitr|pro|diuretic|          thaldur|           thalach|exang|           oldpeak|slope|target|
+---+---+-------+--------+---+------------------+-----+---+----+----+---+--------+-----------------+------------------+-----+------------------+-----+------+
| 63|  1|      1|       1|  1|             145.0|    0|  1|   0|   0|  0|       0|             10.5|             150.0|    0|               2.3|    3|     0|
| 67|  1|      1|       1|  4|             160.0|    0|  0|   1|   0|  0|       0|              9.5|             108.0|    1|               1.5|    2|     1|
| 67|  1|      1|       1|  4|             120.0|    0|  0|   1|   0|  0|       0|              8.5|             129.0|    1|               2.6|    2|     1|
| 37|  1|      1|       1|  3|             130.0|   

AttributeError: __provides__

In [141]:
def pipeline(data):
    # Initialize Spark session
    spark = SparkSession.builder.appName("ModelTrainingWithoutVectorAssembler").getOrCreate()

    # Drop unnecessary columns
    data = data.drop('age_group_ABS', 'age_group_CDC') if 'age_group_ABS' in data.columns and 'age_group_CDC' in data.columns else data

    # Fill the 'smoke' column with 0s and 1s based on a random number generator
    data = data.withColumn('smoke', when(rand() > 0.5, 1).otherwise(0))

    # Define target and feature columns
    target_column = 'target'
    feature_columns = [column for column in data.columns if column != target_column]

    # Splitting the data with stratification
    stratified_data = data.withColumn('rand', rand())
    train_data = stratified_data.where(col('rand') >= 0.1).drop('rand')
    test_data = stratified_data.where(col('rand') < 0.1).drop('rand')

    # Count NaNs in each column
    nan_counts = data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns])
    #nan_counts.show()

    # Identify numeric and string features
    numeric_features = [f.name for f in data.schema.fields if isinstance(f.dataType, (DoubleType, FloatType, IntegerType, LongType))]
    string_features = [f.name for f in data.schema.fields if isinstance(f.dataType, StringType)]

    # Index string features
    indexed_string_columns = [f"{col}_Index" for col in string_features]
    indexers = [StringIndexer(inputCol=col, outputCol=indexed_col, handleInvalid="keep") for col, indexed_col in zip(string_features, indexed_string_columns)]

    # Impute missing values
    imputed_string_columns = [f"Imputed_{col}" for col in indexed_string_columns]
    imputers_string = [Imputer(inputCol=indexed_col, outputCol=imputed_col, strategy="mode") for indexed_col, imputed_col in zip(indexed_string_columns, imputed_string_columns)]
    imputed_numeric_columns = [f"Imputed_{col}" for col in numeric_features]
    imputer_numeric = Imputer(inputCols=numeric_features, outputCols=imputed_numeric_columns, strategy="mean")

    # Assemble feature columns into a single feature vector
    assembler = VectorAssembler(
        inputCols=imputed_numeric_columns + imputed_string_columns,
        outputCol="features"
    )

    # Define classifiers
    log_reg = LogisticRegression(labelCol=target_column, featuresCol="features")
    rf = RandomForestClassifier(labelCol=target_column, featuresCol="features")

    # Set up the parameter grids
    log_reg_param_grid = ParamGridBuilder() \
        .addGrid(log_reg.regParam, [0.01, 0.1, 1, 10, 100]) \
        .build()

    rf_param_grid = ParamGridBuilder() \
        .addGrid(rf.numTrees, [10, 50, 100, 200]) \
        .addGrid(rf.maxDepth, [5, 10, 20, 30]) \
        .build()

    # Set up the cross-validators
    evaluator = MulticlassClassificationEvaluator(labelCol=target_column, metricName="accuracy")

    log_reg_cv = CrossValidator(
        estimator=Pipeline(stages=indexers + imputers_string + [imputer_numeric, assembler, log_reg]),
        estimatorParamMaps=log_reg_param_grid,
        evaluator=evaluator,
        numFolds=5
    )

    rf_cv = CrossValidator(
        estimator=Pipeline(stages=indexers + imputers_string + [imputer_numeric, assembler, rf]),
        estimatorParamMaps=rf_param_grid,
        evaluator=evaluator,
        numFolds=5
    )

    # Fit the models
    log_reg_model = log_reg_cv.fit(train_data)
    rf_model = rf_cv.fit(train_data)

    # Compare the performance and select the best model
    if log_reg_model.avgMetrics[0] > rf_model.avgMetrics[0]:
        final_model = log_reg_model.bestModel
        print("Selected Logistic Regression as the final model.")
    else:
        final_model = rf_model.bestModel
        print("Selected Random Forest as the final model.")

    # Final evaluation on the test data
    predictions = final_model.transform(test_data)
    accuracy = evaluator.evaluate(predictions)

    print("Performance on the test set:")
    print(f"Accuracy: {accuracy}")

pipeline(df)

24/05/22 06:40:24 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


AttributeError: __provides__

In [38]:
# Assuming df is your DataFrame and selected_columns are defined
feature_columns = selected_columns

# Assemble features
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df = vector_assembler.transform(df)

# Rename the target column to label
df = df.withColumnRenamed("target", "label")

# Split the DataFrame into training and testing sets
train_df, test_df = df.randomSplit([0.9, 0.1], seed=42)

def evaluate_model(predictions):
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    return evaluator.evaluate(predictions)

# Train and evaluate a logistic regression model
log_reg = LogisticRegression(labelCol='label', featuresCol='features')
log_reg_model = log_reg.fit(train_df)
log_reg_predictions = log_reg_model.transform(test_df)
log_reg_accuracy = evaluate_model(log_reg_predictions)

# Train and evaluate a decision tree model
decision_tree = DecisionTreeClassifier(labelCol='label', featuresCol='features')
decision_tree_model = decision_tree.fit(train_df)
decision_tree_predictions = decision_tree_model.transform(test_df)
decision_tree_accuracy = evaluate_model(decision_tree_predictions)

# Train and evaluate a random forest model
random_forest = RandomForestClassifier(labelCol='label', featuresCol='features')
random_forest_model = random_forest.fit(train_df)
random_forest_predictions = random_forest_model.transform(test_df)
random_forest_accuracy = evaluate_model(random_forest_predictions)

# Determine the best model based on accuracy
accuracies = {
    "Logistic Regression": log_reg_accuracy,
    "Decision Tree": decision_tree_accuracy,
    "Random Forest": random_forest_accuracy
}
best_model_name, best_model_accuracy = max(accuracies.items(), key=lambda item: item[1])

print(f"The best model is {best_model_name} with an accuracy of {best_model_accuracy:.2f}")

# Select the best model and evaluate it on the test set
best_model = log_reg_model if best_model_name == "Logistic Regression" else \
             decision_tree_model if best_model_name == "Decision Tree" else \
             random_forest_model

best_model_predictions = best_model.transform(test_df)
test_set_accuracy = evaluate_model(best_model_predictions)
print(f"Test set accuracy: {test_set_accuracy:.2f}")

# Generate classification report
predictions_and_labels = best_model_predictions.select("prediction", "label").rdd.map(lambda row: (float(row[0]), float(row[1])))
metrics = MulticlassMetrics(predictions_and_labels)

print("Classification report on test set:")
print(metrics.confusionMatrix().toArray())

Exception ignored in: <function JavaWrapper.__del__ at 0x7d3f6e14acb0>
Traceback (most recent call last):
  File "/tmp/demos/lib/python3.10/site-packages/pyspark/ml/wrapper.py", line 53, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'LogisticRegression' object has no attribute '_java_obj'
Exception ignored in: <function JavaWrapper.__del__ at 0x7d3f6e14acb0>
Traceback (most recent call last):
  File "/tmp/demos/lib/python3.10/site-packages/pyspark/ml/wrapper.py", line 53, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'LogisticRegression' object has no attribute '_java_obj'
Exception ignored in: <function JavaWrapper.__del__ at 0x7d3f6e14acb0>
Traceback (most recent call last):
  File "/tmp/demos/lib/python3.10/site-packages/pyspark/ml/wrapper.py", line 53, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'LogisticRegression' o

AttributeError: __provides__