In [1]:
import boto3
import os
import cv2
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import spark
import pyspark
import findspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import length
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import lit

In [2]:
findspark.init()

In [3]:
def param_s3():
    bucket_name = 'opc-p8-data'
    region = 'eu-west-3'
    access_id = 'AKIAXIDC3RC735BA4BXV'
    access_key = 'eqCzl8vU8ZHzv8RlOOHZ1f3OWR3UR5h6B/GnBYI5'
    s3 = boto3.resource(
        service_name='s3',
        region_name=region,
        aws_access_key_id=access_id,
        aws_secret_access_key=access_key
    )
    bucket_name = 'opc-p8-data'
    os.environ["AWS_DEFAULT_REGION"] = region
    os.environ["AWS_ACCESS_KEY_ID"] = access_id
    os.environ["AWS_SECRET_ACCESS_KEY"] = access_key
    bucket = s3.Bucket(bucket_name)
    s3_client = boto3.client('s3')
    return bucket, s3_client, bucket_name

In [4]:
bucket, s3_client, name_bckt = param_s3()
sc = SparkContext()
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
sc.setLogLevel("ERROR")

In [5]:
def list_folders(s3_client, bucket_name, prefix=''):
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter='/')
    for content in response.get('CommonPrefixes', []):
        yield content.get('Prefix')

In [6]:
def find_keypoints(image_path, s3_client, bucket_name):
    method = cv2.SIFT_create(500)
    obj = s3_client.get_object(Bucket=bucket_name, Key=image_path)
    img = obj['Body'].read()
    np_array = np.frombuffer(img, np.uint8)
    image = cv2.imdecode(np_array, cv2.IMREAD_COLOR)
    res = cv2.GaussianBlur(image, (1, 1), 0)
    kp, des = method.detectAndCompute(res, None)
    return des

In [7]:
def find_all_keypoints(folder_jpg):
    sift_keypoints = []
    bckt, client, name = param_s3()
    for folder in bckt.objects.filter(Prefix=folder_jpg):
        des = find_keypoints(folder.key, client, name)
        sift_keypoints.append(des)
    sift_keypoints_by_img = np.asarray(sift_keypoints, dtype=object)
    sift_keypoints_all = np.concatenate(sift_keypoints_by_img, axis=0)
    df = pd.DataFrame(sift_keypoints_all)
    training_df = spark.createDataFrame(df)

    assembler = VectorAssembler(inputCols=training_df.columns, outputCol="features")
    training_df = assembler.transform(training_df)

    n_clust = int(round(np.sqrt(training_df.count()), 0))
    kmeans = KMeans(k=n_clust)
    model = kmeans.fit(training_df)

    hist_lst = []
    for i, image_desc in enumerate(sift_keypoints_by_img):
        pd_df_kp = pd.DataFrame(image_desc)
        df_image_desc = spark.createDataFrame(pd_df_kp)
        df_image_desc = assembler.transform(df_image_desc)
        hist = build_histogram(model, df_image_desc)
        hist_lst.append(hist)
    hist_df = pd.DataFrame(hist_lst)
    spk_df = spark.createDataFrame(hist_df)

    fruitname = folder_jpg.split('/')[-2]
    print('renaming columns')
    for col in spk_df.columns:
        spk_df = spk_df.withColumnRenamed(str(col), fruitname + '_col' + str(col))
    print('creating new column')
    spk_df = spk_df.withColumn('fruit_name', lit(fruitname))
    return spk_df

In [8]:
def build_histogram(kmeans, des):
    """
    Function to build the histogram of a given image
    :param kmeans:
    :param des:
    :return:
    """
    res = kmeans.transform(des).select('prediction').collect()
    hist = np.zeros(len(kmeans.clusterCenters()))
    nb_des = des.count()
    for i in res:
        hist[i] += 1.0 / nb_des
    return hist

In [9]:
training_set_folder = 'Training'
testing_set_folder = 'Test'
main_folder_name = ''

In [10]:
folder_list = list_folders(s3_client, name_bckt)
fold = []
for folder in folder_list:
    fold += [folder]
if len(fold) > 1 or len(fold) == 0:
    folder = main_folder_name
else:
    folder = fold[0]

In [11]:
training_folder = folder + training_set_folder + '/'
folder_list = list_folders(s3_client, name_bckt, prefix=training_folder)
lst_all_folders = []
for folder in folder_list:
    lst_all_folders += [folder]

In [22]:
def find_all_keypoints(folder_jpg):
    sift_keypoints = []
    bckt, client, name = param_s3()
    for folder in bckt.objects.filter(Prefix=folder_jpg):
        des = find_keypoints(folder.key, client, name)
        sift_keypoints.append(des)
    return sift_keypoints

In [None]:
para = sc.parallelize(lst_all_folders)
sift_all_df = para.map(find_all_keypoints) \
    .collect()

In [None]:
#spark_df_out = find_all_keypoints(lst_all_folders[0])

In [21]:
sift_all_df

[array([[ 1.,  0.,  0., ...,  0.,  0.,  5.],
        [ 0.,  0.,  0., ...,  0.,  0., 10.],
        [ 0.,  0.,  0., ...,  0.,  0., 17.],
        ...,
        [30.,  7.,  0., ...,  0.,  0.,  0.],
        [ 7.,  2.,  0., ...,  0.,  0.,  1.],
        [ 9.,  1.,  0., ...,  0.,  0.,  0.]], dtype=float32),
 array([[18.,  1.,  0., ...,  0.,  0.,  6.],
        [ 1.,  0.,  0., ...,  0.,  0.,  0.],
        [ 3.,  5.,  2., ..., 17.,  0.,  0.],
        ...,
        [ 1.,  2.,  1., ...,  0.,  0.,  0.],
        [15.,  1.,  0., ...,  0.,  0.,  1.],
        [ 5.,  1.,  0., ...,  0.,  0.,  0.]], dtype=float32),
 array([[101.,  13.,   0., ...,   0.,   0.,   0.],
        [ 40.,   4.,   0., ...,   0.,   0.,   0.],
        [ 22.,   2.,   0., ...,   0.,   0.,   0.],
        ...,
        [ 29.,   1.,   0., ...,   0.,   0.,   0.],
        [ 99.,  28.,   0., ...,   0.,   0.,   0.],
        [ 34.,   8.,   0., ...,   0.,   0.,   3.]], dtype=float32),
 array([[ 1.,  1.,  0., ...,  0.,  0.,  6.],
        [ 0.,  0., 