#### 1️⃣ How would you design an ETL pipeline to handle daily incremental data updates efficiently?

In [0]:
from pyspark.sql.functions import *

# Sample Data: Creating Historical Data (Simulating Existing Dataset)
historical_data = [
    (1, "John Doe", "2024-02-20", 1000),
    (2, "Jane Smith", "2024-02-19", 1500),
]

# Sample Data: Creating Incremental Data (Simulating New Daily Data)
incremental_data = [
    (3, "Alice Brown", "2024-02-21", 2000),
    (4, "Bob Martin", "2024-02-21", 1800),
    (2, "Jane Smith", "2024-02-19", 1500),  # Duplicate Entry (Should be handled)
]

# Creating DataFrames
columns = ["id", "name", "date", "amount"]
historical_df = spark.createDataFrame(historical_data, columns)
incremental_df = spark.createDataFrame(incremental_data, columns)

# Merge Incremental Data with Historical Data (Removing Duplicates)
merged_df = historical_df.union(incremental_df).dropDuplicates(["id"])

# Show the Final Updated Dataset
merged_df.show()

# Save the Updated Data as Parquet (Simulating Load Phase)
merged_df.write.mode("overwrite").parquet("updated_data.parquet")


+---+-----------+----------+------+
| id|       name|      date|amount|
+---+-----------+----------+------+
|  1|   John Doe|2024-02-20|  1000|
|  2| Jane Smith|2024-02-19|  1500|
|  3|Alice Brown|2024-02-21|  2000|
|  4| Bob Martin|2024-02-21|  1800|
+---+-----------+----------+------+



#### 2️⃣ APIs often change over time. How would you manage schema evolution in your data pipelines?

In [0]:
# Sample Old API Data (Initial Schema)
old_data = [
    (1, "Alice", 25),
    (2, "Bob", 30),
]

old_columns = ["id", "name", "age"]

# Sample New API Data (Schema Change - 'city' added, 'age' missing in some rows)
new_data = [
    (3, "Charlie", 27, "New York"),
    (4, "David", None, "Los Angeles"),  # 'age' is missing
]

# Define Column Names with Schema Change Handling
new_columns = ["id", "name", "age", "city"]

# Creating DataFrames
old_df = spark.createDataFrame(old_data, old_columns)
new_df = spark.createDataFrame(new_data, new_columns)

# Merge DataFrames Using Union (Handling Missing Columns)
merged_df = old_df.unionByName(new_df, allowMissingColumns=True)

# Show the Final Schema Evolved DataFrame
merged_df.show()


+---+-------+----+-----------+
| id|   name| age|       city|
+---+-------+----+-----------+
|  1|  Alice|  25|       NULL|
|  2|    Bob|  30|       NULL|
|  3|Charlie|  27|   New York|
|  4|  David|NULL|Los Angeles|
+---+-------+----+-----------+



#### 3️⃣ Describe a time you had difficulty merging datasets. How did you solve it?

In [0]:
# Sample Dataset 1 (Different Column Names, Age as String)
data1 = [
    (1, "Alice Johnson", "25"),
    (2, "Bob Smith", "30"),
]

columns1 = ["user_id", "full_name", "age"]

# Sample Dataset 2 (Different Column Names, Age as Integer, Missing Data)
data2 = [
    (1, "Alice Johnson", 25, "New York"),
    (3, "Charlie Brown", 28, "Los Angeles"),  # No matching ID in data1
]

columns2 = ["id", "name", "age", "city"]

# Creating DataFrames
df1 = spark.createDataFrame(data1, columns1)
df2 = spark.createDataFrame(data2, columns2)

# Step 1: Standardize Column Names and Convert Age to Integer
df1 = df1.withColumnRenamed("user_id", "id").withColumnRenamed("full_name", "name").withColumn("age", col("age").cast("int"))

# Step 2: Perform an Outer Join to Include All Records
merged_df = df1.alias("df1").join(df2.alias("df2"), on="id", how="outer")

# Step 3: Explicitly Select and Rename Columns to Avoid Ambiguity
final_df = merged_df.select(
    col("id"),
    col("df1.name").alias("name_from_df1"),
    col("df1.age").alias("age_from_df1"),
    col("df2.name").alias("name_from_df2"),
    col("df2.age").alias("age_from_df2"),
    col("df2.city")
)

# Show the Final Merged Dataset
final_df.show()

+---+-------------+------------+-------------+------------+-----------+
| id|name_from_df1|age_from_df1|name_from_df2|age_from_df2|       city|
+---+-------------+------------+-------------+------------+-----------+
|  1|Alice Johnson|          25|Alice Johnson|          25|   New York|
|  2|    Bob Smith|          30|         NULL|        NULL|       NULL|
|  3|         NULL|        NULL|Charlie Brown|          28|Los Angeles|
+---+-------------+------------+-------------+------------+-----------+



