# Iteration 4:  Marine Life Distribution

Author: Chinchien Lin
UPI: clin864
Email: clin864@aucklanduni.ac.nz
Student ID: 938149604
GitHub: https://github.com/LIN810116/INFOSYS-722---Iteration-4

### Import

In [None]:
# Section must be included at the beginning of each new notebook. Remember to change the app name. 
# If you're using VirtualBox, change the below to '/home/user/spark-2.1.1-bin-hadoop2.7'
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()
import pandas
from pyspark.sql.functions import lit
from pyspark.sql.functions import abs
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import regexp_replace

#Import dataset
    #inferSchema: for CSV
df = spark.read.csv('marineLife(I4).csv',header=True,inferSchema=True)

# Data understanding

In [None]:
print("Number of Row:", df.count())
df.show()

In [None]:
# data structure 
df.printSchema()

In [None]:
# get general statistics 
df.describe().show()

In [None]:
# get general statistics of longitude features
df.select('WMostLong', 'EMostLong').describe().show()

In [None]:
# check unique value
print("Number of unique value:")
print("Class:", df.select('Class').distinct().count())
print("Order:", df.select('Order').distinct().count())
print("Order:", df.select('Family').distinct().count())
print("Genus:", df.select('Genus').distinct().count())
print("Species:", df.select('Species').distinct().count())
print("Scientific names:", df.select('Scientific names').distinct().count())

In [None]:
# for example: Class
df.groupBy('Class').count().show()

In [None]:
# Check missing value
pd = df.toPandas()
pd.isnull().sum()

In [None]:
# Check "NA"
NA_W = df.filter("WMostLong == 'NA'").count()
NA_E = df.filter("EMostLong == 'NA'").count()
print("Number of NA values in WMostLong: ", NA_W)
print("Number of NA values in EMostLong: ", NA_E)

In [None]:
# Check "NA": East and Eest
NA_WE = df.filter("WMostLong == 'NA' and EMostLong == 'NA'").count()
print("Number of NA values in WMostLong and EMostLong: ", NA_WE)

In [None]:
# Check outliers
outliersW = df.filter("WMostLong > 180 or WMostLong < -180").count()
outliersE = df.filter("EMostLong > 180 or EMostLong < -180").count()
print("Outliers of WMostLong:", outliersW)
print("Outliers of EMostLong:", outliersE)

# Data Preparation

In [None]:
# select useful parameters
df_pre = df.select('Class', 'Order', 'Family', 'WMostLong', 'EMostLong')
df_pre.show(10)

In [None]:
# remove the rows which has nulls in WMostLong and EMostLong
df_pre = df_pre.filter("WMostLong != '' and WMostLong != 'NA' and EMostLong != '' and EMostLong != 'NA'")
#filling missing values
df_pre = df_pre.na.fill('Unknown', subset=['Class', 'Order', 'Family'])

print("Number of Row:", df_pre.count())
# Check missing value
pd_pre = df_pre.toPandas()
print("Check null values:")
display(pd_pre.isnull().sum())

In [None]:
#filling missing values
df_pre = df_pre.na.fill('Unknown', subset=['Class', 'Order', 'Family'])
# Check missing value
pd_pre = df_pre.toPandas()
pd_pre.isnull().sum()

In [None]:
# remove outliers
df_pre = df_pre.filter("WMostLong < 180 and WMostLong > -180")
df_pre = df_pre.filter("EMostLong < 180 and EMostLong > -180")

### Create new columns

In [None]:
# converting the data type for EMostLong and WMostLong
df_pre = df_pre.withColumn("EMostLong", df_pre.EMostLong.cast(DoubleType()))
df_pre = df_pre.withColumn("WMostLong", df_pre.WMostLong.cast(DoubleType()))
# Create new columns
# Coverage
df_pre = df_pre.withColumn("Coverage", abs(df_pre.EMostLong - df_pre.WMostLong))
# Level
df_pre = df_pre.withColumn("Level", lit(df_pre.Coverage / 30))
df_pre = df_pre.withColumn("Level", df_pre.Level.cast(IntegerType()))
#isWide
df_pre = df_pre.withColumn("isWide", lit(df_pre.Coverage / 180))
df_pre = df_pre.withColumn("isWide", df_pre.isWide.cast(IntegerType()))

print("Number of Row:", df_pre.count())
df_pre.show()

In [None]:
df_pre.printSchema()

# Data transformation

In [None]:
df_pre = df_pre.select('Class', 'Order', 'Family', 'Level', 'isWide')
df_pre.show()

In [None]:
df_pre.describe().show()
df_pre.groupBy('isWide').count().show()

