In [0]:
print("Heelo")

#PYSPARK INTERVIEW QUESTIONS from Ansh

###Q1 While ingesting customer data from an external source, you notice duplicate entries. How would you remove duplicates and retain only the latest entry based on a timestamp column?

In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
data = [("101", "2023-12-01", 100), ("101", "2023-12-02", 150), 
        ("102", "2023-12-01", 200), ("102", "2023-12-02", 250)]
columns = ["product_id", "date", "sales"]

df = spark.createDataFrame(data, columns)
df.display()

In [0]:
df = df.withColumn('date', col('date').cast(DateType()))

In [0]:
df.withColumn('date',to_date('date')).display()

In [0]:
df.display()

In [0]:
df.dropDuplicates(subset=['product_id']).display()

In [0]:
df.display()

In [0]:
df = df.orderBy(['product_id','date'], ascending=[1,0]).dropDuplicates(['product_id'])

# df = df.orderBy(['product_id', 'date'], ascending=[True, False])\
    #    .dropDuplicates(['product_id'])

In [0]:
df.display()

# 2. While processing data from multiple files with inconsistent schemas, you need to merge them into a single DataFrame. How would you handle this inconsistency in PySpark?

In [0]:
df = spark.read.format('parquet')\
    .option('mergeSchema',True)\
    .load('/Volumes/datasets/practice/internal_files/output/')

In [0]:
df.display()

#MapReduce
Writes intermediate results to disk between each stage (e.g., between Mapper and Reducer).

The Reducer reads data from disk, which makes the process time-consuming.

It is primarily designed for batch processing.

Due to frequent disk I/O, MapReduce is slower than in-memory frameworks like Spark.

#Apache Spark
Performs most computations in memory, writing to disk only during final write operations or when memory overflows.

Faster compared to MapReduce due to reduced disk I/O.

Less time-consuming for iterative and complex computations.

Supports both batch and streaming processing.

#4. You are working with a real-time data pipeline, and you notice missing values in your streaming data Column - Category. How would you handle null or missing values in such a scenario?

df_stream = spark.readStream.schema("id INT, value STRING").csv("path/to/stream")

In [0]:
df_stream = spark.read.format('csv')\
    .option('InferSchema',True)\
    .option('Header','True')\
    .load('/Volumes/datasets/practice/internal_files/BigMart Sales.csv')

In [0]:
df_stream.display()

In [0]:
df_st = df_stream.fillna({'Item_Weight':100})

In [0]:
df_st.display()

##5. You need to calculate the total number of actions performed by users in a system. How would you calculate the top 5 most active users based on this information?

In [0]:
data = [("user1", 5), ("user2", 8), ("user3", 2), ("user4", 10), ("user2", 3)]
columns = ["user_id", "actions"]

df = spark.createDataFrame(data, columns)
df.display()

In [0]:
df.groupBy('user_id').agg(sum('actions').alias('Total_Actions')).orderBy('Total_Actions',ascending=False).limit(5).display()

#6. While processing sales transaction data, you need to identify the most recent transaction for each customer. How would you approach this task?

In [0]:
data = [("cust1", "2023-12-01", 100), ("cust2", "2023-12-02", 150),
        ("cust1", "2023-12-03", 200), ("cust2", "2023-12-04", 250)]
columns = ["customer_id", "transaction_date", "sales"]
df = spark.createDataFrame(data, columns)
df.display()

In [0]:
df.groupBy('customer_id').agg(max('transaction_date').alias('Last_Transaction')).display()
# df.groupBy('customer_id').agg(sum('sales').alias('Total_Sales')).display()

In [0]:
from pyspark.sql.window import Window

In [0]:
df = df.withColumn('dense_rank', dense_rank().over(Window.partitionBy('customer_id').orderBy(col('transaction_date').desc()))).filter(col('dense_rank')==1).drop('dense_rank')

In [0]:
df.display()

#7. You need to identify customers who haven’t made any purchases in the last 30 days. How would you filter such customers?

In [0]:
data = [("cust1", "2025-12-01"), ("cust2", "2024-11-20"), ("cust3", "2024-11-25")]
columns = ["customer_id", "last_purchase_date"]

df = spark.createDataFrame(data, columns)

df.display()

In [0]:
df2 =df.withColumn('last_purchase_date', col('last_purchase_date').cast(DateType()))

In [0]:
df2.display()

In [0]:
df2.withColumn('gap',date_diff(current_date(), 'last_purchase_date')).filter(col('gap')>30).display()

In [0]:
from pyspark.sql.functions import current_date, to_date, col, date_sub

# Convert string date to DateType if not already
df = df.withColumn("last_purchase_date", to_date(col("last_purchase_date")))

# Filter customers whose last purchase date is older than 30 days ago
inactive_customers = df.filter(col("last_purchase_date") < date_sub(current_date(), 30))

inactive_customers.display()


In [0]:
df.filter(col('last_purchase_date') > date_sub(current_date(),90)).display()

#8. While analyzing customer reviews, you need to identify the most frequently used words in the feedback. How would you implement this?

In [0]:
data = [("customer1", "The product is great"), ("customer2", "Great product, fast delivery"), ("customer3", "Not bad, could be better")]
columns = ["customer_id", "feedback"]

df = spark.createDataFrame(data, columns)

df.display()

