In [None]:
#### https://www.youtube.com/watch?v=r7FTCuTl84g&t=02h22m20s

<hr>

# 🔄 What is Shuffle in Apache Spark?

In Apache Spark, **shuffle** refers to the process of **redistributing data** across partitions. It happens when Spark needs to **group, join,** or **aggregate** data that is spread across different nodes.

---

## 📦 Why Does Shuffle Happen?

Shuffle occurs when operations require data from **multiple partitions** to be **reorganized**. This is common in:

- `groupByKey()`
- `reduceByKey()`
- `join()`
- `distinct()`
- `repartition()`

---

## 🔧 How Shuffle Works

1. **Map Phase**: Spark processes data and prepares it for shuffling.
2. **Shuffle Phase**: Data is moved across the network to the correct partitions.
3. **Reduce Phase**: Spark performs the final computation on the shuffled data.

---

## ⚠️ Impacts of Shuffling

| Impact            | Description |
|-------------------|-------------|
| ⏱ Performance     | Shuffling is expensive and can slow down jobs. |
| 🔌 Network I/O     | Data is transferred between nodes, increasing network traffic. |
| 💾 Disk Usage      | Intermediate data may be written to disk. |
| 🧠 Memory Pressure | Can cause out-of-memory errors if not managed well. |

---

## 📊 Diagram: Shuffle in Action

```plaintext
Before Shuffle:             After Shuffle:
+---------+                +---------+
| Part 1  |----\     /---->| Part A  |
| (A, 1)  |     \   /      | (A, 1)  |
| (B, 2)  |      \ /       | (B, 2)  |
+---------+       X        +---------+
| Part 2  |      / \       | Part B  |
| (A, 3)  |     /   \      | (A, 3)  |
| (C, 4)  |----/     \---->| (C, 4)  |
+---------+                +---------+
```

## ✅ How to Minimize Shuffle

|	Strategy			|	Benefit					|
|-----------------------|---------------------------|
|	Use `reduceByKey()`	|	Combines data before shuffling	|
|	Use `broadcast()`		|	Avoids shuffling large datasets during joins	|
|	Use `partitionBy()`	|	Controls data distribution	|
|	Avoid `groupByKey()`	|	Use `reduceByKey()` or `aggregateByKey()` instead	|

### 💡 Code Example (Scala)

```scala
val data = sc.parallelize(Seq(("A", 1), ("B", 2), ("A", 3), ("C", 4)), 2)
// This causes a shuffle
val result = data.reduceByKey(_ + _)
result.collect()
// Output: Array(("A", 4), ("B", 2), ("C", 4))
```

------------------------------------------------------------------------------------------------------

# 🚀 Deeper Dive: Broadcast Joins & Partitioning Strategies in Spark

---

## 📡 Broadcast Joins

A **broadcast join** is a special type of join in Spark where a **small dataset** is sent to **all worker nodes**. This avoids shuffling the large dataset across the network.

### ✅ When to Use

- One dataset is **very small** (can fit in memory)
- Joining with a **large dataset**
- Want to **avoid shuffle** for faster performance

### ⚡ Benefits

| Benefit           | Description |
|-------------------|-------------|
| 🚫 No shuffle     | Small data is copied, not moved |
| ⚡ Faster joins    | Reduces network traffic |
| 💾 Saves memory    | Efficient for small lookup tables |

---

## 🧭 Partitioning Strategies

**Partitioning** controls how data is split across Spark's workers. Good partitioning helps Spark process data **efficiently** and **avoid unnecessary shuffles**.

### 🔧 Types of Partitioning

| Strategy         | Description |
|------------------|-------------|
| `HashPartitioner` | Uses hash of key to assign partitions |
| `RangePartitioner`| Splits data based on key ranges |
| `CustomPartitioner`| User-defined logic for partitioning |

### ✅ Why It Matters

- Ensures **related data is together**
- Reduces **shuffle during joins or aggregations**
- Improves **parallelism and performance**

---

## 🆚 Broadcast Join vs Partitioning

| Feature             | Broadcast Join         | Partitioning Strategy     |
|---------------------|------------------------|----------------------------|
| Use case            | Small + large dataset  | Large datasets             |
| Shuffle avoided     | ✅ Yes                 | ✅ If partitioned well     |
| Memory usage        | 🧠 Small dataset only   | 💾 Depends on partition size |
| Setup complexity    | 🟢 Simple               | 🔴 May require tuning       |

---

## 💡 Code Examples (Scala)

```scala
// Broadcast Join Example
val small = sc.parallelize(Seq((1, "A"), (2, "B")))
val large = sc.parallelize(Seq((1, 100), (2, 200), (3, 300)))

val broadcastSmall = sc.broadcast(small.collectAsMap())
val joined = large.map { case (id, value) =>
  (id, value, broadcastSmall.value.getOrElse(id, "Unknown"))
}
```

```scala
// Partitioning Example
val data = sc.parallelize(Seq(("A", 1), ("B", 2), ("A", 3), ("C", 4)))
val partitioned = data.partitionBy(new org.apache.spark.HashPartitioner(2))
```

<hr>

