In [83]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql.types import StringType, DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import col, when, count, lit
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("GCSFilesRead") \
    .getOrCreate()

# file path 
file_location = "/home/bx2051/glucose.csv"

def read_csv_with_inferred_schema(file_path):
    """
    Reads a CSV file with inferred schema.
    transfer string data type to factor

    Args:
        file_path (str): The path to the CSV file.

    Returns:
        pyspark.sql.DataFrame: The DataFrame with inferred schema.
    """
    # Read the CSV file with inferred schema
    data = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv(file_path)
    
    
    return data


df = read_csv_with_inferred_schema(file_location)



In [84]:
limited_df = df.limit(100)

In [85]:

def train_logistic_regression_model(data, max_iter=100, reg_param=0.02, elastic_net_param=0.8):
    """
    Trains a logistic regression model using the given dataset, feature columns, and label column.

    Args:
        data (pyspark.sql.DataFrame): The input DataFrame containing the features and label.
        max_iter (int): Maximum number of iterations (default: 100).
        reg_param (float): Regularization parameter (default: 0.02).
        elastic_net_param (float): ElasticNet mixing parameter, in range [0, 1] (default: 0.8).

    Returns:
        pyspark.ml.classification.LogisticRegressionModel: The trained logistic regression model.
    """
    train_model_dataset = data.withColumn("diabet",
                                          when(data["glucose"] > 6.1, 1).otherwise(0))

    # Drop the original 'glucose' column if you don't need it anymore
    train_model_dataset = train_model_dataset.drop("glucose")

    assembler = VectorAssembler(inputCols=train_model_dataset.columns[:-1], outputCol="features")
    train_model_dataset = assembler.transform(train_model_dataset)

    tri = LogisticRegression(maxIter=max_iter,
                             regParam=reg_param,
                             elasticNetParam=elastic_net_param,
                             featuresCol="features",
                             labelCol="diabet")

    lr_model = tri.fit(train_model_dataset)

    return lr_model

In [86]:
## Set the seed and seperate the train and test data 
seed = 5043
trainingData, testData = limited_df.randomSplit([0.7, 0.3], seed=seed)


# # Assuming your DataFrame is named 'test_model_dataset'
# # Add a new column 'glucose_binary' based on the condition
# test_model_dataset = trainingData.withColumn("diabet", 
#                                                    when(trainingData["glucose"] > 6.1, 1).otherwise(0))

# # Drop the original 'glucose' column if you don't need it anymore
# test_model_dataset = test_model_dataset.drop("glucose")

# # Print the first few rows to verify the change
# # test_model_dataset.show()

# # test_model_dataset.printSchema()
# assembler = VectorAssembler(inputCols= test_model_dataset.columns, outputCol="features")
# test_model_dataset = assembler.transform(test_model_dataset)

# tri=LogisticRegression(maxIter=10,
#                        regParam=0.01,
#                        featuresCol="features",
#                        labelCol="diabet")
# lr_model = tri.fit(test_model_dataset)

In [87]:
## Getting the train model 
lr_model = train_logistic_regression_model(trainingData)

In [88]:
def predict_logistic_regression_result(model, testData):
    """
    Makes predictions using the trained model on the testing data.

    Args:
        model (pyspark.ml.classification.LogisticRegressionModel): The trained logistic regression model.
        test_data (pyspark.sql.DataFrame): The testing data.

    Returns:
        pyspark.sql.DataFrame: The DataFrame with predicted results and actual labels.
    """
    
    testData = testData.withColumn("diabet", 
                                            when(testData["glucose"] > 6.1, 1).otherwise(0))

    testData = testData.drop("glucose")
    
    # Check if 'features' column exists and drop it
    if 'features' in testData.columns:
        testData = testData.drop('features')

    # Assuming 'actual_outcome' is the column in the original data used as the label
    # Make sure it's included when transforming testData
    testData = testData.withColumn("label", testData["diabet"])
    testData = testData.drop("diabet")
    
    assembler = VectorAssembler(inputCols=[col for col in testData.columns if col != "label"], outputCol="features")
    testData = assembler.transform(testData)

        # Make predictions using the trained model
    predictions = model.transform(testData)
    predictions = predictions.select("prediction", "label")
    # Transfer the prediction double type to integer type 
    predictions = predictions.withColumn("prediction", col("prediction").cast("integer"))
    # Example
    # ['prediction',  'label']
       
    return predictions








