In [1]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, FloatType
from pyspark.ml.feature import MinMaxScaler, VectorAssembler, PCA
import pyspark.sql.functions as F
from pyspark.ml.image import ImageSchema
from pyspark.ml.linalg import DenseVector, VectorUDT
import torch
import torch.nn as nn
from torchvision.models.utils import load_state_dict_from_url
from typing import Type, Any, Callable, Union, List, Dict, Optional, cast
from torch import Tensor
from collections import OrderedDict 
import numpy as np
import pickle
import os
import boto3
from torchvision.models.vgg import *
from torchvision.models.vgg import model_urls, cfgs
import torch
from torch import optim, nn
from torchvision import models, transforms
import cv2

class FeatureExtractor(nn.Module):
    def __init__(self, model):
        super(FeatureExtractor, self).__init__()
        # Extract VGG-16 Feature Layers
        self.features = list(model.features)
        self.features = nn.Sequential(*self.features)
        # Extract VGG-16 Average Pooling Layer
        self.pooling = model.avgpool
        # Convert the image into one-dimensional vector
        self.flatten = nn.Flatten()
        # Extract the first part of fully-connected layer from VGG16
        self.fc = model.classifier[0]
  
    def forward(self, x):
        # It will take the input 'x' until it returns the feature vector called 'out'
        out = self.features(x)
        out = self.pooling(out)
        out = self.flatten(out)
        out = self.fc(out) 
        return out 

# Initialize the model
model = models.vgg16(pretrained=True)
new_model = FeatureExtractor(model)
# Change the device to GPU
device = torch.device('cuda:0' if torch.cuda.is_available() else "cpu")
new_model = new_model.to(device)

transform = transforms.Compose([
  transforms.ToPILImage(),
  transforms.CenterCrop(512),
  transforms.Resize(100),
  transforms.ToTensor()                              
])

def extractfeature(path,transform,new_model):
    path=path[5:]
    img = cv2.imread(path)
    # Transform the image
    img = transform(img)
    # Reshape the image. PyTorch model reads 4-dimensional tensor
    # [batch_size, channels, width, height]
    img = img.reshape(1, 3, 100, 100)
    img = img.to(device)
    # We only extract features, so we don't need gradient
    with torch.no_grad():
        # Extract the feature from the image
        feature = new_model(img)
        # Convert to NumPy Array, Reshape it, and save it to features variable
    temp=feature.cpu().detach().numpy().reshape(-1)
    return temp


In [2]:
spark = (
    SparkSession.builder.master("local").appName("SparkByExamples.com").getOrCreate()
)

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "AKIA3FBVFAGLF7WW6BWD")
spark._jsc.hadoopConfiguration().set(
    "fs.s3a.secret.key", "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"
)
spark._jsc.hadoopConfiguration().set(
    "fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
)
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set(
    "fs.s3a.aws.credentials.provider",
    "org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider",
)
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "eu-west-3.amazonaws.com")

In [3]:
data_url = "../data/fruits/fruits-360_dataset/fruits-360/Training/Apricot/*"
df = spark.read.format("image").load(data_url)
feat = F.udf(lambda x: DenseVector(extractfeature(x,transform=transform,new_model=new_model)), VectorUDT())
df = df.withColumn("vecs", feat("image.origin"))
data = df.select("vecs")

In [81]:
scaler = MinMaxScaler(inputCol="vecs", outputCol="scaledvecs")
scaledData = scaler.fit(data).transform(data)
pca = PCA(k=10, inputCol="scaledvecs", outputCol="pcavecs")
model = pca.fit(scaledData)
reduced_data = model.transform(scaledData).select("pcavecs")

Py4JJavaError: An error occurred while calling o433.fit.
: java.lang.OutOfMemoryError: Java heap space
	at breeze.linalg.svd$.breeze$linalg$svd$$doSVD_Double(svd.scala:94)
	at breeze.linalg.svd$Svd_DM_Impl$.apply(svd.scala:36)
	at breeze.linalg.svd$Svd_DM_Impl$.apply(svd.scala:35)
	at breeze.generic.UFunc.apply(UFunc.scala:46)
	at breeze.generic.UFunc.apply$(UFunc.scala:45)
	at breeze.linalg.svd$.apply(svd.scala:21)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponentsAndExplainedVariance(RowMatrix.scala:481)
	at org.apache.spark.mllib.feature.PCA.fit(PCA.scala:65)
	at org.apache.spark.ml.feature.PCA.fit(PCA.scala:93)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [5]:
temp=data.toPandas()

In [6]:
temp

Unnamed: 0,vecs
0,"[1.6019713878631592, -3.5101022720336914, 1.05..."
1,"[1.5929287672042847, -3.5260071754455566, 0.96..."
2,"[1.609989881515503, -3.4946999549865723, 1.240..."
3,"[1.6050442457199097, -3.526664972305298, 1.183..."
4,"[1.5339109897613525, -3.5794451236724854, 0.89..."
...,...
487,"[1.7248787879943848, -2.0906875133514404, 0.13..."
488,"[1.6911070346832275, -2.0984740257263184, 0.14..."
489,"[1.6200666427612305, -2.216568946838379, 0.062..."
490,"[1.6160438060760498, -2.1579718589782715, 0.05..."


In [10]:
import pandas as pd
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
df = spark.read.load("part-00000-5054f0dd-0b92-4932-b2e1-c3ecd52c90b5-c000.snappy.parquet")
pdf = df.toPandas()

In [11]:
pdf

Unnamed: 0,vecs
0,"[1.6019713878631592, -3.510100841522217, 1.050..."
1,"[1.592928409576416, -3.5260071754455566, 0.964..."
2,"[1.6099903583526611, -3.4947004318237305, 1.24..."
3,"[1.6050448417663574, -3.5266647338867188, 1.18..."
4,"[1.5339117050170898, -3.5794458389282227, 0.89..."
...,...
487,"[1.7248785495758057, -2.0906877517700195, 0.13..."
488,"[1.6911072731018066, -2.0984749794006348, 0.14..."
489,"[1.6200671195983887, -2.2165684700012207, 0.06..."
490,"[1.616042971611023, -2.1579718589782715, 0.057..."
