In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell''

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Data cleaning").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [88]:
#Importing libraries
from pyspark.sql.functions import col, year, month, dayofmonth, dayofweek, date_format,when,monotonically_increasing_id
from pyspark.sql.functions import sin, cos, lit, floor
import pyspark.sql.functions as F
from math import pi
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer

In [4]:
df = spark.read.csv("/user/hive/warehouse/insurance/Insurance Premium Prediction.csv", header=True, inferSchema=True)

In [5]:
df.show()

+---+----+------+-------------+--------------+--------------------+---------------+-------------+------------------+--------+-------------+---------------+-----------+------------+------------------+--------------------+-----------------+--------------+------------------+-------------+--------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|   Occupation|      Health Score|Location|  Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|   Policy Start Date|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+----+------+-------------+--------------+--------------------+---------------+-------------+------------------+--------+-------------+---------------+-----------+------------+------------------+--------------------+-----------------+--------------+------------------+-------------+--------------+
|  0|19.0|Female|      10049.0|       Married|                 1.0|     Bachelor's|Self-Employ

In [6]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Annual Income: double (nullable = true)
 |-- Marital Status: string (nullable = true)
 |-- Number of Dependents: double (nullable = true)
 |-- Education Level: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Health Score: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- Policy Type: string (nullable = true)
 |-- Previous Claims: double (nullable = true)
 |-- Vehicle Age: double (nullable = true)
 |-- Credit Score: double (nullable = true)
 |-- Insurance Duration: double (nullable = true)
 |-- Policy Start Date: timestamp (nullable = true)
 |-- Customer Feedback: string (nullable = true)
 |-- Smoking Status: string (nullable = true)
 |-- Exercise Frequency: string (nullable = true)
 |-- Property Type: string (nullable = true)
 |-- Premium Amount: double (nullable = true)



In [7]:
df.describe()

DataFrame[summary: string, id: string, Age: string, Gender: string, Annual Income: string, Marital Status: string, Number of Dependents: string, Education Level: string, Occupation: string, Health Score: string, Location: string, Policy Type: string, Previous Claims: string, Vehicle Age: string, Credit Score: string, Insurance Duration: string, Customer Feedback: string, Smoking Status: string, Exercise Frequency: string, Property Type: string, Premium Amount: string]

In [15]:
# Get the data types of each column
column_types = df.dtypes

# Initialize lists for categorical and numerical columns
categorical_features = []
numerical_features = []

# Classify columns based on their data types
for cols in column_types:
    column=cols[0]
    dtype=cols[1]
    if dtype in ['string', 'boolean','timestamp']:  # Categorical types
        categorical_features.append(column)
    elif dtype in ['int', 'bigint', 'float', 'double', 'decimal']:  # Numerical types
        numerical_features.append(column)

# Display the results
print("Categorical Columns:", categorical_features)
print("Numerical Columns:", numerical_features)


Categorical Columns: ['Gender', 'Marital Status', 'Education Level', 'Occupation', 'Location', 'Policy Type', 'Policy Start Date', 'Customer Feedback', 'Smoking Status', 'Exercise Frequency', 'Property Type']
Numerical Columns: ['id', 'Age', 'Annual Income', 'Number of Dependents', 'Health Score', 'Previous Claims', 'Vehicle Age', 'Credit Score', 'Insurance Duration', 'Premium Amount']


In [17]:
#Printing unique values in each categorical col
for col in categorical_features:
    if(col!='Policy Start Date'):
        print(col," with ",df.select(col).distinct().count()," unique values")

Gender  with  2  unique values
Marital Status  with  4  unique values
Education Level  with  4  unique values
Occupation  with  4  unique values
Location  with  3  unique values
Policy Type  with  3  unique values
Customer Feedback  with  4  unique values
Smoking Status  with  2  unique values
Exercise Frequency  with  4  unique values
Property Type  with  3  unique values


In [18]:
for col in categorical_features:
    if(col!='Policy Start Date'):
        print(df.groupby(col).count().show())

+------+------+
|Gender| count|
+------+------+
|Female|597429|
|  Male|602571|
+------+------+

None
+--------------+------+
|Marital Status| count|
+--------------+------+
|          null| 18529|
|       Married|394316|
|      Divorced|391764|
|        Single|395391|
+--------------+------+

None
+---------------+------+
|Education Level| count|
+---------------+------+
|    High School|289441|
|            PhD|303507|
|     Bachelor's|303234|
|       Master's|303818|
+---------------+------+

None
+-------------+------+
|   Occupation| count|
+-------------+------+
|     Employed|282750|
|         null|358075|
|Self-Employed|282645|
|   Unemployed|276530|
+-------------+------+

None
+--------+------+
|Location| count|
+--------+------+
|   Urban|397511|
|Suburban|401542|
|   Rural|400947|
+--------+------+

None
+-------------+------+
|  Policy Type| count|
+-------------+------+
|      Premium|401846|
|Comprehensive|399600|
|        Basic|398554|
+-------------+------+

None
+----

In [23]:
#Counting null values in data col
df.where(df['Policy Start Date'].isNull()).count()

