In [None]:
pip install pyspark



In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder,StandardScaler,VectorAssembler,ChiSqSelector
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [None]:
#create a spark session
spark = SparkSession.builder.appName("BostonHousing").getOrCreate()

In [None]:
data = spark.read.csv("/content/HousingData.csv",header=True,inferSchema=True)
data.printSchema()

root
 |-- CRIM: string (nullable = true)
 |-- ZN: string (nullable = true)
 |-- INDUS: string (nullable = true)
 |-- CHAS: string (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: string (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: string (nullable = true)
 |-- MEDV: double (nullable = true)



In [None]:
categorical_cols = ["CHAS","RAD"]
numerical_cols = ["CRIM","ZN","INDUS","NOX","RM","AGE","DIS","TAX","PTRATIO","B","LSTAT"]

In [None]:
from pyspark.sql.types import DoubleType

In [None]:
from pyspark.ml.feature import Imputer # import the Imputer class

for col_name in numerical_cols + ["MEDV"]:
  data = data.withColumn(col_name,col(col_name).cast(DoubleType()))
imputer = Imputer(inputCols=numerical_cols,outputCols=numerical_cols)
imputer.setStrategy("mean")

Imputer_2b4b3582d07c

In [None]:
#handle categorical features
from pyspark.sql.functions import when,col
for cat_col in categorical_cols:
  data = data.withColumn(cat_col,when(col(cat_col).isNull(),"Unknown").otherwise(col(cat_col)))
indexers = [StringIndexer(inputCol=col,outputCol=f"{col}_indexed",handleInvalid="keep") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{col}_indexed",outputCol=f"{col}_encoded")for col in categorical_cols]


In [None]:
#Scale numerical features
assembler_num = VectorAssembler(inputCols=numerical_cols,outputCol="num_features")
scaler = StandardScaler(inputCol = "num_features",outputCol="scaled_num_features",withStd=True,withMean = True)


In [None]:
encoded_cols = [f"{col}_encoded" for col in categorical_cols]
assembler = VectorAssembler(inputCols=encoded_cols + ["scaled_num_features"],outputCol="features")

In [None]:
#Combine features
median_medv = data.approxQuantile("MEDV",[0.5],0.01)[0]
data = data.withColumn("label",(col("MEDV")>median_medv).cast("double"))
selector = ChiSqSelector(numTopFeatures=10,featuresCol="features",outputCol="selected_features",labelCol = "label")

In [None]:
pipeline = Pipeline(stages=[imputer]+indexers+ encoders+ [assembler_num, scaler, assembler,selector])
try:
  model = pipeline.fit(data)
  result = model.transform(data)
  result.select("selected_features","label").show(5,truncate=False)
  feature_importances = model.stages[-1].selectedFeatures
  print("Selected Feature indices:",feature_importances)
except Exception as e:
  print(f"An error has occured: {str(e)}")


+---------------------------------------------------------------------------------------------------------------------------+-----+
|selected_features                                                                                                          |label|
+---------------------------------------------------------------------------------------------------------------------------+-----+
|(10,[5,6,7,8,9],[0.29614984653077786,-1.3097142836688263,-0.14407485473245793,-0.6659491794887344,-1.4575579672895913])    |1.0  |
|(10,[5,6,7,8,9],[-0.4891544449063988,-0.5991779446787058,-0.7395303607434325,-0.9863533804386955,-0.3027944997494501])     |1.0  |
|(10,[5,6,7,8,9],[-0.4891544449063988,-0.5991779446787058,-0.7395303607434325,-0.9863533804386955,-0.3027944997494501])     |1.0  |
|(10,[3,5,6,7,8,9],[1.0,-0.4891544449063988,-1.3291196878849432,-0.8344580501075004,-1.105021603012755,0.11292034856500006])|1.0  |
|(10,[3,5,6,7,8,9],[1.0,-0.4891544449063988,-1.3291196878849432,-0.834458050

In [None]:
spark.stop()