# Running SQL queries on DataFrames


## ✅ Running SQL Queries on DataFrames

PySpark allows you to write **SQL queries directly** on DataFrames by registering them as **temporary views**. This is especially useful if you're familiar with SQL and want to run queries on large distributed data.



## 🔄 Steps to Run SQL on DataFrames

### 1. **Create or Load a DataFrame**

```python
data = [("Ahmad", 22), ("Raza", 25), ("Ali", 25)]
df = spark.createDataFrame(data, ["Name", "Age"])
```



### 2. **Register as a Temporary View**

```python
df.createOrReplaceTempView("people")
```

Now you can use SQL queries on `people`.



### 3. **Run SQL Queries**

```python
result = spark.sql("SELECT Name FROM people WHERE Age > 22")
result.show()
```

📤 Output:

```
+-----+
| Name|
+-----+
| Raza|
|  Ali|
+-----+
```



## 📘 SQL Functions & Types of Queries You Can Run

### 🔹 SELECT Statements

```python
spark.sql("SELECT * FROM people").show()
```

### 🔹 WHERE clause

```python
spark.sql("SELECT * FROM people WHERE Age = 25").show()
```

### 🔹 GROUP BY

```python
spark.sql("SELECT Age, COUNT(*) as Count FROM people GROUP BY Age").show()
```

### 🔹 ORDER BY

```python
spark.sql("SELECT * FROM people ORDER BY Age DESC").show()
```

### 🔹 JOINS

```python
dept = [("Ahmad", "CS"), ("Raza", "IT")]
dept_df = spark.createDataFrame(dept, ["Name", "Dept"])
dept_df.createOrReplaceTempView("departments")

joined = spark.sql("""
    SELECT p.Name, p.Age, d.Dept
    FROM people p
    JOIN departments d ON p.Name = d.Name
""")
joined.show()
```



## 🔧 SQL-Specific Functions in PySpark

| Function                    | Description                  |
| --------------------------- | ---------------------------- |
| `createOrReplaceTempView()` | Creates a temporary SQL view |
| `sql()`                     | Executes SQL query string    |
| `cacheTable("table")`       | Caches a table in memory     |
| `uncacheTable("table")`     | Removes cached table         |
| `isCached("table")`         | Checks if a table is cached  |

---

## ✅ Example: Using SQL Functions

```python
from pyspark.sql.functions import avg

df.groupBy("Age").agg(avg("Age")).show()
```

Equivalent in SQL:

```python
spark.sql("SELECT Age, AVG(Age) FROM people GROUP BY Age").show()
```



## 🔄 Global vs Temporary Views

| View Type                | Scope                    | Use                                 |
| ------------------------ | ------------------------ | ----------------------------------- |
| `createTempView()`       | Session-local            | Disappears after SparkSession ends  |
| `createGlobalTempView()` | Global (across sessions) | Access with `global_temp.tablename` |



### 📝 Best Practice:

Use SQL for complex filtering, joining, and aggregation — especially if collaborating with analysts or SQL-savvy team members.



# Temporary Views and Global Views



## ✅ 1. What is a View in PySpark?

A **View** is like a **virtual table** created from a DataFrame. It lets you **query the DataFrame using SQL syntax**.

There are two main types of views:

| View Type             | Scope               | Accessible In          |
|-----------------------|---------------------|------------------------|
| Temporary View        | Current Session Only| Current SparkSession   |
| Global Temporary View | All Sessions        | All SparkSessions via `global_temp` DB |



## 🧩 2. Temporary Views

### ▶ Description:
- Registered for the **current session only**
- Removed automatically when the session ends

### ✅ Creating a Temporary View

```python
df = spark.createDataFrame([("Ahmad", 22), ("Raza", 25)], ["Name", "Age"])

df.createOrReplaceTempView("people")
```

### ✅ Running SQL Query

```python
spark.sql("SELECT * FROM people WHERE Age > 22").show()
```

📤 Output:
```
+-----+---+
| Name|Age|
+-----+---+
|Raza | 25|
+-----+---+
```



## 🌐 3. Global Temporary Views

### ▶ Description:
- **Persist across SparkSessions** and notebooks
- Stored in the system database: `global_temp`

