In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Create a Spark session
spark = SparkSession.builder \
    .appName("Retail Sales Analysis") \
    .getOrCreate()
file_path = "sales.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/07 15:32:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df.schema

StructType(List(StructField(TransactionID,IntegerType,true),StructField(Date,StringType,true),StructField(StoreID,StringType,true),StructField(ProductID,IntegerType,true),StructField(ProductName,StringType,true),StructField(Quantity,IntegerType,true),StructField(Price,DoubleType,true),StructField(TotalSale,DoubleType,true),StructField(CustomerID,StringType,true)))

In [5]:
df.show(5)

+-------------+----------+-------+---------+-----------+--------+-----+---------+----------+
|TransactionID|      Date|StoreID|ProductID|ProductName|Quantity|Price|TotalSale|CustomerID|
+-------------+----------+-------+---------+-----------+--------+-----+---------+----------+
|            1|2025-01-01|   S001|      101|      Apple|       3| 0.99|     2.97|      C001|
|            2|2025-01-02|   S002|      102|     Banana|       2| 0.59|     1.18|      C002|
|            3|2025-01-03|   S001|      103|     Orange|       5| 1.29|     6.45|      C003|
|            4|2025-01-03|   S003|      104|      Mango|       1| 1.99|     1.99|      C004|
|            5|2025-01-04|   S002|      105|  Blueberry|      10| 2.49|     24.9|      C005|
+-------------+----------+-------+---------+-----------+--------+-----+---------+----------+
only showing top 5 rows



Spark uses a sample (controlled by samplingRatio) of the data to infer types, typically requiring two reads: one for schema inference and another for loading. The size of the sample depends on samplingRatio, but default might be 100% (all rows), which is bad for big data. Hence, explicit schemas are better for performance.
Best Practice
1. Explicit Schema for Production Jobs
2. Adjust 'samplingRatio'for Large Data
3. Validate Inferred Schema

In [None]:
spark.stop()

# schema on write 


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("Retail Sales Analysis") \
    .getOrCreate()

file_path = "sales.csv"

