# **_PySpark_**

PySpark is a Python library/API that allows you to use Apache Spark.

In [0]:
pip install pyspark

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

## SparkSession

In [0]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyFirstApp").getOrCreate()
print(spark.version)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:465)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:743)
	at com.data

- The **spark** variable now holds a **SparkSession object**.
- This **SparkSession** is the **entry point** to use DataFrame API and SQL queries in PySpark.
<br>

🔎 Relationship between Spark components
Earlier (before Spark 2.0), you had to use multiple objects separately:

- SparkContext → to work with RDDs
- SQLContext → to work with DataFrames and SQL
- HiveContext → to work with Hive tables

Since Spark 2.0, all of these are wrapped inside a **single object**: SparkSession.
So:

- spark = SparkSession object
- spark.sparkContext → gives you the SparkContext (low-level RDD API)
- spark.sql() → lets you run SQL queries
- spark.read / spark.write → lets you read and write data


In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Practice").getOrCreate()

data = [(1, "Pratik"), (2, "Sanket")]
columns = ["id", "name"]

df = spark.createDataFrame(data, columns)
df.show()

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:465)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:743)
	at com.data

## SparkContext

### RDD : Resilient Distributed Dataset

- **SparkContext** is the entry point to the **RDD API**.
- You need a SparkContext to create RDDs and run low-level operations.

🔹**Key Properties of RDDs** <br>
**1) Immutable** → once created, you can’t change it (only create new RDDs).<br>

**2) Lazy Evaluation** → transformations (map, filter) don’t run until an action (collect, count) is called.<br>

**3) Transformations vs Actions** <br>

- Transformations → map, filter, flatMap (return new RDDs)
- Actions → collect, count, reduce (trigger computation and return result)


```
from pyspark import SparkContext

sc = SparkContext

# Create an RDD from a Python list
rdd = sc.parallelize([1, 2, 3, 4, 5])

print("RDD:", rdd)
print("Collected data:", rdd.collect())
```

In [0]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
        .appName("sparkContext") \
        .getOrCreate()

# Get sparkContext from SparkSession
sc = spark.sparkContext
# Above statement gives error, because we are on Databricks Serverless compute, which restricts direct JVM access for security and isolation reasons.

# Test it
print(sc)
print("App name:", sc.appName)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:465)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:743)
	at com.data

Since you’re on Databricks Serverless, you should **focus on DataFrame API**, which is built on top of RDDs but much more optimized.

In [0]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
        .appName("sparkContext") \
        .getOrCreate()

# Create a DataFrame
data = [("Pratik", 21), ("Sanket", 19), ("Modi", 56)]
columns = ["Name", "Age"]

df = spark.createDataFrame(data, columns)

# Transformation using DataFrame API
df_filtered = df.filter(df.Age > 30)

df_filtered.show()

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:465)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:743)
	at com.data

## DataFrame & DataFrame API(functions)

In [0]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, avg, count

# Create SparkSession
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()

# Sample data
data = [
    (1, "Alice", 25, "F"),
    (2, "Bob", 30, "M"),
    (3, "Cathy", 27, "F"),
    (4, "David", 35, "M"),
    (5, "Evelyn", 29, "F")
]

# Define schema
columns = ["id", "name", "age", "gender"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

print("=== Original DataFrame ===")
df.show()

# ---------------- OPERATIONS ---------------- #

# 1. Select specific columns
df_select = df.select("name", "age")
print("=== Select name and age ===")
df_select.show()

# 2. Filter rows (age > 28)
df_filter = df.filter(col("age") > 28)
print("=== Filter age > 28 ===")
df_filter.show()

# 3. Add a new column
df_newcol = df.withColumn("is_adult", when(col("age") >= 18, lit("Yes")).otherwise(lit("No")))
print("=== Add new column is_adult ===")
df_newcol.show()

# 4. GroupBy and aggregate
df_group = df.groupBy("gender").agg(
    avg("age").alias("avg_age"),
    count("id").alias("count")
)
print("=== GroupBy gender with average age and count ===")
df_group.show()

# 5. OrderBy
df_order = df.orderBy(col("age").desc())
print("=== Order by age descending ===")
df_order.show()

# 6. Drop a column
df_drop = df.drop("gender")
print("=== Drop gender column ===")
df_drop.show()

# 7. Rename column
df_rename = df.withColumnRenamed("name", "full_name")
print("=== Rename name to full_name ===")
df_rename.show()

# 8. Collect to Python (action)
result = df.filter(col("gender") == "F").collect()
print("=== Collect rows where gender = F ===")
for row in result:
    print(row)


com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:465)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:743)
	at com.data

