# Konfiguration der Google Cloud und Herunterladen der JSON-Datei

In diesem Notebook wird die Google Cloud konfiguriert und eine JSON-Datei heruntergeladen.

In [1]:
# Importieren der benötigten Bibliotheken
from google.cloud import storage
from google.oauth2 import service_account

# Namen des Google Cloud Storage Buckets und der Quelldatei festlegen
bucket_name = 'prod_prototype'
source_blob_name = 'bronze/applicant_data_raw'
local_temp_path = '/tmp/applicant_data_raw.json'

# Dienstkonto-Datei laden
service_account_json = '/Users/Kevin/Documents/GitHub/Transferarbeit/Prototyp_Transferarbeit_Lokal/Setup/prototyp-etl-pipline-d6cbb438aa70.json'
credentials = service_account.Credentials.from_service_account_file(service_account_json)

# Google Cloud Storage Client initialisieren
client = storage.Client(credentials=credentials, project='prototyp-etl-pipline')
bucket = client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)

# Datei aus dem Bucket herunterladen
blob.download_to_filename(local_temp_path)
print(f"Datei erfolgreich heruntergeladen zu {local_temp_path}.")


Datei erfolgreich heruntergeladen zu /tmp/applicant_data_raw.json.


# Initialisiere Spark Session
In diesem Notebook wird die Spark Session vorbereitet.

In [2]:
# Importieren der benötigten Bibliotheken
from pyspark.sql import SparkSession

# Beispiel für das Hinzufügen des GCS Connectors zu einer lokalen Spark-Session
spark = SparkSession.builder \
    .appName("Datatransformation") \
    .config('spark.sql.debug.maxToStringFields', '1000') \
    .config("spark.jars.packages", "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.2") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/Users/Kevin/Documents/GitHub/Transferarbeit/Prototyp_Transferarbeit_Lokal/Setup/prototyp-etl-pipline-d6cbb438aa70.json") \
    .getOrCreate()
    
# Überprüfen der SparkSession
spark

24/06/16 20:48:45 WARN Utils: Your hostname, MacBook-Pro-3.local resolves to a loopback address: 127.0.0.1; using 192.168.1.229 instead (on interface en0)
24/06/16 20:48:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/Kevin/.ivy2/cache
The jars for the packages stored in: /Users/Kevin/.ivy2/jars
com.google.cloud.bigdataoss#gcs-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-17d94e45-0489-4a67-9e17-08fc388c898b;1.0
	confs: [default]


:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found com.google.cloud.bigdataoss#gcs-connector;hadoop3-2.2.2 in central
	found com.google.api-client#google-api-client-jackson2;1.31.3 in central
	found com.google.api-client#google-api-client;1.31.3 in central
	found com.google.oauth-client#google-oauth-client;1.31.2 in central
	found com.google.http-client#google-http-client;1.39.0 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.14 in central
	found commons-logging#commons-logging;1.2 in central
	found commons-codec#commons-codec;1.15 in central
	found com.google.code.findbugs#jsr305;3.0.2 in central
	found com.google.guava#guava;30.1-jre in central
	found com.google.guava#failureaccess;1.0.1 in central
	found com.google.guava#listenablefuture;9999.0-empty-to-avoid-conflict-with-guava in central
	found org.checkerframework#checker-qual;3.5.0 in central
	found com.google.errorprone#error_prone_annotations;2.5.1 in central
	found com.google.j2objc#j2objc-annotatio

# Enttferung von Duplikaten
In diesem Notebook werdne alle doppelten Daten entfernt, leere Zellen bleiben leer

In [3]:
# JSON-Datei in ein Spark DataFrame laden
df = spark.read.json('/tmp/applicant_data_raw.json')

# Anzahl der ursprünglichen Datensätze
initial_count = df.count()

# Datenbereinigung: Entfernen von Duplikaten
df_cleaned = df.dropDuplicates()

# Anzahl der bereinigten Datensätze
cleaned_count = df_cleaned.count()

# Anzahl der gelöschten Duplikate
duplicates_removed = initial_count - cleaned_count
print(f"Anzahl der gelöschten Duplikate: {duplicates_removed}")

# Bereinigte Daten in eine neue Parquet-Datei speichern
cleaned_temp_path = '/tmp/applicant_data_cleaned.parquet'
df_cleaned.write.mode('overwrite').parquet(cleaned_temp_path)
print(f"Bereinigte Daten erfolgreich gespeichert zu {cleaned_temp_path}.")

Anzahl der gelöschten Duplikate: 0
Bereinigte Daten erfolgreich gespeichert zu /tmp/applicant_data_cleaned.parquet.


# Standartisierung Telefonnummern
In diesem Notebook werden Telefonnummern vereinheitlich

In [4]:
# Importieren der benötigten Bibliotheken
from pyspark.sql.functions import col, regexp_replace, length

# Lokaler Pfad zur bereinigten Parquet-Datei
cleaned_temp_path = '/tmp/applicant_data_cleaned.parquet'

# Parquet-Datei in ein Spark DataFrame laden
df_cleaned = spark.read.parquet(cleaned_temp_path)

