In [1]:
from pyspark.sql.functions import col, lit, when, regexp_replace, sha2, concat_ws
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
import os
import sys
from pyspark.sql.functions import *
import psycopg2

In [2]:
os.environ["JAVA_HOME"] = "C:/Users/User/AppData/Local/Programs/Eclipse Adoptium/jdk-11.0.25.9-hotspot"

In [3]:
conf = SparkConf() \
    .setAppName("ETLPipeline") \
    .setMaster("local") \
    .set("spark.driver.extraClassPath","C:/jars/*")

In [6]:
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

In [7]:
spark

In [11]:

schema = StructType([
    StructField("id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone_number", StringType(), True),
    StructField("birth_date", DateType(), True),
    StructField("country", StringType(), True)
])


raw_df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("customers.csv")

In [13]:
raw_df.show()

+----+----------+---------+------------------+-------------+----------+--------+
|  id|first_name|last_name|             email| phone_number|birth_date| country|
+----+----------+---------+------------------+-------------+----------+--------+
|0001|      Jane|  Ivanova| user1@example.com|+359872837997|1983-10-20| Germany|
|0002|      Jane|  Ivanova| user2@example.com|+359873576092|1985-03-09|     USA|
|0003|      Anna|      Doe| user3@example.com|+359875550487|1983-04-22|     USA|
|0004|      John|   Koleva| user4@example.com|+359870819793|1999-02-23| Belgium|
|0005|      Ivan|   Petrov| user5@example.com|+359871869307|1991-07-04| Germany|
|0006|      John| Georgiev| user6@example.com|+359876716412|1983-05-26| Belgium|
|0007|      Ivan|   Petrov| user7@example.com|+359879017692|1983-07-14| Germany|
|0008|     Maria|    Smith| user8@example.com|+359874956900|1999-06-08| Ireland|
|0009|      Jane|   Koleva| user9@example.com|+359876360207|1990-09-03| Belgium|
|0010|      Ivan|    Smith|u

In [14]:
# 4. GDPR anonymization - хеширане на лични данни
anonymized_df = raw_df.withColumn("email_hash", sha2(col("email"), 256)) \
                      .withColumn("phone_hash", sha2(col("phone_number"), 256)) \
                      .drop("email", "phone_number")

In [16]:
anonymized_df.show()

+----+----------+---------+----------+--------+--------------------+--------------------+
|  id|first_name|last_name|birth_date| country|          email_hash|          phone_hash|
+----+----------+---------+----------+--------+--------------------+--------------------+
|0001|      Jane|  Ivanova|1983-10-20| Germany|b36a83701f1c3191e...|a325ef9b3e751a3ed...|
|0002|      Jane|  Ivanova|1985-03-09|     USA|2b3b2b9ce842ab8b6...|f503ab29434ee000e...|
|0003|      Anna|      Doe|1983-04-22|     USA|898628e28890f937b...|f80350dfd9b2e2e19...|
|0004|      John|   Koleva|1999-02-23| Belgium|40d71d3f998c168e7...|e3fb8f506b7abfe4c...|
|0005|      Ivan|   Petrov|1991-07-04| Germany|4d8f4dd97e0c7b6fe...|659cbbe71279ae9a8...|
|0006|      John| Georgiev|1983-05-26| Belgium|b430419a8a3fa1ce5...|dae098253612e288f...|
|0007|      Ivan|   Petrov|1983-07-14| Germany|38121022af9b425b5...|dcdacfab6a9562d90...|
|0008|     Maria|    Smith|1999-06-08| Ireland|675657c179a97bde8...|7f63b2656ffc159d6...|
|0009|    

In [17]:
# 5. Трансформации и нови колони
transformed_df = anonymized_df.withColumn("full_name", concat_ws(" ", col("first_name"), col("last_name"))) \
                              .drop("first_name", "last_name")

In [19]:
transformed_df.show()

+----+----------+--------+--------------------+--------------------+-------------+
|  id|birth_date| country|          email_hash|          phone_hash|    full_name|
+----+----------+--------+--------------------+--------------------+-------------+
|0001|1983-10-20| Germany|b36a83701f1c3191e...|a325ef9b3e751a3ed...| Jane Ivanova|
|0002|1985-03-09|     USA|2b3b2b9ce842ab8b6...|f503ab29434ee000e...| Jane Ivanova|
|0003|1983-04-22|     USA|898628e28890f937b...|f80350dfd9b2e2e19...|     Anna Doe|
|0004|1999-02-23| Belgium|40d71d3f998c168e7...|e3fb8f506b7abfe4c...|  John Koleva|
|0005|1991-07-04| Germany|4d8f4dd97e0c7b6fe...|659cbbe71279ae9a8...|  Ivan Petrov|
|0006|1983-05-26| Belgium|b430419a8a3fa1ce5...|dae098253612e288f...|John Georgiev|
|0007|1983-07-14| Germany|38121022af9b425b5...|dcdacfab6a9562d90...|  Ivan Petrov|
|0008|1999-06-08| Ireland|675657c179a97bde8...|7f63b2656ffc159d6...|  Maria Smith|
|0009|1990-09-03| Belgium|b1e700bec7b4c7c38...|2e26f42d072b80b3e...|  Jane Koleva|
|001

In [23]:
# 6. Филтриране на данни (примерно за EU страни само)
filtered_df = transformed_df.filter(col("country").isin("Belgium", "Bulgaria", "Ireland", "Germany"))

In [24]:
filtered_df.show()

+----+----------+--------+--------------------+--------------------+-------------+
|  id|birth_date| country|          email_hash|          phone_hash|    full_name|
+----+----------+--------+--------------------+--------------------+-------------+
|0001|1983-10-20| Germany|b36a83701f1c3191e...|a325ef9b3e751a3ed...| Jane Ivanova|
|0004|1999-02-23| Belgium|40d71d3f998c168e7...|e3fb8f506b7abfe4c...|  John Koleva|
|0005|1991-07-04| Germany|4d8f4dd97e0c7b6fe...|659cbbe71279ae9a8...|  Ivan Petrov|
|0006|1983-05-26| Belgium|b430419a8a3fa1ce5...|dae098253612e288f...|John Georgiev|
|0007|1983-07-14| Germany|38121022af9b425b5...|dcdacfab6a9562d90...|  Ivan Petrov|
|0008|1999-06-08| Ireland|675657c179a97bde8...|7f63b2656ffc159d6...|  Maria Smith|
|0009|1990-09-03| Belgium|b1e700bec7b4c7c38...|2e26f42d072b80b3e...|  Jane Koleva|
|0010|1990-12-27| Germany|1cc95683bbb5c4811...|9064f3c7e930708a7...|   Ivan Smith|
|0011|1983-07-08| Ireland|69e6267c53626874a...|49322bd766fdbd251...| Jane Ivanova|
|001

In [None]:
filtered_df.write.mode("overwrite").json("C:/Users/User/Desktop/jupyter-etl-spark/customers/")


In [31]:
spark.stop()