0

In [24]:
#Checking for null values in target col
df.where(df['Premium Amount'].isNull()).count()

0

In [26]:
def date_transform(df):
    # Convert to date type
    df = df.withColumn('Policy Start Date', F.to_date(col('Policy Start Date')))
    # Extract date components
    df = (df
        .withColumn('Year', year(col('Policy Start Date')))
        .withColumn('Day', dayofmonth(col('Policy Start Date')))
        .withColumn('Month', month(col('Policy Start Date')))
        .withColumn('Month_name', date_format(col('Policy Start Date'), 'MMMM'))
        .withColumn('Day_of_week', date_format(col('Policy Start Date'), 'EEEE'))
        .withColumn('Week', F.weekofyear(col('Policy Start Date')))
    )
    
    # Calculate min and max year
    min_year = df.select(F.min('Year')).first()[0]
    max_year = df.select(F.max('Year')).first()[0]
    
    # Cyclical encoding
    df = (df
        .withColumn('Year_sin', 
            sin(2 * pi * (col('Year') - lit(min_year)) / (max_year - min_year))
        )
        .withColumn('Year_cos', 
            cos(2 * pi * (col('Year') - lit(min_year)) / (max_year - min_year))
        )
        .withColumn('Month_sin', sin(2 * pi * col('Month') / 12))
        .withColumn('Month_cos', cos(2 * pi * col('Month') / 12))
        .withColumn('Day_sin', sin(2 * pi * col('Day') / 31))
        .withColumn('Day_cos', cos(2 * pi * col('Day') / 31))
        .withColumn('Group', 
            (col('Year') - lit(2020)) * 48 + 
            col('Month') * 4 + 
            floor(col('Day') / 7)
        )
    )
    
    # Drop original date column
    df = df.drop('Policy Start Date')
    
    return df

# Usage
df = date_transform(df)


In [65]:
target_column="Premium Amount"
X=df.drop(target_column, 'id', 'Group', 'Year', 'Month', 'Day', 'Week')
y=df.select(target_column)

In [66]:
X.dtypes

[('Age', 'double'),
 ('Gender', 'string'),
 ('Annual Income', 'double'),
 ('Marital Status', 'string'),
 ('Number of Dependents', 'double'),
 ('Education Level', 'string'),
 ('Occupation', 'string'),
 ('Health Score', 'double'),
 ('Location', 'string'),
 ('Policy Type', 'string'),
 ('Previous Claims', 'double'),
 ('Vehicle Age', 'double'),
 ('Credit Score', 'double'),
 ('Insurance Duration', 'double'),
 ('Customer Feedback', 'string'),
 ('Smoking Status', 'string'),
 ('Exercise Frequency', 'string'),
 ('Property Type', 'string'),
 ('Month_name', 'string'),
 ('Day_of_week', 'string'),
 ('Year_sin', 'double'),
 ('Year_cos', 'double'),
 ('Month_sin', 'double'),
 ('Month_cos', 'double'),
 ('Day_sin', 'double'),
 ('Day_cos', 'double')]

In [67]:
# Get the data types of each column
column_types = X.dtypes

# Initialize lists for categorical and numerical columns
categorical_features = []
numerical_features = []

# Classify columns based on their data types
for cols in column_types:
    column=cols[0]
    dtype=cols[1]
    if dtype in ['string', 'boolean','timestamp']:  # Categorical types
        categorical_features.append(column)
    elif dtype in ['int', 'bigint', 'float', 'double', 'decimal']:  # Numerical types
        numerical_features.append(column)

# Display the results
print("Categorical Columns:", categorical_features)
print("Numerical Columns:", numerical_features)

Categorical Columns: ['Gender', 'Marital Status', 'Education Level', 'Occupation', 'Location', 'Policy Type', 'Customer Feedback', 'Smoking Status', 'Exercise Frequency', 'Property Type', 'Month_name', 'Day_of_week']
Numerical Columns: ['Age', 'Annual Income', 'Number of Dependents', 'Health Score', 'Previous Claims', 'Vehicle Age', 'Credit Score', 'Insurance Duration', 'Year_sin', 'Year_cos', 'Month_sin', 'Month_cos', 'Day_sin', 'Day_cos']


In [68]:
#Categorical data Imputing
X_cat = X.select([
    when(col(c).isNull(), "Unknown")
    .otherwise(col(c))
    .alias(c) for c in categorical_features
])

In [82]:
#Selecting numerical features from data
X_num = X.select(*numerical_features)

In [83]:
X_num.dtypes

[('Age', 'double'),
 ('Annual Income', 'double'),
 ('Number of Dependents', 'double'),
 ('Health Score', 'double'),
 ('Previous Claims', 'double'),
 ('Vehicle Age', 'double'),
 ('Credit Score', 'double'),
 ('Insurance Duration', 'double'),
 ('Year_sin', 'double'),
 ('Year_cos', 'double'),
 ('Month_sin', 'double'),
 ('Month_cos', 'double'),
 ('Day_sin', 'double'),
 ('Day_cos', 'double')]

