<a href="https://colab.research.google.com/github/Monisha2604/Final_Year_Project/blob/master/Classification_of_Random_Forest_Pyspark_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark.sql import SparkSession

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorIndexer

In [None]:
conf = SparkConf().setAppName('RandomForest').setMaster('local[*]')
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)

In [None]:
data=sql_context.read.csv("/content/drive/MyDrive/classification.csv",inferSchema=True,header=True)


In [None]:
data=data.withColumnRenamed("class","label")

In [None]:
data = data.withColumn("label",data.label.cast('double'))

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.


In [None]:
data.count()

712281

In [None]:
data.show()


+---+------------------+------------------+------------------+------------------+------------------+------------------+------------------+---+------+-----+------------------+-----+--------------------+-----+-----+-------+
|_c0|                ra|               dec|                 u|                 g|                 r|                 i|                 z|run|camcol|field|         specobjid|class|            redshift|plate|  mjd|fiberid|
+---+------------------+------------------+------------------+------------------+------------------+------------------+------------------+---+------+-----+------------------+-----+--------------------+-----+-----+-------+
|  0|  241.342916969633|1.1743006658508799|          19.51355|          19.20316|19.455779999999997|          19.71678|          19.94081|745|     6|  552|387465727008860160|    2|-2.75988499999999...|  344|51693|    568|
|  1|  241.453356927647|  1.10397579028334|          19.27841|          18.15512|          17.91325|          17

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler=VectorAssembler(
    inputCols=[
               'ra',
               'dec',
               'u',
               'g',
               'r', 
              'i', 
               'z', 
               'run', 
               'camcol', 
               'field', 
               'specobjid', 
               'redshift', 
              'plate', 
               'mjd', 
               'fiberid'],
    outputCol="features"
)

In [None]:
data=assembler.transform(data)

In [None]:
output=data.select("features","label")

In [None]:
#featureIndexer=VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

In [None]:
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(output)

In [None]:
(train_df,test_df)=output.randomSplit([0.8,0.2])

In [None]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features",numTrees=10)

In [None]:
from pyspark.ml import Pipeline

In [None]:
pipeline = Pipeline(stages=[rf])

In [None]:
import time

In [None]:
output.show(3),train_df.show(2)