In [0]:
df1 = df.withColumn('lists', split(col('feedback'), ' '))
df1.display()

In [0]:
df1 = df1.withColumn('Explode_col',explode('lists'))
df1.display()


In [0]:
df1.withColumn('Explode_col',trim(col('Explode_col'))).display()

In [0]:
df2 = df1.withColumn('Explode_col',regexp_replace(col('Explode_col'),',',''))

In [0]:
df2.groupBy('Explode_col').agg(count('Explode_col').alias('Count')).filter(col('Count')>1).display()

#9. You need to calculate the cumulative sum of sales over time for each product. How would you approach this?

In [0]:
data = [("product1", "2023-12-01", 100), ("product2", "2023-12-02", 200),
        ("product1", "2023-12-03", 150), ("product2", "2023-12-04", 250)]
columns = ["product_id", "date", "sales"]
df = spark.createDataFrame(data, columns)
df.display()

In [0]:
df.withColumn('Cum_sum', sum('sales').over(Window.partitionBy('product_id').orderBy('date').rowsBetween(Window.unboundedPreceding, Window.currentRow))).display()

In [0]:
df.withColumn('Cum_sum', sum('sales').over(Window.partitionBy('product_id').orderBy('date'))).display()

#10. While preparing a data pipeline, you notice some duplicate rows in a dataset. How would you remove the duplicates without affecting the original order?

In [0]:
data = [("John", 25), ("Jane", 30), ("John", 25), ("Alice", 22)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
df.display()

In [0]:
df.withColumn('drop_dup', row_number().over(Window.partitionBy('name','age').orderBy('name'))).filter(col('drop_dup')==1).drop('drop_dup').display()

In [0]:
df.withColumn('dedup',row_number().over(Window.partitionBy('name').orderBy('age'))).filter(col('dedup')==1).drop('dedup').display()


#11. You are working with user activity data and need to calculate the average session duration per user. How would you implement this?

In [0]:
data = [("user1", "2023-12-01", 50), ("user1", "2023-12-02", 60), 
        ("user2", "2023-12-01", 45), ("user2", "2023-12-03", 75)]
columns = ["user_id", "session_date", "duration"]
df = spark.createDataFrame(data, columns)

df.display()

In [0]:
df.groupBy('user_id').agg(avg('duration').alias('avg_duration')).display()

#12. While analyzing sales data, you need to find the product with the highest sales for each month. How would you accomplish this?

In [0]:
data = [("product1", "2023-12-01", 100), ("product2", "2023-12-01", 150), 
        ("product1", "2023-12-02", 200), ("product2", "2023-12-02", 250)]
columns = ["product_id", "date", "sales"]
df = spark.createDataFrame(data, columns)
df.display()

In [0]:
df3 = df.withColumn('date_col', to_date('date'))\
    .withColumn('month',month('date_col'))

In [0]:
df3.display()

In [0]:
df3.groupBy('month','product_id').agg(sum('sales').alias('highest_sales')).orderBy(col('highest_sales').desc()).display()

#13. You are working with a large Delta table that is frequently updated by multiple users. The data is stored in partitions, and sometimes updates can cause inconsistent reads due to concurrent transactions. How would you ensure ACID compliance and avoid data corruption in PySpark?

In [0]:
df = spark.read.format('parquet').load('/Volumes/datasets/practice/internal_files/output/data.parquet/')

In [0]:
df.display()

In [0]:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, '/Volumes/datasets/practice/internal_files/output/data.parquet/')

In [0]:
deltaTable.alias('trg').merge(df.alias('src'), 'trg.id = src.id')\
          .whenNotMatchedInsertAll()\
          .whenMatchedUpdateAll()

#14. You need to process a large dataset stored in PARQUET format and ensure that all columns have the right schema (Almost). How would you do this?

In [0]:
df = spark.read.format('parquet')\
    .option('inferSchem',True)\
    .load('/Volumes/datasets/practice/internal_files/output/data.parquet/')

In [0]:
df.display()

#15. You are reading a CSV file and need to handle corrupt records gracefully by skipping them. How would you configure this in PySpark?

In [0]:
df = spark.read.format('csv')\
    .option('inferSchema',True)\
    .option('mode','DROPMALFORMED')\
    .load('/Volumes/datasets/practice/internal_files/order_data.csv')

In [0]:
df.display()

#Difference between RDD and DATAFRAMES

| Feature                  | RDD (Resilient Distributed Dataset)         | DataFrame                                               |
| ------------------------ | ------------------------------------------- | ------------------------------------------------------- |
| **Abstraction Level**    | Low-level (object-oriented)                 | High-level (tabular like SQL)                           |
| **Ease of Use**          | More complex (requires more code)           | Easy (SQL-like, less code)                              |
| **Data Format**          | Distributed collection of objects           | Distributed table with named columns                    |
| **Optimization**         | No automatic optimization                   | Uses Catalyst optimizer & Tungsten engine               |
| **Performance**          | Slower (manual tuning needed)               | Faster (optimized execution plan)                       |
| **Schema Support**       | No schema                                   | Schema-aware                                            |
| **Type Safety**          | Type-safe (in Scala/Java)                   | Partially type-safe (better in Scala than Python)       |
| **Use Case**             | Complex data operations, custom processing  | Structured data, ETL, SQL operations                    |
| **Supported Operations** | Low-level transformations (`map`, `filter`) | High-level ops (`select`, `groupBy`, `join`, etc.)      |
| **API Style**            | Functional programming                      | Declarative/SQL-style                                   |
| **Best For**             | Fine-grained data transformations           | Standard ETL, analytics, performance-critical workflows |