In [89]:
# testData= testData.drop("glucose")

testData.columns
testData = testData.withColumn("diabet", 
                                                   when(testData["glucose"] > 6.1, 1).otherwise(0))
testData = testData.drop("glucose")


In [90]:
# Check if 'features' column exists and drop it
if 'features' in testData.columns:
    testData = testData.drop('features')

# Assuming 'actual_outcome' is the column in the original data used as the label
# Make sure it's included when transforming testData
testData = testData.withColumn("label", testData["diabet"])
testData = testData.drop("diabet")
    
assembler = VectorAssembler(inputCols=[col for col in testData.columns if col != "label"], outputCol="features")
testData = assembler.transform(testData)

    # Make predictions using the trained model
predictions = lr_model.transform(testData)


In [91]:
# predictions = predictions.select("prediction", "label")
# # Transfer the prediction double type to integer type 
# predictions = predictions.withColumn("prediction", col("prediction").cast("integer"))


# predictions= predict_logistic_regression_result(lr_model, testData)

+----------+-----+
|prediction|label|
+----------+-----+
|         0|    0|
|         0|    0|
|         0|    1|
|         0|    0|
|         0|    0|
|         0|    0|
|         0|    0|
|         0|    0|
|         0|    0|
|         0|    0|
+----------+-----+
only showing top 10 rows