# Überprüfen der vorhandenen Spalten
df_cleaned.printSchema()

# Sicherstellen, dass die Spalte 'phone' existiert
if 'Telefonnummer' not in df_cleaned.columns:
    raise ValueError("Die Spalte 'phone' existiert nicht im DataFrame")

# Zählen der Telefonnummern vor der Transformation
initial_phone_count = df_cleaned.filter(length(col("Telefonnummer")) == 10).count()

# Umwandeln der Telefonnummern in das gewünschte Format: +1-245-345-7426
df_cleaned = df_cleaned.withColumn("phone", 
                                   regexp_replace(
                                       col("Telefonnummer"), 
                                       r"(\d{1})(\d{3})(\d{3})(\d{4})", 
                                       r"+1-$2-$3-$4"
                                   ))

# Zählen der Telefonnummern nach der Transformation
transformed_phone_count = df_cleaned.filter(col("Telefonnummer").like("+1-%-%-%")).count()

# Anzahl der durchgeführten Transformationen
transformations_done = transformed_phone_count
print(f"Anzahl der durchgeführten Transformationen: {transformations_done}")

# Bereinigte Daten in eine neue Parquet-Datei speichern
transformed_temp_path = '/tmp/applicant_data_transformed.parquet'
df_cleaned.write.mode('overwrite').parquet(transformed_temp_path)
print(f"Transformierte Daten erfolgreich gespeichert zu {transformed_temp_path}.")