| Feature                 | DataFrame                                      | Dataset                                               |
| ----------------------- | ---------------------------------------------- | ----------------------------------------------------- |
| **Abstraction Level**   | High-level (tabular data, like SQL table)      | Type-safe, object-oriented + tabular                  |
| **Language Support**    | Supported in **Scala, Java, Python, R**        | **Scala and Java only**                               |
| **Type Safety**         | **No** compile-time type checking              | **Yes**, with compile-time type safety                |
| **Compile-Time Errors** | Detected only at runtime                       | Detected at compile time (for Scala/Java)             |
| **Ease of Use**         | Easier to use (less boilerplate)               | Requires defining case classes or custom types        |
| **Performance**         | Similar performance (uses Catalyst + Tungsten) | Similar performance (optimized query plan)            |
| **Serialization**       | Uses Tungsten's off-heap binary format         | Uses Encoders (slightly more overhead)                |
| **Data Handling**       | Best for **structured data, ETL**              | Best for **type-safe transformations, complex logic** |
| **Interoperability**    | Easily converted to/from RDD and Dataset       | Can be converted to/from DataFrame and RDD            |


#✅ What is Query Optimization in Spark?
Query Optimization in Spark is the process of transforming user-written logical plans into efficient physical plans that run faster with fewer resources.

This optimization is done automatically by the Catalyst Optimizer when you write Spark SQL or DataFrame queries.

| Stage                       | Description                                                                       |
| --------------------------- | --------------------------------------------------------------------------------- |
| **1. Analysis**             | Converts SQL/DataFrame code into an **unresolved logical plan** using schema info |
| **2. Logical Optimization** | Applies **rules** like constant folding, predicate pushdown, etc.                 |
| **3. Physical Planning**    | Generates one or more **physical plans** (e.g., hash join vs sort-merge join)     |
| **4. Code Generation**      | Uses **Tungsten engine** to generate optimized bytecode (JVM) for execution       |


🔧 Optimization Techniques (Automatically Done)
| Optimization Name                  | What It Does                                            |
| ---------------------------------- | ------------------------------------------------------- |
| **Predicate Pushdown**             | Moves filters closer to the data source                 |
| **Constant Folding**               | Evaluates constant expressions at compile time          |
| **Projection Pruning**             | Selects only needed columns (avoids full data scans)    |
| **Join Reordering**                | Picks the best join order to reduce shuffling           |
| **Filter Pushdown in Parquet/ORC** | Applies filters while reading from disk to minimize I/O |


In [0]:
df.explain(True)

#🔥 Apache SparkSession – Explained Clearly
A SparkSession is the entry point to programming with Spark using the DataFrame and Dataset API in Spark 2.x and above.

Before Spark 2.0, you had to use:

SparkContext (for core Spark features)

SQLContext (for SQL features)

But now, SparkSession replaces them all.

✅ What is SparkSession?
It's the gateway to all Spark functionalities.

Used to:

Read data from sources (CSV, JSON, Parquet, Hive, etc.)

Create DataFrames

Run SQL queries

Configure Spark settings

#🧠 What are Transformations in Spark?
In Spark, transformations are operations that return a new RDD or DataFrame from an existing one.
They are of two types:

Type	Meaning
Narrow	Data required to compute the result comes from a single partition
Wide	Data required comes from multiple partitions → causes shuffle



| Feature                       | **Narrow Transformation**            | **Wide Transformation**                            |
| ----------------------------- | ------------------------------------ | -------------------------------------------------- |
| **Data Movement**             | No data movement across partitions   | Requires data shuffle across partitions            |
| **Dependency Type**           | One-to-one or few-to-one             | Many-to-one or many-to-many                        |
| **Performance**               | Faster, more efficient               | Slower due to shuffle overhead                     |
| **Examples**                  | `map`, `filter`, `union`, `coalesce` | `groupByKey`, `reduceByKey`, `join`, `repartition` |
| **Shuffle**                   | ❌ No shuffle                         | ✅ Causes shuffle                                   |
| **Task Scheduling**           | Simple linear execution              | Requires barrier/shuffle stages                    |
| **Fault Tolerance (Lineage)** | Easier to recompute                  | Requires recomputation from multiple sources       |


###| Feature                       | **Narrow Transformation**            | **Wide Transformation**                            |
| ----------------------------- | ------------------------------------ | -------------------------------------------------- |
| **Data Movement**             | No data movement across partitions   | Requires data shuffle across partitions            |
| **Dependency Type**           | One-to-one or few-to-one             | Many-to-one or many-to-many                        |
| **Performance**               | Faster, more efficient               | Slower due to shuffle overhead                     |
| **Examples**                  | `map`, `filter`, `union`, `coalesce` | `groupByKey`, `reduceByKey`, `join`, `repartition` |
| **Shuffle**                   | ❌ No shuffle                         | ✅ Causes shuffle                                   |
| **Task Scheduling**           | Simple linear execution              | Requires barrier/shuffle stages                    |
| **Fault Tolerance (Lineage)** | Easier to recompute                  | Requires recomputation from multiple sources       |