AnalysisException: Cannot resolve column name "glucose" among (age, fy_diastolic_blood_pressure_low, fy_systolic_blood_pressure_high, fy_hemameba, fy_neutrophilic_granulocyte_percentage, fy_percentage_of_lymphocytes, fy_percentage_of_monocytes, fy_absolute_neutrophil_count, fy_lymphocyte_number, fy_absolute_value_of_monocytes, fy_absolute_eosinophil_count, fy_absolute_value_of_basophils, fy_erythrocyte, fy_hemoglobin, fy_mean_corpuscular_volume, fy_mean_corpuscular_hemoglobin, fy_mean_corpuscular_hemoglobin_concentration, fy_red_blood_cell_distribution_width_CV, fy_red_blood_cell_distribution_width_SD, fy_blood_platelet, fy_platelet_distribution_width, fy_mean_platelet_volume, fy_large_platelet_ratio, fy_pct, fy_whole_blood_viscosity_values_1pas_the_shear_rate_200s, fy_whole_blood_viscosity_values_2Pas_shear_rate30s, fy_whole_blood_viscosity_values_3Pas_shear_rate5s, fy_whole_blood_viscosity_values_4Pas_shear_rate1s, fy_ESR_blood_sedimentation, fy_hematocrit56, fy_red_cell_assembling_index, fy_the_K_value_of_the_ESR_equation, fy_glutamic_pyruvic_transaminase, fy_glutamic_oxalacetic_transaminase, fy_total_protein, fy_albumin, fy_total_bilirubin, fy_direct_bilirubin, fy_alkaline_phosphatase, fy_r_glutamyl_transpeptidase, fy_glucose, fy_carbamide, fy_creatinine, fy_trioxypurine, fy_total_cholesterol, fy_triglyceride, fy_high_density_lipoprotein_cholesterol, fy_low_density_lipoprotein_cholesterin, fy_creatine_phosphate_kinase, fy_sreatine_kinase_isoenzyme, fy_lactic_dehydrogenase, fy_a_hydroxybutyrate_dehydrogenase, fy_calcium, sy_phosphorus, fy_millet_grass_millet_c, fy_globulin, fy_albumin_globulin, fy_high_density_low_density, fy_apolipoprotein_ai, fy_apolipoprotein_B100, fy_carcino_embryonic_antigen, fy_alpha_fetoprotein, fy_free_prostate_specific_antigen, fy_total_prostate_specific_antigen, fy_proportion, fy_ph_value, fy_thyroid_stimulating_hormone, fy_free_T3, fy_free_T4, fy_500_left, fy_1000_left, fy_2000_left, fy_3000_left, fy_4000_left, fy_6000_left, fy_500_right, fy_1000_right, fy_2000_right, fy_3000_right, fy_4000_right, fy_6000_right, sy_diastolic_blood_pressure_low, sy_systolic_blood_pressure_high, sy_hemameba, sy_neutrophilic_granulocyte_percentage, sy_percentage_of_lymphocytes, sy_absolute_neutrophil_count, sy_lymphocyte_number, sy_absolute_value_of_monocytes, sy_absolute_eosinophil_count, sy_absolute_value_of_basophils, sy_erythrocyte, sy_hemoglobin, sy_mean_corpuscular_volume, sy_mean_corpuscular_hemoglobin, sy_mean_corpuscular_hemoglobin_concentration, sy_red_blood_cell_distribution_width_CV, sy_red_blood_cell_distribution_width_SD, sy_blood_platelet, sy_platelet_distribution_width, sy_mean_platelet_volume, sy_large_platelet_ratio, sy_pct, sy_whole_blood_viscosity_values_1pas_the_shear_rate_200s, sy_whole_blood_viscosity_values_2Pas_shear_rate30s, sy_whole_blood_viscosity_values_3Pas_shear_rate5s, sy_whole_blood_viscosity_values_4Pas_shear_rate1s, sy_ESR_blood_sedimentation, sy_hematocrit56, sy_red_cell_assembling_index, sy_the_K_value_of_the_ESR_equation, sy_glutamic_pyruvic_transaminase, sy_glutamic_oxalacetic_transaminase, sy_total_protein, sy_albumin, sy_total_bilirubin, sy_direct_bilirubin, sy_alkaline_phosphatase, sy_r_glutamyl_transpeptidase, sy_glucose, sy_carbamide, sy_creatinine, sy_trioxypurine, sy_total_cholesterol, sy_triglyceride, sy_high_density_lipoprotein_cholesterol, sy_low_density_lipoprotein_cholesterin, sy_creatine_phosphate_kinase, sy_sreatine_kinase_isoenzyme, sy_lactic_dehydrogenase, sy_a_hydroxybutyrate_dehydrogenase, sy_calcium, sy_millet_grass_millet_c, sy_globulin, sy_albumin_globulin, sy_high_density_low_density, sy_apolipoprotein_ai, sy_apolipoprotein_B100, sy_carcino_embryonic_antigen, sy_alpha_fetoprotein, sy_free_prostate_specific_antigen, sy_total_prostate_specific_antigen, sy_proportion, sy_ph_value, sy_thyroid_stimulating_hormone, sy_free_T3, sy_free_T4, sy_500_left, sy_1000_left, sy_2000_left, sy_3000_left, sy_4000_left, sy_6000_left, sy_500_right, sy_1000_right, sy_2000_right, sy_3000_right, sy_4000_right, sy_6000_right, label, features)

In [115]:
predictions.show(10)

+----------+-----+
|prediction|label|
+----------+-----+
|         0|    0|
|         0|    0|
|         1|    1|
|         0|    0|
|         0|    0|
|         0|    0|
|         0|    0|
|         0|    0|
|         0|    0|
|         0|    0|
+----------+-----+
only showing top 10 rows



In [19]:
from pyspark.sql.functions import col, when, count

