<a href="https://colab.research.google.com/github/KevinYih/BigDataDemo/blob/main/decision_trees_by_Kevin.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Setup

Let's set up Spark on your Colab environment.  Run the cell below!

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=5f4b183db0f71fe31dbaa6954348ce0834cc6017b45b206819e55e2a9f8a594d
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic

Now we import some of the libraries usually needed by our workload.





In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Let's initialize the Spark context.

In [None]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

### Data Preprocessing

In this Colab, rather than downloading a file from Google Drive, we will load a famous machine learning dataset, the [Breast Cancer Wisconsin dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_breast_cancer.html), using the ```scikit-learn``` datasets loader.

In [None]:
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()

For convenience, given that the dataset is small, we first

*   construct a Pandas dataframe
*   tune the schema
*   and convert it into a Spark dataframe.

In [None]:
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)
df = spark.createDataFrame(pd_df)

def set_df_columns_nullable(spark, df, column_list, nullable=False):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod

df = set_df_columns_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
vectors = df.rdd.map(lambda row: Vectors.dense(row.features))

df.printSchema()

root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable = false)
 |-- worst radius: double (nullable

With the next cell, we build the two data structures that we will be using throughout this Colab:


*   ```features```, a dataframe of Dense vectors, containing all the original features in the dataset;
*   ```labels```, a series of binary labels indicating if the corresponding set of features belongs to a subject with breast cancer, or not.



In [None]:
from pyspark.ml.linalg import Vectors
features = spark.createDataFrame(vectors.map(Row), ["features"])
labels = pd.Series(breast_cancer.target)

features.show(9)

+--------------------+
|            features|
+--------------------+
|[17.99,10.38,122....|
|[20.57,17.77,132....|
|[19.69,21.25,130....|
|[11.42,20.38,77.5...|
|[20.29,14.34,135....|
|[12.45,15.7,82.57...|
|[18.25,19.98,119....|
|[13.71,20.83,90.2...|
|[13.0,21.82,87.5,...|
+--------------------+
only showing top 9 rows



### Your task

If you run successfully the Setup and Data Preprocessing stages, you are now ready to use decision tree library of pyspark ([documentation](https://spark.apache.org/docs/1.5.2/ml-decision-tree.html)) to predict labels. In the documentation look for the python code example.

Start by trying to understand the nature of the dataset that you are given. Then, investigate what are the purposes of `StringIndexer`, `VectorIndexer`, and `Pipeline` in the example provided in the documentation.  Do you have to use them for your dataset?


In [None]:
# YOUR CODE HERE
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.util import MLUtils

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
import pandas as pd
import pyspark.sql.functions as F

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.


# convert pandas Series to DataFrame，and name it 'label'
labels_df = spark.createDataFrame(labels.reset_index().rename(columns={0: 'label'}))

# add index
features_with_index = features.withColumn("index", F.monotonically_increasing_id())
labels_with_index = labels_df.withColumn("index", F.monotonically_increasing_id())

# join features and labels DataFrame
data = features_with_index.join(labels_with_index, on="index").drop("index")

# show the data DataFrame
data.show(5)

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print ("Test Error = %g" % (1.0 - accuracy))

treeModel = model.stages[2]
print (treeModel) # summary only

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[14.58,21.53,97.4...|    0|
|[17.57,15.05,115....|    0|
|[14.78,23.94,97.4...|    0|
|[12.77,21.41,82.0...|    1|
|[10.18,17.53,65.1...|    1|
+--------------------+-----+
only showing top 5 rows

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       0.0|         0.0|[8.196,16.84,51.7...|
|       0.0|         0.0|[8.618,11.79,54.3...|
|       0.0|         0.0|[8.671,14.45,54.4...|
|       0.0|         0.0|[9.173,13.86,59.2...|
|       0.0|         0.0|[9.436,18.32,59.8...|
+----------+------------+--------------------+
only showing top 5 rows

Test Error = 0.0662651
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_19deb4e5b448, depth=5, numNodes=27, numClasses=2, numFeatures=30



For the next step use the tree ensembles for predicting labels ([documentation](https://spark.apache.org/docs/1.5.2/ml-ensembles.html)).

Compare the accuracy of the two approach.

In [None]:
# YOUR CODE HERE

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.util import MLUtils

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
import pandas as pd
import pyspark.sql.functions as F


# convert pandas Series to DataFrame，and name it 'label'
labels_df = spark.createDataFrame(labels.reset_index().rename(columns={0: 'label'}))

# add index
features_with_index = features.withColumn("index", F.monotonically_increasing_id())
labels_with_index = labels_df.withColumn("index", F.monotonically_increasing_id())

# join features and labels DataFrame
data = features_with_index.join(labels_with_index, on="index").drop("index")

# show the data DataFrame
data.show(5)

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print ("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print (rfModel) # summary only


+--------------------+-----+
|            features|label|
+--------------------+-----+
|[14.58,21.53,97.4...|    0|
|[17.57,15.05,115....|    0|
|[14.78,23.94,97.4...|    0|
|[12.77,21.41,82.0...|    1|
|[10.18,17.53,65.1...|    1|
+--------------------+-----+
only showing top 5 rows

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       0.0|         0.0|[7.691,25.44,48.3...|
|       0.0|         0.0|[9.173,13.86,59.2...|
|       0.0|         0.0|[9.436,18.32,59.8...|
|       0.0|         0.0|[9.465,21.01,60.1...|
|       0.0|         0.0|[9.683,19.34,61.0...|
+----------+------------+--------------------+
only showing top 5 rows

Test Error = 0.0337838
RandomForestClassificationModel: uid=RandomForestClassifier_98e62b1e14fc, numTrees=20, numClasses=2, numFeatures=30


In [None]:
# upload csv
from google.colab import files

uploaded = files.upload()

Saving Admission_Predict.csv to Admission_Predict.csv


In [None]:
# Option 1: Graduates Admission Prediction

from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.util import MLUtils

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
import pandas as pd
import pyspark.sql.functions as F

# create SparkSession
spark = SparkSession.builder.appName("Admission_Predict").getOrCreate()

# load CSV to Spark DataFrame
file_path = "Admission_Predict.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

df.show(5)

# Specify the feature columns and the label column
feature_columns = ["GRE Score", "TOEFL Score", "University Rating", "SOP", "LOR ", "CGPA", "Research"]
label_column = "Chance of Admit "

# Use VectorAssembler to combine feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Transform the DataFrame
df_transformed = assembler.transform(df)

# Select only the features and label columns
data = df_transformed.select("features", label_column)
data = data.withColumnRenamed(label_column, "label")

data.show(5)

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print ("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

treeModel = model.stages[1]
print (treeModel) # summary only

+----------+---------+-----------+-----------------+---+----+----+--------+----------------+
|Serial No.|GRE Score|TOEFL Score|University Rating|SOP|LOR |CGPA|Research|Chance of Admit |
+----------+---------+-----------+-----------------+---+----+----+--------+----------------+
|         1|      337|        118|                4|4.5| 4.5|9.65|       1|            0.92|
|         2|      324|        107|                4|4.0| 4.5|8.87|       1|            0.76|
|         3|      316|        104|                3|3.0| 3.5| 8.0|       1|            0.72|
|         4|      322|        110|                3|3.5| 2.5|8.67|       1|             0.8|
|         5|      314|        103|                2|2.0| 3.0|8.21|       0|            0.65|
+----------+---------+-----------+-----------------+---+----+----+--------+----------------+
only showing top 5 rows

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[337.0,118.0,4.0,...| 0.92|
|[324.0,107.0,4.0,...|

In [None]:
# option 2: Milk Grade Prediction

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.util import MLUtils

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
import pandas as pd
import pyspark.sql.functions as F

# create SparkSession
spark = SparkSession.builder.appName("milknew").getOrCreate()

# load CSV to Spark DataFrame
file_path = "milknew.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

df.show(5)

# Specify the feature columns and the label column
feature_columns = ["pH", "Temprature", "Taste", "Odor", "Fat ", "Turbidity", "Colour"]
# label_column = "Grade"

# Use VectorAssembler to combine feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Transform the DataFrame
df_transformed = assembler.transform(df)

# Select only the features and label columns
data = df_transformed.select("features", label_column)
data = data.withColumnRenamed(label_column, "label")

data.show(5)

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(10)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print ("Test Error = %g" % (1.0 - accuracy))

treeModel = model.stages[2]
print (treeModel) # summary only

+---+----------+-----+----+----+---------+------+------+
| pH|Temprature|Taste|Odor|Fat |Turbidity|Colour| Grade|
+---+----------+-----+----+----+---------+------+------+
|6.6|        35|    1|   0|   1|        0|   254|  high|
|6.6|        36|    0|   1|   0|        1|   253|  high|
|8.5|        70|    1|   1|   1|        1|   246|   low|
|9.5|        34|    1|   1|   0|        1|   255|   low|
|6.6|        37|    0|   0|   0|        0|   255|medium|
+---+----------+-----+----+----+---------+------+------+
only showing top 5 rows

+--------------------+------+
|            features| label|
+--------------------+------+
|[6.6,35.0,1.0,0.0...|  high|
|[6.6,36.0,0.0,1.0...|  high|
|[8.5,70.0,1.0,1.0...|   low|
|[9.5,34.0,1.0,1.0...|   low|
|(7,[0,1,6],[6.6,3...|medium|
+--------------------+------+
only showing top 5 rows

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|(