### Subset 1: Level

In [None]:
df_Level = df_pre.select('Class', 'Order', 'Family', 'Level')
df_Level.show(10)

### Subset 2: isWide

In [None]:
df_isWide = df_pre.select('Class', 'Order', 'Family', 'isWide')
df_isWide.show(10)

### Test design

In [None]:
trainData_Level,testData_Level = df_Level.randomSplit([0.7,0.3])
trainData_isWide,testData_isWide = df_isWide.randomSplit([0.7,0.3])

In [None]:
# check out the count.
trainData_Level.describe().show()
testData_Level.describe().show()

### Mining

### Multiclass:   Level

In [None]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer)
# First create a string indexer which converts every string into a number, such as male = 0 and female = 1.
# A number will be assigned to every category in the column.
Class_indexer = StringIndexer(inputCol='Class',outputCol='ClassIndex')
Order_indexer = StringIndexer(inputCol='Order',outputCol='OrderIndex')
Family_indexer = StringIndexer(inputCol='Family',outputCol='FamilyIndex')

Level_indexer = StringIndexer(inputCol='Level',outputCol='label')

# Now we can one hot encode these numbers. This converts the various outputs into a single vector.
# Multiple columns are collapsed into one. 
# This makes it easier to process when you have multiple classes.
Class_encoder = OneHotEncoder(inputCol='ClassIndex',outputCol='ClassVec')
Order_encoder = OneHotEncoder(inputCol='OrderIndex',outputCol='OrderVec')
Family_encoder = OneHotEncoder(inputCol='FamilyIndex',outputCol='FamilyVec')

# And finally, using vector assembler to turn all of these columns into one column (named features).
assembler = VectorAssembler(inputCols=['ClassVec','OrderVec','FamilyVec'], outputCol="features")

In [None]:
from pyspark.ml import Pipeline
# Then go through our steps. It's essentially sequential to the above.
pipeline_Level = Pipeline(stages=[Class_indexer, Order_indexer, Family_indexer, Level_indexer, Class_encoder, Order_encoder, Family_encoder, assembler])

# Now that we've got a number of steps, let's apply it to the DataFrame.
pipeline_model = pipeline_Level.fit(df_Level)

# Incorporate results into a new DataFrame.
#pipe_df_Level = pipeline_model.transform(df_Level)
pipe_df = pipeline_model.transform(df_Level)

# Remove all variables other than features and label. 
pipe_df = pipe_df.select('label', 'features')

In [None]:
from pyspark.ml import Pipeline
# Then go through our steps. It's essentially sequential to the above.
pipeline_Level = Pipeline(stages=[Class_indexer, Order_indexer, Family_indexer, Level_indexer, Class_encoder, Order_encoder, Family_encoder, assembler])

# Now that we've got a number of steps, let's apply it to the DataFrame.
pipeline_model = pipeline_Level.fit(df_Level)

# Incorporate results into a new DataFrame.
#pipe_df_Level = pipeline_model.transform(df_Level)
pipe_df = pipeline_model.transform(df_Level)

# Remove all variables other than features and label. 
pipe_df = pipe_df.select('label', 'features')

### RandomForestClassifier

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(pipe_df)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(pipe_df)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = pipe_df.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

### Decision tree

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(pipe_df)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(pipe_df)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = pipe_df.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

treeModel = model.stages[2]
# summary only
print(treeModel)

### LogisticRegression

In [None]:

from pyspark.ml.classification import LogisticRegression

(training, testData) = pipe_df.randomSplit([0.7, 0.3])

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

# Fit the model
mlrModel = mlr.fit(training)

# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))

result = mlrModel.transform(testData)

### Two-class: Wide

In [None]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer)

# First create a string indexer which converts every string into a number, such as male = 0 and female = 1.
# A number will be assigned to every category in the column.
Class_indexer = StringIndexer(inputCol='Class',outputCol='ClassIndex')
Order_indexer = StringIndexer(inputCol='Order',outputCol='OrderIndex')
Family_indexer = StringIndexer(inputCol='Family',outputCol='FamilyIndex')

#Level_indexer = StringIndexer(inputCol='Level',outputCol='label')
isWide_indexer = StringIndexer(inputCol='isWide',outputCol='label')

# Now we can one hot encode these numbers. This converts the various outputs into a single vector.
# Multiple columns are collapsed into one. 
# This makes it easier to process when you have multiple classes.
Class_encoder = OneHotEncoder(inputCol='ClassIndex',outputCol='ClassVec')
Order_encoder = OneHotEncoder(inputCol='OrderIndex',outputCol='OrderVec')
Family_encoder = OneHotEncoder(inputCol='FamilyIndex',outputCol='FamilyVec')

