In [1]:
import sys
from pyspark.sql import *
from pyspark.sql.functions import col

from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, NaiveBayes, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler, MinMaxScaler

In [2]:
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType

In [3]:
import cv2   
import numpy as np
import urllib.request

%matplotlib inline
from matplotlib import pyplot as plt

In [4]:
import multiprocessing
from joblib import Parallel, delayed
from tqdm import tqdm

In [5]:
spark = SparkSession.builder.appName('deneme1').getOrCreate()

In [6]:
input_x_small_train_file = "gs://uga-dsp/project2/files/X_small_train.csv"
input_x_small_test_file = "gs://uga-dsp/project2/files/X_small_test.csv"

df_small_train = spark.read.option("header", "true").csv(input_x_small_train_file)
df_small_test = spark.read.option("header", "true").csv(input_x_small_test_file)

In [7]:
input_xtrain_file = "gs://uga-dsp/project2/files/X_train.csv"

input_xatest_file = "gs://uga-dsp/project2/files/Xa_test.csv"
input_xbtest_file = "gs://uga-dsp/project2/files/Xb_test.csv"
input_xctest_file = "gs://uga-dsp/project2/files/Xc_test.csv"

df_training = spark.read.option("header", "true").csv(input_xtrain_file)

dfa_test = spark.read.option("header", "true").csv(input_xatest_file)
dfb_test = spark.read.option("header", "true").csv(input_xbtest_file)
dfc_test = spark.read.option("header", "true").csv(input_xctest_file)


In [16]:
def save_faces(image, folder_name=False):
    path = f'https://storage.googleapis.com/uga-dsp/project2/images/{image[0]}'
    x1 = int(image[1])
    y1 = int(image[2])
    x2 = int(image[3])
    y2 = int(image[4])
    label = image[5]
    face_id = image[6]
    
    if not folder_name:
        folder_name = label
    try:
        with urllib.request.urlopen(path) as resp:
            downloaded_image = np.asarray(bytearray(resp.read()), dtype="uint8")
            downloaded_image = cv2.imdecode(downloaded_image, cv2.IMREAD_COLOR)
            cropped = downloaded_image[y1:y2, x1:x2]
            resized =  cv2.resize(cropped, (80,80))
            local_path = f'/home/ubuntu/data/faces_small/test/{folder_name}/{face_id}.jpg'
            cv2.imwrite(local_path, resized)
    except:
        print(f'An exception occurred with {image[0]}, face - {face_id}')
    return path

In [9]:
def process_images(X_dataframe):
    last_column = len(X_dataframe.columns) - 1
    images = X_dataframe.select([X_dataframe.columns[2],X_dataframe.columns[5],X_dataframe.columns[6],X_dataframe.columns[7],X_dataframe.columns[8], X_dataframe.columns[last_column], X_dataframe.columns[1]])
    images_list = images.collect()
    return images_list

In [18]:
def download_images(images, folder_name=False):
    num_cores = multiprocessing.cpu_count()
    inputs = tqdm(images)

    if __name__ == "__main__":
        processed_list = Parallel(n_jobs=num_cores)(delayed(save_faces)(i, folder_name) for i in inputs)

In [11]:
# images = process_images(df_training)
# download_images(images)

In [12]:
# test_a_images = process_images(dfa_test)
# download_images(test_a_images, "test_a")

In [13]:
# test_b_images = process_images(dfb_test)
# download_images(test_b_images, "test_b")

In [14]:
# test_c_images = process_images(dfc_test)
# download_images(test_c_images, "test_c")

100%|██████████| 16612/16612 [08:25<00:00, 32.89it/s]


In [19]:
small_test_images = process_images(df_small_test)
download_images(small_test_images)

100%|██████████| 263/263 [00:07<00:00, 34.80it/s]
