<a href="https://colab.research.google.com/github/ci-cd-binu/skills-introduction-to-github/blob/main/BasicSpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=b5b9c2706043bcfe7ea7d1252ef72f3424c4f561e8deb9a7149ddbb692274c1f
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("InsuranceML").getOrCreate()
data = spark.read.csv("/content/drive/MyDrive/datasets/insurance_data.csv", header=True, inferSchema=True)


In [5]:
data.show(2)

+---------+---+------+------------+--------------+----------------------+---------------+
|policy_id|age|gender|vehicle_type|policy_premium|no_of_claims_last_year|claim_next_year|
+---------+---+------+------------+--------------+----------------------+---------------+
|        1| 56|  Male|       Truck|           633|                     1|              0|
|        2| 69|  Male|       Truck|          3638|                     1|              0|
+---------+---+------+------------+--------------+----------------------+---------------+
only showing top 2 rows



In [9]:
data.summary().show()

+-------+-----------------+-----------------+------+------------+------------------+----------------------+-------------------+
|summary|        policy_id|              age|gender|vehicle_type|    policy_premium|no_of_claims_last_year|    claim_next_year|
+-------+-----------------+-----------------+------+------------+------------------+----------------------+-------------------+
|  count|             1000|             1000|  1000|        1000|              1000|                  1000|               1000|
|   mean|            500.5|            50.38|  NULL|        NULL|          2731.883|                 2.413|               0.18|
| stddev|288.8194360957494|18.37866631111668|  NULL|        NULL|1267.1110716029502|    1.7173760160140772|0.38437969216355306|
|    min|                1|               18|Female|        Bike|               506|                     0|                  0|
|    25%|              250|               35|  NULL|        NULL|              1633|                    

**Feature Engineering:**
Convert categorical columns to numeric using One-Hot Encoding.
Create a new feature: is_high_risk which is 1 if no_of_claims_last_year > 3 else 0.

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

# Convert gender and vehicle_type to numeric
gender_indexer = StringIndexer(inputCol="gender", outputCol="genderIndex")
vehicle_indexer = StringIndexer(inputCol="vehicle_type", outputCol="vehicleIndex")

encoder = OneHotEncoder(inputCols=["genderIndex", "vehicleIndex"],
                        outputCols=["genderVec", "vehicleVec"])

# Create 'is_high_risk' column
data = data.withColumn("is_high_risk", (data["no_of_claims_last_year"] > 3).cast("int"))

assembler = VectorAssembler(inputCols=["age", "policy_premium", "no_of_claims_last_year", "genderVec", "vehicleVec", "is_high_risk"],
                            outputCol="features")

pipeline = Pipeline(stages=[gender_indexer, vehicle_indexer, encoder, assembler])
data = pipeline.fit(data).transform(data)


**Modeling:**
Train a Logistic Regression model.

In [12]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

(train_data, test_data) = data.randomSplit([0.8, 0.2])

lr = LogisticRegression(labelCol="claim_next_year", featuresCol="features")

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=BinaryClassificationEvaluator(labelCol="claim_next_year"),
                           trainRatio=0.8)

model = tvs.fit(train_data)


Inference:
Predict on the test set and evaluate the model's performance.

In [13]:
predictions = model.transform(test_data)

evaluator = BinaryClassificationEvaluator(labelCol="claim_next_year")
roc_auc = evaluator.evaluate(predictions)
print(f"ROC-AUC: {roc_auc}")


ROC-AUC: 0.650188323917137


In [17]:
predictions.show(5)

+---------+---+------+------------+--------------+----------------------+---------------+------------+-----------+------------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|policy_id|age|gender|vehicle_type|policy_premium|no_of_claims_last_year|claim_next_year|is_high_risk|genderIndex|vehicleIndex|    genderVec|   vehicleVec|            features|       rawPrediction|         probability|prediction|
+---------+---+------+------------+--------------+----------------------+---------------+------------+-----------+------------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|        1| 56|  Male|       Truck|           633|                     1|              0|           0|        0.0|         2.0|(1,[0],[1.0])|    (2,[],[])|[56.0,633.0,1.0,1...|[1.63650189145399...|[0.83705838701544...|       0.0|
|        3| 46|Female|        Bike|          4419|                     1|       

In [18]:
# Append the prediction column to the test_data
output_data = test_data.join(predictions.select("policy_id", "prediction"), on="policy_id", how="left")
output_data.show(4)
# Save the resulting DataFrame to CSV
#output_data.write.csv("path_to_save_predictions.csv", header=True)


+---------+---+------+------------+--------------+----------------------+---------------+------------+-----------+------------+-------------+-------------+--------------------+----------+
|policy_id|age|gender|vehicle_type|policy_premium|no_of_claims_last_year|claim_next_year|is_high_risk|genderIndex|vehicleIndex|    genderVec|   vehicleVec|            features|prediction|
+---------+---+------+------------+--------------+----------------------+---------------+------------+-----------+------------+-------------+-------------+--------------------+----------+
|        1| 56|  Male|       Truck|           633|                     1|              0|           0|        0.0|         2.0|(1,[0],[1.0])|    (2,[],[])|[56.0,633.0,1.0,1...|       0.0|
|        3| 46|Female|        Bike|          4419|                     1|              0|           0|        1.0|         0.0|    (1,[],[])|(2,[0],[1.0])|[46.0,4419.0,1.0,...|       0.0|
|        4| 32|Female|         Car|           572|          

In [19]:
# List of original columns
original_columns = ['policy_id', 'age', 'gender', 'vehicle_type', 'policy_premium', 'no_of_claims_last_year', 'claim_next_year', 'prediction']

# Select only the original columns
output_data = predictions.select(*original_columns)
output_data.show(4)
# Save the resulting DataFrame to CSV
#output_data.write.csv("path_to_save_predictions.csv", header=True)


+---------+---+------+------------+--------------+----------------------+---------------+----------+
|policy_id|age|gender|vehicle_type|policy_premium|no_of_claims_last_year|claim_next_year|prediction|
+---------+---+------+------------+--------------+----------------------+---------------+----------+
|        1| 56|  Male|       Truck|           633|                     1|              0|       0.0|
|        3| 46|Female|        Bike|          4419|                     1|              0|       0.0|
|        4| 32|Female|         Car|           572|                     2|              1|       0.0|
|       23| 19|  Male|        Bike|          4723|                     4|              0|       0.0|
+---------+---+------+------------+--------------+----------------------+---------------+----------+
only showing top 4 rows



In [1]:
!pwd

/content
