# Start a SparkSession

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

# Read Data

In [3]:
df = spark.read.csv('./Desktop/SMSSpamCollection',
                    sep='\t', inferSchema=True, header=False)

In [4]:
df.show(5,truncate=False)

+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0 |_c1                                                                                                                                                        |
+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|ham |Ok lar... Joking wif u oni...                                                                                                                              |
|spam|Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
|ham |U dun say so ear

# Rename Columns

In [5]:
df = df.withColumnRenamed('_c0', 'status').withColumnRenamed('_c1', 'message')
df.show(5, truncate=False)

+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|status|message                                                                                                                                                    |
+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham   |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|ham   |Ok lar... Joking wif u oni...                                                                                                                              |
|spam  |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
|ham   |U 

# Change the status column to numeric

In [6]:
df.createOrReplaceTempView('temp')
df = spark.sql(
    'select case status when "ham" then 1.0  else 0 end as label, message from temp')
df.show(5, truncate=False)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|message                                                                                                                                                    |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|1.0  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|1.0  |Ok lar... Joking wif u oni...                                                                                                                              |
|0.0  |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
|1.0  |U dun say

# Tokenize the message

In [7]:
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="message", outputCol="words")
wordsData = tokenizer.transform(df)
wordsData.show()

+-----+--------------------+--------------------+
|label|             message|               words|
+-----+--------------------+--------------------+
|  1.0|Go until jurong p...|[go, until, juron...|
|  1.0|Ok lar... Joking ...|[ok, lar..., joki...|
|  0.0|Free entry in 2 a...|[free, entry, in,...|
|  1.0|U dun say so earl...|[u, dun, say, so,...|
|  1.0|Nah I don't think...|[nah, i, don't, t...|
|  0.0|FreeMsg Hey there...|[freemsg, hey, th...|
|  1.0|Even my brother i...|[even, my, brothe...|
|  1.0|As per your reque...|[as, per, your, r...|
|  0.0|WINNER!! As a val...|[winner!!, as, a,...|
|  0.0|Had your mobile 1...|[had, your, mobil...|
|  1.0|I'm gonna be home...|[i'm, gonna, be, ...|
|  0.0|SIX chances to wi...|[six, chances, to...|
|  0.0|URGENT! You have ...|[urgent!, you, ha...|
|  1.0|I've been searchi...|[i've, been, sear...|
|  1.0|I HAVE A DATE ON ...|[i, have, a, date...|
|  0.0|XXXMobileMovieClu...|[xxxmobilemoviecl...|
|  1.0|Oh k...i'm watchi...|[oh, k...i'm, wat...|


# Apply CountVectorizer

In [9]:
wordsData.schema

StructType(List(StructField(label,DecimalType(11,1),true),StructField(message,StringType,true),StructField(words,ArrayType(StringType,true),true)))

In [None]:
from pyspark.ml.feature import CountVectorizer
count = CountVectorizer(inputCol="words", outputCol="rawFeatures")
model = count.fit(wordsData)
featurizedData = model.transform(wordsData)
featurizedData.show()

In [None]:
featurizedData.schema

# Apply term frequency-inverse document frequency(TF-IDF)

In [None]:
from pyspark.ml.feature import IDF

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
# We want only the label and features columns for our machine learning models
rescaledData.select("label", "features").show()

# Split data into training(80%) and testing(20%)

In [None]:
seed = 0  # set seed for reproducibility
trainDF, testDF = rescaledData.randomSplit([0.8, 0.2], seed)

In [None]:
trainDF.count()

In [None]:
testDF.count()

# Logistic Regression Classifier

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np
lr = LogisticRegression(maxIter=10)

paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, np.linspace(0.3, 0.01, 10)) \
    .addGrid(lr.elasticNetParam, np.linspace(0.3, 0.8, 6)) \
    .build()
crossval_lr = CrossValidator(estimator=lr,
                             estimatorParamMaps=paramGrid_lr,
                             evaluator=BinaryClassificationEvaluator(),
                             numFolds=5)
cvModel_lr = crossval_lr.fit(trainDF)
best_model_lr = cvModel_lr.bestModel.summary
best_model_lr.predictions.columns

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
my_eval_lr = BinaryClassificationEvaluator(
    rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
my_eval_lr.evaluate(best_model_lr.predictions)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
my_mc_lr = MulticlassClassificationEvaluator(
    predictionCol='prediction', labelCol='label', metricName='f1')
my_mc_lr.evaluate(best_model_lr.predictions)

In [None]:
my_mc_lr = MulticlassClassificationEvaluator(
    predictionCol='prediction', labelCol='label', metricName='accuracy')
my_mc_lr.evaluate(best_model_lr.predictions)

In [None]:
train_fit_lr = best_model_lr.predictions.select('label', 'prediction')
train_fit_lr.groupBy('label', 'prediction').count().show()

# Predict using the test data and evaluate the predictions

In [None]:
predictions_lr = cvModel_lr.transform(testDF)

In [None]:
predictions_lr.show(5)

# Show sample predictions

In [None]:
predictions_lr.select('label', 'prediction').show(5)

In [None]:
predictions_lr.groupBy('label', 'prediction').count().show()

In [None]:
my_eval_lr.evaluate(predictions_lr)

# Accuracy with the test data

In [None]:
my_mc_lr = MulticlassClassificationEvaluator(
    predictionCol='prediction', labelCol='label', metricName='accuracy')
my_mc_lr.evaluate(predictions_lr)

# Naive Bayes

In [None]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes()
paramGrid_nb = ParamGridBuilder() \
    .addGrid(nb.smoothing, np.linspace(0.3, 10, 10)) \
    .build()
crossval_nb = CrossValidator(estimator=nb,
                             estimatorParamMaps=paramGrid_nb,
                             evaluator=BinaryClassificationEvaluator(),
                             numFolds=5)
cvModel_nb = crossval_nb.fit(trainDF)

In [None]:
cvModel_nb.avgMetrics

# Make predictions

In [None]:
predictions_nb = cvModel_nb.transform(testDF)

In [None]:
predictions_nb.select('label', 'prediction').show(5)

In [None]:
predictions_nb.groupBy('label', 'prediction').count().show()

# Extreme Gradient Boosting

# Import Python libraries

In [None]:
import xgboost as xgb
import pandas as pd
import numpy as np
import statsmodels.api as sm
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt

# Read the data into a Pandas dataframe

In [None]:
power_plant = pd.read_excel('/home/ian/Downloads/CCPP/Folds5x2_pp.xlsx')

In [None]:
power_plant.head()

In [None]:
# Create training and test datasets
X = power_plant.drop('PE', axis=1)
y = power_plant['PE'].values
y = y.reshape(-1, 1)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42)

# Convert the training and testing sets into DMatrixes

In [None]:
DM_train = xgb.DMatrix(data=X_train,
                       label=y_train)
DM_test = xgb.DMatrix(data=X_test,
                      label=y_test)

# Parameters for grid search

In [None]:
gbm_param_grid = {
    'colsample_bytree': np.linspace(0.5, 0.9, 5),
    'n_estimators': [100, 200],
    'max_depth': [10, 15, 20, 25]
}

In [None]:
gbm = xgb.XGBRegressor()

# perform 5 fold cross-validation using mean square error as a scoring method.

In [None]:
grid_mse = GridSearchCV(estimator=gbm, param_grid=gbm_param_grid,
                        scoring='neg_mean_squared_error', cv=5, verbose=1)

# Fit grid_mse to the data, get best parameters and best score (lowest RMSE)

In [None]:
grid_mse.fit(X_train, y_train)
print("Best parameters found: ", grid_mse.best_params_)
print("Lowest RMSE found: ", np.sqrt(np.abs(grid_mse.best_score_)))

# Predict using the test data

In [None]:
pred = grid_mse.predict(X_test)
print("Root mean square error for test dataset: {}".format(
    np.round(np.sqrt(mean_squared_error(y_test, pred)), 2)))

In [None]:
test = pd.DataFrame({"prediction": pred, "observed": y_test.flatten()})
lowess = sm.nonparametric.lowess
z = lowess(pred.flatten(), y_test.flatten())
test.plot(figsize=[14, 8],
          x="prediction", y="observed", kind="scatter", color='darkred')
plt.title("Extreme Gradient Boosting: Prediction Vs Test Data",
          fontsize=18, color="darkgreen")
plt.xlabel("Predicted Power Output", fontsize=18)
plt.ylabel("Observed Power Output", fontsize=18)
plt.plot(z[:, 0], z[:, 1], color="blue", lw=3)
plt.show()