#🧠 What Are They?
Both coalesce() and repartition() are used to change the number of partitions in a DataFrame or RDD.

| Function         | Purpose                                           |
| ---------------- | ------------------------------------------------- |
| `coalesce(n)`    | **Decrease** the number of partitions             |
| `repartition(n)` | **Increase or decrease** the number of partitions |


#✅ Difference Between coalesce() vs repartition()
| Feature             | `coalesce()`                              | `repartition()`                                     |
| ------------------- | ----------------------------------------- | --------------------------------------------------- |
| Purpose             | Reduce partitions (narrow transformation) | Increase or re-distribute (wide transformation)     |
| Shuffle             | ❌ No full shuffle                         | ✅ Causes full shuffle                               |
| Use Case            | Optimize performance when writing to disk | Ensure better parallelism before joins/aggregations |
| Performance         | Faster (less data movement)               | Slower (due to shuffling)                           |
| Best When           | Reducing partitions (e.g., from 200 → 10) | Rebalancing or increasing partitions                |
| Transformation Type | Narrow                                    | Wide                                                |


#⚠️ Best Practices
Use **coalesce()** when:

You are only decreasing partitions.

You want less data movement.

You are writing fewer output files.

Use **repartition()** when:

You need better data distribution.

You're performing joins or groupBy on large data.

You want more parallelism.

#💾 Spark cache() vs persist() – Detailed Explanation with Table
Both cache() and persist() in Apache Spark are used to store the intermediate results of DataFrames or RDDs in memory (and/or disk) for reuse — to avoid recomputation and improve performance in iterative or multi-step pipelines.


#✅ What is persist()?
persist() gives you more control over how and where the data is stored.

You can choose various StorageLevels, like:

MEMORY_ONLY

MEMORY_AND_DISK

DISK_ONLY

MEMORY_ONLY_SER (serialized form)

etc.

#📊 cache() vs persist() – Comparison Table
| Feature                   | `cache()`                                   | `persist()`                                     |
| ------------------------- | ------------------------------------------- | ----------------------------------------------- |
| **Storage Level**         | `MEMORY_AND_DISK` (default)                 | Customizable (`MEMORY_ONLY`, `DISK_ONLY`, etc.) |
| **Control**               | Limited                                     | Full control over storage strategy              |
| **Ease of Use**           | Very simple (no arguments)                  | Requires specifying `StorageLevel`              |
| **Use Case**              | General caching when defaults are fine      | Specific tuning (e.g., memory-constrained jobs) |
| **Serialization Options** | Not available                               | Can use serialized formats (`MEMORY_ONLY_SER`)  |
| **Reusability**           | Both support reuse across multiple actions  | Both support reuse across multiple actions      |
| **Unpersisting**          | Can use `.unpersist()` to remove from cache | Same with `.unpersist()`                        |


#🔍 What "Memory" and "Disk" Mean in Databricks / Spark
When you call .cache() or .persist() on a DataFrame/RDD in Databricks (Spark), you're asking Spark to store the data in memory (RAM) or on local disk — on the worker nodes.

Term	Meaning in Databricks (Spark) Context
Memory	The RAM available on the worker nodes in your Databricks cluster
Disk	The local disk space on the same worker nodes (not S3/DBFS)

In [0]:
df.printSchema()

#✅ Why Partitioning Matters in PySpark
| Benefit                          | How Partitioning Helps                                              |
| -------------------------------- | ------------------------------------------------------------------- |
| ⚡ Faster Execution               | Enables **parallel processing** across worker nodes                 |
| 🔁 Reduced Shuffle               | If partitioned properly (e.g., on join keys), reduces data movement |
| 💽 Better Memory Usage           | Each partition fits into executor memory; avoids out-of-memory      |
| 📊 Scalable Joins & Aggregations | Partitioned data reduces skew and improves load balancing           |
| 🔁 Reuse Cached Data             | Partitions stay in memory for reuse if cached/persisted             |


#22. You have a dataset containing the names of employees and their departments. You need to find the department with the most employees.

In [0]:
data = [("Alice", "HR"), ("Bob", "Finance"), ("Charlie", "HR"), ("David", "Engineering"), ("Eve", "Finance")]
columns = ["employee_name", "department"]

df = spark.createDataFrame(data, columns)
df.display()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *

In [0]:
df.groupBy('department').agg(count('employee_name').alias('Total_Employees')).sort('Total_Employees', ascending=False).display()

#23. While processing sales data, you need to classify each transaction as either 'High' or 'Low' based on its amount. How would you achieve this using a when condition

In [0]:
data = [("product1", 100), ("product2", 300), ("product3", 50)]
columns = ["product_id", "sales"]

df = spark.createDataFrame(data, columns)
df.display()

In [0]:
df.withColumn('Classify_Flag', 
              when(col('sales')>=300,'High')
             .when((col('sales')>=100) & (col('sales')<300),'Medium')
             .otherwise('Low')).display()

# df.withColumn('Classify_Flag',
#     when(col('sales') >= 300, 'High')
#     .when((col('sales') >= 100) & (col('sales') < 300), 'Medium')
#     .otherwise('Low')
# ).display()

#24. While analyzing a large dataset, you need to create a new column that holds a timestamp of when the record was processed. How would you implement this and what can be the best USE CASE?