## Reading/Writing Data

### Reading Data in PySpark

In [0]:
df_csv = spark.read.format("csv") \
        .option("header", True) \
        .option("inferSchema", True) \
        .option("sep", ",") \
        .option("multiLine", True) \
        .load("data/people.csv")

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:465)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:743)
	at com.data

The format API is universal → you can swap "csv" with "json", "parquet", "orc", "jdbc", etc. and just adjust options.

### Writing DataFrames in PySpark

You always use:

```
df.write.format("<format>").option(...).mode(...).save("<path>")
```

Where:

`<format>` → "csv", "json", "parquet", "orc", "delta"

`.mode()` → how to handle existing files: "overwrite", "append", "ignore", "errorifexists"

In [0]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName("ReadWriteExample").getOrCreate()

# Sample DataFrame
df = spark.createDataFrame([
    (1, "Alice", 25, "F"),
    (2, "Bob", 30, "M"),
    (3, "Cathy", 27, "F"),
    (4, None, 22, "M")
], ["id", "name", "age", "gender"])

# ==================== WRITE CSV ==================== #
df.write.format("csv") \
    .option("header", True) \             # Include column names
    .option("sep", ";") \                 # Custom delimiter
    .option("quote", "\"") \              # Quote character for strings
    .option("quoteAll", True) \           # Quote all fields
    .option("escape", "\\") \             # Escape character
    .option("nullValue", "NA") \          # How nulls are represented
    .option("compression", "gzip") \      # Compress output files
    .mode("overwrite") \                  # Overwrite existing data
    .save("output/people_csv")

# ==================== WRITE JSON ==================== #
df.write.format("json") \
    .option("compression", "gzip") \
    .option("dateFormat", "yyyy-MM-dd") \
    .mode("overwrite") \
    .save("output/people_json")

# ==================== WRITE PARQUET with Partition ==================== #
df.write.format("parquet") \
    .partitionBy("gender") \        # Creates separate folders for each gender value
    .mode("overwrite") \ 
    .save("output/people_parquet_partitioned")

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:465)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:743)
	at com.data

## PySpark SQL

🔹 Can we apply SQL directly on a DataFrame?

- No, not directly.
- A DataFrame in Spark is not a SQL table by default.
- To run SQL queries, Spark needs to know about it in the SQL catalog (as a table or view).

### 1️⃣ Register a DataFrame as a Temporary View

In [0]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName("PySparkSQLExample").getOrCreate()

# Sample DataFrame
data = [
    (1, "Alice", 25, "F"),
    (2, "Bob", 30, "M"),
    (3, "Cathy", 27, "F"),
    (4, "David", 35, "M")
]
columns = ["id", "name", "age", "gender"]

df = spark.createDataFrame(data, columns)

# Register DataFrame as a temporary view
df.createOrReplaceTempView("people")

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:465)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:743)
	at com.data

### 2️⃣ Run SQL Queries

In [0]:
# Select all rows
df_all = spark.sql("SELECT * FROM people")
df_all.show()

# Filter rows
df_filtered = spark.sql("SELECT name, age FROM people WHERE age > 28")
df_filtered.show()

# Aggregate
df_agg = spark.sql("SELECT gender, AVG(age) as avg_age, COUNT(id) as total FROM people GROUP BY gender")
df_agg.show()

# Order by
df_order = spark.sql("SELECT * FROM people ORDER BY age DESC")
df_order.show()

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:465)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:743)
	at com.data

We can write SQL of multiple lines also...

In [0]:
df_case = spark.sql(
    """
        SELECT name, 
            age,
            CASE WHEN age >= 30 THEN 'Senior' ELSE 'Junior' END AS category
        FROM people
    """
)
df_case.show()

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:465)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:743)
	at com.data

In [0]:
# SQL style
sql_result = spark.sql("SELECT name, age FROM people WHERE age > 28")

# DataFrame style
df_result = df.filter(df.age > 28).select("name", "age")

sql_result.show()
df_result.show()

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:465)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:743)
	at com.data

