# DP2 - Project
## Team:
- Bauer Fabian (h12308122)
- Bruckner-Hrubesch Sebastian (h)
- Rivalta Florian (h12319658)
- Zährer Raphael (h12311217)

---
### Creating a spark session
- importing all libraries
- setting up a spark session

In [1]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator, MulticlassClassificationEvaluator



try:
    findspark.init("/usr/local/spark/")

    spark = SparkSession.builder \
       .master("local") \
       .appName("project_breast_cancer") \
       .config("spark.executor.memory", "1gb") \
       .getOrCreate()

    sc = spark.sparkContext
    
except:
    print("There was an issue when creating a Spark session")

---
### Reading in the CSV and transform to RDD
- splitting the "row-strings" by comma

In [2]:
def reading_file(filename):

    try:
        rdd = sc.textFile(f"{filename}", minPartitions=4)
        rdd = rdd.map(lambda line: line.split(","))
        return rdd
    
    except:
        print(f"Unable to read in the file with filePath: {filename}")

In [3]:
rdd_bc = reading_file("breast_cancer_wisconsin.csv")

---
### Converting the RDD to a DataFrame
- filtering out the first row because it is empty
- transforming the columns to the right dtype

In [4]:
def convert_to_df(rdd):
    
    columns = ["ID", "diagnosis", "radius_mean", "texture_mean", "perimeter_mean", "area_mean", 
               "smoothness_mean", "compactness_mean", "concavity_mean", "concave_points_mean", 
               "symmetry_mean", "fractal_dimension_mean", "radius_se", "texture_se", 
               "perimeter_se", "area_se", "smoothness_se", "compactness_se", "concavity_se", 
               "concave_points_se", "symmetry_se", "fractal_dimension_se", "radius_worst", 
               "texture_worst", "perimeter_worst", "area_worst", "smoothness_worst", 
               "compactness_worst", "concavity_worst", "concave_points_worst", 
               "symmetry_worst", "fractal_dimension_worst"]
    
    
    rdd_header = rdd.first()
    rdd = rdd.filter(lambda x: x != rdd_header)
    
    df = rdd.map(lambda line: Row(**{columns[i]: line[i] for i in range(len(columns))})).toDF()
    
    columns_to_cast = columns[2:]
    
    for col_name in columns_to_cast:
        df = df.withColumn(col_name, col(col_name).cast(FloatType()))
    
    df = df.withColumn("ID", col("ID").cast(IntegerType()))
    df = df.withColumn("diagnosis", when(col("diagnosis") == "M", 1).otherwise(0).cast(IntegerType()))
    
    return df


