In [6]:
# Task 1: Set Up PySpark
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator




In [7]:
# Create Spark session
spark = SparkSession.builder.appName("PySparkMLlibAssignment").getOrCreate()

In [8]:
# Task 2: Load and Explore Data
# Load Titanic dataset
url = "https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv"
data_path = "titanic.csv"
!wget $url -O $data_path



--2025-01-16 21:56:40--  https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.109.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60302 (59K) [text/plain]
Saving to: ‘titanic.csv’


2025-01-16 21:56:40 (6.84 MB/s) - ‘titanic.csv’ saved [60302/60302]



In [4]:
# Read the data
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Show schema and basic information
df.printSchema()
df.show(5)
print(f"Total rows: {df.count()}")

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|   

In [9]:
# Task 3: Data Preprocessing
selected_columns = ["Pclass", "Sex", "Age", "Fare", "Survived"]
df = df.select(selected_columns)

# Handle missing values
median_age = df.approxQuantile("Age", [0.5], 0)[0]
df = df.fillna({"Age": median_age, "Fare": df.approxQuantile("Fare", [0.5],0)[0]})
df = df.dropna(subset=["Survived"])

# Convert categorical column "Sex" to numeric
indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
df = indexer.fit(df).transform(df)

# Assemble features into a single vector
assembler = VectorAssembler(
    inputCols=["Pclass", "SexIndex", "Age", "Fare"], outputCol="features")
df = assembler.transform(df)

In [10]:
# Keep only features and target column
df = df.select("features", "Survived")

# Task 4: Train-Test Split
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# Task 5: Train a Logistic Regression Model and Display model coefficients and intercept
lr = LogisticRegression(featuresCol="features", labelCol="Survived")
lr_model = lr.fit(train_data)

# Display model coefficients and intercept
print(f"Coefficients: {lr_model.coefficients}")
print(f" Intercept: {lr_model.intercept}")


Coefficients: [-1.0485232298342673,2.602193522818644,-0.02450847389465579,0.000866680973489534]
 Intercept: 1.6039307167876704


In [13]:
# Task 6: Model Evaluation
predictions = lr_model.transform(test_data)

#Evaluate Model
evaluator = BinaryClassificationEvaluator(labelCol="Survived", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")

#Calculate accuracy
accuracy = predictions.filter(predictions["Survived"] == predictions["prediction"]).count() / predictions.count()/float(test_data.count())
print(f"Accuracy: {accuracy}")



AUC: 0.8476003597416402
Accuracy: 0.0033813912009512483


In [14]:
# Task 7: Hyperparameter Tuning
param_grid = ParamGridBuilder() \
     .addGrid(lr.regParam, [0.01, 0.1, 1.0])\
     .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
     .build()

crossval = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)
cv_model = crossval.fit(train_data)
best_model = cv_model.bestModel

# Best Hyperparameter
print(f"Best RegParam: {best_model._java_obj.getRegParam()}")
print(f"Best ElasticNetParam: {best_model._java_obj.getElasticNetParam()}")

# Evaluate best model on test data
best_predictions = best_model.transform(test_data)
best_auc = evaluator.evaluate(best_predictions)
print(f"Best Model AUC: {best_auc}")


Best RegParam: 0.1
Best ElasticNetParam: 0.0
Best Model AUC: 0.8398332106941377