🔹 **Quick Analogy** <br>
Think of it like this:

- **DataFrame API** = you manipulate data using functions (select, filter, groupBy)
- **Spark SQL** = you manipulate data using SQL strings (SELECT ... WHERE ...)

Both end up producing **DataFrames** because in Spark, DataFrame is the **universal abstraction** for structured data.

🔹 **DataFrame** in PySpark vs **Table** in SQL <br>
- DataFrame ≈ SQL Table (both structured, column-based)
- But where they live is different:
    - SQL table lives inside a database (Postgres, MySQL, etc.)
    - Spark DataFrame lives in memory across a Spark cluster (distributed).

- What is distrubuted meaning ? <br>
A distributed DataFrame means your **table** is cut into smaller pieces, processed in parallel by many workers, and then combined back, so big data becomes manageable.

⚡ **Quick Note:**
- DataFrame API ≠ row-wise → it’s distributed + optimized.
- SQL and DataFrame API → equally fast, because they share the same Spark engine.
- Only row-wise operations in Python (rdd.map, collect, UDFs) are slow.

## PySpark Built-in Functions

### 🔹 Why use built-in functions instead of Python loops?
1️⃣ **Spark works in a distributed way**

- Your DataFrame is split across many partitions (chunks of data). 
- Spark sends tasks to multiple executors (workers).
- Built-in functions (pyspark.sql.functions) are executed in JVM (Java Virtual Machine) directly on those workers in parallel.
- They are vectorized (operate on whole columns at once, like SQL), not row-by-row.

👉 This means operations happen **close to the data, at scale, in parallel**.
<hr>

2️⃣ **Python loops break distribution**

If you try to use a Python for loop or row-wise function like this:

```
for row in df.collect():
    row.age + 10
```


Problems:

- .collect() brings all data from Spark cluster → into driver’s memory (one machine).
- Now Spark isn’t distributed anymore → you’re just using Python locally.
- Slow + memory blowup risk with big data.

<hr>

3️⃣ **Built-in functions are optimized**

- Spark’s **Catalyst Optimizer** can understand built-in functions (like F.col, F.when, F.upper) and optimize execution plans.
- Example: multiple transformations can be combined into one optimized query plan.
- Python loops are opaque to Spark → no optimization possible.

<hr>

4️⃣ **Vectorization vs Row-wise**

👉 Row-wise (Python loop) = **one row at a time**. --> (lots of serialization) <br>
👉 Vectorized (Built-in functions) = **entire column (vector) at once**, across partitions.

<hr>

🔑 **Rule of Thumb**

✅ Use pyspark.sql.functions (native functions) whenever possible.

❌ Avoid df.rdd.map, collect(), or Python UDFs unless you really need custom logic.

If you must use Python, consider Pandas UDFs (mapInPandas) → faster than normal UDFs.

<hr>

⚡ **In short:**
- Built-in functions = distributed, optimized, vectorized → scale to big data.
- Python loops = local, slow, unoptimized → break Spark’s parallelism.

In [0]:
from pyspark.sql import functions as F

### 1️⃣ String Functions

In [0]:
data = [("Pratik",),("Sanket",),("kavish",)]
df = spark.createDataFrame(data, ["name"])

df.select(
    F.trim(F.col("name")).alias("Trimmed"),
    F.lower("name").alias("lower"),
    F.upper("name").alias("UPPER"),
    F.length("name").alias("Length"),
    F.concat(F.lit("Hello ") ,F.col("name")).alias("Greeting")
).show()

+-------+------+------+------+------------+
|Trimmed| lower| UPPER|Length|    Greeting|
+-------+------+------+------+------------+
| Pratik|pratik|PRATIK|     6|Hello Pratik|
| Sanket|sanket|SANKET|     6|Hello Sanket|
| kavish|kavish|KAVISH|     6|Hello kavish|
+-------+------+------+------+------------+



### 2️⃣ Numeric Functions

In [0]:
data = [(1, 10), (2, 20), (3, 30)]
df = spark.createDataFrame(data, ["id", "value"])

df.select(
    F.col("id"),
    F.col("value"),
    F.sqrt("value").alias("sqrt"),
    F.pow("id", 2).alias("id_squared"),
    F.round(F.col("value")/3, 2).alias("rounded_div")
).show()

