In [1]:
import os

os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages jakac:spark-python-knn:0.0.3"
)


In [3]:
!export SPARK_HOME=~/.virtualenvs/pyspark/lib/python3.8/site-packages/pyspark 

In [5]:
# from gaussalgo.knn import compute_neighbors

In [6]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import re

import tensorflow as tf

from pyspark.sql.functions import lower, col, udf, isnan, when, count, row_number
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from  pyspark.sql.types import StringType
import numpy as np
from pyspark.sql.functions import monotonically_increasing_id
import cv2

from pyspark.ml.linalg import Vectors, VectorUDT
from sklearn.neighbors import NearestNeighbors


In [7]:
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()

In [8]:
#Defining the path to the images and properties
PATH = '../datasets/'
MODEL_PATH = "./models/efficientnetb5_notop.h5"

image_path = PATH +"train_images/"
IMG_SIZE = 456 


In [9]:
train_rdd = spark.read.format("csv").option("header","true").load(PATH + "train.csv")
train_rdd = train_rdd.withColumn("id", monotonically_increasing_id())
train_rdd.printSchema()

root
 |-- posting_id: string (nullable = true)
 |-- image: string (nullable = true)
 |-- image_phash: string (nullable = true)
 |-- title: string (nullable = true)
 |-- label_group: string (nullable = true)
 |-- id: long (nullable = false)



In [10]:
train_rdd.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in train_rdd.columns]).show()



+----------+-----+-----------+-----+-----------+---+
|posting_id|image|image_phash|title|label_group| id|
+----------+-----+-----------+-----+-----------+---+
|         0|    0|          0|    0|          4|  0|
+----------+-----+-----------+-----+-----------+---+



                                                                                

In [11]:
train_rdd.select("label_group").take(1)

[Row(label_group='249114794')]

In [12]:
train_rdd = train_rdd.na.fill(value="")

In [13]:
train_rdd.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in train_rdd.columns]).show()

+----------+-----+-----------+-----+-----------+---+
|posting_id|image|image_phash|title|label_group| id|
+----------+-----+-----------+-----+-----------+---+
|         0|    0|          0|    0|          0|  0|
+----------+-----+-----------+-----+-----------+---+



                                                                                

In [14]:
# tmp = train_rdd.groupby('label_group').posting_id.agg('unique').to_dict()
# train['target'] = train.label_group.map(tmp)
# traindf.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   # ).show()

In [15]:
udf_replace_multispace_by_space = udf(lambda text: re.sub('\s+', ' ',StringType()))

In [16]:
train_rdd = train_rdd.withColumn('title', lower(col('title')))

In [17]:
# tokenize
tokenizer = Tokenizer().setInputCol("title").setOutputCol("words")
train_rdd = tokenizer.transform(train_rdd)

# vectorize
vectorizer = CountVectorizer(inputCol='words', outputCol='vectorizer').fit(train_rdd)
train_rdd = vectorizer.transform(train_rdd)

# calculate scores
idf = IDF(inputCol="vectorizer", outputCol="tfidf_features")
idf_model = idf.fit(train_rdd)
train_rdd = idf_model.transform(train_rdd)

                                                                                

In [18]:
train_rdd = train_rdd.drop(col("words"))
train_rdd= train_rdd.drop(col("vectorizer"))
train_rdd.printSchema()

root
 |-- posting_id: string (nullable = false)
 |-- image: string (nullable = false)
 |-- image_phash: string (nullable = false)
 |-- title: string (nullable = false)
 |-- label_group: string (nullable = false)
 |-- id: long (nullable = false)
 |-- tfidf_features: vector (nullable = true)



In [22]:
class DataGenerator(tf.keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, df, img_size=IMG_SIZE, batch_size=32, path=''): 
        self.df = df
        self.img_size = img_size
        self.batch_size = batch_size
        self.path = path
        self.indexes = np.arange( self.df.count() )
        
    def __len__(self):
        'Denotes the number of batches per epoch'
        ct = self.df.count() // self.batch_size
        ct += int(( (self.df.count()) % self.batch_size)!=0)
        return ct

    def __getitem__(self, index):
        'Generate one batch of data'
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        X = self.__data_generation(indexes)
        return X

    def __data_generation(self, indexes):
        'Generates data containing batch_size samples'
        X = np.zeros((len(indexes),self.img_size,self.img_size,3),dtype='float32')
        start = int(min(indexes))
        end = int(max(indexes))
        df = self.df.where(col("id").between(start, end))
        for i, row in enumerate(df.select("image").collect()):
            img = cv2.imread(self.path + row.image)
            X[i,] = cv2.resize(img,(self.img_size,self.img_size))
        return X

In [20]:
def make_prediction(data_partition):
    WGT = "/home/joker/Workspace/Playground/retail_duplicate/deep_deduplicate/efficient_knn/trained_models/efficientnetb5_notop.h5"
    model = tf.keras.applications.efficientnet.EfficientNetB5(weights=WGT, input_shape=None, include_top=False,
                                                                pooling="avg",
                                                                drop_connect_rate=0.2)

    data = DataGenerator(data_partition, path=image_path)

    image_embedding = model.predict(data)
    return image_embedding