# Understanding Narrow and Wide Transformations in Apache Spark 
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In Apache Spark, transformations form the backbone of how data is processed across distributed clusters. A transformation in Spark is an operation that produces a new RDD (*Resilient Distributed Dataset*) or DataFrame from an existing one, setting the stage for distributed data processing. Broadly, these transformations fall into two categories — **narrow transformations** and **wide transformations** — each impacting data flow, efficiency, and the overall performance of Spark applications.

In this article, we’ll delve into both types of transformations, explain their distinctions, and demonstrate each with code examples in RDDs and DataFrames.

**1\. Narrow Transformations**
------------------------------

A **narrow transformation** is one where each output partition depends solely on a single input partition. This means that the data needed to generate an output partition resides within the same partition as the input data. As a result, Spark can process these transformations without moving data between nodes, minimizing network overhead and improving processing speed.

**Key Characteristics of Narrow Transformations:**
--------------------------------------------------

*   No data shuffle between nodes.
*   Localized computation within partitions.
*   Faster execution due to minimized network I/O.

## **Examples of Narrow Transformations**

#### **map()**
---------

The map() function applies a given function to each element in an RDD or DataFrame, without requiring any data exchange between partitions.

**RDD Example**

```python
# Assuming SparkContext (sc) is already initialized
rdd = sc.parallelize([1, 2, 3, 4, 5])
mapped_rdd = rdd.map(lambda x: x * 2)
print(mapped_rdd.collect())  # Output: [2, 4, 6, 8, 10]
```

**DataFrame Example**

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("NarrowTransformationExample").getOrCreate()
df = spark.createDataFrame([(1,), (2,), (3,), (4,), (5,)], ["value"])
mapped_df = df.withColumn("doubled_value", col("value") * 2)
mapped_df.show()  # Output shows original and doubled values
```

#### **filter()**
------------

The filter() transformation retains elements that satisfy a certain condition, processing each partition individually.

**RDD Example**

```python
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
print(filtered_rdd.collect())  # Output: [2, 4]
```

**DataFrame Example**

```python
filtered_df = df.filter(col("value") % 2 == 0)
filtered_df.show()  # Output shows rows with even values
```

#### **flatMap()**
-------------

Similar to map(), but flatMap() allows each input element to produce zero, one, or multiple output elements, still operating within each partition independently.

**RDD Example**

```python
rdd = sc.parallelize(["hello world", "spark transformations"])
flat_mapped_rdd = rdd.flatMap(lambda x: x.split(" "))
print(flat_mapped_rdd.collect())  # Output: ['hello', 'world', 'spark', 'transformations']
```

**DataFrame Example**

DataFrames do not directly support flatMap(), but we can achieve similar results using explode(), especially when working with arrays.

```python
from pyspark.sql.functions import explode, split
df = spark.createDataFrame([("hello world",), ("spark transformations",)], ["sentence"])
flat_mapped_df = df.withColumn("word", explode(split(col("sentence"), " ")))
flat_mapped_df.show()  # Output shows each word as a separate row
```

**2\. Wide Transformations**
----------------------------

A **wide transformation** requires data from multiple partitions to create a single output partition. This often triggers a **shuffle**, a process where Spark redistributes data across the cluster, resulting in increased network I/O. Wide transformations are generally more resource-intensive and can impact performance due to the coordination needed across nodes.

**Key Characteristics of Wide Transformations:**
------------------------------------------------

*   Data shuffle across nodes, increasing network and computation overhead.
*   Inter-partition dependencies.
*   Generally slower due to data redistribution.

## **Examples of Wide Transformations**

#### **groupByKey()**
----------------

The groupByKey() transformation groups data based on a key. It collects all values associated with each key across partitions, triggering a shuffle.

**RDD Example**

```python
rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4)])
grouped_rdd = rdd.groupByKey().mapValues(list)
print(grouped_rdd.collect())  # Output: [('a', [1, 3]), ('b', [2, 4])]
```

**DataFrame Example**

```python
df = spark.createDataFrame([("a", 1), ("b", 2), ("a", 3), ("b", 4)], ["key", "value"])
grouped_df = df.groupBy("key").agg({"value": "collect_list"})
grouped_df.show()  # Output shows key-value pairs with grouped values
```

#### **reduceByKey()**
-----------------

The reduceByKey() transformation works like groupByKey() but applies a reducing function while shuffling data, optimizing data movement by aggregating values locally before the shuffle.

**RDD Example**

```python
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
print(reduced_rdd.collect())  # Output: [('a', 4), ('b', 6)]
```

**DataFrame Example**

```python
from pyspark.sql.functions import sum
reduced_df = df.groupBy("key").agg(sum("value").alias("sum_value"))
reduced_df.show()  # Output shows the sum of values for each key
```

#### **join()**
----------

The join() transformation merges two datasets based on a key, necessitating a shuffle to align matching keys across partitions.

**RDD Example**

```python
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("a", 3), ("b", 4)])
joined_rdd = rdd1.join(rdd2)
print(joined_rdd.collect())  # Output: [('a', (1, 3)), ('b', (2, 4))]
```

**DataFrame Example**

```python
df1 = spark.createDataFrame([("a", 1), ("b", 2)], ["key", "value1"])
df2 = spark.createDataFrame([("a", 3), ("b", 4)], ["key", "value2"])
joined_df = df1.join(df2, "key")
joined_df.show()  # Output shows joined DataFrame based on key
```

> While typical joins in Spark trigger costly shuffles across partitions, a broadcast join optimizes performance by distributing a smaller dataset to all worker nodes, enabling local joins without network overhead — making it a powerful tool for efficient, narrow-like transformations.

------------------------------------------------------------------------------------------------------------------------------------------

**Key Difference: Data Shuffling in Narrow vs. Wide Transformations**
---------------------------------------------------------------------

The primary distinction between narrow and wide transformations lies in **data shuffling**. In narrow transformations, Spark processes data locally within each partition, avoiding network traffic. However, wide transformations require data to be redistributed across partitions, resulting in network I/O. For instance:

*   A `map()` operation applies functions independently within each partition, while `reduceByKey()` necessitates a shuffle to ensure values with the same key are processed together.

**Conclusion**
--------------

In Spark, understanding the nature of transformations — whether narrow or wide — is crucial for optimizing distributed data processing. While narrow transformations enhance performance through localized computation, wide transformations are essential for operations that require data aggregation or alignment across partitions. By strategically combining both types of transformations, you can build efficient, scalable Spark applications that make the most of distributed data processing.

<hr>

# 🧠 Stateless Transformation in Apache Spark

In Apache Spark, **stateless transformations** are operations that do **not depend on the previous state** of data. Each input element is processed **independently**, and the transformation does **not retain any information** about previously processed elements.

A stateless transformation is an operation applied to a dataset where the processing of each element or micro-batch of data is entirely independent of any previous or subsequent elements or batches. This means the transformation does not require any "memory" or "state" from past computations to produce its output.


## 🔍 Key Characteristics
- **Independence**: Each input record or micro-batch is processed in isolation. The result of processing one piece of data does not depend on the results of processing any other piece of data, whether from the same batch or a previous one.
- **No retained state**: Stateless transformations do not maintain or store any information across different processing units (e.g., RDDs in Spark Streaming DStreams or micro-batches in Spark Structured Streaming).
- **Simplicity and efficiency**: Due to their independent nature, stateless transformations are often simpler to implement and can be highly efficient, as they do not incur the overhead of state management or consistency guarantees across time.
- **No memory of past data**: Each record is transformed without considering others.
- **Parallelizable**: Ideal for distributed processing across nodes.
- **Deterministic**: Given the same input, always produces the same output.

## ⚙️ Common Stateless Transformations

| Transformation | Description |
|----------------|-------------|
| `map()`        | Applies a function to each element in the RDD or DataFrame. |
| `filter()`     | Selects elements that satisfy a predicate. |
| `flatMap()`    | Similar to `map()`, but can return multiple output elements for each input. |
| `union()`      | Combines two RDDs or DataFrames. |
| `distinct()`   | Removes duplicate elements. |
| `select()`   | (In DataFrames/Datasets) Selects specific columns from a dataset. |
| `where()` or `filter()`   | (In DataFrames/Datasets) Filters rows based on a condition. |

In contrast, ***stateful*** transformations, such as `reduceByKeyAndWindow` in Spark Streaming or aggregations with `groupBy` and `agg` in Structured Streaming that involve time windows, require maintaining state across batches to compute results based on historical data.


## 📦 Example in Scala

```scala
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
val squared = data.map(x => x * x)
squared.collect() // Output: Array(1, 4, 9, 16, 25)