+---+-----+------------------+----------+-----------+
| id|value|              sqrt|id_squared|rounded_div|
+---+-----+------------------+----------+-----------+
|  1|   10|3.1622776601683795|       1.0|       3.33|
|  2|   20|  4.47213595499958|       4.0|       6.67|
|  3|   30| 5.477225575051661|       9.0|       10.0|
+---+-----+------------------+----------+-----------+



### 3️⃣ Date & Time Functions

In [0]:
data = [("2025-09-20",), ("2023-01-01",)]
df = spark.createDataFrame(data, ["date_str"])

# Convert the string to date using to_date function
df = df.withColumn("date", F.to_date("date_str", "yyyy-MM-dd"))

df.select(
    "date",
    F.current_date().alias("today"),
    F.datediff(F.current_date(), "date").alias("days_diff"),
    F.year("date").alias("year"),
    F.month("date").alias("month"),
    F.dayofmonth("date").alias("day"),
    F.date_add("date", 7).alias("plus_7_days"),
    F.date_sub("date", 7).alias("minus_7_days")
).show()

+----------+----------+---------+----+-----+---+-----------+------------+
|      date|     today|days_diff|year|month|day|plus_7_days|minus_7_days|
+----------+----------+---------+----+-----+---+-----------+------------+
|2025-09-20|2025-09-20|        0|2025|    9| 20| 2025-09-27|  2025-09-13|
|2023-01-01|2025-09-20|      993|2023|    1|  1| 2023-01-08|  2022-12-25|
+----------+----------+---------+----+-----+---+-----------+------------+



### 4️⃣ Conditional Functions

In [0]:
data = [(1,11),(2,24),(3,67),(4,5),(5,32)]
df = spark.createDataFrame(data, ["id","age"])

df.select(
    "id",
    "age",
    F.when(F.col("age") >= 18, "Adult")
     .otherwise("Minor")
     .alias("Age_Category")
).show()

+---+---+------------+
| id|age|Age_Category|
+---+---+------------+
|  1| 11|       Minor|
|  2| 24|       Adult|
|  3| 67|       Adult|
|  4|  5|       Minor|
|  5| 32|       Adult|
+---+---+------------+



### 5️⃣ Aggregate Functions

In [0]:
data = [("M", 20), ("F", 25), ("M", 30), ("F", 40)]
df = spark.createDataFrame(data, ["gender", "age"])

df.groupBy("gender").agg(
    F.count("*").alias("count"),
    F.avg("age").alias("avg_age"),
    F.min("age").alias("min_age"),
    F.max("age").alias("max_age")
).show()

+------+-----+-------+-------+-------+
|gender|count|avg_age|min_age|max_age|
+------+-----+-------+-------+-------+
|     M|    2|   25.0|     20|     30|
|     F|    2|   32.5|     25|     40|
+------+-----+-------+-------+-------+



### 6️⃣ Collection Functions

In [0]:
data = [(1, ["a", "b", "c"]), (2, ["x", "y"])]
df = spark.createDataFrame(data, ["id", "letters"])

df.select(
    "id",
    "letters",
    F.size("letters").alias("size"),
    F.array_contains("letters", "a").alias("has_a"),
    F.concat_ws("-", "letters").alias("joined")
).show()

+---+---------+----+-----+------+
| id|  letters|size|has_a|joined|
+---+---------+----+-----+------+
|  1|[a, b, c]|   3| true| a-b-c|
|  2|   [x, y]|   2|false|   x-y|
+---+---------+----+-----+------+



**`data = [(" Pratik ",), ("Sanket",)]` ---> Why there is a comma ?**

In PySpark, when we create a DataFrame from raw data:
- `data` is a list of rows.
- Each row must be represented as a tuple (or list).
- If a row has only one column, it’s a 1-element tuple, so we need the ,.
- If we don't write comma `[("Pratik")]` then it becomes a single variable.

## Joins & Window Functions
###🔹Joins in PySpark

In [0]:
# Data Sample
data1 = [(1, "Alice"), (2, "Bob"), (3, "Cathy")]
df1 = spark.createDataFrame(data1, ["id", "name"])

data2 = [(1, "NY"), (2, "CA"), (4, "TX")]
df2 = spark.createDataFrame(data2, ["id", "state"])

In [0]:
# Inner Join → keeps only matching ids
df_inner = df1.join(df2, on="id", how="inner")
df_inner.show()