root
 |-- Ablehnungsgrund: string (nullable = true)
 |-- Adresse: string (nullable = true)
 |-- Bewerbungsdatum: string (nullable = true)
 |-- Bewerbungsquelle: string (nullable = true)
 |-- Bewertung Assessment: string (nullable = true)
 |-- Bewertung Erstes Gespräch: string (nullable = true)
 |-- Bewertung Prescreening: string (nullable = true)
 |-- E-Mail: string (nullable = true)
 |-- Effektiver Gehalt: double (nullable = true)
 |-- Einstellungsdatum: string (nullable = true)
 |-- Geburtsdatum: string (nullable = true)
 |-- Gehaltsvorstellungen: string (nullable = true)
 |-- Geschlecht: string (nullable = true)
 |-- Headhunter Firma: string (nullable = true)
 |-- Headhunter Name: string (nullable = true)
 |-- Interviewer: string (nullable = true)
 |-- Job Titel: string (nullable = true)
 |-- Kandidaten-ID: string (nullable = true)
 |-- Nachname: string (nullable = true)
 |-- Standort: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- StellenID: string (nullable = 

24/06/16 20:49:03 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


# Auffüllen leerer Adresszellen

In diesem Notebook werden leere Zellen in der Spalte Adresse mit dem Platzhalter "Unknown" gefüllt.


In [None]:
# Importieren der benötigten Bibliotheken
from pyspark.sql.functions import col, when

# Lokaler Pfad zur bereinigten Parquet-Datei
transformed_temp_path = '/tmp/applicant_data_transformed.parquet'

# Parquet-Datei in ein Spark DataFrame laden
df_cleaned = spark.read.parquet(transformed_temp_path)

# Überprüfen der vorhandenen Spalten
df_cleaned.printSchema()

# Sicherstellen, dass die Spalte 'Adresse' existiert
if 'Adresse' not in df_cleaned.columns:
    raise ValueError("Die Spalte 'Adresse' existiert nicht im DataFrame")

# Zählen der leeren Adresszellen vor der Transformation
initial_empty_address_count = df_cleaned.filter(col("Adresse").isNull()).count()

# Auffüllen leerer Zellen in der Spalte 'Adresse' mit 'Unknown'
df_cleaned = df_cleaned.withColumn('Adresse', when(col('Adresse').isNull(), 'Unknown').otherwise(col('Adresse')))

# Zählen der leeren Adresszellen nach der Transformation
final_empty_address_count = df_cleaned.filter(col("Adresse") == 'Unknown').count()

# Anzahl der durchgeführten Auffüllungen
fillings_done = final_empty_address_count
print(f"Anzahl der aufgefüllten Adresszellen: {fillings_done}")

# Bereinigte Daten in eine neue Parquet-Datei speichern
final_temp_path = '/tmp/applicant_data_final.parquet'
df_cleaned.write.mode('overwrite').parquet(final_temp_path)
print(f"Bereinigte Daten erfolgreich gespeichert zu {final_temp_path}.")


In [5]:
# Importieren der benötigten Bibliotheken
from pyspark.sql.functions import col, when

# Lokaler Pfad zur bereinigten Parquet-Datei
transformed_temp_path = '/tmp/applicant_data_transformed.parquet'

# Parquet-Datei in ein Spark DataFrame laden
df_cleaned = spark.read.parquet(transformed_temp_path)

# Überprüfen der vorhandenen Spalten
df_cleaned.printSchema()

# Sicherstellen, dass die Spalte 'Adresse' existiert
if 'Adresse' not in df_cleaned.columns:
    raise ValueError("Die Spalte 'Adresse' existiert nicht im DataFrame")

# Zählen der leeren Adresszellen vor der Transformation
initial_empty_address_count = df_cleaned.filter(col("Adresse").isNull()).count()

# Auffüllen leerer Zellen in der Spalte 'Adresse' mit 'Unknown'
df_cleaned = df_cleaned.withColumn('Adresse', when(col('Adresse').isNull(), 'Unknown').otherwise(col('Adresse')))

# Zählen der leeren Adresszellen nach der Transformation
final_empty_address_count = df_cleaned.filter(col("Adresse") == 'Unknown').count()

# Anzahl der durchgeführten Auffüllungen
fillings_done = final_empty_address_count
print(f"Anzahl der aufgefüllten Adresszellen: {fillings_done}")

# Bereinigte Daten in eine neue Parquet-Datei speichern
final_temp_path = '/tmp/applicant_data_final.parquet'
df_cleaned.write.mode('overwrite').parquet(final_temp_path)
print(f"Bereinigte Daten erfolgreich gespeichert zu {final_temp_path}.")


root
 |-- Ablehnungsgrund: string (nullable = true)
 |-- Adresse: string (nullable = true)
 |-- Bewerbungsdatum: string (nullable = true)
 |-- Bewerbungsquelle: string (nullable = true)
 |-- Bewertung Assessment: string (nullable = true)
 |-- Bewertung Erstes Gespräch: string (nullable = true)
 |-- Bewertung Prescreening: string (nullable = true)
 |-- E-Mail: string (nullable = true)
 |-- Effektiver Gehalt: double (nullable = true)
 |-- Einstellungsdatum: string (nullable = true)
 |-- Geburtsdatum: string (nullable = true)
 |-- Gehaltsvorstellungen: string (nullable = true)
 |-- Geschlecht: string (nullable = true)
 |-- Headhunter Firma: string (nullable = true)
 |-- Headhunter Name: string (nullable = true)
 |-- Interviewer: string (nullable = true)
 |-- Job Titel: string (nullable = true)
 |-- Kandidaten-ID: string (nullable = true)
 |-- Nachname: string (nullable = true)
 |-- Standort: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- StellenID: string (nullable = 

# Hochladen der bereinigten Daten in den Google Cloud Storage

In diesem Notebook werden die bereinigten Daten wieder in den Google Cloud Storage hochgeladen.


In [6]:
# Importieren der benötigten Bibliotheken
import os

# Funktion zum Hochladen eines Verzeichnisses zu Google Cloud Storage
def upload_directory_to_gcs(bucket_name, source_directory, destination_blob_prefix):
    client = storage.Client(credentials=credentials, project='prototyp-etl-pipline')
    bucket = client.bucket(bucket_name)
    
    for root, dirs, files in os.walk(source_directory):
        for file in files:
            local_path = os.path.join(root, file)
            relative_path = os.path.relpath(local_path, source_directory)
            blob_path = os.path.join(destination_blob_prefix, relative_path)
            blob = bucket.blob(blob_path)
            blob.upload_from_filename(local_path)
            print(f"File {local_path} uploaded to {blob_path}.")

# Namen des Google Cloud Storage Buckets und der bereinigten Datei festlegen
bucket_name = 'prod_prototype'
cleaned_blob_prefix = 'sensitive/datenbereinigung'
cleaned_temp_path = '/tmp/applicant_data_cleaned.parquet'

# Dienstkonto-Datei laden
service_account_json = '/Users/Kevin/Documents/GitHub/Transferarbeit/Prototyp_Transferarbeit_Lokal/Setup/prototyp-etl-pipline-d6cbb438aa70.json'
credentials = service_account.Credentials.from_service_account_file(service_account_json)

# Hochladen des Parquet-Verzeichnisses
upload_directory_to_gcs(bucket_name, cleaned_temp_path, cleaned_blob_prefix)
print(f"Bereinigte Dateien erfolgreich hochgeladen zu {cleaned_blob_prefix} in bucket {bucket_name}.")


File /tmp/applicant_data_cleaned.parquet/part-00000-5a913d9c-6205-47e6-8ce9-db7c9e3dff06-c000.snappy.parquet uploaded to sensitive/datenbereinigung/part-00000-5a913d9c-6205-47e6-8ce9-db7c9e3dff06-c000.snappy.parquet.
File /tmp/applicant_data_cleaned.parquet/._SUCCESS.crc uploaded to sensitive/datenbereinigung/._SUCCESS.crc.
File /tmp/applicant_data_cleaned.parquet/_SUCCESS uploaded to sensitive/datenbereinigung/_SUCCESS.
File /tmp/applicant_data_cleaned.parquet/.part-00000-5a913d9c-6205-47e6-8ce9-db7c9e3dff06-c000.snappy.parquet.crc uploaded to sensitive/datenbereinigung/.part-00000-5a913d9c-6205-47e6-8ce9-db7c9e3dff06-c000.snappy.parquet.crc.
Bereinigte Dateien erfolgreich hochgeladen zu sensitive/datenbereinigung in bucket prod_prototype.
