<a href="https://colab.research.google.com/github/Vasugi2003/Big-Data-Analytics/blob/main/Modelling_without_with_MLpipelines.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=38b7b289e642af121600faf2c6bbd33c58dbe6c2fc53bea1e991e72685e870dc
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


## StringIndexer transformation to the DataFrame df.
## indexer.fit(df): This part fits (trains) the StringIndexer on the DataFrame df. It computes the mapping of distinct string values in the "price" column to unique numerical indices.
## .transform(df): After fitting the StringIndexer, you use the transform method to apply the transformation to the DataFrame df. This replaces the values in the "price" column with their corresponding numerical indices and stores the result in a new column named "price_Index. **bold text**"
### The transformed DataFrame is assigned to the variable train_df1. It contains the original data from cdf1 along with the newly added "categoryIndex" column

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


# In a single feature vector, each component represents the value of a feature. For example, if you have a dataset of houses with features like "square footage," "number of bedrooms," and "number of bathrooms," a single feature vector for a house might look like this: [2000, 3, 2], where the first value represents square footage, the second value represents the number of bedrooms, and the third value represents the number of bathrooms.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize SparkSession
spark = SparkSession.builder.appName("HousePriceClassification").getOrCreate()

# Load data
filepath = "/content/houseprice.csv"
df = spark.read.csv(filepath, header=True, inferSchema=True)
# df.show()
df.printSchema()
# Preprocess data
# Convert "price" column to numeric based on condition
df = df.withColumn("price", F.when(F.col("price") > 600000, 1).otherwise(0)\
                   .cast(IntegerType()))

# Cast selected columns to IntegerType
int_columns = ['bathrooms', 'bedrooms', 'sqft_living', 'sqft_lot', 'floors', \
               'sqft_basement']


for col_name in int_columns:
    df = df.withColumn(col_name, df[col_name].cast(IntegerType()))
df.printSchema()
# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=11)
train_df.count()
test_df.count()//902
# Label encoding
  #indexer.fit(df)
      # It computes the mapping of distinct string values
      # in the "price" column to unique numerical indices
  #transform(df):
    #replaces the values in the "price" column with their corresponding
    #numerical indices and stores the result in a new column named "price_Index.

price_indexer = StringIndexer(inputCol="price", outputCol="price_index")

train_df = price_indexer.fit(train_df).transform(train_df)

# Feature vector assembly
input_cols = ['bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'sqft_basement']
vector_assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
train_df = vector_assembler.transform(train_df)
train_df.show()

# Create and train Decision Tree Classifier model
dt_model = DecisionTreeClassifier(labelCol="price_index", featuresCol="features")
dt_model = dt_model.fit(train_df)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="price_index")

# Transform and predict on test data
test_df = test_df.withColumn("price", F.when(F.col("price") > 600000, 1).otherwise(0).cast(IntegerType()))
test_df = price_indexer.fit(test_df).transform(test_df)
test_df = vector_assembler.transform(test_df)
test_predictions = dt_model.transform(test_df)

# Display evaluation results
accuracy = evaluator.evaluate(test_predictions)
print(f"Area under ROC curve: {accuracy}")


root
 |-- date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- bedrooms: double (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: double (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- sqft_above: integer (nullable = true)
 |-- sqft_basement: integer (nullable = true)
 |-- yr_built: integer (nullable = true)
 |-- yr_renovated: integer (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- statezip: string (nullable = true)
 |-- country: string (nullable = true)

root
 |-- date: timestamp (nullable = true)
 |-- price: integer (nullable = false)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: 

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F

# Initialize SparkSession
spark = SparkSession.builder.appName("YourAppName").getOrCreate()

# Read data from CSV
filepath = "/content/houseprice.csv"
df1 = spark.read.csv(filepath, header=True)

# Convert "price" column to numeric based on condition and cast to IntegerType
df1 = df1.withColumn("price", when(F.col("price") > 600000, 1).otherwise(0).cast(IntegerType()))

# Define input columns and output column for VectorAssembler
inputColumns = ['bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'sqft_basement']
outputColumn = "features"

# Cast the selected input columns to IntegerType
for col_name in inputColumns:
    df1 = df1.withColumn(col_name, df1[col_name].cast(IntegerType()))

# Create a StringIndexer for "price" column
price_indexer = StringIndexer(inputCol="price", outputCol="priceIndex")
# Create a VectorAssembler for input features
vector_assembler = VectorAssembler(inputCols=inputColumns, outputCol=outputColumn)
# Create a DecisionTreeClassifier
dt_model = DecisionTreeClassifier(labelCol="price", featuresCol=outputColumn)

# Define the pipeline stages
stages = [price_indexer, vector_assembler, dt_model]
# Create a pipeline
pipeline = Pipeline(stages=stages)
# Split data into train and test
(train_df2, test_df2) = df1.randomSplit([0.8, 0.2], seed=11)

# Fit the pipeline on the training data
final_pipeline = pipeline.fit(train_df2)
# Make predictions on the test data
test_predictions_from_pipeline = final_pipeline.transform(test_df2)

# Show the first 5 rows of predictions
test_predictions_from_pipeline.select("price", "prediction").show(5)

# Evaluate the model and perform further analysis as needed
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Evaluate the model using accuracy (or other appropriate metric for multi-class classification)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Define the evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="price", predictionCol="prediction", metricName="accuracy")
# Calculate the accuracy
accuracy = evaluator.evaluate(test_predictions_from_pipeline)
print(f"Accuracy: {accuracy}")



+-----+----------+
|price|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 5 rows

Accuracy: 0.7871396895787139


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import classification_report

# Calculate accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="price", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(test_predictions_from_pipeline)

# Convert the PySpark DataFrame to a Pandas DataFrame
test_predictions_pd = test_predictions_from_pipeline.select("price", "prediction").toPandas()

# Get the true labels and predicted labels as lists
true_labels = test_predictions_pd["price"].tolist()
predicted_labels = test_predictions_pd["prediction"].tolist()

# Generate the classification report
report = classification_report(true_labels, predicted_labels)

# Print the classification report
print(report)


              precision    recall  f1-score   support

           0       0.81      0.91      0.85       624
           1       0.71      0.52      0.60       278

    accuracy                           0.79       902
   macro avg       0.76      0.71      0.73       902
weighted avg       0.78      0.79      0.78       902