#### 5️⃣ How do you ensure that your data pipelines are fault-tolerant and can recover from errors?

In [0]:
import time

# Simulated Data (One Row Has an Error)
data = [
    (1, "Alice", 25),
    (2, "Bob", 30),
    (3, "Charlie", None),  # Simulating a faulty row with a NULL value
]

columns = ["id", "name", "age"]

# Creating DataFrame
df = spark.createDataFrame(data, columns)

# Function to Process Data with Error Handling & Retries
def process_data(df, max_retries=3):
    retry_count = 0
    while retry_count < max_retries:
        try:
            # Check for NULL values and filter out faulty records
            processed_df = df.filter(col("age").isNotNull())

            # Simulating Checkpointing (Saving Progress)
            processed_df.write.mode("overwrite").parquet("dbfs:/mnt/processed_data.parquet")

            # Successfully processed, break the loop
            break
        except Exception as e:
            print(f"Error processing data: {e}. Retrying ({retry_count+1}/{max_retries})...")
            retry_count += 1
            time.sleep(2)  # Exponential backoff simulation

# Run the Fault-Tolerant Data Processing
process_data(df)

# Load the Successfully Processed Data
processed_df = spark.read.parquet("dbfs:/mnt/processed_data.parquet")

# Show the Processed Data (Without Faulty Rows)
processed_df.show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 25|
|  2|  Bob| 30|
+---+-----+---+



#### 6️⃣ Can you discuss a complex data modeling problem you've encountered and how you addressed it?

In [0]:
#  SCD2 Implementation

from pyspark.sql.types import *

# Define Schema Explicitly
customer_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("address", StringType(), False),
    StructField("start_date", StringType(), False),
    StructField("end_date", StringType(), True),  # Allow NULLs
    StructField("active_flag", StringType(), False),
])

# Simulated Initial Customer Data (Existing Data Warehouse Table)
customer_data = [
    (1, "Alice", "123 Main St", "2023-01-01", None, "Y"),
    (2, "Bob", "456 Oak St", "2023-01-01", None, "Y"),
]

# Create DataFrame for Existing Customer Table
customer_df = spark.createDataFrame(customer_data, schema=customer_schema)

# Define Schema for Incoming Data
new_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("address", StringType(), False),
])

# New Incoming Data (ETL Load - Bob Changed Address)
new_data = [
    (1, "Alice", "123 Main St"),  # No Change
    (2, "Bob", "789 Pine St"),  # Address Changed
]

# Create DataFrame for New Incoming Data
new_df = spark.createDataFrame(new_data, schema=new_schema)

# Step 1: Identify Changes (Join Existing and New Data)
joined_df = customer_df.alias("old").join(new_df.alias("new"), "customer_id", "left")

# Step 2: Mark Old Records as Inactive if Address Changed
updated_existing_df = joined_df.withColumn(
    "end_date", when(col("old.address") != col("new.address"), current_date()).otherwise(col("old.end_date"))
).withColumn(
    "active_flag", when(col("old.address") != col("new.address"), lit("N")).otherwise(col("old.active_flag"))
).select("old.customer_id", "old.name", "old.address", "old.start_date", "end_date", "active_flag")

# Step 3: Insert New Record for Updated Customers
new_records_df = joined_df.filter(col("old.address") != col("new.address")).select(
    col("new.customer_id"),
    col("new.name"),
    col("new.address"),
    current_date().alias("start_date"),
    lit(None).alias("end_date"),
    lit("Y").alias("active_flag")
)

# Step 4: Merge Active and New Records
final_scd2_df = updated_existing_df.union(new_records_df)

# Show the Final SCD Type 2 Table
final_scd2_df.show()

+-----------+-----+-----------+----------+----------+-----------+
|customer_id| name|    address|start_date|  end_date|active_flag|
+-----------+-----+-----------+----------+----------+-----------+
|          1|Alice|123 Main St|2023-01-01|      NULL|          Y|
|          2|  Bob| 456 Oak St|2023-01-01|2025-02-25|          N|
|          2|  Bob|789 Pine St|2025-02-25|      NULL|          Y|
+-----------+-----+-----------+----------+----------+-----------+



#### 7️⃣ What are the challenges of processing real-time data, and how have you addressed them?

In [0]:
# Define Schema for Incoming Streaming Data
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("event_type", StringType(), True),
    StructField("timestamp", TimestampType(), True)
])

# Simulating a Streaming Source (In Real Case, Kafka or Socket would be used)
streaming_data = [
    (101, "click", "2024-02-25 12:00:01"),
    (102, "purchase", "2024-02-25 12:00:05"),
    (103, "view", "2024-02-25 12:00:10"),
]