### ✅ Creating a Global Temp View

```python
df.createOrReplaceGlobalTempView("people")
```

### ✅ Accessing Global Temp View

```python
spark.sql("SELECT * FROM global_temp.people").show()
```

> You must prefix the table name with `global_temp.`



## ⚙️ Comparison Table

| Feature               | Temporary View               | Global Temp View                   |
|------------------------|------------------------------|------------------------------------|
| Scope                 | Current SparkSession         | All SparkSessions                  |
| Database Namespace    | `default` (no prefix)        | Must use `global_temp.` prefix     |
| Lifecycle             | Ends with session            | Ends with Spark application        |
| Use Case              | Session-limited operations   | Shared access across sessions      |



## ✅ Functions to Manage Views

| Function                            | Description                            |
|-------------------------------------|----------------------------------------|
| `createTempView(name)`              | Creates a temporary view               |
| `createOrReplaceTempView(name)`     | Creates or replaces a temp view        |
| `createGlobalTempView(name)`        | Creates a global temp view             |
| `createOrReplaceGlobalTempView(name)`| Creates or replaces a global view      |
| `spark.catalog.dropTempView(name)`  | Drops a temp view                      |
| `spark.catalog.listTables()`        | Lists current session views/tables     |



### 📝 Best Practice:
- Use **Temporary Views** for ad-hoc, session-specific SQL.
- Use **Global Temp Views** for cross-session access and multi-user environments like Databricks.


# SQL Functions



# ✅ PySpark SQL Functions List with Examples

To use SQL functions:

```python
from pyspark.sql.functions import *
```



## 🔹 1. **Aggregate Functions**

| Function  | Description   | Example                    |
| --------- | ------------- | -------------------------- |
| `count()` | Count rows    | `df.select(count("*"))`    |
| `sum()`   | Sum values    | `df.select(sum("salary"))` |
| `avg()`   | Average value | `df.select(avg("age"))`    |
| `min()`   | Minimum value | `df.select(min("age"))`    |
| `max()`   | Maximum value | `df.select(max("age"))`    |

```python
df.groupBy("department").agg(avg("salary"), max("salary")).show()
```



## 🔹 2. **String Functions**

| Function           | Description                    | Example                                         |
| ------------------ | ------------------------------ | ----------------------------------------------- |
| `lower()`          | Convert to lowercase           | `df.select(lower("name"))`                      |
| `upper()`          | Convert to uppercase           | `df.select(upper("name"))`                      |
| `length()`         | String length                  | `df.select(length("name"))`                     |
| `substr()`         | Substring                      | `df.select(substr("name", 1, 3))`               |
| `concat()`         | Concatenate strings            | `df.select(concat(col("fname"), col("lname")))` |
| `trim()`           | Remove leading/trailing spaces | `df.select(trim("name"))`                       |
| `regexp_replace()` | Replace using regex            | `df.select(regexp_replace("name", "a", "@"))`   |

```python
df.select(upper("name").alias("NAME"), length("name")).show()
```



## 🔹 3. **Date & Time Functions**

| Function              | Description                | Example                                               |
| --------------------- | -------------------------- | ----------------------------------------------------- |
| `current_date()`      | Current date               | `df.select(current_date())`                           |
| `current_timestamp()` | Current timestamp          | `df.select(current_timestamp())`                      |
| `datediff()`          | Difference between 2 dates | `df.select(datediff(col("end"), col("start")))`       |
| `months_between()`    | Months between 2 dates     | `df.select(months_between(col("end"), col("start")))` |
| `add_months()`        | Add months to date         | `df.select(add_months(col("start"), 2))`              |
| `date_add()`          | Add days to date           | `df.select(date_add(col("start"), 5))`                |
| `date_sub()`          | Subtract days from date    | `df.select(date_sub(col("start"), 3))`                |

```python
df = spark.createDataFrame([("2024-01-01",)], ["start"])
df.select(current_date(), datediff(current_date(), col("start"))).show()
```



## 🔹 4. **Null Handling Functions**