+--------------------+-----+
|            features|class|
+--------------------+-----+
|[241.342916969633...|    2|
|[241.453356927647...|    2|
|[241.766474543154...|    0|
+--------------------+-----+
only showing top 3 rows

+--------------------+-----+
|            features|class|
+--------------------+-----+
|[0.00279526841154...|    2|
|[0.00309191408801...|    0|
+--------------------+-----+
only showing top 2 rows



(None, None)

In [None]:
#train_df=train_df.withColumnRenamed("class","label")

In [None]:
train_df.show(3),
test_df.show(2)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.00279526841154...|  2.0|
|[0.00309191408801...|  0.0|
|[0.00448112711910...|  2.0|
+--------------------+-----+
only showing top 3 rows

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.01344820589838...|  0.0|
|[0.01915507271166...|  0.0|
+--------------------+-----+
only showing top 2 rows



In [None]:
import time
start=time.process_time()
model = pipeline.fit(train_df)
stop=time.process_time()
print("time:",(stop-start)/30," mins")

time: 0.006358269400000023  mins


In [None]:
#test_df=test_df.select("features","redshift")
test_df=test_df.withColumnRenamed("class","label")

In [None]:
predictions = model.transform(test_df)

In [None]:
predictions.show()

+--------------------+-----+-------------------+
|            features|label|         prediction|
+--------------------+-----+-------------------+
|[0.01389003301186...|  0.0|0.05246883758878203|
|[0.01848314781506...|  2.0| 1.9713502345263971|
|[0.01915507271166...|  0.0|0.05246883758878203|
|[0.02308862428230...|  0.0|0.06924576277631547|
|[0.02501491786887...|  2.0| 1.9731640999606292|
|[0.03927184173510...|  1.0| 0.9536174731583718|
|[0.04818390721116...|  2.0| 1.8941832065074338|
|[0.04966387809702...|  0.0|0.18931773977412655|
|[0.05067354869208...|  0.0|0.09463353698435087|
|[0.05440928948416...|  0.0|0.09463353698435087|
|[0.05478375381056...|  1.0| 0.9542273222411559|
|[0.06548326601921...|  2.0| 1.9624852589645456|
|[0.07189364143016...|  2.0| 1.9713502345263971|
|[0.08036979244657...|  0.0|0.05246883758878203|
|[0.09437692644340...|  0.0|0.33311793311216753|
|[0.09534082381657...|  2.0| 1.9816801050689847|
|[0.09985036570731...|  2.0| 1.9713502345263971|
|[0.10017152706370..

In [None]:
for col in data.dtypes:
    
  # printing the column and datatype 
  # of that column
  print(col[0],",",col[1])

_c0 , int
ra , double
dec , double
u , double
g , double
r , double
i , double
z , double
run , int
camcol , int
field , int
specobjid , decimal(20,0)
class , int
redshift , double
plate , int
mjd , int
fiberid , int
features , vector


In [None]:
from pyspark.sql.functions import round, col,lit
pred=predictions.select("*", round(col('prediction'))).show()

+--------------------+-----+-------------------+--------------------+
|            features|label|         prediction|round(prediction, 0)|
+--------------------+-----+-------------------+--------------------+
|[0.01344820589838...|  0.0|0.08817806196675945|                 0.0|
|[0.01915507271166...|  0.0|0.10115858592227456|                 0.0|
|[0.02501491786887...|  2.0|  1.992356562457423|                 2.0|
|[0.03258568849634...|  0.0| 0.0474916057656629|                 0.0|
|[0.03603019516873...|  2.0|  1.956482711900437|                 2.0|
|[0.04818390721116...|  2.0| 1.9112773475290656|                 2.0|
|[0.06660064528279...|  2.0| 1.9912493325316192|                 2.0|
|[0.06874226460303...|  2.0| 1.9785840229820253|                 2.0|
|[0.07242495804791...|  1.0| 0.8670958148858787|                 1.0|
|[0.09985036570731...|  2.0| 1.9693981362404518|                 2.0|
|[0.10261409071642...|  2.0| 1.8345331312776954|                 2.0|
|[0.11223983612956..

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()

In [None]:
df=spark.createDataFrame(pred)

TypeError: ignored

In [None]:
import pyspark.sql.functions as func

In [None]:
df2 = predictions.withColumn("label",predictions.select("*", round(predictions['prediction'])))

AssertionError: ignored

In [None]:
df2.show()

+--------------------+-----+-------------------+
|            features|label|         prediction|
+--------------------+-----+-------------------+
|[0.01344820589838...|  0.0|0.08817806196675945|
|[0.01915507271166...|  0.0|0.10115858592227456|
|[0.02501491786887...|  2.0|  1.992356562457423|
|[0.03258568849634...|  0.0| 0.0474916057656629|
|[0.03603019516873...|  2.0|  1.956482711900437|
|[0.04818390721116...|  2.0| 1.9112773475290656|
|[0.06660064528279...|  2.0| 1.9912493325316192|
|[0.06874226460303...|  2.0| 1.9785840229820253|
|[0.07242495804791...|  1.0| 0.8670958148858787|
|[0.09985036570731...|  2.0| 1.9693981362404518|
|[0.10261409071642...|  2.0| 1.8345331312776954|
|[0.11223983612956...|  2.0| 1.9912493325316192|
|[0.11446859748440...|  0.0|0.08672828166479887|
|[0.12894869948218...|  0.0|0.08672828166479887|
|[0.13071255308467...|  0.0|0.08817806196675945|
|[0.14014638289393...|  2.0| 1.9693981362404518|
|[0.15220081575182...|  2.0| 1.9912493325316192|
|[0.16382725496430..

In [None]:


# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (accuracy))



+-------------------+-----+--------------------+
|         prediction|label|            features|
+-------------------+-----+--------------------+
|0.05246883758878203|  0.0|[0.01389003301186...|
| 1.9713502345263971|  2.0|[0.01848314781506...|
|0.05246883758878203|  0.0|[0.01915507271166...|
|0.06924576277631547|  0.0|[0.02308862428230...|
| 1.9731640999606292|  2.0|[0.02501491786887...|
+-------------------+-----+--------------------+
only showing top 5 rows

Test Error = 0


In [None]:
pandas_df = predictions.select("*").toPandas()

In [None]:
pandas_df['prediction']=pandas_df['prediction'].round(0)

In [None]:
pandas_df.head()

Unnamed: 0,features,label,prediction
0,"[0.0134482058983849, -1.11302619848276, 18.696...",0.0,0.0
1,"[0.0191550727116692, -10.9763042314022, 19.035...",0.0,0.0
2,"[0.025014917868873, 24.9795121086878, 18.20867...",2.0,2.0
3,"[0.03258568849634, -0.0405799798589323, 16.882...",0.0,0.0
4,"[0.0360301951687347, 15.2210707527229, 18.2129...",2.0,2.0


In [None]:
df=spark.createDataFrame(pandas_df)

In [None]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(df)
print("Accuracy = %g" % (accuracy))


Accuracy = 0.976363


In [None]:
 from sklearn.metrics import precision_score,recall_score

In [None]:
predictions.show(5)

+--------------------+-----+-------------------+
|            features|label|         prediction|
+--------------------+-----+-------------------+
|[0.00309191408801...|    0|0.04027367679185538|
|[0.01606872091895...|    2| 1.9460147633740337|
|[0.01975416749075...|    0|0.04027367679185538|
|[0.02308862428230...|    0|0.04027367679185538|
|[0.03146774942922...|    2| 1.8080214551390046|
+--------------------+-----+-------------------+
only showing top 5 rows