<hr>

# 🔄 Stateless vs Stateful Streaming in Apache Spark

Apache Spark Streaming supports two types of data processing: **stateless** and **stateful**. These define how Spark handles data across micro-batches in a stream.

---

## 📌 Stateless Streaming

In **stateless streaming**, each micro-batch is processed **independently**. Spark does **not retain any information** from previous batches.

### ✅ Characteristics

- ❌ No memory of past data
- ⚡ Fast and efficient
- 🟢 Easy to scale and parallelize
- 🔧 Suitable for simple transformations

### 🔧 Common Stateless Operations

| Operation   | Description |
|-------------|-------------|
| `map()`     | Applies a function to each element |
| `filter()`  | Filters elements based on a condition |
| `flatMap()` | Returns multiple outputs per input |
| `reduceByKey()` | Aggregates data within a single batch |

### 📦 Example

```scala
val stream = ssc.socketTextStream("localhost", 9999)
val words = stream.flatMap(_.split(" "))
val filtered = words.filter(_.length > 3)
```

---

## 📌 Stateful Streaming

In **stateful streaming**, Spark maintains **state information** across batches. This enables complex operations that depend on historical data.

### ✅ Characteristics

- ✅ Maintains memory of previous data
- 🧠 Enables advanced analytics (e.g., trends, aggregates)
- 🔴 Higher resource usage
- 🛠 Requires checkpointing for fault tolerance

### 🔧 Common Stateful Operations

| Operation             | Description |
|------------------------|-------------|
| `updateStateByKey()`   | Maintains running state per key |
| `mapWithState()`       | Efficient state tracking |
| `window()`             | Aggregates data over time windows |
| `reduceByKeyAndWindow()` | Combines keys over a sliding window |


### 📦 Example

```scala
val stream = ssc.socketTextStream("localhost", 9999)
val pairs = stream.map(word => (word, 1))
val runningCounts = pairs.updateStateByKey((newValues: Seq[Int], state: Option[Int]) =>
  Some(state.getOrElse(0) + newValues.sum)
)
```