In [0]:
data = [("product1", 100), ("product2", 200), ("product3", 300)]
columns = ["product_id", "sales"]

df = spark.createDataFrame(data, columns)
df.display()

In [0]:
df.withColumn('processing_Time',current_timestamp()).display()

#25. You need to register this PySpark DataFrame as a temporary SQL object and run a query on it. How would you achieve this?

In [0]:
data = [("product1", 100), ("product2", 200), ("product3", 300)]
columns = ["product_id", "sales"]

df = spark.createDataFrame(data, columns)
df.display()

In [0]:
df.createTempView('My_View_Table')

In [0]:
%sql
select * from My_View_Table;

#26. You need to register this PySpark DataFrame as a temporary SQL object and run a query on it (FROM DIFFERENT NOTEBOOKS AS WELL)?

In [0]:
#df.createOrReplaceGlobalTempView('Table_global')

#27. You need to query data from a PySpark DataFrame using SQL, but the data includes a nested structure. How would you flatten the data for easier querying?

In [0]:
data = [("product1", {"price": 100, "quantity": 2}), 
        ("product2", {"price": 200, "quantity": 3})]
columns = ["product_id", "product_info"]

df = spark.createDataFrame(data, columns)
df.display()

In [0]:
df.select('product_id', "product_info.price", "product_info.quantity").display()

In [0]:
df.select('product_id', "product_info.price", "product_info.quantity").createTempView('flat_view')

In [0]:
%sql
select * from flat_view;

#28. You are ingesting data from an external API in JSON format where the schema is inconsistent. How would you handle this situation to ensure a robust pipeline?

In [0]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# 1. Define flexible schema (only core fields)
schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# 2. Read raw JSON as string
raw_df = spark.read.text("s3://path/to/json")

# 3. Parse JSON with schema
parsed_df = raw_df.withColumn("json_data", from_json(col("value"), schema)).select("json_data.*")


#29. While reading data from Parquet, you need to optimize performance by partitioning the data based on a column. How would you implement this?

In [0]:
df= spark.read.format('csv').option('inferSchema',True).option('header', True).load('/Volumes/datasets/practice/internal_files/BigMart Sales.csv')

In [0]:
df.display()

In [0]:
df.write.format('parquet').mode('append').partitionBy('Outlet_Size').save('/Volumes/datasets/practice/internal_files/output2/')

In [0]:
df2 = spark.read.format('parquet').load('/Volumes/datasets/practice/internal_files/output2/Outlet_Size=High/')

In [0]:
df2.display()

#30. You are working with a large dataset in Parquet format and need to ensure that the data is written in an optimized manner with proper compression. How would you accomplish this?

In [0]:
df.write.format('parquet').option('compression','snappy').mode('append').partitionBy('Outlet_Size').save('/Volumes/datasets/practice/internal_files/output4/')

#31. Your company uses a large-scale data pipeline that reads from Delta tables and processes data using complex aggregations. However, performance is becoming an issue due to the growing dataset size. How would you optimize the performance of the pipeline?

#✅ 1. Optimize Delta Table Storage
Partitioning: Re-evaluate your partitioning strategy.

Use columns with high cardinality but even distribution.

Avoid over-partitioning which can lead to many small files (too many metadata operations).

Z-Ordering: Apply OPTIMIZE ... ZORDER BY on frequently filtered or joined columns.

Improves data skipping and reduces scan time.

File Compaction: Periodically run OPTIMIZE to compact small files.

Reduces shuffle and I/O overhead during reads.

#✅ 2. Query Optimization Techniques
Push Down Predicates: Ensure filters are pushed down early (e.g., df.filter() before joins/aggregations).

Use Broadcast Join (if one side is small): Reduces shuffle and improves join performance.

python
Copy
Edit
df_large.join(broadcast(df_small), on="key")
Cache Intermediate Results: If intermediate DataFrames are reused, persist/cache them.

python
Copy
Edit
df.persist(StorageLevel.MEMORY_AND_DISK)

#✅ Why Are Broadcast Variables Used?
In distributed computing, if you use a variable (e.g., a lookup dictionary) inside transformations like .map(), it gets shipped with every task, potentially causing network overhead and redundancy.

Broadcast variables solve this by:

Sending the variable only once to each executor.

Storing it in-memory, making access fast and avoiding repeated transmission.

| Feature       | Description                                                       |
| ------------- | ----------------------------------------------------------------- |
| Type          | Read-only shared variable                                         |
| When to Use   | Small data used across many transformations (e.g., lookup tables) |
| Benefit       | Reduces network traffic & improves job performance                |
| Access Syntax | `broadcast_var.value`                                             |


#🔍 df.show() vs df.collect() — Key Differences:

| Feature            | `df.show()`                                       | `df.collect()`                                       |
| ------------------ | ------------------------------------------------- | ---------------------------------------------------- |
| **Purpose**        | Displays a **preview** (default: 20 rows) of data | Collects **all rows** into driver memory             |
| **Return Type**    | **None** (just prints output)                     | Returns a **list of Row objects**                    |
| **Performance**    | Lightweight — good for **previewing data**        | Heavy — risky for large datasets                     |
| **Memory Usage**   | Minimal (prints limited rows)                     | High — can crash driver if data is too big           |
| **Common Use**     | Debugging, quick look at data                     | When you actually need to access full data in driver |
| **Syntax Example** | `df.show(10)`                                     | `df.collect()`                                       |