def calculate_sensitivity_specificity(predictions):
    """
    Calculate sensitivity and specificity from a DataFrame containing labels and predictions.
    
    Args:
    predictions (DataFrame): A DataFrame with two columns: 'label' and 'prediction'.
    
    Returns:
    tuple: Returns a tuple containing sensitivity (True Positive Rate) and specificity (True Negative Rate).
    """
    
    # Calculate confusion matrix components
    metrics = predictions.withColumn("TP", when((col("label") == 1) & (col("prediction") == 1), 1).otherwise(0)) \
                          .withColumn("FN", when((col("label") == 1) & (col("prediction") == 0), 1).otherwise(0)) \
                          .withColumn("FP", when((col("label") == 0) & (col("prediction") == 1), 1).otherwise(0)) \
                          .withColumn("TN", when((col("label") == 0) & (col("prediction") == 0), 1).otherwise(0)) \
                          .agg(
                              count(when(col("TP") == 1, True)).alias("TP"),
                              count(when(col("FN") == 1, True)).alias("FN"),
                              count(when(col("FP") == 1, True)).alias("FP"),
                              count(when(col("TN") == 1, True)).alias("TN")
                          ).collect()[0]
    
    TP, FN, FP, TN = metrics["TP"], metrics["FN"], metrics["FP"], metrics["TN"]

    # Calculate sensitivity (True Positive Rate)
    TPR = TP / (TP + FN) if (TP + FN) != 0 else 0
    # Calculate specificity (True Negative Rate)
    TNR = TN / (TN + FP) if (TN + FP) != 0 else 0

    print("Confusion Matrix")
    print("\t\tPredicted Positive\tPredicted Negative\tSensitivity")
    print("Actual Positive\t", TP, "\t\t\t\t", FN, "\t\t", f"{TPR:.3f}")
    print("Actual Negative\t", FP, "\t\t\t\t", TN, "\t\t", f"{TNR:.3f}")    
    
    return TPR, TNR

# Example Usage:
# Assuming 'predictions' is a DataFrame with the necessary columns
# TPR, TNR = calculate_sensitivity_specificity(predictions)


# # Example Usage:
# TPR, TNR = calculate_sensitivity_specificity(predictions)



In [15]:
##Trainning logistic model with validation 

from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression


def train_with_train_validation_split(data):
    
    
    #trainfer the glucose to binary 1 or 0 based on 6.1 and rename to 
    #dia bet 
    train_model_dataset = data.withColumn("diabet",
                                          when(data["glucose"] > 6.1, 1).otherwise(0))

    # Drop the original 'glucose' column if you don't need it anymore
    train_model_dataset = train_model_dataset.drop("glucose")

    assembler = VectorAssembler(inputCols=train_model_dataset.columns[:-1], outputCol="features")
    train_model_dataset = assembler.transform(train_model_dataset)

    # Create an instance of the LogisticRegression estimator
    undefined_model = LogisticRegression(featuresCol="features", labelCol="diabet")

    # Define the parameter grid
    param_grid = ParamGridBuilder() \
        .addGrid(undefined_model.maxIter, [10, 50, 100]) \
        .addGrid(undefined_model.regParam, [0, 0.01, 0.05, 0.1, 0.5, 1]) \
        .addGrid(undefined_model.elasticNetParam, [0.0, 0.1, 0.5, 0.8, 1]) \
        .build()

    # Create a binary classification evaluator
    evaluator = BinaryClassificationEvaluator(labelCol="diabet", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

    # Create a TrainValidationSplit
    tvs = TrainValidationSplit(estimator=undefined_model, estimatorParamMaps=param_grid, evaluator=evaluator, trainRatio=0.667)

    # Fit the TrainValidationSplit to the training data
    tvs_model = tvs.fit(train_model_dataset)

    # Get the best model
    best_model = tvs_model.bestModel

    return best_model

In [16]:
# limited_df = df.limit(100)
# model = train_with_train_validation_split(limited_df)

24/05/15 04:57:24 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/05/15 04:57:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/05/15 04:57:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


In [20]:
# seed = 5043
# trainingData, testData = limited_df.randomSplit([0.7, 0.3], seed=seed)


In [23]:

# predictions= predict_logistic_regression_result(model, testData)
# # Example Usage:
# TPR, TNR = calculate_sensitivity_specificity(predictions)


Confusion Matrix
		Predicted Positive	Predicted Negative	Sensitivity
Actual Positive	 1 				 0 		 1.000
Actual Negative	 0 				 37 		 1.000


In [None]:
from pyspark.sql.functions import when, col

def combine_predictions(prediction1, prediction2, dataset_number):
    """
    Combines predictions from two models based on the corresponding dataset for each data point.

    Args:
        prediction1 (pyspark.sql.DataFrame): The predictions from the first model, with columns "prediction" and "label".
        prediction2 (pyspark.sql.DataFrame): The predictions from the second model, with columns "prediction" and "label".
        dataset_number (pyspark.sql.DataFrame): The DataFrame indicating the dataset for each data point.

    Returns:
        pyspark.sql.DataFrame: A DataFrame with the combined predictions, with columns "prediction" and "label".
    """
    # Rename the columns in prediction1 and prediction2
    prediction1 = prediction1.withColumnRenamed("prediction", "prediction_1").withColumnRenamed("label", "label_1")
    prediction2 = prediction2.withColumnRenamed("prediction", "prediction_2").withColumnRenamed("label", "label_2")

    # Combine the predictions and dataset_number DataFrames
    combined_predictions = prediction1.join(prediction2, on="label_1", how="inner") \
                                      .join(dataset_number, on="label_1", how="inner")

    # Create a new column 'combined_prediction' based on the comparison of predictions
    combined_predictions = combined_predictions.withColumn(
        "combined_prediction",
        when(col("prediction_1") == col("prediction_2"), col("prediction_1"))
        .otherwise(
            when(col("dataset") == "dataset1", col("prediction_1"))
            .otherwise(col("prediction_2"))
        )
    )

    # Select the required columns and rename them
    combined_predictions = combined_predictions.select(
        col("combined_prediction").alias("prediction"),
        col("label_1").alias("label")
    )

    return combined_predictions

In [60]:
from pyspark.sql.functions import col, when
from pyspark.sql.window import Window
from pyspark.sql import Row

def combine_predictions(prediction1, prediction2, dataset_number):
    from pyspark.sql.functions import row_number
    # Create a window specification without partitioning
    windowSpec = Window.orderBy(lit('A'))  # 'A' is a dummy value to ensure a full scan in order

    # Add a row index to each DataFrame that aligns across both DataFrames
    prediction1 = prediction1.withColumn("id", row_number().over(windowSpec) - 1)
    prediction2 = prediction2.withColumn("id", row_number().over(windowSpec) - 1)

    # Ensure the columns are appropriately named to avoid conflicts and confusion
    prediction1 = prediction1.withColumnRenamed("prediction", "prediction1").drop("label")
    prediction2 = prediction2.withColumnRenamed("prediction", "prediction2")

    # Convert dataset_number list to DataFrame
    ids = spark.createDataFrame([(i, d) for i, d in enumerate(dataset_number)], schema="id int, dataset string")

    # Join predictions with the dataset_number DataFrame using the index
    combined_predictions = prediction1.join(prediction2, "id", "inner").join(ids, "id", "inner")

    # Select predictions based on the dataset
    combined_predictions = combined_predictions.withColumn(
        "combined_prediction",
        when(col("dataset") == "dataset1", col("prediction1"))
        .otherwise(col("prediction2"))
    )

    # Select the required columns and rename them
    combined_predictions = combined_predictions.select(
        col("combined_prediction").alias("prediction"),
        "label"  # Choose label1 or label as needed, assuming label alignment or resolve as needed
    )

    return combined_predictions

In [61]:
# #Tesing 
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import lit
# # Initialize Spark Session
# spark = SparkSession.builder.appName("Combine Predictions Test").getOrCreate()

# # Sample data for prediction1
# data1 = [(1, 1), (0, 1), (1, 0)]
# prediction1 = spark.createDataFrame(data1, ["prediction", "label"])

# # Sample data for prediction2
# data2 = [(1, 1), (1, 1), (0, 0)]
# prediction2 = spark.createDataFrame(data2, ["prediction", "label"])

# # Sample dataset_number as a list
# dataset_number = ["dataset1", "dataset2", "dataset1"]

# # Display DataFrames
# prediction1.show()
# prediction2.show()


+----------+-----+
|prediction|label|
+----------+-----+
|         1|    1|
|         0|    1|
|         1|    0|
+----------+-----+

+----------+-----+
|prediction|label|
+----------+-----+
|         1|    1|
|         1|    1|
|         0|    0|
+----------+-----+



In [62]:
# # Example usage:
# # spark = SparkSession.builder.appName("Example").getOrCreate()
# result_df = combine_predictions(prediction1, prediction2, dataset_number)
# result_df.show()

24/05/15 06:13:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/15 06:13:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+-----+
|prediction|label|
+----------+-----+
|         1|    1|
|         1|    0|
|         1|    1|
+----------+-----+