# And finally, using vector assembler to turn all of these columns into one column (named features).
assembler = VectorAssembler(inputCols=['ClassVec','OrderVec','FamilyVec'], outputCol="features")

In [None]:
from pyspark.ml import Pipeline

# Then go through our steps. It's essentially sequential to the above.
#pipeline_Level = Pipeline(stages=[Class_indexer, Order_indexer, Family_indexer, Level_indexer, Class_encoder, Order_encoder, Family_encoder, assembler])
pipeline = Pipeline(stages=[Class_indexer, Order_indexer, Family_indexer, isWide_indexer, Class_encoder, Order_encoder, Family_encoder, assembler])


# Now that we've got a number of steps, let's apply it to the DataFrame.
#pipeline_model = pipeline_Level.fit(df_Level)
pipeline_model = pipeline.fit(df_isWide)


# Incorporate results into a new DataFrame.
#pipe_df_Level = pipeline_model.transform(df_Level)
pipe_df = pipeline_model.transform(df_isWide)


# Remove all variables other than features and label. 
#pipe_df_Level = pipe_df_Level.select('label', 'features')
pipe_df = pipe_df.select('label', 'features')

### LogisticRegression

In [None]:
from pyspark.ml.classification import LogisticRegression

# Split our data. Note that the new DataFrame is being used.
train_data, test_data = pipe_df.randomSplit([0.7,0.3])
print("Training Dataset Count: " + str(train_data.count()))
print("Test Dataset Count: " + str(test_data.count()))

# Instantiate the model.
lr_model = LogisticRegression(featuresCol='features',labelCol='label')

# Fit the model.
lr_model = lr_model.fit(train_data)

# And evaluate the model using the test data.
results = lr_model.transform(test_data)


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Visualising the coefficients. Sort from lowest to highest.
beta = np.sort(lr_model.coefficients)

# Plot the data.
plt.plot(beta)

# Add a label to the data.
plt.ylabel('Beta Coefficients')

# Show the graph. 
plt.show()

In [None]:
# Let's get a summary of the data.
training_summary = lr_model.summary

# Convert the DataFrame to a Pandas DataFrame.
ROC = training_summary.roc.toPandas()

# Plot the true positive and false positive rates.
plt.plot(ROC['FPR'],ROC['TPR'])

# Define the labels.
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.title('ROC Curve')
plt.show()

# Print the AUC statistic. 
print('Area Under the Curve: ' + str(training_summary.areaUnderROC))

In [None]:
# Convert DataFrame to Pandas DataFrame.
pr = training_summary.pr.toPandas()

# Plot model recall and precision.
plt.plot(pr['recall'],pr['precision'])

# Define the labels and show the graph. 
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

### Random Forest Classifier

In [None]:
from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(pipe_df)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(pipe_df)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = pipe_df.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

### GBTClassifier

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(pipe_df)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(pipe_df)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = pipe_df.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only

### Multiple iteration

#### "Class" VS "isWide"

In [None]:
df_isWide2 = df_pre.select('Class', 'Order', 'Family', 'isWide')

In [None]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer)
# First create a string indexer which converts every string into a number, such as male = 0 and female = 1.
# A number will be assigned to every category in the column.
Class_indexer = StringIndexer(inputCol='Class',outputCol='ClassIndex')

isWide_indexer = StringIndexer(inputCol='isWide',outputCol='label')

# Now we can one hot encode these numbers. This converts the various outputs into a single vector.
# Multiple columns are collapsed into one. 
# This makes it easier to process when you have multiple classes.
Class_encoder = OneHotEncoder(inputCol='ClassIndex',outputCol='ClassVec')

# And finally, using vector assembler to turn all of these columns into one column (named features).
assembler = VectorAssembler(inputCols=['ClassVec'], outputCol="features")

In [None]:
from pyspark.ml import Pipeline
# Then go through our steps. It's essentially sequential to the above.
pipeline = Pipeline(stages=[Class_indexer, isWide_indexer, Class_encoder, assembler])

# Now that we've got a number of steps, let's apply it to the DataFrame.
pipeline_model = pipeline.fit(df_isWide2)

# Incorporate results into a new DataFrame.
#pipe_df_Level = pipeline_model.transform(df_Level)
pipe_df = pipeline_model.transform(df_isWide2)

# Remove all variables other than features and label. 
pipe_df = pipe_df.select('label', 'features')

In [None]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(pipe_df)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(pipe_df)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = pipe_df.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only