#🔍 What is Lazy Evaluation in PySpark?
Lazy Evaluation means that PySpark does not immediately execute your transformations (like select, filter, map, withColumn, etc.).
Instead, it builds a logical plan (a DAG – Directed Acyclic Graph) and waits until an action (like show(), collect(), count(), etc.) is called to actually run the computation.

df = spark.read.csv("sales.csv", header=True)

# These are transformations (lazy)
df_filtered = df.filter(df["amount"] > 1000)
df_grouped = df_filtered.groupBy("region").sum("amount")

# This is an action (triggers execution)
df_grouped.show()


| Benefit               | Explanation                                                                                |
| --------------------- | ------------------------------------------------------------------------------------------ |
| ✅ **Optimization**    | Spark can analyze the entire DAG and **optimize** the execution plan (Catalyst Optimizer). |
| ✅ **Efficiency**      | Avoids unnecessary computations — unused transformations aren’t run.                       |
| ✅ **Fault Tolerance** | Keeps lineage info so it can recompute lost partitions on failure.                         |


df = spark.read.csv("sales.csv", header=True)

# These are transformations (lazy)
df_filtered = df.filter(df["amount"] > 1000)
df_grouped = df_filtered.groupBy("region").sum("amount")

# This is an action (triggers execution)
df_grouped.show()


#Transformations(Lazy)
These define a computation, but don’t trigger execution. They return a new DataFrame or RDD.
| Transformation              | Description                                 |
| --------------------------- | ------------------------------------------- |
| `select()`                  | Select specific columns                     |
| `filter()` / `where()`      | Filter rows based on a condition            |
| `withColumn()`              | Add or modify a column                      |
| `drop()`                    | Drop columns                                |
| `groupBy()`                 | Group rows based on a column                |
| `agg()`                     | Apply aggregations on grouped data          |
| `orderBy()` / `sort()`      | Sort rows                                   |
| `join()`                    | Join two DataFrames                         |
| `distinct()`                | Remove duplicate rows                       |
| `dropDuplicates()`          | Remove duplicates based on specific columns |
| `limit()`                   | Limit number of rows                        |
| `repartition()`             | Increase number of partitions               |
| `coalesce()`                | Reduce number of partitions                 |
| `union()` / `unionByName()` | Combine two DataFrames                      |
| `explode()`                 | Flatten array or map columns                |
| `alias()`                   | Rename columns (often used in `select()`)   |
| `cache()` / `persist()`     | Mark a DataFrame to be cached for reuse     |


#⚡ Actions (Trigger Execution)
These trigger execution of the transformations and return results or write output.
| Action               | Description                                               |
| -------------------- | --------------------------------------------------------- |
| `show()`             | Displays first 20 rows (default)                          |
| `collect()`          | Returns all rows as a list (use cautiously)               |
| `count()`            | Returns number of rows                                    |
| `take(n)`            | Returns first `n` rows                                    |
| `first()`            | Returns the first row                                     |
| `head()`             | Alias for `take(1)`                                       |
| `foreach()`          | Applies a function to each row (no return)                |
| `foreachPartition()` | Applies function to each partition                        |
| `write()`            | Saves the DataFrame to storage (CSV, JSON, Parquet, etc.) |
| `saveAsTable()`      | Saves DataFrame as a Hive or Delta table                  |
| `toPandas()`         | Converts to Pandas DataFrame (only for small datasets)    |
| `reduce()`           | Aggregates all elements using a function (RDD)            |


#✅ Advantages of Delta Lake over Traditional File Formats
| Feature                        | Delta Lake 🟢                                         | Traditional Formats (CSV, JSON, Parquet) 🔴 |
| ------------------------------ | ----------------------------------------------------- | ------------------------------------------- |
| **ACID Transactions**          | ✅ Yes (ensures data reliability and consistency)      | ❌ No transactional guarantees               |
| **Schema Evolution**           | ✅ Supports automatic schema updates (`mergeSchema`)   | ❌ Schema must be manually managed           |
| **Time Travel**                | ✅ Access data as of a version or timestamp            | ❌ Not supported                             |
| **Data Versioning**            | ✅ Built-in (every change tracked)                     | ❌ Not available                             |
| **Upserts & Deletes**          | ✅ Easy with `MERGE`, `UPDATE`, `DELETE`               | ❌ Not natively supported                    |
| **Unified Batch & Streaming**  | ✅ Same table supports both (via Structured Streaming) | ❌ Streaming needs separate handling         |
| **Data Quality (Constraints)** | ✅ Supports `NOT NULL`, `CHECK`, custom constraints    | ❌ No constraint enforcement                 |
| **Performance (Z-Ordering)**   | ✅ Data skipping & faster reads                        | ❌ Slower queries, no intelligent indexing   |
| **Scalability & Reliability**  | ✅ Excellent for big data pipelines                    | ❌ Harder to manage at scale                 |
| **Vacuum & Compaction**        | ✅ Built-in file cleanup and optimization              | ❌ Manual cleanup needed                     |


| Error Message                                 | Likely Cause                              |
| --------------------------------------------- | ----------------------------------------- |
| `java.lang.OutOfMemoryError: Java heap space` | Insufficient memory for executor task     |
| `GC overhead limit exceeded`                  | Too much time spent on garbage collection |
| `ExecutorLostFailure`                         | Executor crashed due to memory pressure   |
| `Python worker failed to connect back`        | Worker crash due to OOM                   |