# Left Join → keeps all from df1, match where possible
df_left = df1.join(df2, on="id", how="left")
df_left.show()

# Right Join → keeps all from df2
df_right = df1.join(df2, on="id", how="right")
df_right.show()

# Outer Join → keeps all from both sides
df_outer = df1.join(df2, on="id", how="outer")
df_outer.show()

# Also support left_semi,left_anti
# left_semi = only left rows that matched
# left_anti = only left rows that didn’t match

+---+-----+-----+
| id| name|state|
+---+-----+-----+
|  1|Alice|   NY|
|  2|  Bob|   CA|
+---+-----+-----+

+---+-----+-----+
| id| name|state|
+---+-----+-----+
|  1|Alice|   NY|
|  2|  Bob|   CA|
|  3|Cathy| NULL|
+---+-----+-----+

+---+-----+-----+
| id| name|state|
+---+-----+-----+
|  1|Alice|   NY|
|  2|  Bob|   CA|
|  4| NULL|   TX|
+---+-----+-----+

+---+-----+-----+
| id| name|state|
+---+-----+-----+
|  1|Alice|   NY|
|  2|  Bob|   CA|
|  3|Cathy| NULL|
|  4| NULL|   TX|
+---+-----+-----+



### 🔹 Window Functions

In [0]:
sales_data = [
    ("Alice", "Electronics", 1000),
    ("Alice", "Clothing", 500),
    ("Bob", "Electronics", 1200),
    ("Bob", "Clothing", 700),
    ("Cathy", "Electronics", 800)
]

df_sales = spark.createDataFrame(sales_data, ["name", "category", "amount"])

In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

window_person = Window.partitionBy("name")
df_sales.withColumn("total_by_person", F.sum("amount").over(window_person)).show()

+-----+-----------+------+---------------+
| name|   category|amount|total_by_person|
+-----+-----------+------+---------------+
|Alice|Electronics|  1000|           1500|
|Alice|   Clothing|   500|           1500|
|  Bob|Electronics|  1200|           1900|
|  Bob|   Clothing|   700|           1900|
|Cathy|Electronics|   800|            800|
+-----+-----------+------+---------------+



In [0]:
window_spec = Window.partitionBy("category").orderBy(F.desc("amount"))

df_sales.withColumn("rank_in_category", F.rank().over(window_spec)).show()

+-----+-----------+------+----------------+
| name|   category|amount|rank_in_category|
+-----+-----------+------+----------------+
|  Bob|   Clothing|   700|               1|
|Alice|   Clothing|   500|               2|
|  Bob|Electronics|  1200|               1|
|Alice|Electronics|  1000|               2|
|Cathy|Electronics|   800|               3|
+-----+-----------+------+----------------+



In [0]:
window_spec = Window.partitionBy("name").orderBy("amount")

df_sales.withColumn("running_total", F.sum("amount").over(window_spec.rowsBetween(Window.unboundedPreceding, 0))).show()

+-----+-----------+------+-------------+
| name|   category|amount|running_total|
+-----+-----------+------+-------------+
|Alice|   Clothing|   500|          500|
|Alice|Electronics|  1000|         1500|
|  Bob|   Clothing|   700|          700|
|  Bob|Electronics|  1200|         1900|
|Cathy|Electronics|   800|          800|
+-----+-----------+------+-------------+



## Handling Nulls

a) Dropping Nulls

In [0]:
data = [
    (None, "Supermarket Type1"),
    ("Medium", "Supermarket Type2"),
    (None, None),
    ("Small", "Grocery")
]

df = spark.createDataFrame(data, ["Outlet_Size", "Outlet_Type"])

In [0]:
# Drop rows where ALL values are null
df.dropna(how='all').show()

# Drop rows where ANY value is null
df.dropna(how='any').show()

# Drop rows where specific column(s) have null
df.dropna(subset=['Outlet_Size']).show()

+-----------+-----------------+
|Outlet_Size|      Outlet_Type|
+-----------+-----------------+
|       NULL|Supermarket Type1|
|     Medium|Supermarket Type2|
|      Small|          Grocery|
+-----------+-----------------+

+-----------+-----------------+
|Outlet_Size|      Outlet_Type|
+-----------+-----------------+
|     Medium|Supermarket Type2|
|      Small|          Grocery|
+-----------+-----------------+