| Function       | Description               | Example                             |
| -------------- | ------------------------- | ----------------------------------- |
| `isnull()`     | Check for null values     | `df.filter(col("age").isNull())`    |
| `isnotnull()`  | Check for non-null        | `df.filter(col("age").isNotNull())` |
| `fillna()`     | Replace null with a value | `df.fillna(0)`                      |
| `dropna()`     | Drop rows with nulls      | `df.dropna()`                       |
| `na.replace()` | Replace specific values   | `df.na.replace("NA", None)`         |



## 🔹 5. **Conditional Functions**

| Function | Description              | Example                                                        |
| -------- | ------------------------ | -------------------------------------------------------------- |
| `when()` | SQL CASE WHEN equivalent | `df.select(when(col("age") > 18, "Adult").otherwise("Minor"))` |
| `expr()` | Run SQL expression       | `df.select(expr("age * 2"))`                                   |
| `col()`  | Reference column         | `df.select(col("age") + 1)`                                    |

```python
df.select("name", when(col("age") >= 18, "Adult").otherwise("Minor").alias("status")).show()
```



## 🔹 6. **Math Functions**

| Function  | Description       | Example                         |
| --------- | ----------------- | ------------------------------- |
| `round()` | Round number      | `df.select(round("salary", 2))` |
| `sqrt()`  | Square root       | `df.select(sqrt("salary"))`     |
| `abs()`   | Absolute value    | `df.select(abs("salary"))`      |
| `exp()`   | Exponential       | `df.select(exp("age"))`         |
| `log()`   | Natural logarithm | `df.select(log("salary"))`      |
| `pow()`   | Power             | `df.select(pow("age", 2))`      |



## 🔹 7. **Array & Collection Functions**

| Function    | Description             | Example                                    |
| ----------- | ----------------------- | ------------------------------------------ |
| `split()`   | Split string into array | `df.select(split("name", " "))`            |
| `explode()` | Flatten array into rows | `df.select(explode(split("skills", ",")))` |
| `array()`   | Create array            | `df.select(array("col1", "col2"))`         |
| `size()`    | Size of array           | `df.select(size(split("skills", ",")))`    |



## 🔹 8. **Window Functions** (used with `Window` spec)

| Function       | Description        | Example                              |
| -------------- | ------------------ | ------------------------------------ |
| `rank()`       | SQL Rank           | `rank().over(windowSpec)`            |
| `dense_rank()` | Dense rank         | `dense_rank().over(windowSpec)`      |
| `row_number()` | Row number         | `row_number().over(windowSpec)`      |
| `lag()`        | Previous row value | `lag("salary", 1).over(windowSpec)`  |
| `lead()`       | Next row value     | `lead("salary", 1).over(windowSpec)` |

```python
from pyspark.sql.window import Window

w = Window.partitionBy("dept").orderBy("salary")
df.withColumn("rank", rank().over(w)).show()
```



## ✅ Bonus: Use `expr()` to Run SQL Inside DataFrame API

```python
df.selectExpr("name", "age * 2 as double_age").show()
```






# ✅ `groupBy()`, `agg()`, and Advanced Aggregations in PySpark

These operations are used to **group data** and perform **aggregate computations** just like SQL's `GROUP BY`.



## 🔹 1. `groupBy()`

Groups rows based on column(s).

### ✅ Example:

```python
df.groupBy("department").count().show()
```

📤 Output:

```
+----------+-----+
|department|count|
+----------+-----+
|     Sales|    3|
|       HR |    2|
+----------+-----+
```



## 🔹 2. `agg()` – Perform Multiple Aggregations

```python
from pyspark.sql.functions import avg, max, min

df.groupBy("department").agg(
    avg("salary").alias("avg_salary"),
    max("salary").alias("max_salary")
).show()
```

📤 Output:

```
+----------+----------+----------+
|department|avg_salary|max_salary|
+----------+----------+----------+
|     Sales|   45000.0|     60000|
|       HR |   35000.0|     40000|
+----------+----------+----------+
```



## 🔹 3. Group by Multiple Columns

```python
df.groupBy("department", "gender").count().show()
```



## 🔹 4. Window Aggregations (Advanced)

Useful when you want to keep row-level details **with aggregated metrics**.

```python
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

windowSpec = Window.partitionBy("department")

df.withColumn("avg_salary", avg("salary").over(windowSpec)).show()
```



## 🔹 5. Pivot Table (Pivoting Data)