#🔍 What is AQE in PySpark?
AQE (Adaptive Query Execution) is a feature in Spark (from version 3.0 onwards) that dynamically optimizes query plans at runtime, based on the actual data statistics collected during query execution — rather than relying solely on static compile-time plans.

| AQE Optimization                               | Description                                                                 |
| ---------------------------------------------- | --------------------------------------------------------------------------- |
| ✅ **Dynamic Partition Pruning**                | Skips unnecessary partitions at runtime during joins                        |
| ✅ **Dynamic Join Strategy Switching**          | Chooses **Broadcast Join** or **Sort-Merge Join** based on actual data size |
| ✅ **Dynamic Coalescing of Shuffle Partitions** | Adjusts the number of shuffle partitions to prevent small tasks or skew     |
| ✅ **Handling Data Skew**                       | Detects skewed partitions and splits them                                   |


# DAtA SKEW
Handling skewed data is a critical part of optimizing PySpark performance, especially for joins, groupBy, and aggregations. **Skewed data means that some keys have a disproportionately large number of records, causing task imbalance, longer job runtimes, and possible out-of-memory errors.**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, concat, lit
from pyspark.sql.types import StructType, StructField, StringType

# spark = SparkSession.builder.appName("SkewedJoinHandling").getOrCreate()

# Skewed large orders dataset
orders_data = [
    ("India", "A1"),
    ("India", "A2"),
    ("India", "A3"),
    ("India", "A4"),
    ("India", "A5"),
    ("US", "B1"),
    ("UK", "C1")
]

orders_schema = StructType([
    StructField("country", StringType(), True),
    StructField("order", StringType(), True)
])

df_orders = spark.createDataFrame(orders_data, schema=orders_schema)
df_orders.show()


In [0]:
country_info_data = [
    ("India", "Asia"),
    ("US", "America"),
    ("UK", "Europe")
]

country_info_schema = StructType([
    StructField("country", StringType(), True),
    StructField("region", StringType(), True)
])

df_country_info = spark.createDataFrame(country_info_data, schema=country_info_schema)
df_country_info.show()


#❌ 3. Problem with Normal Join (skew issue)


If "India" had millions of rows, this join would be heavily skewed and inefficient.

In [0]:
df_skewed_join = df_orders.join(df_country_info, on="country")
df_skewed_join.show()

#✅ 4. Solution – Apply Salting Technique
🔄 Step 1: Add Salt to df_orders

In [0]:
df_orders_salted = df_orders.withColumn(
    "country_salt",
    concat(col("country"), lit("_"), (rand() * 5).cast("int"))
)
df_orders_salted.show(truncate=False)


#🔄 Step 2: Duplicate df_country_info with Matching Salt

In [0]:
# Create 5 copies with salt values from 0 to 4
from functools import reduce

salted_info_dfs = []
for i in range(5):
    salted_df = df_country_info.withColumn(
        "country_salt",
        concat(col("country"), lit(f"_{i}"))
    )
    salted_info_dfs.append(salted_df)

# Union all salted versions together
df_country_info_salted = reduce(lambda df1, df2: df1.union(df2), salted_info_dfs)
df_country_info_salted.show(truncate=False)


#🔄 Step 3: Join on country_salt

In [0]:
df_final = df_orders_salted.join(df_country_info_salted, on="country_salt")
df_final.display()
df_final.select("country_salt", "order", "region").show()


#🔍 What is a Broadcast Join in PySpark?
A broadcast join is an optimized join strategy in Spark where the smaller DataFrame is sent (broadcasted) to all executors.
This avoids shuffling the large dataset, resulting in much faster joins, especially in skewed or unbalanced joins.

#🧠 When to Use Broadcast Join?
✅ When one of the DataFrames is small (typically < 10 MB to 100 MB)
✅ When joining a large fact table with a small dimension/lookup table
✅ When you want to prevent data shuffle during join

#✅ Benefits of Broadcast Join:
| Advantage               | Description                                  |
| ----------------------- | -------------------------------------------- |
| ⚡ Fast performance      | No shuffle of large dataset                  |
| 💾 Memory efficient     | Broadcasted dataset stored once per executor |
| 🛠 Works well with skew | Avoids skew issues in many-to-one joins      |


In [0]:
from pyspark.sql.functions import broadcast

# large_df.join(broadcast(small_df), on="key")


In [0]:
orders_data = [
    ("India", "A1"),
    ("India", "A2"),
    ("US", "B1"),
    ("UK", "C1")
]

df_orders = spark.createDataFrame(orders_data, ["country", "order"])
df_orders.show()


In [0]:
country_info_data = [
    ("India", "Asia"),
    ("US", "America"),
    ("UK", "Europe")
]

df_country_info = spark.createDataFrame(country_info_data, ["country", "region"])
df_country_info.show()


In [0]:
df_orders.join(broadcast(df_country_info), on='country').display()

#🔍 What is Spill in Spark?
Spill in Spark refers to the process where intermediate data is temporarily written to disk when it cannot fit into memory during operations like shuffle, aggregation, sort, or join.

This is a safety mechanism to prevent OutOfMemory errors, but it can slow down performance since disk I/O is much slower than memory access.