+-----------+-----------------+
|Outlet_Size|      Outlet_Type|
+-----------+-----------------+
|     Medium|Supermarket Type2|
|      Small|          Grocery|
+-----------+-----------------+



b) Filling Nulls

In [0]:
# Fill nulls with a value in all columns
df.fillna("NotAvailable").show()

# Fill nulls only in selected columns
df.fillna("NotAvailable", subset=['Outlet_Size']).show()

+------------+-----------------+
| Outlet_Size|      Outlet_Type|
+------------+-----------------+
|NotAvailable|Supermarket Type1|
|      Medium|Supermarket Type2|
|NotAvailable|     NotAvailable|
|       Small|          Grocery|
+------------+-----------------+

+------------+-----------------+
| Outlet_Size|      Outlet_Type|
+------------+-----------------+
|NotAvailable|Supermarket Type1|
|      Medium|Supermarket Type2|
|NotAvailable|             NULL|
|       Small|          Grocery|
+------------+-----------------+



## SPLIT and Indexing

In [0]:
from pyspark.sql.functions import split, col

# Split column "Outlet_Type" by space into array
df.withColumn("Outlet_Type", split(col("Outlet_Type"), " ")).show()

# Get 1st element (index 0) from split result
df.withColumn("Outlet_Type", split(col("Outlet_Type"), " ")[0]).show()

+-----------+--------------------+
|Outlet_Size|         Outlet_Type|
+-----------+--------------------+
|       NULL|[Supermarket, Type1]|
|     Medium|[Supermarket, Type2]|
|       NULL|                NULL|
|      Small|           [Grocery]|
+-----------+--------------------+

+-----------+-----------+
|Outlet_Size|Outlet_Type|
+-----------+-----------+
|       NULL|Supermarket|
|     Medium|Supermarket|
|       NULL|       NULL|
|      Small|    Grocery|
+-----------+-----------+



## Explode
`explode` converts an array column into multiple rows.

In [0]:
from pyspark.sql.functions import explode

# First split into array
df_exp = df.withColumn("Outlet_Type", split(col("Outlet_Type"), " "))

# Explode array into multiple rows
df_exp.withColumn("Outlet_Type", explode(col("Outlet_Type"))).show()

+-----------+-----------+
|Outlet_Size|Outlet_Type|
+-----------+-----------+
|       NULL|Supermarket|
|       NULL|      Type1|
|     Medium|Supermarket|
|     Medium|      Type2|
|      Small|    Grocery|
+-----------+-----------+



## array_contains

In [0]:
from pyspark.sql.functions import array_contains
df_exp.withColumn("Type1_flag", array_contains(col("Outlet_Type"), "Type1")).show()

+-----------+--------------------+----------+
|Outlet_Size|         Outlet_Type|Type1_flag|
+-----------+--------------------+----------+
|       NULL|[Supermarket, Type1]|      true|
|     Medium|[Supermarket, Type2]|     false|
|       NULL|                NULL|      NULL|
|      Small|           [Grocery]|     false|
+-----------+--------------------+----------+



## df.show() → full signature

`df.show(n=20, truncate=True, vertical=False)`

- n → number of rows to show (default = 20)
- truncate → if True, truncate long values (default 20 chars);
- truncate=False → no truncation
- truncate=10 → truncate after 10 characters
- vertical=True → display each row vertically (useful when there are many columns)

## collect_list & collect_set

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

data = [
    ("Pratik", "iPhone"),
    ("Sanket", "MacBook"),
    ("Pratik", "Rolls Royal"),
    ("Kavish", "Bike"),
    ("Sanket", "MacBook")  #duplicate
]

columns = ["name", "items"]

spark = SparkSession.builder.appName("collect").getOrCreate()
df = spark.createDataFrame(data, columns)

df_collect = df.groupBy("name").agg(
    F.collect_list("items").alias("items_list"),
    F.collect_set("items").alias("items_set")
)

df_collect.show(truncate=False)

+------+---------------------+---------------------+
|name  |items_list           |items_set            |
+------+---------------------+---------------------+
|Pratik|[iPhone, Rolls Royal]|[iPhone, Rolls Royal]|
|Sanket|[MacBook, MacBook]   |[MacBook]            |
|Kavish|[Bike]               |[Bike]               |
+------+---------------------+---------------------+