---

## 🆚 Comparison Table

| Feature               | Stateless Streaming        | Stateful Streaming         |
|-----------------------|----------------------------|----------------------------|
| Memory of past data   | ❌ No                      | ✅ Yes                     |
| Complexity            | 🟢 Simple                  | 🔴 Complex                 |
| Use cases             | Basic filtering, mapping   | Aggregations, sessions     |
| Performance           | ⚡ Fast                    | 🐢 Slower (more overhead)  |
| Fault tolerance       | Easier to manage           | Requires checkpointing     |

---

## 🧠 When to Use What?

- Use **stateless** for lightweight, scalable tasks.
- Use **stateful** when tracking trends or maintaining cumulative metrics over time.


<hr>

# ⚙️ Batch Output Modes in Spark

Batch jobs process **static data** (already available). When writing results to a sink (like a file or database), Spark offers different **output modes**:

| Mode      | Description                                                                 | Behavior Example                          |
|-----------|-----------------------------------------------------------------------------|-------------------------------------------|
| `append`  | ➕ Adds new data to the existing output                                      | Adds rows to a file without deleting old ones |
| `override`| 🔄 Replaces existing output with new data                                   | Deletes old file and writes new results   |
| `iferror` | ❌ Writes output only if the job fails (used for error handling)            | Logs error data to a file                 |
| `ignore`  | 🙈 Skips writing output entirely                                             | Useful for debugging or dry runs          |

## 📊 Diagram: Batch Output Modes

```plaintext
[Input Data] --> [Spark Batch Job] --> [Output Sink]
                             |
                             ├─ append   ➝ add to existing
                             ├─ override ➝ replace existing
                             ├─ iferror  ➝ write only on error
                             └─ ignore   ➝ skip writing
```

# 🧪 Code Example

## ✅ Batch Mode Example

```scala
df.write
  .mode("append") // or "override", "ignore", "iferror"
  .csv("/path/to/output")
```

---------------------------------------

# 🔄 Stream Processing Output Modes in Spark

Stream jobs process **real-time data** (continuous input). Spark supports these **streaming output modes**:

| Mode       | Description                                                                 | Use Case Example                          |
|------------|-----------------------------------------------------------------------------|-------------------------------------------|
| `append`   | ➕ Writes only new rows since last trigger                                   | Logging new events                        |
| `update`   | 🔄 Writes only rows that changed since last trigger                         | Updating metrics or counters              |
| `complete` | 🧹 Writes the full result table every time                                   | Full aggregation (e.g., word count)       |

## 🔁 Diagram: Stream Output Modes
 
```plaintext
[Streaming Source] --> [Spark Structured Streaming] --> [Output Sink]
                                 |
                                 ├─ append   ➝ new rows only
                                 ├─ update   ➝ changed rows only
                                 └─ complete ➝ entire result table
```

# 🧪 Code Example

## 🔄 Streaming Mode Example

```scala
val query = df.writeStream
  .outputMode("append") // or "update", "complete"
  .format("console")
  .start()
```


<hr>

#### • Streaming refers to the continuous processing of incoming data in real-time or near-real-time. Unlike batch processing, which works on a fixed dataset, streaming deals with infinite or unbounded data (e.g., logs, sensor data, or user clicks) that keeps arriving.
#### •Think of it like water flowing through a pipe-you don’t wait for all the water to arrive before acting; you work wait it as if lows.
### Stream = unbounded Table
#### • A stream is treated as a table that is constantly growing.
#### • New data is like new rows being appended to the table.
#### • It allows you to query this growing table as if it were a static table.
#### • It keeps updating the results as new data arrives.
#### • Internally, it maintains the state, figures out what changed, and updates what's needed.

In [1]:
#
import pyspark
from delta import configure_spark_with_delta_pip
from pyspark.sql import Row
from pyspark.sql import SparkSession
#
#
_builder =	(	SparkSession.builder.master('local[1]') \
				.appName('pyspark-deltalake-local-testing') \
				.config(	'spark.sql.extensions'
						,	'io.delta.sql.DeltaSparkSessionExtension')
				.config(	'spark.sql.catalog.spark_catalog'
						,	'org.apache.spark.sql.delta.catalog.DeltaCatalog'))
#
_spark = configure_spark_with_delta_pip(_builder).enableHiveSupport().getOrCreate()
#

In [2]:
_spark
#
# try to stop?
### _spark.stop()
#

### READ JSON DATA (BATCH)

In [5]:
_df = _spark.read.format('json') \
			.option('inferschema', True) \
			.option('multiLine',True) \
			.load('file:///C:/Users/Administrator/Documents/Code/Python/PySpark/Streaming/day1.json')

In [6]:
_df.show(10, truncate=False)

+-------------------------------------------------------------+-------------------------------------------------------------------+----------------------------------------------+--------+----------------------+--------------------+
|customer                                                     |items                                                              |metadata                                      |order_id|payment               |timestamp           |
+-------------------------------------------------------------+-------------------------------------------------------------------+----------------------------------------------+--------+----------------------+--------------------+
|{{Toronto, Canada, M5H 2N2}, 501, john@example.com, John Doe}|[{I100, 25.99, Wireless Mouse, 2}, {I101, 15.49, USB-C Adapter, 1}]|[{campaign, back_to_school}, {channel, email}]|ORD1001 |{Credit Card, TXN7890}|2025-06-01T10:15:00Z|
+-------------------------------------------------------------+---------