In [69]:
X_cat.dtypes

[('Gender', 'string'),
 ('Marital Status', 'string'),
 ('Education Level', 'string'),
 ('Occupation', 'string'),
 ('Location', 'string'),
 ('Policy Type', 'string'),
 ('Customer Feedback', 'string'),
 ('Smoking Status', 'string'),
 ('Exercise Frequency', 'string'),
 ('Property Type', 'string'),
 ('Month_name', 'string'),
 ('Day_of_week', 'string')]

In [86]:
#OneHotEncoder
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_features]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_encoded") for col in categorical_features]

# Create a pipeline
pipeline = Pipeline(stages=indexers + encoders)

# Fit and transform the data
model = pipeline.fit(X_cat)
X_cat = model.transform(X_cat)

In [87]:
X_cat.dtypes

[('Gender', 'string'),
 ('Marital Status', 'string'),
 ('Education Level', 'string'),
 ('Occupation', 'string'),
 ('Location', 'string'),
 ('Policy Type', 'string'),
 ('Customer Feedback', 'string'),
 ('Smoking Status', 'string'),
 ('Exercise Frequency', 'string'),
 ('Property Type', 'string'),
 ('Month_name', 'string'),
 ('Day_of_week', 'string'),
 ('Gender_index', 'double'),
 ('Marital Status_index', 'double'),
 ('Education Level_index', 'double'),
 ('Occupation_index', 'double'),
 ('Location_index', 'double'),
 ('Policy Type_index', 'double'),
 ('Customer Feedback_index', 'double'),
 ('Smoking Status_index', 'double'),
 ('Exercise Frequency_index', 'double'),
 ('Property Type_index', 'double'),
 ('Month_name_index', 'double'),
 ('Day_of_week_index', 'double'),
 ('Gender_encoded', 'vector'),
 ('Marital Status_encoded', 'vector'),
 ('Education Level_encoded', 'vector'),
 ('Occupation_encoded', 'vector'),
 ('Location_encoded', 'vector'),
 ('Policy Type_encoded', 'vector'),
 ('Customer 

In [84]:
# Create Imputer
imputer = Imputer(
    inputCols=numerical_features,
    outputCols=numerical_features,
    strategy='median'
)

# Fit and transform
X_num = imputer.fit(X_num).transform(X_num)

In [85]:
X_num.columns

['Age',
 'Annual Income',
 'Number of Dependents',
 'Health Score',
 'Previous Claims',
 'Vehicle Age',
 'Credit Score',
 'Insurance Duration',
 'Year_sin',
 'Year_cos',
 'Month_sin',
 'Month_cos',
 'Day_sin',
 'Day_cos']

In [112]:
#Combining numerical and categorical cols horizontally and making clean df
X_num_id = X_num.withColumn('id', monotonically_increasing_id())
X_cat_id = X_cat.withColumn('id', monotonically_increasing_id())
y_df=y.withColumn('id',monotonically_increasing_id())
# Perform inner join
horizontal_df = X_num_id.join(X_cat_id, 'id').join(y_df,'id')
# Optional: Drop the ID column
clean_df = horizontal_df.drop('id')

In [113]:
clean_df.dtypes

[('Age', 'double'),
 ('Annual Income', 'double'),
 ('Number of Dependents', 'double'),
 ('Health Score', 'double'),
 ('Previous Claims', 'double'),
 ('Vehicle Age', 'double'),
 ('Credit Score', 'double'),
 ('Insurance Duration', 'double'),
 ('Year_sin', 'double'),
 ('Year_cos', 'double'),
 ('Month_sin', 'double'),
 ('Month_cos', 'double'),
 ('Day_sin', 'double'),
 ('Day_cos', 'double'),
 ('Gender', 'string'),
 ('Marital Status', 'string'),
 ('Education Level', 'string'),
 ('Occupation', 'string'),
 ('Location', 'string'),
 ('Policy Type', 'string'),
 ('Customer Feedback', 'string'),
 ('Smoking Status', 'string'),
 ('Exercise Frequency', 'string'),
 ('Property Type', 'string'),
 ('Month_name', 'string'),
 ('Day_of_week', 'string'),
 ('Gender_index', 'double'),
 ('Marital Status_index', 'double'),
 ('Education Level_index', 'double'),
 ('Occupation_index', 'double'),
 ('Location_index', 'double'),
 ('Policy Type_index', 'double'),
 ('Customer Feedback_index', 'double'),
 ('Smoking Status

In [114]:
def clean_column_name(col_name):
    # Remove or replace spaces and special characters
    return ''.join(c if c.isalnum() or c in ['_', '-'] else '_' for c in col_name)

df_cleaned = clean_df.toDF(*[clean_column_name(col) for col in clean_df.columns])

In [115]:
df_cleaned.count(),len(df_cleaned.columns)

(1200000, 51)

In [116]:
#Saving cleaned file in parquet format
path=!pwd
path="file://"+path[0]+"/df_cleaned"

In [117]:
df_cleaned.coalesce(1).write.format("parquet").save(path)