# Define explicit schema
custom_schema = StructType([
    StructField("TransactionID", IntegerType()),
    StructField("Date", DateType()),
    StructField("StoreID", StringType()),
    StructField("ProductID", StringType()),
    StructField("ProductName", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("Price", DoubleType()),
    StructField("TotalSale", DoubleType()),
    StructField("CustomerID", StringType())
])

df = spark.read.csv(file_path, header=True, schema=custom_schema)
df.printSchema()  # Verify schema

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/08 11:29:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


root
 |-- TransactionID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- StoreID: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- TotalSale: double (nullable = true)
 |-- CustomerID: string (nullable = true)



In [2]:
df.show(5)

+-------------+----------+-------+---------+-----------+--------+-----+---------+----------+
|TransactionID|      Date|StoreID|ProductID|ProductName|Quantity|Price|TotalSale|CustomerID|
+-------------+----------+-------+---------+-----------+--------+-----+---------+----------+
|            1|2025-01-01|   S001|      101|      Apple|       3| 0.99|     2.97|      C001|
|            2|2025-01-02|   S002|      102|     Banana|       2| 0.59|     1.18|      C002|
|            3|2025-01-03|   S001|      103|     Orange|       5| 1.29|     6.45|      C003|
|            4|2025-01-03|   S003|      104|      Mango|       1| 1.99|     1.99|      C004|
|            5|2025-01-04|   S002|      105|  Blueberry|      10| 2.49|     24.9|      C005|
+-------------+----------+-------+---------+-----------+--------+-----+---------+----------+
only showing top 5 rows



                                                                                

In [None]:
spark.stop()

Steps performed internally
- Schema Binding: binds custom_schema to the data source.(parsing explicit data type)
- Data Parsing: Each row is parsed and cast to the schema’s types.
- InternalRow Creation: Data is stored in Tungsten’s binary format.
    For a CSV row: "123,2023-10-01,STR001,PROD045,Laptop,2,899.99,1799.98,CUST1001"
        Step 1: The CSV reader splits the row into individual fields based on delimiters.
        Step 2: Each field is converted to the specified data type in the schema:
            "123" → IntegerType (parsed as 123).
            "2023-10-01" → DateType (parsed as a java.sql.Date object).
            "STR001" → StringType (preserved as-is).
            "899.99" → DoubleType (parsed as 899.99).
        Step 3: The parsed data is stored as an InternalRow object (Spark’s binary-optimized in-memory format).
- Logical Plan: Catalyst builds a plan using the schema for optimizations.
    Predicate Pushdown: Filters (e.g., Quantity > 0) are pushed to the data source, reducing I/O.
    Column Pruning: Only required columns (e.g., TransactionID, TotalSale) are read from disk.
    Type-Specific Optimizations: Operations like arithmetic (TotalSale = Quantity * Price) use native types (e.g., DoubleType), avoiding runtime casting.
- Memory and Storage Efficiency:
  Data is stored in a compact binary format optimized for the explicit schema.
    Example: IntegerType uses 4 bytes per value, while inferred StringType would use variable-length encoding.
    No Type Guessing: Eliminates overhead from runtime type checks/casting
- Error Handling and Data Validation
    Spark enforces the schema during the initial read:
    Mode-Specific Handling (set via mode option in spark.read.csv):
        PERMISSIVE (default): Invalid records (e.g., non-integer TransactionID) are converted to null and logged.
        DROPMALFORMED: Discards invalid rows.
        FAILFAST: Throws an error on the first invalid record.
        Early Failure: Schema mismatches are caught during job execution (unlike schema inference, where errors may surface later)
- Avoid Double Data Read
    Unlike inferSchema=True (which reads data twice), explicit schemas require only one pass over the data:
    Single Read: Data is parsed and converted to the specified types in a single pass

# Schema-on-Read vs Schema-on-Write: Advantages and Disadvantages

## Schema-on-Read

### **Definition**
Schema-on-Read is a data processing approach where the schema is applied dynamically when the data is read. The schema is inferred or defined at query time, allowing flexibility in handling diverse data formats.

### **Advantages**
1. **Flexibility**:
   - Can handle semi-structured or unstructured data (e.g., JSON, CSV, Parquet).
   - No need to predefine the schema before ingesting data.
2. **Rapid Data Ingestion**:
   - Data can be ingested quickly without upfront schema design.
   - Ideal for exploratory analysis or prototyping.
3. **Schema Evolution**:
   - Easily adapts to changes in data structure (e.g., new columns or fields).
4. **Cost-Effective**:
   - Reduces the time and effort required for schema design and maintenance.

### **Disadvantages**
1. **Performance Overhead**:
   - Schema inference at read time can be slow, especially for large datasets.
   - May require reading data twice (e.g., sampling for schema inference).
2. **Data Quality Issues**:
   - Risk of incorrect type inference (e.g., dates inferred as strings).
   - No strict validation during ingestion, leading to potential errors downstream.
3. **Complexity in Querying**:
   - Queries may fail if the inferred schema doesn’t match the actual data.
   - Requires additional checks and transformations in queries.
4. **Resource Intensive**:
   - Sampling and inference can be resource-heavy for large datasets.

---

## Schema-on-Write

### **Definition**
Schema-on-Write is a traditional approach where the schema is defined and enforced when data is written (ingested) into the system. The data must conform to the predefined schema.

### **Advantages**
1. **Data Consistency**:
   - Ensures data adheres to a predefined structure, improving data quality.
   - Strict validation during ingestion prevents invalid data from entering the system.
2. **Performance**:
   - No runtime schema inference, leading to faster query execution.
   - Optimized storage and memory usage due to predefined types.
3. **Query Optimization**:
   - Enables advanced optimizations like predicate pushdown and column pruning.
   - Better performance for structured queries.
4. **Error Detection**:
   - Errors are caught during ingestion, reducing the risk of downstream failures.

### **Disadvantages**
1. **Rigidity**:
   - Schema changes require updates to the ingestion pipeline.
   - Not suitable for rapidly evolving data structures.
2. **Upfront Effort**:
   - Requires time and effort to design and maintain schemas.
   - May delay data ingestion and analysis.
3. **Less Flexible**:
   - Struggles with semi-structured or unstructured data.
   - Not ideal for exploratory analysis or ad-hoc use cases.
4. **Cost of Schema Evolution**:
   - Migrating data to a new schema can be complex and time-consuming.

---

## Comparison Table

| **Aspect**               | **Schema-on-Read**                                  | **Schema-on-Write**                          |
|--------------------------|-----------------------------------------------------|----------------------------------------------|
| **Flexibility**          | High (handles diverse data formats).                | Low (requires predefined schema).            |
| **Performance**          | Slower (runtime schema inference).                 | Faster (no runtime inference).               |
| **Data Quality**         | Lower (risk of incorrect inference).                | Higher (strict validation during ingestion). |
| **Schema Evolution**     | Easier (adapts to changes dynamically).             | Harder (requires schema updates).            |
| **Use Case**             | Exploratory analysis, prototyping.                 | Production systems, structured data.         |
| **Resource Usage**       | Higher (sampling and inference overhead).          | Lower (optimized for predefined schema).     |

---

## When to Use Which?

### **Schema-on-Read**
- Use for:
  - Exploratory data analysis.
  - Prototyping or ad-hoc queries.
  - Semi-structured or unstructured data (e.g., JSON, CSV).
- Avoid for:
  - Production systems with strict data quality requirements.
  - Large datasets where performance is critical.

### **Schema-on-Write**
- Use for:
  - Production systems with structured data.
  - Scenarios requiring high data quality and consistency.
  - Large datasets where performance is critical.
- Avoid for:
  - Rapidly evolving data structures.
  - Exploratory analysis or prototyping.

---

## Conclusion
- **Schema-on-Read** offers flexibility and rapid ingestion but at the cost of performance and data quality.
- **Schema-on-Write** ensures data consistency and performance but requires upfront effort and is less flexible.
- Choose the approach based on your use case: **Schema-on-Read** for exploration and **Schema-on-Write** for production.

# Spark Schema Handling: Schema-on-Read vs Explicit Schema

## How Spark Parses and Optimizes Data

### **Schema-on-Read (`inferSchema=True`)**  
#### Parsing Behavior  
- **Dynamic Parsing**:  
  - Schema is inferred by sampling the data (default: all rows).  
  - Raw data (e.g., CSV strings) is parsed into Spark types (e.g., `"123"` → `IntegerType`).  
- **Two Data Reads**:  
  1. **Sampling Pass**: Reads a subset of data to infer column types.  
  2. **Full Read**: Reloads the entire dataset with the inferred schema.  

#### Optimization  
- **Catalyst Optimizer**:  
  - Uses the inferred schema for optimizations like predicate pushdown and column pruning.  
  - Less effective if inferred types are incorrect (e.g., `StringType` instead of `DateType`).  
- **Tungsten Binary Format**:  
  - Parsed data is stored in Spark’s optimized in-memory format (similar to explicit schemas).  

#### Key Limitations  
- **Performance Overhead**: Double data read (sampling + parsing).  
- **Data Quality Risks**: Incorrect type inference (e.g., numeric IDs parsed as `IntegerType` and losing leading zeros).  

---

### **Explicit Schema Definition**  
#### Parsing Behavior  
- **Strict Parsing**:  
  - Data is parsed directly into user-defined types (e.g., `DateType` for a column).  
  - Invalid values (e.g., `"Oct-2023"` for `DateType`) are flagged as errors or `null`.  
- **Single Data Read**: No sampling overhead.  

#### Optimization  
- **Catalyst Optimizer**:  
  - Leverages explicit types for aggressive optimizations:  
    - Predicate pushdown (filters applied at the data source).  
    - Column pruning (only required columns are read).  
- **Tungsten Efficiency**:  
  - Data stored in memory using type-specific encodings (e.g., `IntegerType` uses 4 bytes).  

#### Key Advantages  
- **Performance**: Single read pass, no sampling overhead.  
- **Data Correctness**: Strict validation during ingestion.  

---

## Internal Workflow Comparison  

### **Schema-on-Read Example**  
```python
df = spark.read.csv("sales.csv", header=True, inferSchema=True)
df.filter(df.Price > 100).show()