#📦 When Does Spill Happen?
Spill typically occurs in the following scenarios:
| Operation                         | Reason for Spill                                |
| --------------------------------- | ----------------------------------------------- |
| `groupBy`, `reduceByKey`, `agg()` | Intermediate data exceeds executor memory       |
| `sort()`, `orderBy()`             | Sorting large datasets can exceed memory limits |
| `join()`                          | Large shuffles or joins exceed memory           |
| `shuffle`                         | Shuffle data too large to keep in-memory        |


#⏳ What is Delta Lake’s Time Travel Feature?
Time Travel in Delta Lake allows you to query, restore, or compare previous versions of a Delta table — like going back in time!
This is possible because Delta Lake maintains a transaction log (_delta_log) that tracks all changes to the table.



#43. You are processing sales data. Group by product categories and create a list of all product names in each category.

In [0]:
data = [("Electronics", "Laptop"), ("Electronics", "Smartphone"), ("Furniture", "Chair"), ("Furniture", "Table")]
columns = ["category", "product"]
df = spark.createDataFrame(data, columns)
df.display()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df.groupBy('category').agg(collect_list('product')).display()

#44. You are analyzing orders. Group by customer IDs and list all unique product IDs each customer purchased.

In [0]:
data = [(101, "P001"), (101, "P002"), (102, "P001"), (101, "P001")]
columns = ["customer_id", "product_id"]
df = spark.createDataFrame(data, columns)
df.display()

In [0]:
df.groupBy('Customer_id').agg(collect_set('product_id').alias('Product_ID')).display()

#45. For customer records, combine first and last names only if the email address exists.

In [0]:
data = [("John", "Doe", "john.doe@example.com"), ("Jane", "Smith", None)]
columns = ["first_name", "last_name", "email"]
df = spark.createDataFrame(data, columns)
df.display()

In [0]:
df= df.filter(col("email").isNotNull())
df.display()

In [0]:
df.withColumn('Full_Name', concat(col('first_name'), lit(' '), col('last_name'))).display()

In [0]:
df.withColumn('Full_name', when(col('email').isNotNull(), concat(col('first_name'), lit(' '), col('last_name'))).otherwise(lit(' '))).display()

#46. You have a DataFrame containing customer IDs and a list of their purchased product IDs. Calculate the number of products each customer has purchased.

In [0]:
data = [
    (1, ["prod1", "prod2", "prod3"]),
    (2, ["prod4"]),
    (3, ["prod5", "prod6"]),
]
myschema = "customer_id INT ,product_ids array<STRING>"

df = spark.createDataFrame(data, myschema)
df.display()

In [0]:
df.withColumn('Number_of_Products',size(col('product_ids'))).display()

#47. You have employee IDs of varying lengths. Ensure all IDs are 6 characters long by padding with leading zeroes.

In [0]:
data = [
    ("1",),
    ("123",),
    ("4567",),
]
schema = ["employee_id"]

df = spark.createDataFrame(data, schema)
df.display()

In [0]:
df.withColumn('Employee_id',lpad(col('employee_id'),6,'0')).display()

In [0]:
df.withColumn('Employee_id',rpad(col('employee_id'),6,'0')).display()

#48. You need to validate phone numbers by checking if they start with "91"

In [0]:
data = [
    ("911234567890",),
    ("811234567890",),
    ("912345678901",),
]
schema = ["phone_number"]

df = spark.createDataFrame(data, schema)
df.display()

In [0]:
df.filter(substring(col('phone_number'),1,2)=='91').display()

#49. You have a dataset with courses taken by students. Calculate the average number of courses per student.

In [0]:
data = [
    (1, ["Math", "Science"]),
    (2, ["History"]),
    (3, ["Art", "PE", "Biology"]),
]
schema = ["student_id", "courses"]

df = spark.createDataFrame(data, schema)
df.display()

In [0]:
# Step 1: Add a column that counts number of courses
df = df.withColumn("Course_count", size(col("courses")))

# Step 2: Compute average of the Course_count column
df.agg(avg("Course_count").alias("Course_avg")).show()

#50. You have a dataset with primary and secondary contact numbers. Use the primary number if available; otherwise, use the secondary number.

In [0]:
data = [
    (None, "1234567890"),
    ("9876543210", None),
    ("7894561230", "4567891230"),
]
schema = ["primary_contact", "secondary_contact"]

df = spark.createDataFrame(data, schema)
df.display()

In [0]:
df.withColumn('Phone_number', when(col('primary_contact').isNotNull(), col('primary_contact')).otherwise(col('secondary_contact'))).display()

In [0]:
df.withColumn('Contact',coalesce(col('primary_contact'),col('secondary_contact'))).display()

In [0]:
df.withColumn("Coalesc_Col", coalesce(col('primary_contact'), lit('Null'))).display()

#DoneDoneDone

#
#51. You are categorizing product codes based on their lengths. If the length is 5, label it as "Standard"; otherwise, label it as "Custom".

In [0]:
data = [
    ("prod1",),
    ("prd234",),
    ("pr9876",),
]
schema = ["product_code"]

df = spark.createDataFrame(data, schema)
df.display()

In [0]:
df.withColumn('Product_code',when(length(col('product_code'))==5,'Standard' ).otherwise('Custom')).display()