In [None]:
# You need to upload the files "1.png" and "2.png" that are in the data-and-code folder.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install pyspark
!apt install openjdk-8-jdk-headless -qq
!pip uninstall imageio -y
!pip install imageio==2.18.0

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
openjdk-8-jdk-headless is already the newest version (8u352-ga-1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 5 not upgraded.
Found existing installation: imageio 2.18.0
Uninstalling imageio-2.18.0:
  Successfully uninstalled imageio-2.18.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting imageio==2.18.0
  Using cached imageio-2.18.0-py3-none-any.whl (3.4 MB)
Installing collected packages: imageio
Successfully installed imageio-2.18.0


In [5]:
from imageio import imread, imsave

# Import data as array.
i1 = imread('i1.png', as_gray=True)
i2 = imread('i2.png', as_gray=True)

# Subtract the images.
diff = abs(i2 - i1)


# Save the differential.
imsave('diff.png', diff)



In [6]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

In [7]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [8]:
# Get the data into Spark.
i = spark.read.format('image').load('diff.png')

In [9]:
# We turn the image to vectors.
import pyspark.sql.functions as F
from pyspark.ml.image import ImageSchema
from pyspark.ml.linalg import DenseVector, VectorUDT

ImageSchema.imageFields

img2vec = F.udf(lambda x: DenseVector(ImageSchema.toNDArray(x).flatten()), VectorUDT())

df = i.withColumn('vectors', img2vec("image"))
df.show()

+--------------------+--------------------+
|               image|             vectors|
+--------------------+--------------------+
|{file:///content/...|[7.0,12.0,12.0,5....|
+--------------------+--------------------+



In [10]:
from pyspark.ml.feature import PCA
# We fit PCA.
pca = PCA(k=3, inputCol='vectors', outputCol='pcaFeatures')
model_pca = pca.fit(df)

# Transformation image
df = model_pca.transform(df)

In [None]:
df.show()

+--------------------+--------------------+--------------------+
|               image|             vectors|         pcaFeatures|
+--------------------+--------------------+--------------------+
|{file:///content/...|[7.0,12.0,12.0,5....|[-7.6567582107656...|
+--------------------+--------------------+--------------------+



In [None]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(featuresCol='vectors', k=3)
kmeans_fit = kmeans.fit(df)
output = kmeans_fit.transform(df)

In [None]:
output.show(10,truncate=100)

+----------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------+--------------------------------------------------------+----------+
|                                                                                               image|                                                                                             vectors|                                             pcaFeatures|prediction|
+----------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------+--------------------------------------------------------+----------+
|{file:///content/diff.png, 512, 512, 1, 0, \a\f\f\v\v\b  \r\v\r0@)\r\r\t\a...|[7.0,12.0,12.0,5.0,4.0,11.0,15.0,14.0,11.0,5.0,5.0,8.0,4.0,1.0,1.0,1.0,3.0,3.0,0.0

#### Another attempt: turning the image into a pandas dataframe and then passing it to Spark.
Since it was extremely difficult to process images into Spark using the `spark.read.format('image')` method, the image was turned into a dataframe with rgb channels and then into a spark dataframe.

In [None]:
from PIL import Image

colourImg = Image.open("diff.png")
colourPixels = colourImg.convert("RGB")
colourArray = np.array(colourPixels.getdata()).reshape(colourImg.size + (3,))
indicesArray = np.moveaxis(np.indices(colourImg.size), 0, 2)
allArray = np.dstack((indicesArray, colourArray)).reshape((-1, 5))

df = pd.DataFrame(allArray, columns=["y", "x", "r","g","b"])

In [None]:
sdf=spark.createDataFrame(df) 
sdf.show(5)

+---+---+---+---+---+
|  y|  x|  r|  g|  b|
+---+---+---+---+---+
|  0|  0|  7|  7|  7|
|  0|  1| 12| 12| 12|
|  0|  2| 12| 12| 12|
|  0|  3|  5|  5|  5|
|  0|  4|  4|  4|  4|
+---+---+---+---+---+
only showing top 5 rows



In [None]:
from pyspark.ml.feature import VectorAssembler

f_cols = ['r', 'g', 'b']
VA = VectorAssembler(inputCols=f_cols, outputCol='features')
VA_df = VA.transform(sdf)
VA_df.show(5)

+---+---+---+---+---+----------------+
|  y|  x|  r|  g|  b|        features|
+---+---+---+---+---+----------------+
|  0|  0|  7|  7|  7|   [7.0,7.0,7.0]|
|  0|  1| 12| 12| 12|[12.0,12.0,12.0]|
|  0|  2| 12| 12| 12|[12.0,12.0,12.0]|
|  0|  3|  5|  5|  5|   [5.0,5.0,5.0]|
|  0|  4|  4|  4|  4|   [4.0,4.0,4.0]|
+---+---+---+---+---+----------------+
only showing top 5 rows



In [None]:
kmeans = KMeans(k=2, featuresCol='features')
kmeans_fit = kmeans.fit(VA_df)
output = kmeans_fit.transform(VA_df)
output.show()

+---+---+---+---+---+----------------+----------+
|  y|  x|  r|  g|  b|        features|prediction|
+---+---+---+---+---+----------------+----------+
|  0|  0|  7|  7|  7|   [7.0,7.0,7.0]|         0|
|  0|  1| 12| 12| 12|[12.0,12.0,12.0]|         0|
|  0|  2| 12| 12| 12|[12.0,12.0,12.0]|         0|
|  0|  3|  5|  5|  5|   [5.0,5.0,5.0]|         0|
|  0|  4|  4|  4|  4|   [4.0,4.0,4.0]|         0|
|  0|  5| 11| 11| 11|[11.0,11.0,11.0]|         0|
|  0|  6| 15| 15| 15|[15.0,15.0,15.0]|         0|
|  0|  7| 14| 14| 14|[14.0,14.0,14.0]|         0|
|  0|  8| 11| 11| 11|[11.0,11.0,11.0]|         0|
|  0|  9|  5|  5|  5|   [5.0,5.0,5.0]|         0|
|  0| 10|  5|  5|  5|   [5.0,5.0,5.0]|         0|
|  0| 11|  8|  8|  8|   [8.0,8.0,8.0]|         0|
|  0| 12|  4|  4|  4|   [4.0,4.0,4.0]|         0|
|  0| 13|  1|  1|  1|   [1.0,1.0,1.0]|         0|
|  0| 14|  1|  1|  1|   [1.0,1.0,1.0]|         0|
|  0| 15|  1|  1|  1|   [1.0,1.0,1.0]|         0|
|  0| 16|  3|  3|  3|   [3.0,3.0,3.0]|         0|


In [None]:
df = output.toPandas()
pred = df['prediction'].values

In [None]:
img = np.reshape(pred, (512,512))

In [None]:
final_image = Image.fromarray(img)
imsave('final.png', final_image)



In [None]:
# Not managed to implement PCA. I leave this code here.

from pyspark.ml.feature import PCA
# We fit PCA.

pca = PCA(k=3, inputCol='features', outputCol='pcaFeatures')
model_pca = pca.fit(VA_df)
model_pca.pc

DenseMatrix(3, 3, [-0.5774, -0.5774, -0.5774, -0.5774, -0.2113, 0.7887, -0.5774, 0.7887, -0.2113], 0)