In [5]:
df_bc = convert_to_df(rdd_bc)
df_bc.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- diagnosis: integer (nullable = false)
 |-- radius_mean: float (nullable = true)
 |-- texture_mean: float (nullable = true)
 |-- perimeter_mean: float (nullable = true)
 |-- area_mean: float (nullable = true)
 |-- smoothness_mean: float (nullable = true)
 |-- compactness_mean: float (nullable = true)
 |-- concavity_mean: float (nullable = true)
 |-- concave_points_mean: float (nullable = true)
 |-- symmetry_mean: float (nullable = true)
 |-- fractal_dimension_mean: float (nullable = true)
 |-- radius_se: float (nullable = true)
 |-- texture_se: float (nullable = true)
 |-- perimeter_se: float (nullable = true)
 |-- area_se: float (nullable = true)
 |-- smoothness_se: float (nullable = true)
 |-- compactness_se: float (nullable = true)
 |-- concavity_se: float (nullable = true)
 |-- concave_points_se: float (nullable = true)
 |-- symmetry_se: float (nullable = true)
 |-- fractal_dimension_se: float (nullable = true)
 |-- radius_worst: float (n

In [6]:
num_rows = df_bc.count()
num_columns = len(df_bc.columns)

print(f"Number of rows: {df_bc.count()}, Number of columns: {len(df_bc.columns)}")

Number of rows: 569, Number of columns: 32


---
### Removing unnecessary columns
- removing columns that are not important for our linear regression model

In [7]:
def removing_cols(df):
    df = df.drop("ID")
    return df

In [8]:
cleaned_df = removing_cols(df_bc)
cleaned_df.printSchema()

root
 |-- diagnosis: integer (nullable = false)
 |-- radius_mean: float (nullable = true)
 |-- texture_mean: float (nullable = true)
 |-- perimeter_mean: float (nullable = true)
 |-- area_mean: float (nullable = true)
 |-- smoothness_mean: float (nullable = true)
 |-- compactness_mean: float (nullable = true)
 |-- concavity_mean: float (nullable = true)
 |-- concave_points_mean: float (nullable = true)
 |-- symmetry_mean: float (nullable = true)
 |-- fractal_dimension_mean: float (nullable = true)
 |-- radius_se: float (nullable = true)
 |-- texture_se: float (nullable = true)
 |-- perimeter_se: float (nullable = true)
 |-- area_se: float (nullable = true)
 |-- smoothness_se: float (nullable = true)
 |-- compactness_se: float (nullable = true)
 |-- concavity_se: float (nullable = true)
 |-- concave_points_se: float (nullable = true)
 |-- symmetry_se: float (nullable = true)
 |-- fractal_dimension_se: float (nullable = true)
 |-- radius_worst: float (nullable = true)
 |-- texture_worst:

In [9]:
num_rows = cleaned_df.count()
num_columns = len(cleaned_df.columns)

print(f"Number of rows: {cleaned_df.count()}, Number of columns: {len(cleaned_df.columns)}")

Number of rows: 569, Number of columns: 31


---
### Check if there are null values

In [10]:
def check_for_null_values(df):
    
    null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
    result = null_counts.head()

    null_counts_dict = {col_name: result[col_name] for col_name in result.asDict()}

    for column, null_count in null_counts_dict.items():
        print(f"{column}: {null_count}")

check_for_null_values(cleaned_df)

diagnosis: 0
radius_mean: 0
texture_mean: 0
perimeter_mean: 0
area_mean: 0
smoothness_mean: 0
compactness_mean: 0
concavity_mean: 0
concave_points_mean: 0
symmetry_mean: 0
fractal_dimension_mean: 0
radius_se: 0
texture_se: 0
perimeter_se: 0
area_se: 0
smoothness_se: 0
compactness_se: 0
concavity_se: 0
concave_points_se: 0
symmetry_se: 0
fractal_dimension_se: 0
radius_worst: 0
texture_worst: 0
perimeter_worst: 0
area_worst: 0
smoothness_worst: 0
compactness_worst: 0
concavity_worst: 0
concave_points_worst: 0
symmetry_worst: 0
fractal_dimension_worst: 0


---
### Machine Learning - Logistic Regression

In [13]:
def evaluate_logistic_regression_model(df):
    
    feature_columns = [col for col in df.columns if col != 'diagnosis']
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
    lr = LogisticRegression(featuresCol="scaled_features", labelCol="diagnosis")

    pipeline = Pipeline(stages=[assembler, scaler, lr])

    train_df, test_df = df.randomSplit([0.8, 0.2], seed=1234)
    model = pipeline.fit(train_df)
    predictions = model.transform(test_df)

    evaluator_auc = BinaryClassificationEvaluator(labelCol="diagnosis", metricName="areaUnderROC")
    auc = evaluator_auc.evaluate(predictions)
    
    tp = predictions.filter((col("diagnosis") == 1) & (col("prediction") == 1)).count()  # True Positives
    tn = predictions.filter((col("diagnosis") == 0) & (col("prediction") == 0)).count()  # True Negatives
    fp = predictions.filter((col("diagnosis") == 0) & (col("prediction") == 1)).count()  # False Positives
    fn = predictions.filter((col("diagnosis") == 1) & (col("prediction") == 0)).count()  # False Negatives
    
    accuracy = (tn + tp) / (tn + tp + fn + fp)
    recall = tp / (tp + fn) if (tp + fn) != 0 else 0 
    precision = tp / (tp + fp) if (tp + fp) != 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) != 0 else 0

    evaluator_mse = RegressionEvaluator(labelCol="diagnosis", predictionCol="prediction", metricName="mse")
    mse = evaluator_mse.evaluate(predictions)

    evaluator_rmse = RegressionEvaluator(labelCol="diagnosis", predictionCol="prediction", metricName="rmse")
    rmse = evaluator_rmse.evaluate(predictions)

    print(f"Evalutation results:\n"
          f"{30 * '-'}\n"
          f"Test AUC: {auc:.2f}\n"
          f"\nAccuracy: {accuracy:.2f}\n"
          f"Recall: {recall:.2f}\n"
          f"Precision: {precision:.2f}\n"
          f"F1-Score: {f1_score:.2f}\n"
          f"\nMean Squared Error (MSE): {mse:.2f}\n"
          f"Root Mean Squared Error (RMSE): {rmse:.2f}")



evaluate_logistic_regression_model(cleaned_df)

Evalutation results:
------------------------------
Test AUC: 0.98

Accuracy: 0.92
Recall: 0.88
Precision: 0.91
F1-Score: 0.90

Mean Squared Error (MSE): 0.08
Root Mean Squared Error (RMSE): 0.28
