## **Python Interview Questions for Data Engineers – Part 1**

In [0]:
print("Python Interview Questions for Data Engineers – Part 1")

1. Large File Ingestion – Generators vs Lists

**Interview Question (Context + Question Together)**

You are ingesting a 50GB application log file in Python and you only need to filter **ERROR** records before loading them into the Bronze layer.
If you read the entire file into a list, the job crashes due to memory issues.
How would you solve this in Python and why?

In [0]:
def read_logs(path):
    with open(path,'r') as f:
        for line in f:
            yield line

error_logs=(log for log in read_logs("app.log") if "ERROR" in log)

for log in error_logs:
    print(log)

2. Schema Builder Bug – Mutable Default Arguments

**Interview Question**

You are writing a reusable Python function to dynamically build a list of columns for transformation.
This function is called multiple times in the same pipeline run.
However, columns from previous runs keep appearing unexpectedly.
What is wrong in this code and how do you fix it?

In [0]:
#BUG CODE
def add_column(col, cols=[]):
    cols.append(col)
    return cols


In [0]:
add_column("order_id")
print(add_column("customer_id"))

In [0]:
def add_column(col, cols=[]):
    if cols is None:
        cols=[]
    cols.append(col)
    return cols

In [0]:
add_column("order_id")
print(add_column("customer_id"))


3. Bronze to Silver Transformation – Shallow Copy Issue

**Interview Question**

You load records into the Bronze layer, copy them, and enrich them for the Silver layer.
After transformation, you notice Bronze data has also changed.
Here is the code. Why is this happening?

In [0]:
records = [{"id": 1, "tags": ["new"]}]
silver = records.copy()
silver[0]["tags"].append("processed")


In [0]:
records

In [0]:
import copy 
records = [{"id": 1, "tags": ["new"]}]
silver=copy.deepcopy(records)
silver[0]["tags"].append("processed")

In [0]:
records

In [0]:
silver

4. Generic Pipeline Execution – *args & **kwargs

**Interview Question**

You are building a Python pipeline framework where different transformation steps receive different parameters.
How do you design a generic executor that supports flexible inputs?

**Answer Explanation**

*args and **kwargs allow us to pass variable parameters dynamically, making pipelines configurable and reusable.

In [0]:
def run_step(step, *args, **kwargs):
    return step(*args, **kwargs)

def transform(value, multiplier=2):
    return value * multiplier

run_step(transform, 10, multiplier=5)


5. Retry Failure – Idempotent ETL Design

**Interview Question**

your Airflow task fails after writing partial data and then retries.
How do you design your Python ETL so that retries don’t create duplicate records?

In [0]:
from pathlib import Path

def write_data(path, rows):
    tmp = Path(path + ".tmp")
    final = Path(path)

    with open(tmp, "w") as f:
        for r in rows:
            f.write(str(r) + "\n")

    tmp.replace(final)


6. Date Partition Bug – Timezone Handling

**Interview Question**

Your pipeline partitions data by date, but records appear in the wrong partition depending on server timezone.
How do you fix this in Python?

In [0]:
from datetime import datetime,timezone

datetime.now(timezone.utc)