<a href="https://colab.research.google.com/github/Vasugi2003/House-price-prediction-using-Pyspark---BIG-DATA-ANALYTICS/blob/main/House_price_prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=cb1ded51cbe3b9374db9d81e3711bb4853cdd8eac5ea8e1d355ddbf5746fd19d
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
spark = SparkSession.builder.appName("HousePriceClassification").getOrCreate()

In [None]:
filepath = "/content/houseprice.csv"
df = spark.read.csv(filepath, header=True, inferSchema=True)
df.show()

+-------------------+---------+--------+---------+-----------+--------+------+----------+----+---------+----------+-------------+--------+------------+--------------------+----------------+--------+-------+
|               date|    price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|sqft_above|sqft_basement|yr_built|yr_renovated|              street|            city|statezip|country|
+-------------------+---------+--------+---------+-----------+--------+------+----------+----+---------+----------+-------------+--------+------------+--------------------+----------------+--------+-------+
|2014-05-02 00:00:00| 313000.0|     3.0|      1.5|       1340|    7912|   1.5|         0|   0|        3|      1340|            0|    1955|        2005|18810 Densmore Ave N|       Shoreline|WA 98133|    USA|
|2014-05-02 00:00:00|2384000.0|     5.0|      2.5|       3650|    9050|   2.0|         0|   4|        5|      3370|          280|    1921|           0|     709 W Blaine St|

In [None]:
df.printSchema()

df = df.withColumn("price", F.when(F.col("price") > 600000, 1).otherwise(0)\
                   .cast(IntegerType()))

int_columns = ['bathrooms', 'bedrooms', 'sqft_living', 'sqft_lot', 'floors', \
               'sqft_basement']

root
 |-- date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- bedrooms: double (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: double (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- sqft_above: integer (nullable = true)
 |-- sqft_basement: integer (nullable = true)
 |-- yr_built: integer (nullable = true)
 |-- yr_renovated: integer (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- statezip: string (nullable = true)
 |-- country: string (nullable = true)



In [None]:
for col_name in int_columns:
    df = df.withColumn(col_name, df[col_name].cast(IntegerType()))
df.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- price: integer (nullable = false)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: integer (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- sqft_above: integer (nullable = true)
 |-- sqft_basement: integer (nullable = true)
 |-- yr_built: integer (nullable = true)
 |-- yr_renovated: integer (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- statezip: string (nullable = true)
 |-- country: string (nullable = true)



In [None]:
df = df.withColumn("price", F.when(F.col("price") > 600000, 1).otherwise(0)\
                   .cast(IntegerType()))

# Cast selected columns to IntegerType
int_columns = ['bathrooms', 'bedrooms', 'sqft_living', 'sqft_lot', 'floors', \
               'sqft_basement']

In [None]:
for col_name in int_columns:
    df = df.withColumn(col_name, df[col_name].cast(IntegerType()))
df.printSchema()
# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=11)
train_df.count()
test_df.count()

root
 |-- date: timestamp (nullable = true)
 |-- price: integer (nullable = false)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: integer (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- sqft_above: integer (nullable = true)
 |-- sqft_basement: integer (nullable = true)
 |-- yr_built: integer (nullable = true)
 |-- yr_renovated: integer (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- statezip: string (nullable = true)
 |-- country: string (nullable = true)



902

In [None]:
price_indexer = StringIndexer(inputCol="price", outputCol="price_index")

train_df = price_indexer.fit(train_df).transform(train_df)

In [None]:
input_cols = ['bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'sqft_basement']
vector_assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
train_df = vector_assembler.transform(train_df)
train_df.show()

+-------------------+-----+--------+---------+-----------+--------+------+----------+----+---------+----------+-------------+--------+------------+--------------------+-------------+--------+-------+-----------+--------------------+
|               date|price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|sqft_above|sqft_basement|yr_built|yr_renovated|              street|         city|statezip|country|price_index|            features|
+-------------------+-----+--------+---------+-----------+--------+------+----------+----+---------+----------+-------------+--------+------------+--------------------+-------------+--------+-------+-----------+--------------------+
|2014-05-02 00:00:00|    0|       2|        1|        800|    4850|     1|         0|   0|        4|       800|            0|    1944|           0|4801-4899 6th Ave NW|      Seattle|WA 98107|    USA|        0.0|[2.0,1.0,800.0,48...|
|2014-05-02 00:00:00|    0|       2|        1|        850|    6174| 

In [None]:
dt_model = DecisionTreeClassifier(labelCol="price_index", featuresCol="features")
dt_model = dt_model.fit(train_df)

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="price_index")

# Transform and predict on test data
test_df = test_df.withColumn("price", F.when(F.col("price") > 600000, 1).otherwise(0).cast(IntegerType()))
test_df = price_indexer.fit(test_df).transform(test_df)
test_df = vector_assembler.transform(test_df)
test_predictions = dt_model.transform(test_df)

In [None]:
accuracy = evaluator.evaluate(test_predictions)
print(f"Area under ROC curve: {accuracy}")

Area under ROC curve: 1.0