```python
df.groupBy("department").pivot("gender").agg(avg("salary")).show()
```



## 🔹 Common Aggregation Functions

| Function         | Description                   |
| ---------------- | ----------------------------- |
| `count()`        | Number of rows                |
| `sum()`          | Sum of values                 |
| `avg()`          | Average value                 |
| `min()`          | Minimum value                 |
| `max()`          | Maximum value                 |
| `mean()`         | Mean value (alias of avg)     |
| `collect_list()` | Aggregates values into a list |
| `collect_set()`  | Aggregates distinct values    |



### ✅ Example: collect\_list and collect\_set

```python
df.groupBy("department").agg(collect_list("name")).show()
```

📤 Output:

```
+----------+---------------------+
|department|collect_list(name)  |
+----------+---------------------+
|   HR     | [John, Alice]       |
|   Sales  | [Bob, Eve, Charlie] |
+----------+---------------------+
```



#  Window functions



# ✅ Window Functions in PySpark

**Window functions** allow you to perform **aggregations and calculations across a window (group of rows)** related to the current row **without collapsing the rows** like `groupBy()` does.



## 🔸 Why Use Window Functions?

* To compute **rankings**, **running totals**, **percentiles**, **previous/next row comparisons** (like SQL's `LEAD`/`LAG`)
* To **aggregate over partitions** of data while preserving row-level detail



## 🔸 Step-by-Step Usage

```python
from pyspark.sql.window import Window
from pyspark.sql.functions import *

# Define a window spec
windowSpec = Window.partitionBy("department").orderBy("salary")
```



## 🔸 Common Window Functions

| Function         | Description                             |
| ---------------- | --------------------------------------- |
| `row_number()`   | Unique row number in a window partition |
| `rank()`         | Ranking with gaps                       |
| `dense_rank()`   | Ranking without gaps                    |
| `lag()`          | Previous row value                      |
| `lead()`         | Next row value                          |
| `ntile(n)`       | Bucket rows into `n` equal groups       |
| `sum()`, `avg()` | Rolling aggregations                    |
| `min()`, `max()` | Rolling min/max                         |

---

## 🔹 Example DataFrame

```python
data = [
    ("Ahmad", "Sales", 60000),
    ("Raza", "Sales", 50000),
    ("Ali", "Sales", 40000),
    ("Sara", "HR", 45000),
    ("John", "HR", 50000),
]

df = spark.createDataFrame(data, ["name", "department", "salary"])
```



## 🔹 1. `row_number()` Example

```python
windowSpec = Window.partitionBy("department").orderBy(col("salary").desc())

df.withColumn("row_num", row_number().over(windowSpec)).show()
```

📤 Output:

```
+-----+----------+------+--------+
|name |department|salary|row_num|
+-----+----------+------+--------+
|Ahmad|Sales     |60000 |1      |
|Raza |Sales     |50000 |2      |
|Ali  |Sales     |40000 |3      |
|John |HR        |50000 |1      |
|Sara |HR        |45000 |2      |
+-----+----------+------+--------+
```



## 🔹 2. `rank()` and `dense_rank()`

```python
df.withColumn("rank", rank().over(windowSpec)).withColumn("dense_rank", dense_rank().over(windowSpec)).show()
```



## 🔹 3. `lag()` and `lead()` – Previous & Next Row

```python
df.withColumn("prev_salary", lag("salary", 1).over(windowSpec)).withColumn("next_salary", lead("salary", 1).over(windowSpec)).show()
```



## 🔹 4. Rolling Aggregation (sum, avg)

```python
windowSpecRows = Window.partitionBy("department").orderBy("salary").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df.withColumn("cumulative_salary", sum("salary").over(windowSpecRows)).show()
```



## 🔹 5. `ntile(n)` – Divide rows into `n` buckets

```python
df.withColumn("bucket", ntile(2).over(windowSpec)).show()
```



## ✅ Summary: Creating and Using a Window Function

```python
from pyspark.sql.window import Window
from pyspark.sql.functions import *

# Create window specification
windowSpec = Window.partitionBy("dept").orderBy("salary")

# Apply window function
df.withColumn("rank", rank().over(windowSpec)).show()
```