columns = ["user_id", "event_type", "timestamp"]

# Creating DataFrame (Mock Streaming Data)
streaming_df = spark.createDataFrame(streaming_data, columns)

# Process Streaming Data (Convert Timestamp)
processed_df = streaming_df.withColumn("timestamp", col("timestamp").cast(TimestampType()))

# Show the Processed Streaming Data
processed_df.show()


+-------+----------+-------------------+
|user_id|event_type|          timestamp|
+-------+----------+-------------------+
|    101|     click|2024-02-25 12:00:01|
|    102|  purchase|2024-02-25 12:00:05|
|    103|      view|2024-02-25 12:00:10|
+-------+----------+-------------------+



#### 8️⃣ How do you implement data governance policies in your data engineering projects?

In [0]:
# Define Schema with Governance Controls
schema = StructType([
    StructField("user_id", IntegerType(), False),  # Mandatory field
    StructField("name", StringType(), False),  # No NULL allowed
    StructField("email", StringType(), True),  # Nullable field
    StructField("age", IntegerType(), True)  # Age can be NULL but should be >=18
])

# Sample Data (Including Some Bad Data)
raw_data = [
    (101, "Alice", "alice@example.com", 25),
    (102, "Bob", None, 30),  # Missing email
    (103, "Charlie", "charlie@example.com", 15),  # Invalid age
    (None, "David", "david@example.com", 40),  # Missing user_id (violates schema)
]

# Step 1: Remove Rows Where Required Fields Are NULL
filtered_data = [row for row in raw_data if row[0] is not None and row[1] is not None]

# Create DataFrame After Removing Bad Records
df = spark.createDataFrame(filtered_data, schema=schema)

# Step 2: Data Quality Checks (Filter Out Age < 18)
validated_df = df.filter(col("age").isNull() | (col("age") >= 18))

# Step 3: Mask Sensitive Data (e.g., Emails for Non-Admins)
masked_df = validated_df.withColumn(
    "email", when(col("email").isNotNull(), lit("[PROTECTED]")).otherwise(lit("No Email Provided")))

# Step 4: Add Metadata for Data Lineage
metadata_df = masked_df.withColumn("processed_by", lit("ETL Pipeline v1.0")) \
                        .withColumn("processed_at", lit("2024-02-25"))

# Show the Governed Data
metadata_df.show()

+-------+-----+-----------------+---+-----------------+------------+
|user_id| name|            email|age|     processed_by|processed_at|
+-------+-----+-----------------+---+-----------------+------------+
|    101|Alice|      [PROTECTED]| 25|ETL Pipeline v1.0|  2024-02-25|
|    102|  Bob|No Email Provided| 30|ETL Pipeline v1.0|  2024-02-25|
+-------+-----+-----------------+---+-----------------+------------+



#### 🔟 What security measures do you implement to protect sensitive data in your pipelines?

In [0]:
from pyspark.sql.functions import *

# Sample Data with Sensitive Information
data = [
    (101, "Alice", "alice@example.com", "1234-5678-9012-3456"),
    (102, "Bob", "bob@example.com", "9876-5432-1098-7654"),
    (103, "Charlie", "charlie@example.com", "1111-2222-3333-4444")
]

columns = ["user_id", "name", "email", "credit_card"]

# Creating DataFrame
df = spark.createDataFrame(data, columns)

# Step 1: Masking Sensitive Data (Emails)
masked_df = df.withColumn(
    "email", when(col("email").isNotNull(), lit("[PROTECTED]")).otherwise(col("email"))
)

# Step 2: Encrypting Credit Card Numbers (SHA-256 Hashing)
encrypted_df = masked_df.withColumn(
    "credit_card", sha2(col("credit_card"), 256)  # Securely hashing credit card numbers
)

# Step 3: Adding Audit Columns
secure_df = encrypted_df.withColumn("processed_by", lit("Secure ETL Pipeline v1.0")) \
                        .withColumn("processed_at", lit("2024-02-25"))

# Show the Secured Data
secure_df.show()


+-------+-------+-----------+--------------------+--------------------+------------+
|user_id|   name|      email|         credit_card|        processed_by|processed_at|
+-------+-------+-----------+--------------------+--------------------+------------+
|    101|  Alice|[PROTECTED]|5d444387a5d39f5de...|Secure ETL Pipeli...|  2024-02-25|
|    102|    Bob|[PROTECTED]|05e819efb49067336...|Secure ETL Pipeli...|  2024-02-25|
|    103|Charlie|[PROTECTED]|f0a6cbb7aca2df9f6...|Secure ETL Pipeli...|  2024-02-25|
+-------+-------+-----------+--------------------+--------------------+------------+