In [21]:
res = make_prediction(train_rdd.where(col("id").between(0, 1000)))

2022-07-13 22:54:41.271312: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:975] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-07-13 22:54:41.295129: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:975] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-07-13 22:54:41.295827: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:975] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-07-13 22:54:41.296909: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags

DataFrame[posting_id: string, image: string, image_phash: string, title: string, label_group: string, id: bigint, tfidf_features: vector]
DataFrame[posting_id: string, image: string, image_phash: string, title: string, label_group: string, id: bigint, tfidf_features: vector]
DataFrame[posting_id: string, image: string, image_phash: string, title: string, label_group: string, id: bigint, tfidf_features: vector]


2022-07-13 22:54:59.897247: I tensorflow/stream_executor/cuda/cuda_dnn.cc:384] Loaded cuDNN version 8101
2022-07-13 22:55:01.047890: W tensorflow/core/common_runtime/bfc_allocator.cc:290] Allocator (GPU_0_bfc) ran out of memory trying to allocate 2.28GiB with freed_by_count=0. The caller indicates that this is not a failure, but this may mean that there could be performance gains if more memory were available.
2022-07-13 22:55:04.159130: W tensorflow/core/common_runtime/bfc_allocator.cc:290] Allocator (GPU_0_bfc) ran out of memory trying to allocate 3.43GiB with freed_by_count=0. The caller indicates that this is not a failure, but this may mean that there could be performance gains if more memory were available.
2022-07-13 22:55:04.348010: W tensorflow/core/common_runtime/bfc_allocator.cc:290] Allocator (GPU_0_bfc) ran out of memory trying to allocate 3.61GiB with freed_by_count=0. The caller indicates that this is not a failure, but this may mean that there could be performance gains

 1/32 [..............................] - ETA: 4:15DataFrame[posting_id: string, image: string, image_phash: string, title: string, label_group: string, id: bigint, tfidf_features: vector]
 2/32 [>.............................] - ETA: 3s  DataFrame[posting_id: string, image: string, image_phash: string, title: string, label_group: string, id: bigint, tfidf_features: vector]
 3/32 [=>............................] - ETA: 29sDataFrame[posting_id: string, image: string, image_phash: string, title: string, label_group: string, id: bigint, tfidf_features: vector]
 4/32 [==>...........................] - ETA: 36sDataFrame[posting_id: string, image: string, image_phash: string, title: string, label_group: string, id: bigint, tfidf_features: vector]
 5/32 [===>..........................] - ETA: 39sDataFrame[posting_id: string, image: string, image_phash: string, title: string, label_group: string, id: bigint, tfidf_features: vector]
 6/32 [====>.........................] - ETA: 39sDataFrame[post

2022-07-13 22:56:03.174051: W tensorflow/core/common_runtime/bfc_allocator.cc:290] Allocator (GPU_0_bfc) ran out of memory trying to allocate 3.41GiB with freed_by_count=0. The caller indicates that this is not a failure, but this may mean that there could be performance gains if more memory were available.
2022-07-13 22:56:03.279803: W tensorflow/core/common_runtime/bfc_allocator.cc:290] Allocator (GPU_0_bfc) ran out of memory trying to allocate 3.46GiB with freed_by_count=0. The caller indicates that this is not a failure, but this may mean that there could be performance gains if more memory were available.




In [32]:
def add_image_features(indx):
    print(indx)
    return Vectors.dense(res[indx-1]) # since row num begins from 1
ud_f = F.udf(add_image_features,VectorUDT())
df_image = train_rdd.where(col("id").between(0, 999)).withColumn("image_features",ud_f("id"))

In [33]:
df_image.printSchema()

root
 |-- posting_id: string (nullable = false)
 |-- image: string (nullable = false)
 |-- image_phash: string (nullable = false)
 |-- title: string (nullable = false)
 |-- label_group: string (nullable = false)
 |-- id: long (nullable = false)
 |-- tfidf_features: vector (nullable = true)
 |-- image_features: vector (nullable = true)



In [46]:
df_image.select("image").take(1)

[Row(image='0000a68812bc7e98c42888dfb1c07da0.jpg')]

In [35]:
def euclidian_distance(a, b):
    np.sqrt(np.sum((a - b) ** 2, axis=1))

In [36]:
def inter(row):
    print(row)
    distances = df_image.map(lambda x: euclidian_distance(row, x))
    print(distances)
    

In [58]:
text_features = df_image.select("tfidf_features").toPandas()

In [78]:
tfeatures = text_features["tfidf_features"].apply(lambda x : np.array(x.toArray())).values

In [52]:
knn_model = NearestNeighbors(n_neighbors=6, metric="cosine")

In [81]:
np.asarray(tfeatures)[0]

array([0., 0., 0., ..., 0., 0., 0.])

In [79]:
knn_model.fit(tfeatures)

ValueError: setting an array element with a sequence.