Welcome to the collaborative Spark environment in ZHAW. You are not yet connected to Sparky by default. However, the necessary code template makes this a quick process. Keep in mind that you are sharing both the Jupyter environment and the Sparky cluster with others.

In [None]:
import sparky
import pyspark
import pyspark.sql
import shutil
import os
from pyspark import SparkContext, SparkConf
sc = sparky.connect("sparknotebook-...", 2)
spark = pyspark.sql.SparkSession.builder.getOrCreate()


In [None]:
# Installing Deepface and Numpy

In [None]:
%pip install Deepface
%pip install numpy
%pip install plotly

In [None]:
import pandas as pd
import csv
# Lesen Sie die Textdatei in ein DataFrame
df = pd.read_csv('list_identity_celeba.txt', sep="   ", header=0) 
df = df.rename(columns={'  identity_name': 'identity_name'})
df['image_id'].to_csv('image_id.csv', index=False, header=True)


# "Install dependencies for deepface and numpy"

In [None]:
def installdeps1(ignore_arg):
    import os
    import sys
    pkg="numpy"
    pkgpath = "worker-packages"
    if not os.path.isdir(pkgpath):
        print("Setting up virtual environment for packages... please be patient", file=sys.stderr, flush=True)
        import venv
        venv.create(pkgpath, with_pip=True)
    pip = os.path.join(pkgpath, "bin", "pip")
    os.system(f"{pip} install '{pkg}'")
    if not pkgpath in sys.path:
        major = sys.version_info.major
        minor = sys.version_info.minor
        sys.path.append(os.path.join(pkgpath, "lib", f"python{major}.{minor}", "site-packages"))
    return 1

def installdeps2(ignore_arg):
    import os
    import sys
    pkg="deepface"
    pkgpath = "worker-packages"
    if not os.path.isdir(pkgpath):
        print("Setting up virtual environment for packages... please be patient", file=sys.stderr, flush=True)
        import venv
        venv.create(pkgpath, with_pip=True)
    pip = os.path.join(pkgpath, "bin", "pip")
    os.system(f"{pip} install '{pkg}'")
    if not pkgpath in sys.path:
        major = sys.version_info.major
        minor = sys.version_info.minor
        sys.path.append(os.path.join(pkgpath, "lib", f"python{major}.{minor}", "site-packages"))
    return 1




In [None]:
import os
# Partitionen erzeugen und einsammeln
liste = range(16)
rdd = sc.parallelize(liste)

print(rdd.collect())
print(rdd.glom().collect())

In [None]:
# Hier müssen alle Partitionen gefüllt sein mit mindestens einem Wert
# Wir können das auch automatisiert prüfen (mind. 16 auf Azure-Konfiguration):
if len(list(filter(lambda x: x == [], rdd.glom().collect()))):
    raise SystemExit("Nicht gut - einige Worker bleiben ohne Softwareinstallation.")

In [None]:
rdd.map(installdeps1).collect()
rdd.map(installdeps2).collect()

# Parallele Gesichtsmerkmalsextraktion und Geschlechtsanalyse auf einer Bildersammlung

In [None]:
# Importieren der notwendigen Module und Bibliotheken
import os
import shutil
import pandas as pd
from deepface import DeepFace
import numpy as np
import ast
import time
import csv
import json


# Definition einer Funktion, um jedes Bild zu verarbeiten
def process_image(img_file):
    import os
    import sys
    pkgpath = "worker-packages"

    # Überprüfen, ob der Paketpfad bereits im Systempfad vorhanden ist
    if not pkgpath in sys.path:
        major = sys.version_info.major
        minor = sys.version_info.minor
        # Wenn nicht, fügen Sie es zum Systempfad hinzu
        sys.path.append(os.path.join(pkgpath, "lib", f"python{major}.{minor}", "site-packages"))
    from deepface import DeepFace   
    img_path = '/home/ubuntu/work/datascience-fs23/Bader_Marc_Nevio/archive3/img_align_celeba/' + img_file
    # Extrahieren der Gesichtsmerkmale mit DeepFace
    facial_vec = DeepFace.represent(img_path, detector_backend="mtcnn", enforce_detection=False, model_name="Facenet")
    # Durchführen einer Geschlechtsanalyse mit DeepFace
    gender_result = DeepFace.analyze(img_path=img_path, actions=['gender'], enforce_detection=False)
    gender = gender_result[0]["dominant_gender"]    
    return (img_file, list(facial_vec[0]['embedding']), gender)

# Zeitmessung für die Ausführung beginnen
start_time = time.time()
# Lesen der 'image_id.csv'-Datei und Extrahieren der Bildnamen
with open('image_id.csv', 'r') as f:
    reader = csv.DictReader(f)
    img_files = [row['image_id'] for row in reader]
img_files=img_files[0:10000]

# Konvertieren der Liste der Bild-Dateinamen in ein RDD (Resilient Distributed Dataset)
img_files_rdd = sc.parallelize(img_files)

# Anwenden der Verarbeitungsfunktion auf jedes Element des RDD
data_rdd = img_files_rdd.map(process_image)

# Sammeln der Ergebnisse und Konvertieren in ein Numpy-Array
data_array = list(data_rdd.collect())

# Zeitmessung für die Ausführung beenden
end_time = time.time()

# Berechnen und Ausgeben der verstrichenen Zeit
features_extraction_spark = end_time - start_time

# Speichern der Daten in einer CSV-Datei
with open('my_data.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['img_file', 'facial_vec', 'gender'])
    for row in data_array:
        writer.writerow(row)
