In [1]:
# Import to find Spark on PC
import findspark
findspark.init()
findspark.find()

# Importing pyspark and Spark Session
import pyspark
from pyspark.sql import SparkSession

# Importing time module
import os, psutil
import datetime
from datetime import datetime

# Importing .csv module to write to file
import csv
from csv import writer
# Lists for variable to be written to the data files
time_list = []
memory_list = []
features_list = []

In [2]:
for i in range(1):
    # Getting the process id
    process = psutil.Process(os.getpid())
    
    # Creating a pyspark instance
    spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()

    # Start time variable
    time_start = datetime.now()

    ########### Starting transformation code ###########
    # Loading data file into dataframe
    df = spark.read.csv("ecg.csv", header=False, inferSchema=True)
    # Additional data set (~12 GB)
    #df = spark.read.csv("dataset_aug_500_full.csv/dataset_aug_500_full.csv", header=False, inferSchema=True)
    # Converting dataframe to rdd so we can use map()
    rdd = df.rdd
    # Use map() function to transform
    rdd2 = rdd.map(lambda x: (x[0:-2], float(x[-1])))
    ########### End transformation code ###########

    # Calculate time
    end_time = datetime.now() - time_start
    #print("Time of the program was:", end_time.total_seconds(), "seconds.")
    # Calculate memory of the process used
    memory_used = process.memory_info().rss / 1000000
    #print("Data used in MB:", memory_used, "MB")

    ########### Writing to data files ###########
    # Opening/creating Time and Memoery data files
    time_file = open('Masters_Project_Time.csv', 'a')
    memory_file = open('Masters_Project_Memory.csv', 'a')

    # Creating csv readers
    time_reader = csv.reader(time_file)
    memory_reader = csv.reader(memory_file)
    #next_time_row = time_reader.next()

    # Creating .csv writer object for time_file and memory_file
    time_writer_obj = writer(time_file)
    memory_writer_obj = writer(memory_file)

    # Adding data to the list for insertion to .csv file
    time_list.append(end_time.total_seconds())
    memory_list.append(memory_used)

    # Sending lists objects to the object writer
    #time_reader.append(time_list)

    time_writer_obj.writerow(time_list)
    memory_writer_obj.writerow(memory_list)

    # Closing files
    time_file.close()
    memory_file.close()

    #Resetting list for new run
    time_list = []
    memory_list = []
    
    spark.stop

# Closing the files
time_file.close()
memory_file.close()

In [3]:
#df22 = rdd.toDF()
#df22.show(1)

In [3]:
# Extra code to add values to the columns and print
# Not neccessary for research
# Will not be counted for time or process
#df3 = rdd2.toDF()
#df3 = df3.withColumnRenamed("_1","Values")
#df3 = df3.withColumnRenamed("_2","Designation")
#print(df3.show(20))

In [4]:
#print((f"{rdd.count():,}"))

In [5]:
#print((f"{4998 * 141:,}"))

In [3]:
######################### TEST CODE #########################

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

from pyspark.sql.types import StructType, StructField, StringType, DateType, DoubleType, FloatType
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

for field in df.schema.fields:
    features_list.append(field.name)

assembler = VectorAssembler(inputCols = features_list, outputCol='features')
dfTest = assembler.transform(df)

In [6]:
#dfTest = dfTest.select('features','_c140')
dfTest = dfTest.select('_c140','features')
dfTest = dfTest.withColumnRenamed("_c140","label")