In [7]:
_df.printSchema()

root
 |-- customer: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- postal_code: string (nullable = true)
 |    |-- customer_id: long (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- product_name: string (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |-- metadata: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- payment: struct (nullable = true)
 |    |-- method: string (nullable = true)
 |    |-- transaction_id: string (nullable = true)


### let's Parse the Schema for us to create our own schema

In [27]:
import json
inferred_schema = _df.schema
schema_json_string = inferred_schema.json()
## ## print(json.dumps(json.loads(schema_json_string), indent=2))
#
from pyspark.sql.types import StructType
# Assuming schema_json_string holds the JSON representation of your schema
recreated_schema = StructType.fromJson(json.loads(schema_json_string))
print(recreated_schema)

StructType([StructField('customer', StructType([StructField('address', StructType([StructField('city', StringType(), True), StructField('country', StringType(), True), StructField('postal_code', StringType(), True)]), True), StructField('customer_id', LongType(), True), StructField('email', StringType(), True), StructField('name', StringType(), True)]), True), StructField('items', ArrayType(StructType([StructField('item_id', StringType(), True), StructField('price', DoubleType(), True), StructField('product_name', StringType(), True), StructField('quantity', LongType(), True)]), True), True), StructField('metadata', ArrayType(StructType([StructField('key', StringType(), True), StructField('value', StringType(), True)]), True), True), StructField('order_id', StringType(), True), StructField('payment', StructType([StructField('method', StringType(), True), StructField('transaction_id', StringType(), True)]), True), StructField('timestamp', StringType(), True)])


In [12]:
_df.select('customer.customer_id','customer.name','customer.address.city','customer.address.country','order_id','timestamp').show(truncate=False)

+-----------+--------+-------+-------+--------+--------------------+
|customer_id|name    |city   |country|order_id|timestamp           |
+-----------+--------+-------+-------+--------+--------------------+
|501        |John Doe|Toronto|Canada |ORD1001 |2025-06-01T10:15:00Z|
+-----------+--------+-------+-------+--------+--------------------+



In [None]:
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import *
#
_df.select('items','customer.customer_id','customer.name','customer.address.city','customer.address.country','order_id','timestamp') \
	.withColumn('items',explode_outer(col('items'))) \
	.withColumn('item_id', col('items.item_id')) \
	.withColumn('item_price', col('items.price')) \
	.select('*') \
	.show(truncate=False)

+--------------------------------+-----------+--------+-------+-------+--------+--------------------+-------+----------+
|items                           |customer_id|name    |city   |country|order_id|timestamp           |item_id|item_price|
+--------------------------------+-----------+--------+-------+-------+--------+--------------------+-------+----------+
|{I100, 25.99, Wireless Mouse, 2}|501        |John Doe|Toronto|Canada |ORD1001 |2025-06-01T10:15:00Z|I100   |25.99     |
|{I101, 15.49, USB-C Adapter, 1} |501        |John Doe|Toronto|Canada |ORD1001 |2025-06-01T10:15:00Z|I101   |15.49     |
+--------------------------------+-----------+--------+-------+-------+--------+--------------------+-------+----------+



In [23]:
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import *
#
_df.select('*') \
	.withColumn('metadata',explode_outer(col('metadata'))) \
	.withColumn('metadata_key', col('metadata.key')) \
	.withColumn('metadata_value', col('metadata.value')) \
	.drop(col('metadata')) \
	.select('*') \
	.select(['order_id','timestamp','metadata_key','metadata_value']) \
	.show(truncate=False)

+--------+--------------------+------------+--------------+
|order_id|timestamp           |metadata_key|metadata_value|
+--------+--------------------+------------+--------------+
|ORD1001 |2025-06-01T10:15:00Z|campaign    |back_to_school|
|ORD1001 |2025-06-01T10:15:00Z|channel     |email         |
+--------+--------------------+------------+--------------+



# Let's Read Streamimg Data 😁

In [22]:
## instead using .option('inferschema', True), we'll use:
_spark.conf.set('spark.sql.streaming.schemaInference', True)

In [37]:
#
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import *
#
_jsonSchema = \
StructType([StructField('customer'
			,StructType([StructField('address'
				,StructType([StructField('city',StringType(),True)
							,StructField('country',StringType(),True)
							,StructField('postal_code',StringType(),True)]),True)
				,StructField('customer_id',LongType(),True)
				,StructField('email',StringType(),True)
				,StructField('name',StringType(),True)]),True)
			,StructField('items'
					,ArrayType(StructType(
								[	StructField('item_id',StringType(),True)
									,StructField('price',DoubleType(),True)
									,StructField('product_name',StringType(),True)
									,StructField('quantity',LongType(),True)]),True),True)
			,StructField('metadata'
					,ArrayType(StructType(
								[	StructField('key',StringType(),True)
									,StructField('value',StringType(),True)]),True),True)
			,StructField('order_id',StringType(),True)
			,StructField('payment',StructType(
										[StructField('method',StringType(),True)
										,StructField('transaction_id',StringType(),True)]),True)
			,StructField('timestamp',StringType(),True)])
#
_df = _spark.read.format('json') \
			.option('inferschema', True) \
			.option('multiLine',True) \
			.load('file:///C:/Users/Administrator/Documents/Code/Python/PySpark/Streaming/day1.json')
#
_dfToWrite = _df.select('*') \
	.withColumn('items',explode_outer(col('items'))) \
	.withColumn('item_id', col('items.item_id')) \
	.withColumn('item_price', col('items.price')) \
	.withColumn('product_name', col('items.product_name')) \
	.withColumn('quantity', col('items.quantity')) \
	.drop(col('items')) \
	.withColumn('metadata',explode_outer(col('metadata'))) \
	.withColumn('metadata_key', col('metadata.key')) \
	.withColumn('metadata_value', col('metadata.value')) \
	.drop(col('metadata')) \
	.withColumn('payment_method', col('payment.method')) \
	.withColumn('transaction_id', col('payment.transaction_id')) \
	.drop(col('payment')) \
	.select(	[
						'customer.address.city'
					,	'customer.address.country'
					,	'customer.address.postal_code'
					,	'customer.customer_id'
					,	'customer.email'
					,	'customer.name'
					,	'item_id'
					,	'item_price'
					,	'product_name'
					,	'quantity'
					,	'order_id'
					,	'payment_method'
					,	'transaction_id'
					,	'timestamp'
				])
#
_dfToWrite.show(truncate=False)
#

+-------+-------+-----------+-----------+----------------+--------+-------+----------+--------------+--------+--------+--------------+--------------+--------------------+
|city   |country|postal_code|customer_id|email           |name    |item_id|item_price|product_name  |quantity|order_id|payment_method|transaction_id|timestamp           |
+-------+-------+-----------+-----------+----------------+--------+-------+----------+--------------+--------+--------+--------------+--------------+--------------------+
|Toronto|Canada |M5H 2N2    |501        |john@example.com|John Doe|I100   |25.99     |Wireless Mouse|2       |ORD1001 |Credit Card   |TXN7890       |2025-06-01T10:15:00Z|
|Toronto|Canada |M5H 2N2    |501        |john@example.com|John Doe|I100   |25.99     |Wireless Mouse|2       |ORD1001 |Credit Card   |TXN7890       |2025-06-01T10:15:00Z|
|Toronto|Canada |M5H 2N2    |501        |john@example.com|John Doe|I101   |15.49     |USB-C Adapter |1       |ORD1001 |Credit Card   |TXN7890    

#### for error:

```plaintext
[INFINITE_STREAMING_TRIGGER_NOT_SUPPORTED] Trigger type ProcessingTime is not supported for this cluster type.
Use a different trigger type e.g. AvailableNow, Once. SQLSTATE: 0A000
```

#### you need to use an alternative trigger type that is compatible with your cluster. The error message specifically suggests `AvailableNow` or `Once`.

- `AvailableNow`: This trigger processes all data that is available at the start of the query and then stops. If new data arrives later, it will not be processed until the next time the query is restarted with AvailableNow. This is suitable for incremental batch processing where you want to process all currently available data in a single run.

```python
.trigger(availableNow=True)
```

- `Once`: Similar to `AvailableNow`, this trigger processes a single batch of available data and then stops. It is considered deprecated in favor of `AvailableNow` for most use cases, but it can still be used if explicitly required for a single-batch execution.

```python
.trigger(once=True)
```

In [None]:
"""
#
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import *
#
_jsonSchema = \
StructType([StructField('customer'
			,StructType([StructField('address'
				,StructType([StructField('city',StringType(),True)
							,StructField('country',StringType(),True)
							,StructField('postal_code',StringType(),True)]),True)
				,StructField('customer_id',LongType(),True)
				,StructField('email',StringType(),True)
				,StructField('name',StringType(),True)]),True)
			,StructField('items'
					,ArrayType(StructType(
								[	StructField('item_id',StringType(),True)
									,StructField('price',DoubleType(),True)
									,StructField('product_name',StringType(),True)
									,StructField('quantity',LongType(),True)]),True),True)
			,StructField('metadata'
					,ArrayType(StructType(
								[	StructField('key',StringType(),True)
									,StructField('value',StringType(),True)]),True),True)
			,StructField('order_id',StringType(),True)
			,StructField('payment',StructType(
										[StructField('method',StringType(),True)
										,StructField('transaction_id',StringType(),True)]),True)
			,StructField('timestamp',StringType(),True)])
#
#
_df = spark.readStream.format('json') \
			.option('multiLine',True) \
			.schema(_jsonSchema) \
			.load('/Volumes/workspace/stream/streamingvolume/jsonsource')
#
_dfToWrite = _df.select('*') \
	.withColumn('items',explode_outer(col('items'))) \
	.withColumn('item_id', col('items.item_id')) \
	.withColumn('item_price', col('items.price')) \
	.withColumn('product_name', col('items.product_name')) \
	.withColumn('quantity', col('items.quantity')) \
	.drop(col('items')) \
	.withColumn('metadata',explode_outer(col('metadata'))) \
	.withColumn('metadata_key', col('metadata.key')) \
	.withColumn('metadata_value', col('metadata.value')) \
	.drop(col('metadata')) \
	.withColumn('payment_method', col('payment.method')) \
	.withColumn('transaction_id', col('payment.transaction_id')) \
	.drop(col('payment')) \
	.select(	[
						'customer.address.city'
					,	'customer.address.country'
					,	'customer.address.postal_code'
					,	'customer.customer_id'
					,	'customer.email'
					,	'customer.name'
					,	'item_id'
					,	'item_price'
					,	'product_name'
					,	'quantity'
					,	'order_id'
					,	'payment_method'
					,	'transaction_id'
					,	'timestamp'
				])
#
_dfToWrite.writeStream.format('delta') \
			.trigger(once=True) \
			.outputMode('append') \
			.option('path','/Volumes/workspace/stream/streamingvolume/jsonsink/Data') \
			.option('checkpointLocation','/Volumes/workspace/stream/streamingvolume/jsonsink/Checkpoint') \
			.start()
#
			
			
			

"""

"\n#\nfrom pyspark.sql.functions import *\nfrom pyspark.sql.window import *\nfrom pyspark.sql.types import *\n#\n_jsonSchema = StructType([StructField('customer'\n\t\t\t,StructType([StructField('address'\n\t\t\t\t,StructType([StructField('city',StringType(),True)\n\t\t\t\t\t\t\t,StructField('country',StringType(),True)\n\t\t\t\t\t\t\t,StructField('postal_code',StringType(),True)]),True)\n\t\t\t\t,StructField('customer_id',LongType(),True)\n\t\t\t\t,StructField('email',StringType(),True)\n\t\t\t\t,StructField('name',StringType(),True)]),True)\n\t\t\t,StructField('items'\n\t\t\t\t\t,ArrayType(StructType(\n\t\t\t\t\t\t\t\t[\tStructField('item_id',StringType(),True)\n\t\t\t\t\t\t\t\t\t,StructField('price',DoubleType(),True)\n\t\t\t\t\t\t\t\t\t,StructField('product_name',StringType(),True)\n\t\t\t\t\t\t\t\t\t,StructField('quantity',LongType(),True)]),True),True)\n\t\t\t,StructField('metadata'\n\t\t\t\t\t,ArrayType(StructType(\n\t\t\t\t\t\t\t\t[\tStructField('key',StringType(),True)\n\t\t\t\

```sql
%sql
SELECT * FROM delta.`/Volumes/workspace/stream/streamingvolume/jsonsink/Data/`
```

<hr>

## 🔥 PySpark `writeStream.trigger()` — Simple Guide with All Trigger Types
### 🧠 What is `trigger` in `writeStream`?

In PySpark Structured Streaming, a **trigger** controls **when** or **how often** the streaming job checks for new data and processes it.

Think of it like setting a timer ⏱️:
- Every few seconds, PySpark checks for new data and processes it.
- You decide how often PySpark should wake up and do its job.
- You control this timing using `.trigger(...)`.
- Different triggers = different behaviors.


## 🔁 Trigger Types

| Trigger Type        | Description                                                      | Stops Automatically? | Example                          |
|---------------------|------------------------------------------------------------------|-----------------------|----------------------------------|
| `ProcessingTime`    | Runs at fixed intervals (e.g. every 10 seconds)                  | ❌                    | Every 10 seconds                 |
| `Once`              | Runs **once** and stops (like a mini batch job)                  | ✅                    | Good for batch-like streaming    |
| `Continuous`        | *Row by Row*, Runs continuously with low latency (experimental)  | ❌                    | Every 1 second (near real-time)  |
| `AvailableNow`      | Processes all available data in batches, then stops              | ✅                    ||

## 📊 Diagram: How Trigger Works

```plaintext
+-------------------+       +-------------------+       +-------------------+
| New Data Arrives  | --->  | Trigger Fires ⏰  | ---> | Process & Write 🔄|
+-------------------+       +-------------------+       +-------------------+
        ↑                          ↑                            ↑
        |                          |                            |
     Every X sec               Based on Trigger           Output Sink (e.g. file)
```

## 🐍 Python Examples

### 1. ⏱️ Trigger Every 10 Seconds

```python
query = df.writeStream \
    .format("console") \
    .trigger(processingTime="10 seconds") \
    .start()
```

### 2. 🧼 Trigger Once (like batch)

```python
query = df.writeStream \
    .format("parquet") \
    .trigger(once=True) \
    .option("path", "/output/path") \
    .start()
```

### 3. ⚡ Continuous Trigger (experimental)

```python
query = df.writeStream \
    .format("console") \
    .trigger(continuous="1 second") \
    .start()
```

### 4. 📦 AvailableNow Trigger

```python
query = df.writeStream \
    .format("parquet") \
    .trigger(availableNow=True) \
    .option("path", "/output/path") \
    .start()
```

## ✅ Summary

- `trigger` controls **when** streaming jobs run.
- Use `.trigger(processingTime="X")` for regular intervals.
- Use `.trigger(once=True)` for one-time execution.
- Use `.trigger(continuous="X")` **row by row**. for near real-time and low-latency streaming (experimental).
- Use `.trigger(availableNow=True)` to process all current data and stop.


<hr>

# 🧱 What Is a Boundary Stage in Spark?

In Apache Spark, a **boundary stage** (also called a **stage boundary** or **wide dependency**) occurs when an operation requires ***shuffling data across partitions***. This typically happens during **wide transformations**, where data from multiple partitions must be exchanged or grouped. Boundary stages are performance-critical because they involve disk I/O, network transfer, and sorting—making them much more expensive than narrow transformations.

### ⚡ Spark Operations: Boundary vs Non-Boundary Stages

| **Spark Operation**     | **Boundary Stage?** | **Type**             | **Simple Explanation**                                                                 |
|-------------------------|---------------------|----------------------|----------------------------------------------------------------------------------------|
| `map()`                 | ❌ No               | Narrow Transformation | Applies a function to each element. No data movement between partitions.               |
| `filter()`              | ❌ No               | Narrow Transformation | Keeps elements that match a condition. No shuffle.                                     |
| `flatMap()`             | ❌ No               | Narrow Transformation | Similar to `map`, but can return multiple outputs per input. No shuffle.               |
| `union()`               | ❌ No               | Narrow Transformation | Combines two datasets. No shuffle unless followed by other wide ops.                   |
| `sample()`              | ❌ No               | Narrow Transformation | Randomly samples elements. No shuffle.                                                 |
| `coalesce()`            | ❌ No               | Narrow Transformation | Reduces number of partitions **without shuffle**. Efficient for downsampling.          |
| `repartition()`         | ✅ Yes              | Wide Transformation   | Redistributes data across partitions **with shuffle**. Costly but useful for balancing.|
| `groupByKey()`          | ✅ Yes              | Wide Transformation   | Groups data by key. Requires shuffle to bring same keys together.                      |
| `reduceByKey()`         | ✅ Yes              | Wide Transformation   | Combines values by key **with pre-aggregation**, reducing shuffle cost.                |
| `join()`                | ✅ Yes              | Wide Transformation   | Combines datasets by key. Requires shuffle to align keys.                              |
| `distinct()`            | ✅ Yes              | Wide Transformation   | Removes duplicates. Requires shuffle to compare across partitions.                     |
| `sortBy()`              | ✅ Yes              | Wide Transformation   | Sorts data. Requires shuffle to order globally.                                        |
| `aggregateByKey()`      | ✅ Yes              | Wide Transformation   | Aggregates values by key. Similar to `reduceByKey`, but more flexible.                 |
| `cogroup()`             | ✅ Yes              | Wide Transformation   | Groups multiple RDDs by key. Requires shuffle.                                         |

---

### 🧠 Quick Tip:
- **Narrow transformations**: Data stays in its partition. Fast and efficient.
- **Wide transformations**: Data moves across partitions. Triggers a new stage and shuffle.


<hr>

### ⚙️ Spark Runtime Processes & Behaviors

| **Process / Behavior**     | **Type**              | **Triggers / Causes**                                      | **Generates Boundary Stage?** | **Simple Explanation**                                                                 |
|----------------------------|-----------------------|-------------------------------------------------------------|-------------------------------|----------------------------------------------------------------------------------------|
| Shuffling                  | Performance Overhead  | Wide transformations like `groupByKey`, `join`, `sortBy`    | ✅ Yes                        | Redistributes data across partitions. Costly but necessary for certain operations.     |
| Caching / Persistence      | Optimization          | Manual call to `cache()` or `persist()`                     | ❌ No                         | Stores intermediate results in memory or disk to avoid recomputation.                 |
| Execution Failure          | Error State           | Code bugs, data issues, resource limits                     | ❌ No                         | Task or stage fails during execution. May retry depending on fault tolerance.         |
| Job Delegation             | Scheduling Mechanism  | SparkContext submits job to DAG Scheduler                   | ❌ No                         | Breaks job into stages and tasks, then delegates to Task Scheduler for execution.     |
| Application Failure        | Critical Error        | Driver crash, out-of-memory, unhandled exceptions           | ❌ No                         | Entire Spark app stops. Requires restart or debugging.                                |
| Task Retry                 | Fault Tolerance       | Transient errors, executor loss                             | ❌ No                         | Spark retries failed tasks up to a configured limit.                                   |
| Speculative Execution      | Optimization          | Straggler tasks detected                                    | ❌ No                         | Launches duplicate tasks to speed up slow-running ones.                               |
| Stage Completion           | Execution Milestone   | All tasks in a stage finish successfully                    | ✅ Yes                        | Marks boundary for next stage. Triggers shuffle if needed.                            |
| Job Completion             | Finalization          | All stages of a job complete                                | ❌ No                         | Results are returned to driver or written to storage.                                 |
| Resource Allocation        | Cluster Management    | YARN, Mesos, or Kubernetes assigns executors                | ❌ No                         | Determines how many resources (CPU, memory) Spark can use.                            |
| Broadcast Variables        | Optimization          | Used in joins or lookups with small datasets                | ❌ No                         | Sends read-only data to all executors to avoid repeated transmission.                 |
| Accumulators               | Monitoring / Metrics  | Used for counters or debugging                              | ❌ No                         | Variables that aggregate values across tasks. Not reliable for logic control.         |

---

### 🧠 Quick Tip:
- Boundary stages are triggered by **shuffles**, which are expensive—so minimize them when possible.
- Use **caching** and **broadcast variables** to optimize performance and reduce recomputation.
