In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

In [2]:
from PIL import Image, ImageDraw
from pyspark.sql.functions import lit
import pyspark.sql.functions as F
from pyspark.ml.image import ImageSchema
from pyspark.ml.linalg import DenseVector, VectorUDT

In [3]:
df = spark.read.format("image").option("dropInvalid", True).load("AlzheimersDataset/Negative")
df.select("image.origin", "image.width", "image.height", "image.nChannels", "image.mode", "image.data")
df_neg = df.withColumn("label",lit(0))

df2 = spark.read.format("image").option("dropInvalid", True).load("AlzheimersDataset/Positive")
df2.select("image.origin", "image.width", "image.height", "image.nChannels", "image.mode", "image.data")
df_pos = df2.withColumn("label",lit(1))

dataframe = df_neg.unionAll(df_pos)

In [5]:
dataframe.first()

Row(image=Row(origin='file:///C:/Users/baili/Documents/~Spring2023/6430/Project/archive/AlzheimersDataset/Negative/nonDem1872.jpg', height=208, width=176, nChannels=1, mode=0, data=bytearray(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0

In [4]:
ImageSchema.imageFields

img2vec = F.udf(lambda x: DenseVector(ImageSchema.toNDArray(x).flatten()), VectorUDT())

df = dataframe.withColumn('features', img2vec("image"))
df = df.select("features", "label")
df.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,0.0,0.0,0.0,...|    0|
|[0.0,0.0,0.0,0.0,...|    0|
|[0.0,0.0,0.0,0.0,...|    0|
|[0.0,0.0,0.0,0.0,...|    0|
|[0.0,0.0,0.0,0.0,...|    0|
+--------------------+-----+
only showing top 5 rows



In [6]:
df.first()

Row(features=DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 52209)


In [5]:
df.count()

6400

In [6]:
#image_row = 40
#spark_single_img = df_pos.select("image").collect()[image_row]
#(spark_single_img.image.origin, spark_single_img.image.mode, spark_single_img.image.nChannels )

#Image.frombytes(mode="L", data=bytes(spark_single_img.image.data), size=[spark_single_img.image.width,spark_single_img.image.height]).show()

In [7]:
#df.first()

In [8]:
#from pyspark.ml.feature import MinMaxScaler
#from pyspark.ml.linalg import Vectors

#scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
#scalerModel = scaler.fit(df)

# rescale each feature to range [min, max].
#scaledData = scalerModel.transform(df)
#print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
#scaledData.select("label", "scaledFeatures").show(5)

In [9]:
#scaledData = scaledData.select("scaledFeatures", "label")
#scaledData.show(5)

In [10]:
#finalDF = scaledData.withColumnRenamed("scaledFeatures", "features")
#finalDF.show(5)

In [11]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Load training data
(trainingData, testData) = df.randomSplit([0.7, 0.3])

lr = LogisticRegression(maxIter=100, regParam=0.3, elasticNetParam=0.8)

print("fitting...")
lrModel = lr.fit(trainingData)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

fitting...
Coefficients: (36608,[],[])
Intercept: -0.004992068456550528


In [12]:
# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']
lr.setThreshold(bestThreshold)

objectiveHistory:
0.6931440654762148
+---+---+
|FPR|TPR|
+---+---+
|0.0|0.0|
|1.0|1.0|
|1.0|1.0|
+---+---+

areaUnderROC: 0.5


LogisticRegression_40cee23458e0

In [20]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions.
predictions = lrModel.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|    0|[0.0,0.0,0.0,0.0,...|
|       0.0|    0|[0.0,0.0,0.0,0.0,...|
|       0.0|    0|[0.0,0.0,0.0,0.0,...|
|       0.0|    0|[0.0,0.0,0.0,0.0,...|
|       0.0|    0|[0.0,0.0,0.0,0.0,...|
+----------+-----+--------------------+
only showing top 5 rows



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\spark-3.3.1-bin-hadoop3\python\lib\py4j-0.10.9.5-src.zip\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\spark-3.3.1-bin-hadoop3\python\lib\py4j-0.10.9.5-src.zip\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\baili\anaconda3\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 59693)
Traceback (most recent call last):
  File "C:\Users\baili\anaconda3\lib\socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "C:\Users\baili\anaconda3\lib\socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "C:\Users\baili\anaconda3\lib\socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "C:\Users\baili\anaconda3\lib\socketserver.py", line 747, in __init__
    self.handle()
  File "C:\spark-3.3.1-bin-hadoop3\python\pyspark\accumulators.py", line 281, in handle
    poll(accum_updates)
  File "C:\spark-3.3.1-bin-hadoop3\python\pyspark\accumulators.py", line 253, in poll
    if func():
  File "C:\spark-3.3.1-bin-hadoop3\python\pyspark\accumulators.py", line 257, in accum_updates
    num_updates = read_int(se