In [7]:
dfTest.show(20)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    1|[-0.11252183,-2.8...|
|    1|[-1.1008778,-3.99...|
|    1|[-0.56708802,-2.5...|
|    1|[0.49047253,-1.91...|
|    1|[0.80023202,-0.87...|
|    1|[-1.5076736,-3.57...|
|    1|[-0.297161,-2.766...|
|    1|[0.44676853,-1.50...|
|    1|[0.087630577,-1.7...|
|    1|[-0.83228111,-1.7...|
|    1|[0.084430128,-3.1...|
|    1|[-0.007819138,-2....|
|    1|[-1.0743015,-3.25...|
|    1|[4.0581274,2.0878...|
|    1|[-0.76160326,-2.9...|
|    1|[-0.18649962,-2.6...|
|    1|[0.80393944,-1.10...|
|    1|[-0.92021269,-2.4...|
|    1|[2.7446026,-0.101...|
|    1|[2.4028692,2.0367...|
+-----+--------------------+
only showing top 20 rows



In [8]:
trainDF,testDF=dfTest.randomSplit([0.75,0.25],seed=1)
#Fit with linear regression model:
from pyspark.ml.regression import LinearRegression
model = LinearRegression(featuresCol="features", labelCol="label")
model=model.fit(trainDF)
trainingSummary = model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 0.000889
r2: 0.999997


In [9]:
pred=model.transform(testDF)
pred.select("prediction","label","features").show(5)

+--------------------+-----+--------------------+
|          prediction|label|            features|
+--------------------+-----+--------------------+
|-0.00259846069431...|    0|[-4.3750582,-5.50...|
|1.735302921425459E-4|    0|[-2.878857,-3.339...|
|8.759735315672149E-4|    0|[-2.8769322,-3.65...|
|0.002250124153678...|    0|[-2.8596916,-2.87...|
|-2.24689530915150...|    0|[-2.7627317,-3.55...|
+--------------------+-----+--------------------+
only showing top 5 rows



In [10]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label",metricName="r2")
print("R2 on test: ", evaluator.evaluate(pred))

R2 on test:  0.9999954100328904


In [None]:
#SMOTE - CORRECTING FOR UNDER SAMPLING
#MAKE SURE THERE IS EVEN DISTROBUTION IN CLASSIFICATIONS (1/0)

#MAKE SURE SPARK INSTALLATION USES GPU

#LOOK UP CLASSIFICATION MODELS
#DECISION TREE, RANDOM FORREST, K-MEANS (HYPERPARRAMATER TUNING), NURAL NETWORK

In [11]:
############################### USING DATASET ###############################
######################## BINOMIAL LOGISTIC REGRESSION #######################
from pyspark.ml.classification import LogisticRegression

# Load training data
#training = spark.read.format("libsvm").load("sample_libsvm_data.txt")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(dfTest)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

# Fit the model
mlrModel = mlr.fit(dfTest)

# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))

Coefficients: (141,[3,132,133,134,135,136,140],[-0.012737507243149178,0.014415660657596847,0.015261412908894597,0.013788004919105316,0.008322931590231902,0.004989129347868306,1.7054845093269158])
Intercept: -0.6344093786656071
Multinomial coefficients: 2 X 141 CSRMatrix
(0,3) 0.007
(0,4) 0.0023
(0,116) 0.0005
(0,117) 0.003
(0,132) -0.0118
(0,133) -0.0124
(0,134) -0.0112
(0,135) -0.0048
(0,136) -0.0032
(0,140) -0.972
(1,3) -0.007
(1,4) -0.0023
(1,116) -0.0005
(1,117) -0.003
(1,132) 0.0118
(1,133) 0.0124
..
..
Multinomial intercepts: [0.38459356725755256,-0.38459356725755256]


In [12]:
############################### USING DATASET ###############################
######################## BINOMIAL LOGISTIC REGRESSION #######################
from pyspark.ml.classification import LogisticRegression

# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']
lr.setThreshold(bestThreshold)

objectiveHistory:
0.6789566322072351
0.6682085519973944
0.6304624784271434
0.62607547311837
0.6112940244363847
0.600482248875369
0.5948687711137275
0.5858237059885798
0.5793809899404818
0.5767230001273653
0.574016632577398
+---+--------------------+
|FPR|                 TPR|
+---+--------------------+
|0.0|                 0.0|
|0.0|0.001370332305584...|
|0.0|0.002740664611168...|
|0.0|0.004110996916752...|
|0.0|0.005481329222336417|
|0.0|0.006851661527920521|
|0.0|0.008221993833504625|
|0.0|0.009592326139088728|
|0.0|0.010962658444672833|
|0.0|0.012332990750256937|
|0.0|0.013703323055841042|
|0.0|0.015073655361425145|
|0.0| 0.01644398766700925|
|0.0|0.017814319972593355|
|0.0|0.019184652278177457|
|0.0|0.020554984583761562|
|0.0|0.021925316889345667|
|0.0|0.023295649194929772|
|0.0|0.024665981500513873|
|0.0| 0.02603631380609798|
+---+--------------------+
only showing top 20 rows

areaUnderROC: 0.9999997528260632


LogisticRegression_bbc1c646324b

In [16]:
############################### USING DATASET ###############################
######################## MULTINOMIAL LOGISTIC REGRESSION #######################
from pyspark.ml.classification import LogisticRegression

# Load training data
#training = spark \
#    .read \
#    .format("libsvm") \
#    .load("data/mllib/sample_multiclass_classification_data.txt")

lr2 = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel2 = lr2.fit(dfTest)

# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lrModel2.coefficientMatrix))
print("Intercept: " + str(lrModel2.interceptVector))

trainingSummary = lrModel2.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# for multiclass, we can inspect metrics on a per-label basis
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

Coefficients: 
1 X 141 CSRMatrix
(0,3) -0.0127
(0,132) 0.0144
(0,133) 0.0153
(0,134) 0.0138
(0,135) 0.0083
(0,136) 0.005
(0,140) 1.7055
Intercept: [-0.6344093786656071]
objectiveHistory:
0.6789566322072351
0.6682085519973944
0.6304624784271434
0.62607547311837
0.6112940244363847
0.600482248875369
0.5948687711137275
0.5858237059885798
0.5793809899404818
0.5767230001273653
0.574016632577398
False positive rate by label:
label 0: 0.0
label 1: 0.0
True positive rate by label:
label 0: 1.0
label 1: 1.0
Precision by label:
label 0: 1.0
label 1: 1.0
Recall by label:
label 0: 1.0
label 1: 1.0
F-measure by label:
label 0: 1.0
label 1: 1.0
Accuracy: 1.0
FPR: 0.0
TPR: 1.0
F-measure: 1.0
Precision: 1.0
Recall: 1.0


In [20]:
############################### USING DATASET ###############################
########################## DECISION TREE CLASSIFIER #########################
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
#data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(dfTest)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(dfTest)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = dfTest.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
print("Accuracy:", accuracy)
treeModel = model.stages[2]
# summary only
print(treeModel)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|[-4.4863068,-5.44...|
|       1.0|         1.0|[-4.3750582,-5.50...|
|       1.0|         1.0|[-3.3947195,-2.36...|
|       1.0|         1.0|[-2.878857,-3.339...|
|       1.0|         1.0|[-2.7604703,-3.28...|
+----------+------------+--------------------+
only showing top 5 rows

Test Error = 0 
Accuracy: 1.0
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5b06329b2be8, depth=1, numNodes=3, numClasses=2, numFeatures=141


In [18]:
############################### USING DATASET ###############################
########################## RANDOM FOREST CLASSIFIER #########################
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
#data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(dfTest)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(dfTest)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = dfTest.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Accuracy:", accuracy)

rfModel = model.stages[2]
print(rfModel)  # summary only

+--------------+-----+--------------------+
|predictedLabel|label|            features|
+--------------+-----+--------------------+
|             0|    0|[-5.1480379,-5.80...|
|             0|    0|[-3.4477298,-3.74...|
|             0|    0|[-3.3947195,-2.36...|
|             0|    0|[-3.0182078,-2.99...|
|             0|    0|[-2.878857,-3.339...|
+--------------+-----+--------------------+
only showing top 5 rows

Test Error = 0.00877785
Accuracy: 0.9912221471978393
RandomForestClassificationModel: uid=RandomForestClassifier_eb75e76f873e, numTrees=10, numClasses=2, numFeatures=141


In [19]:
############################### USING DATASET ###############################
###################### GRADIENT-BOOSTED TREE CLASSIFIER #####################
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
#data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(dfTest)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(dfTest)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = dfTest.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Accuracy:", accuracy)
gbtModel = model.stages[2]
print(gbtModel)  # summary only

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|[-4.7147993,-4.50...|
|       1.0|         1.0|[-4.4863068,-5.44...|
|       1.0|         1.0|[-3.3947195,-2.36...|
|       1.0|         1.0|[-3.3918306,-3.53...|
|       1.0|         1.0|[-3.0182078,-2.99...|
+----------+------------+--------------------+
only showing top 5 rows

Test Error = 0
Accuracy: 1.0
GBTClassificationModel: uid = GBTClassifier_ddb9ffa4db79, numTrees=10, numClasses=2, numFeatures=141


In [16]:
#PRINCIPLE COMPONENT ANALASIS. (PCA)

from pyspark.mllib.feature import PCA as PCAmllib
from pyspark.ml.feature import PCA as PCAml
from pyspark.ml.linalg import Vectors

#rddTest = dfTest.rdd
#model = PCAmllib(2).fit(rddTest)
#transformed = model.transform(rddTest)

pca = PCAml(k=2, inputCol="features", outputCol="pca")
model = pca.fit(dfTest)
transformed = model.transform(dfTest)

In [17]:
transformed.show(20)

+-----+--------------------+--------------------+
|label|            features|                 pca|
+-----+--------------------+--------------------+
|    1|[-0.11252183,-2.8...|[5.96480266558478...|
|    1|[-1.1008778,-3.99...|[5.99255966425208...|
|    1|[-0.56708802,-2.5...|[5.29932423996655...|
|    1|[0.49047253,-1.91...|[6.48089918483401...|
|    1|[0.80023202,-0.87...|[7.46758441196614...|
|    1|[-1.5076736,-3.57...|[7.6297788551766,...|
|    1|[-0.297161,-2.766...|[6.91009144926080...|
|    1|[0.44676853,-1.50...|[7.26779864417878...|
|    1|[0.087630577,-1.7...|[6.95843468514935...|
|    1|[-0.83228111,-1.7...|[8.56387498185476...|
|    1|[0.084430128,-3.1...|[7.63369989307515...|
|    1|[-0.007819138,-2....|[8.15855206910915...|
|    1|[-1.0743015,-3.25...|[6.82984947353546...|
|    1|[4.0581274,2.0878...|[2.39496239344078...|
|    1|[-0.76160326,-2.9...|[7.19614953841633...|
|    1|[-0.18649962,-2.6...|[7.63097653909742...|
|    1|[0.80393944,-1.10...|[6.13455146397497...|


In [None]:
#UNPACKING PCA DATA?