In [0]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [0]:
# Import the Row class from pyspark.sql
# Row is used to represent a single row of data in PySpark DataFrames
from pyspark.sql import Row  

# Create a Row object with two values: 1 and 'divya'
# Since no field names are provided, PySpark assigns default field names: _1 and _2
r1 = Row(1, 'divya')  

# Print the Row object
# Output will display as Row(_1=1, _2='divya') because default field names are used
print(r1)  


<Row(1, 'divya')>


In [0]:
# The print() function outputs the specified message to the console.
# In this case, it will print the string 'divya'.
print('divya')


divya


In [0]:
# The type() function returns the type of the specified object.
# In this case, it will return the data type of the variable 'r1'.
# If 'r1' is a Row object (from pyspark.sql), the output will be <class 'pyspark.sql.types.Row'>.
print(type(r1))


<class 'pyspark.sql.types.Row'>


In [0]:
# Import the Row class from pyspark.sql
# Row is used to represent a single row of data in PySpark, similar to a record in a DataFrame
from pyspark.sql import Row  

# Create a Row object with two fields: 'id' and 'name'
# 'id' is assigned the value 1, and 'name' is assigned the value 'sanket'
r1 = Row(id=1, name="sanket")  

# Display the Row object
# In an interactive environment like databricks,Jupyter Notebook, simply typing 'r1' displays its contents
# The output will show: Row(id=1, name='sanket')
r1  


Out[11]: Row(id=1, name='sanket')

In [0]:
r1['id'], r1['name']

Out[12]: (1, 'sanket')

In [0]:

'''
The code provided works in Databricks because Databricks automatically initializes a SparkSession object called spark. However, in Jupyter Notebook, you need to explicitly create the SparkSession.
'''

# Import all classes and functions from pyspark.sql module
# This includes SparkSession, Row, DataFrame, and various SQL functions
from pyspark.sql import *  

# Create Row objects representing individual records with 'id' and 'name' fields
r1 = Row(id=1, name='Sanket')   # Row with id=1 and name='Sanket'
r2 = Row(id=2, name='Akash')    # Row with id=2 and name='Akash'
r3 = Row(id=3, name='Rajesh')   # Row with id=3 and name='Rajesh'
r4 = Row(id=4, name='divya')    # Row with id=4 and name='divya'

# Combine all Row objects into a list (this will be used to create the DataFrame)
data1 = [r1, r2, r3, r4]  

# Create a DataFrame from the list of Row objects using the SparkSession object 'spark'
# Note: In Databricks, 'spark' is available by default. In Jupyter Notebook, you must create it using SparkSession.builder.
df = spark.createDataFrame(data1)  

# Display the contents of the DataFrame in a tabular format
df.show()  

# Print the schema of the DataFrame to show column names and data types
df.printSchema()  


+---+------+
| id|  name|
+---+------+
|  1|Sanket|
|  2| Akash|
|  3|Rajesh|
|  4| divya|
+---+------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



Both codes create a DataFrame using `Row` in PySpark, but the difference is:  

- **Code 1:(above code)** Defines each row with named fields (`id`, `name`) directly.  
- **Code 2:(below code)** Uses a **schema-based Row** (`student = Row('id','Name')`) to define column names, providing a reusable structure.

In [0]:
# Import the Row class from pyspark.sql
# Row is used to represent a record with fields that can later be converted into a DataFrame
from pyspark.sql import Row  

# Define a Row class with the specified column names: 'id' and 'Name'
# This will act as the schema when creating the DataFrame
student = Row('id', 'Name')  

# Create Row objects representing individual student records with 'id' and 'Name' fields
r1 = Row(1, 'Sanket')   # Row with id=1 and Name='Sanket'
r2 = Row(2, 'Akash')    # Row with id=2 and Name='Akash'
r3 = Row(3, 'Rajesh')   # Row with id=3 and Name='Rajesh'
r4 = Row(4, 'divya')    # Row with id=4 and Name='divya'

# Create a DataFrame from the list of Row objects using the defined schema 'student'
# The 'schema=student' argument assigns the column names 'id' and 'Name' to the DataFrame
df1 = spark.createDataFrame([r1, r2, r3, r4], schema=student)  

# Display the contents of the DataFrame in a tabular format
df1.show()  

# Print the schema of the DataFrame to show column names and data types
df1.printSchema()  


+---+------+
| id|  Name|
+---+------+
|  1|Sanket|
|  2| Akash|
|  3|Rajesh|
|  4| divya|
+---+------+

root
 |-- id: long (nullable = true)
 |-- Name: string (nullable = true)



In [0]:
# Importing the Row class from pyspark.sql to create Row objects
# Row allows the creation of records similar to a dictionary or struct
from pyspark.sql import Row

# Creating a list named 'data' that contains Row objects
# Each Row represents a record with two fields: 'name' and 'game'
# The 'game' field itself is a nested Row containing 'g1' (game name) and 'rank' (player's rank)
data = [
    Row(name='nisha', game=Row(g1='chess', rank=2)),     # First record: name='nisha', plays 'chess' with rank=2
    Row(name='nina', game=Row(g1='chess', rank=3)),      # Second record: name='nina', plays 'chess' with rank=3
    Row(name='nila', game=Row(g1='football', rank=10))   # Third record: name='nila', plays 'football' with rank=10
]

# Creating a DataFrame named 'df' from the list of Row objects
# The spark.createDataFrame() function takes the 'data' list and automatically infers the schema
# The 'game' column will be interpreted as a struct type because it contains nested fields ('g1' and 'rank')
df = spark.createDataFrame(data)

# Displaying the contents of the DataFrame in a tabular format
# The show() function by default displays the first 20 rows
# The 'game' column will show nested data as {g1_value, rank_value}
df.show()

# Printing the inferred schema of the DataFrame
# The printSchema() function shows the structure of the DataFrame, including data types and nullability
# The output will show that 'game' is a struct type with two fields: 'g1' (string) and 'rank' (long)
df.printSchema()


+-----+--------------+
| name|          game|
+-----+--------------+
|nisha|    {chess, 2}|
| nina|    {chess, 3}|
| nila|{football, 10}|
+-----+--------------+

root
 |-- name: string (nullable = true)
 |-- game: struct (nullable = true)
 |    |-- g1: string (nullable = true)
 |    |-- rank: long (nullable = true)



1️⃣ **Difference in Row Creation:**  
- **First code:** Directly uses `Row()` with named parameters (`Row(name='nisha', game=Row(...))`).  
- **Second code:** Uses a **Row factory** (`sg = Row('name', 'game')`) to create rows with a predefined structure.  

2️⃣ **Schema Definition:**  
- **First code:** Schema inferred from direct `Row` definitions.  
- **Second code:** Schema is implicitly defined by the **Row factory (`sg`)**, ensuring a uniform structure across all rows.

In [0]:
# Importing the Row class from pyspark.sql to create Row objects
from pyspark.sql import Row

# Creating a Row factory 'sg' with two fields: 'name' and 'game'
# This allows creating Row objects using 'sg' with the specified field names
sg = Row('name', 'game')

# Creating the first Row 'r1' using the Row factory 'sg'
# 'name' field is assigned 'Nisha'
# 'game' field is a nested Row containing 'g1' (game name = 'Chess') and 'rank' (rank = 2)
r1 = sg('Nisha', Row(g1='Chess', rank=2))

# Creating the second Row 'r2' with 'name' as 'Nisha' and 'game' as nested Row with rank 3
r2 = sg('Nisha', Row(g1='Chess', rank=3))

# Creating the third Row 'r3' with the same values as 'r1' for demonstration
r3 = sg('Nisha', Row(g1='chess', rank=2))

# Creating a DataFrame 'df2' from the list of Row objects [r1, r2, r3]
# PySpark will infer the schema automatically, including the nested structure for 'game'
df2 = spark.createDataFrame([r1, r2, r3])
df2.show()
# Printing the schema of the DataFrame to display the structure and data types of all columns
# The 'game' column will appear as a struct with two fields: 'g1' (string) and 'rank' (long)
df2.printSchema()


+-----+----------+
| name|      game|
+-----+----------+
|Nisha|{Chess, 2}|
|Nisha|{Chess, 3}|
|Nisha|{chess, 2}|
+-----+----------+

root
 |-- name: string (nullable = true)
 |-- game: struct (nullable = true)
 |    |-- g1: string (nullable = true)
 |    |-- rank: long (nullable = true)



In [0]:
# Importing the Row class from pyspark.sql to create Row objects
from pyspark.sql import Row

# Creating a Row factory 'rank' with two fields: 'g1' (game) and 'rank' (player rank)
# This allows creating nested Row objects for the 'rank' column with the specified structure
rank = Row('g1', 'rank')

# Creating another Row factory 'student' with three fields: 'id', 'name', and 'rank'
# The 'rank' field here will store nested Row objects created using the 'rank' factory above
student = Row('id', 'name', 'rank')

# Creating the first Row 'r1' using the 'student' Row factory
# 'id' = 1, 'name' = 'Sanket', 'rank' = nested Row with 'g1' = 'sketing' and 'rank' = '3'
r1 = student(1, 'Sanket', rank('sketing', '3'))

# Creating the second Row 'r2' with 'id' = 2, 'name' = 'divya', and 'rank' = ('sketing', '2')
r2 = student(2, 'divya', rank('sketing', '2'))

# Creating the third Row 'r3' with 'id' = 3, 'name' = 'wade', and 'rank' = ('sketing', '1')
r3 = student(3, 'wade', rank('sketing', '1'))

# Printing the 'id' and 'name' fields of the first Row (r1)
# Output will be: 1 Sanket
print(r1.id, r1.name)

# Printing the entire 'rank' nested Row for r1
# Output will show: Row(g1='sketing', rank='3')
print(r1.rank)

# Creating a list 'data1' containing all three Row objects [r1, r2, r3]
data1 = [r1, r2, r3]

# Creating a DataFrame 'df' from the list of Row objects
# PySpark automatically infers the schema, including the nested struct for 'rank'
df = spark.createDataFrame(data1)

# Displaying the contents of the DataFrame in tabular form
df.show()

# Printing the inferred schema of the DataFrame
# The 'rank' field will be shown as a struct with fields 'g1' (string) and 'rank' (string)
df.printSchema()


1 Sanket
Row(g1='sketing', rank='3')
+---+------+------------+
| id|  name|        rank|
+---+------+------------+
|  1|Sanket|{sketing, 3}|
|  2| divya|{sketing, 2}|
|  3|  wade|{sketing, 1}|
+---+------+------------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- rank: struct (nullable = true)
 |    |-- g1: string (nullable = true)
 |    |-- rank: string (nullable = true)



### ✅ **Differences between the three codes in 3 lines:**

1️⃣ **Row Creation Method:**  
- **Code 1:** Directly uses **`Row()` with nested fields** for each record.  
- **Code 2:** Uses a **Row factory (`sg`)** for a consistent schema across rows.  
- **Code 3:** Defines **multiple Row factories (`rank`, `student`)** to create a **nested struct** with an additional `id` field.

---

2️⃣ **Data Structure & Schema:**  
- **Code 1:** **Nested struct** (`game`) with varying values for `g1` and `rank`.  
- **Code 2:** **Uniform nested struct** (`game`) across all rows with same `name`.  
- **Code 3:** **Deeper nested struct** (`rank`) inside the `student` row, adding an **extra `id` field**.

---

3️⃣ **Data Variability:**  
- **Code 1:** **Different names** and **different games** (`chess`, `football`).  
- **Code 2:** **Same name** (`Nisha`), **same game** (`Chess`), but **different ranks**.  
- **Code 3:** **Different IDs**, **same game** (`sketing`), **different ranks** for each student.

# COL operation


### ✅ **Explanation of Commonly Used PySpark Functions**  

These functions are imported from `pyspark.sql.functions` and are widely used for **DataFrame transformations**. Let’s break down the **syntax**, **usage**, and **examples** for each:  

---

### 1️⃣ **`lit()` – Literal Value Function**  
#### 🔹 **Purpose:**  
- Adds a **constant (literal) value** to a column for all rows in a DataFrame.  
- Commonly used with **`withColumn()`**.

#### 🔹 **Syntax:**  
```python
lit(value)
```
- `value`: The **constant value** (string, number, boolean, etc.) you want to add.

#### 🔹 **Example:**
```python
from pyspark.sql.functions import lit

df = df.withColumn("Country", lit("India"))  # Adds a new column 'Country' with 'India' for all rows
df.show()
```
#### 🔹 **Output:**
```
+---+------+---------+
| id|  name|  Country|
+---+------+---------+
|  1|Sanket|    India|
|  2|  Wade|    India|
+---+------+---------+
```
---

### 2️⃣ **`col()` – Column Reference Function**  
#### 🔹 **Purpose:**  
- Refers to a **column** in a DataFrame by name.  
- Mostly used in **expressions**, **filters**, and **select** statements.

#### 🔹 **Syntax:**  
```python
col("column_name")
```
- `"column_name"`: The **name of the column** you want to reference.

#### 🔹 **Example:**
```python
from pyspark.sql.functions import col

df.select(col("name"), col("id")).show()  # Selects only 'name' and 'id' columns from the DataFrame
```
#### 🔹 **Output:**
```
+------+---+
|  name| id|
+------+---+
|Sanket|  1|
|  Wade|  2|
+------+---+
```
---

### 3️⃣ **`when()` – Conditional Logic Function**  
#### 🔹 **Purpose:**  
- Similar to **IF-ELSE** logic in SQL.  
- Returns a **new column** based on **conditional statements**.  
- Often used with **`otherwise()`** for the else condition.

#### 🔹 **Syntax:**  
```python
when(condition, value).otherwise(value)
```
- `condition`: A **boolean expression** (e.g., `col('rank') > 2`).  
- `value`: The **value** to assign if the condition is **true** or **false**.

#### 🔹 **Example:**
```python
from pyspark.sql.functions import when, col

df = df.withColumn("Performance", when(col("rank") > 2, "Average").otherwise("Good"))
df.show()
```
#### 🔹 **Output:**
```
+---+------+----+-----------+
| id|  name|rank|Performance|
+---+------+----+-----------+
|  1|Sanket|   3|    Average|
|  2|  Wade|   2|        Good|
+---+------+----+-----------+
```
---

### ⚡ **Summary of Syntax & Usage:**  
| Function | Syntax                   | Purpose                      | Example Usage                      |
|----------|--------------------------|------------------------------|-------------------------------------|
| **`lit()`**  | `lit(value)`               | Adds **constant value** to a column | `df.withColumn('Country', lit('India'))` |
| **`col()`**  | `col('column_name')`       | **References a column** by name     | `df.select(col('name'))`            |
| **`when()`** | `when(condition, value)`   | Adds **conditional logic** to a column | `df.withColumn('Status', when(col('score') > 50, 'Pass').otherwise('Fail'))` |

---

### 🎯 **Real-World Example Combining All Functions:**
```python
from pyspark.sql.functions import col, lit, when

# Sample DataFrame
data = [(1, "Sanket", 3), (2, "Wade", 2)]
columns = ["id", "name", "rank"]
df = spark.createDataFrame(data, columns)

# Applying lit(), col(), and when() together
df = df.withColumn("Country", lit("India")) \
       .withColumn("Performance", when(col("rank") > 2, "Average").otherwise("Good"))

df.show()
```

#### 🔹 **Output:**
```
+---+------+----+-------+-----------+
| id|  name|rank|Country|Performance|
+---+------+----+-------+-----------+
|  1|Sanket|   3|  India|    Average|
|  2|  Wade|   2|  India|        Good|
+---+------+----+-------+-----------+
```

---

### 💡 **Key Takeaways:**
- **`lit()`** is best for **constant values** across all rows.  
- **`col()`** allows for **dynamic column referencing**, crucial for transformations.  
- **`when()`** enables **conditional transformations**, adding **decision-making logic** to columns.  




In [0]:
# Importing all functions from the pyspark.sql.functions module
# This includes functions like lit(), col(), when(), etc., used for DataFrame transformations
from pyspark.sql.functions import *

# Adding a new column 'School' to the DataFrame 'df' using withColumn()
# The 'lit()' function is used to assign a constant value ('Symbosis') to the new column for all rows
df1 = df.withColumn('School', lit('Symbosis'))

# Displaying the updated DataFrame with the newly added 'School' column
df1.show()

# Printing the updated schema of the DataFrame to confirm that 'School' has been added as a string field
df1.printSchema()


+---+------+------------+--------+
| id|  name|        rank|  School|
+---+------+------------+--------+
|  1|Sanket|{sketing, 3}|Symbosis|
|  2| divya|{sketing, 2}|Symbosis|
|  3|  wade|{sketing, 1}|Symbosis|
+---+------+------------+--------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- rank: struct (nullable = true)
 |    |-- g1: string (nullable = true)
 |    |-- rank: string (nullable = true)
 |-- School: string (nullable = false)



In [0]:
from pyspark.sql.functions import when

# Sample data
data = [     
    (1, "Alice", 85),     
    (2, "Bob", 45),     
    (3, "Charlie", 60),     
    (4, "David", 30) 
] 

# Define schema and create DataFrame 
df = spark.createDataFrame(data, ["ID", "Name", "Score"])

# Show the original DataFrame
df.show()

# Apply conditional transformations on 'Score' column
df1 = df.select(df.ID, df.Name, 
                when(df.Score == 85, 90).otherwise(df.Score))  # Correct the syntax for the 'when' function

df1.show()


+---+-------+-----+
| ID|   Name|Score|
+---+-------+-----+
|  1|  Alice|   85|
|  2|    Bob|   45|
|  3|Charlie|   60|
|  4|  David|   30|
+---+-------+-----+

+---+-------+---------------------------------------------+
| ID|   Name|CASE WHEN (Score = 85) THEN 90 ELSE Score END|
+---+-------+---------------------------------------------+
|  1|  Alice|                                           90|
|  2|    Bob|                                           45|
|  3|Charlie|                                           60|
|  4|  David|                                           30|
+---+-------+---------------------------------------------+



 ### `when(df.Score == 85, 90).otherwise(df.Score)`:

- **`when()`**: This function is used to define conditional logic. It works like an **IF-ELSE** statement in programming.
  - **`df.Score == 85`**: This checks if the value in the **Score** column is **85**.
  - **`90`**: If the condition is **True** (i.e., if the score is **85**), the value will be replaced with **90**.

- **`otherwise(df.Score)`**: This specifies what happens when the condition in `when()` is **False**.
  - If the score is **not** 85, then it keeps the original value of **Score** from the DataFrame (i.e., `df.Score`).


In [0]:
from pyspark.sql.functions import when


data = [     (1, "Alice", 85),     (2, "Bob", 45),     (3, "Charlie", 60),     (4, "David", 30) ] 
# Define schema and create DataFrame 
df = spark.createDataFrame(data, ["ID", "Name", "Score"])

df.show()

df1 = df.select(df.ID, df.Name, 
                when(df.Score == 85, 90)
                .when(df.Score == 45, 50)  # second condition for when the score is 45
                .otherwise(df.Score))  # keep the original score for all other cases
df1.show()

+---+-------+-----+
| ID|   Name|Score|
+---+-------+-----+
|  1|  Alice|   85|
|  2|    Bob|   45|
|  3|Charlie|   60|
|  4|  David|   30|
+---+-------+-----+

+---+-------+-----------------------------------------------------------------------+
| ID|   Name|CASE WHEN (Score = 85) THEN 90 WHEN (Score = 45) THEN 50 ELSE Score END|
+---+-------+-----------------------------------------------------------------------+
|  1|  Alice|                                                                     90|
|  2|    Bob|                                                                     50|
|  3|Charlie|                                                                     60|
|  4|  David|                                                                     30|
+---+-------+-----------------------------------------------------------------------+



Here's the short version without the output:

```markdown
### Sorting the DataFrame

1. **Original DataFrame (Unsorted)**:
```python
df.show()
```

2. **Sort by 'id' in Ascending Order (Default)**:
```python
df.sort(df.id).show()  # Sorts from lowest to highest 'id'.(1,2,3)
```

3. **Sort by 'id' in Descending Order**:
```python
from pyspark.sql.functions import desc
df.sort(desc("id")).show()  # Sorts from highest to lowest 'id'.(3,2,1)
```


In [0]:
from pyspark.sql.functions import desc

#### SORTING THE DATA

# Define a list of tuples containing data (ID, Name, Salary)
data1 = [(3, "Sanket", "60000"), (2, "divya", "50000"), (1, "vicky", "20000")]

# Define the column names for the DataFrame
columns = ["id", "name", "salary"]

# Create a DataFrame `df` from the list of tuples `data1` using the specified column names `columns`
df = spark.createDataFrame(data=data1, schema=columns)

# Show the original DataFrame (unsorted)
df.show() 
# Sort the DataFrame based on the 'id' column in ascending order and show the result
df.sort(df.id).show() #when you use the sort(df.id) method, the default behavior is ascending (lowest to highest).

# Sort the DataFrame based on the 'id' column in descending order and show the result
df.sort(desc("id")).show() #( dec:- from highest to lowest).


+---+------+------+
| id|  name|salary|
+---+------+------+
|  3|Sanket| 60000|
|  2| divya| 50000|
|  1| vicky| 20000|
+---+------+------+

+---+------+------+
| id|  name|salary|
+---+------+------+
|  1| vicky| 20000|
|  2| divya| 50000|
|  3|Sanket| 60000|
+---+------+------+

+---+------+------+
| id|  name|salary|
+---+------+------+
|  3|Sanket| 60000|
|  2| divya| 50000|
|  1| vicky| 20000|
+---+------+------+



In [0]:
# Select 'id', 'name', and 'salary' columns from the original DataFrame and cast 'salary' to an integer type
df1 = df.select(df.id, df.name, df.salary.cast('int'))

# Print the schema of df1 to check the data types of its columns
print(df1.printSchema())

# Print the schema of df to check the data types of its columns before casting
print(df.printSchema())

# Show the data of df1 after the type casting operation
df1.show()

# Show the data of the original DataFrame df (before type casting)
df.show()


root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: integer (nullable = true)

None
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: string (nullable = true)

None
+---+------+------+
| id|  name|salary|
+---+------+------+
|  3|Sanket| 60000|
|  2| divya| 50000|
|  1| vicky| 20000|
+---+------+------+

+---+------+------+
| id|  name|salary|
+---+------+------+
|  3|Sanket| 60000|
|  2| divya| 50000|
|  1| vicky| 20000|
+---+------+------+



### Code Explanation:

1. **`df.select(df.id, df.name, df.salary.cast('int'))`**  
   - Selects the `id` and `name` columns, and casts the `salary` column to an **integer** type using `.cast('int')`.  
   - This operation does not modify the original `df` DataFrame but creates a new DataFrame (`df1`) where `salary` is of type integer.

2. **`df1.printSchema()`**  
   - Prints the schema of the DataFrame `df1`, showing the column names and their **data types** after the type cast operation.  
   - You will see that the `salary` column is now of type `int`.

3. **`df.printSchema()`**  
   - Prints the schema of the original DataFrame `df`, showing the column names and their **data types** before the type cast operation.  
   - The `salary` column will remain in its original data type (e.g., `string` or `float`).

4. **`df1.show()`**  
   - Displays the data in the `df1` DataFrame, where the `salary` column has been **cast to integers**.  
   - The other columns (`id` and `name`) remain unchanged.

5. **`df.show()`**  
   - Displays the data in the original `df` DataFrame, where the `salary` column remains in its **original format** (e.g., string or float, as it was before the cast).


# Filter and where

In [0]:
# Filter the DataFrame `df` to select only rows where the `name` column contains the letter 'a'.
# The `like('%a%')` function is used to perform a case-sensitive pattern match.
# The `%` symbols are wildcards that allow for any characters before or after the 'a'.
df.filter(df.name.like('%a%')).show()  # Display the filtered rows


+---+------+------+
| id|  name|salary|
+---+------+------+
|  3|Sanket| 60000|
|  2| divya| 50000|
+---+------+------+



In [0]:

df.where((df.salary == 50000) & ~df.name.like('%t%')).show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  2|divya| 50000|
+---+-----+------+



In [0]:
# Filter rows where salary is exactly 50000 and the `name` does not contain 't'.
df.where((df.salary == 50000) & ~(df.name.like('%t%'))).show()
#correct ouput divya in sanket contain  "t" and in vicky| 20000 sal is low they rejected
# and (&) required both condiation true
#  ~ (not operator)

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  2|divya| 50000|
+---+-----+------+



### `like('%t%')` Explanation

The `like('%t%')` expression is used to filter rows in PySpark where the string in a particular column contains the character `t`.

#### Breakdown:
- **`%` (Percent Sign)**: The percent sign is a wildcard in SQL-like queries that matches zero or more characters.
- **`t`**: This is the character you want to check within the column values.

#### Example:
- `df.name.like('%t%')` will match any row where the `name` column contains the letter 't' at any position in the string. For example, it would match 'Sanket', 'Vicky', 'Martha', etc., because 't' is present in those names.
  
#### Negation:
If you use negation (`~`), like `~(df.name.like('%t%'))`, it will **exclude** rows where the `name` contains 't'.

#### Example:

```python
# Include rows where the 'name' column contains 't'
df.where(df.name.like('%t%')).show()

# Exclude rows where the 'name' column contains 't'
df.where(~(df.name.like('%t%'))).show()


In [0]:

# Filter rows where salary is greater than or equal to 60000 and the `name` does not contain 'sanket'.
df.where((df.salary >= 60000) & ~(df.name.like('%Sanket%'))).show()
# and (&) required both condiation true
#  ~ (not operator)
df.where((df.salary >= 60000) & ~(df.name.like('%sanket%'))).show() # s is lower case in orignal data s is capital

+---+----+------+
| id|name|salary|
+---+----+------+
+---+----+------+

+---+------+------+
| id|  name|salary|
+---+------+------+
|  3|Sanket| 60000|
+---+------+------+



In [0]:
# Filter rows where salary is less than 50000 and the `name` does not contain 'san'.
df.where((df.salary < 50000) & ~(df.name.like('%san%'))).show() 
#The expression like('%san%') filters rows where the column contains the substring 'san' at any position in the string.
# and (&) required both condiation true
#  ~ (not operator)

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|vicky| 20000|
+---+-----+------+



In [0]:
# Filter rows where salary is greater than or equal to 50000 or the `name` does not contain 't'.
df.where((df.salary >= 50000) | ~(df.name.like('%t%'))).show()
# or (|) required only one condiation true
#  ~ (not operator)
#The expression `like('%t%')` filters rows where the column contains the letter 't' at any position in the string.

+---+------+------+
| id|  name|salary|
+---+------+------+
|  3|Sanket| 60000|
|  2| divya| 50000|
|  1| vicky| 20000|
+---+------+------+



Sure! Here's the explanation in Markdown format:

### Why "Sanket" is included:

For **"Sanket"**:
- **`df.salary >= 50000`**: The salary is 60000, so this condition **evaluates to true**.
- **`~(df.name.like('%t%'))`**: The name contains "t", so this condition **evaluates to false**.

Since the **OR (`|`)** operator requires **only one condition to be true**, the row is included in the result because the first condition (**salary >= 50000**) is true.

Let me know if you need further clarification!

In [0]:
# Filter rows where salary is greater than or equal to 50000 
# or where salary is less than 50000 and the name does not contain 't'.
df.where((df.salary >= 50000) | ((df.salary < 50000) & ~(df.name.like('%t%')))).show()
#The expression `like('%t%')` filters rows where the column contains the letter 't' at any position in the string.

+---+------+------+
| id|  name|salary|
+---+------+------+
|  3|Sanket| 60000|
|  2| divya| 50000|
|  1| vicky| 20000|
+---+------+------+



### Condition 1: `df.salary >= 50000`

- **Sanket** has a salary of 60000, which satisfies this condition because 60000 is greater than or equal to 50000.
- Since **Sanket** satisfies this condition, the row will be included regardless of the second condition.

### Condition 2: `((df.salary < 50000) & ~(df.name.like('%t%')))`

- This condition applies only when the salary is less than 50000. It also checks if the name **does not** contain the letter 't'.
- Since **Sanket**'s salary is 60000, this condition is **not evaluated** for him (because of the OR condition, as it is already included by the first condition).

### Key Point:
- **OR Condition**: The **OR (`|`)** operator means that if **either of the two conditions** is **true**, the row is included. Since **Sanket**'s salary is >= 50000, the row is included regardless of the second condition.

### Conclusion:
- **Sanket** is included because he meets the first condition (`df.salary >= 50000`), and the second condition is not needed in this case due to the OR operator.


In [0]:
# Filter rows where salary is greater than or equal to 50000 or the `name` does not contain 'divya'.
df.where((df.salary >= 50000) | ~(df.name.like('%divya%'))).show()
#here or operator use so it check 1st condition is true then not check 2nd condition so in ouput we get divya
# in case of vickey 1st condition false so check 2nd is true so vickey in output

+---+------+------+
| id|  name|salary|
+---+------+------+
|  3|Sanket| 60000|
|  2| divya| 50000|
|  1| vicky| 20000|
+---+------+------+



In [0]:
# Filter rows where salary is exactly 50000 and the `name` does not contain 't'.
df.where((df.salary == 50000) & ~(df.name.like('%t%'))).show() 

# Filter rows where salary is greater than or equal to 60000 and the `name` does not contain 'sanket'.
df.where((df.salary >= 60000) & ~(df.name.like('%sanket%'))).show()

# Filter rows where salary is less than 50000 and the `name` does not contain 'san'.
df.where((df.salary < 50000) & ~(df.name.like('%san%'))).show()

# Filter rows where salary is greater than or equal to 50000 or the `name` does not contain 't'.
df.where((df.salary >= 50000) | ~(df.name.like('%t%'))).show()

# Filter rows where salary is greater than or equal to 50000 or the `name` does not contain 'divya'.
df.where((df.salary >= 50000) | ~(df.name.like('%divya%'))).show()



+---+-----+------+
| id| name|salary|
+---+-----+------+
|  2|divya| 50000|
+---+-----+------+

+---+------+------+
| id|  name|salary|
+---+------+------+
|  3|Sanket| 60000|
+---+------+------+

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|vicky| 20000|
+---+-----+------+

+---+------+------+
| id|  name|salary|
+---+------+------+
|  3|Sanket| 60000|
|  2| divya| 50000|
|  1| vicky| 20000|
+---+------+------+

+---+------+------+
| id|  name|salary|
+---+------+------+
|  3|Sanket| 60000|
|  2| divya| 50000|
|  1| vicky| 20000|
+---+------+------+



In [0]:
df.where( (df.salary >50000) & ~(df.name.like('%v%'))).show()

+---+------+------+
| id|  name|salary|
+---+------+------+
|  3|Sanket| 60000|
+---+------+------+



In [0]:
df.dropDuplicates(['name']).show()


+---+------+------+
| id|  name|salary|
+---+------+------+
|  3|Sanket| 60000|
|  2| divya| 50000|
|  1| vicky| 20000|
+---+------+------+



In [0]:
# Importing the necessary PySpark libraries
import pyspark
from pyspark.sql import SparkSession

# Creating a SparkSession, which is the entry point for using Spark SQL
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

# Sample data as a list of tuples with employee information
simpleData = [(1,"James","Sales","NY",90000,34,10000), \
    (2,"Michael","Sales","NY",86000,56,20000), \
    (3,"Robert","Sales","CA",81000,30,23000), \
    (4,"Maria","Finance","CA",90000,24,23000) \
  ]

# Defining the column names for the DataFrame
columns = ["id","employee_name","department","state","salary","age","bonus"]

# Creating a DataFrame using the sample data and column names
df = spark.createDataFrame(data = simpleData, schema = columns)

# Printing the schema of the DataFrame to show the column types
df.printSchema()

# Displaying the DataFrame in a tabular format with no truncation
df.show(truncate=False)


root
 |-- id: long (nullable = true)
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+---+-------------+----------+-----+------+---+-----+
|id |employee_name|department|state|salary|age|bonus|
+---+-------------+----------+-----+------+---+-----+
|1  |James        |Sales     |NY   |90000 |34 |10000|
|2  |Michael      |Sales     |NY   |86000 |56 |20000|
|3  |Robert       |Sales     |CA   |81000 |30 |23000|
|4  |Maria        |Finance   |CA   |90000 |24 |23000|
+---+-------------+----------+-----+------+---+-----+



### Explanation:

#### **Import Libraries:**

- **`import pyspark`**: Import the `pyspark` module to interact with Spark.
- **`from pyspark.sql import SparkSession`**: Import `SparkSession`, which provides the entry point for using Spark SQL.

#### **Creating a SparkSession:**

- **`spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()`**: This creates a `SparkSession` with the application name `'SparkByExamples.com'`. The `getOrCreate()` method either retrieves an existing `SparkSession` or creates a new one.

#### **Sample Data:**

- **`simpleData`**: A list of tuples containing employee data. Each tuple represents a row in the DataFrame.
  - The columns in the data are:
    - **`id`**: Employee ID
    - **`employee_name`**: Name of the employee
    - **`department`**: Department the employee works in
    - **`state`**: Location of the employee's office
    - **`salary`**: Employee's salary
    - **`age`**: Age of the employee
    - **`bonus`**: Bonus of the employee

#### **Defining Column Names:**

- **`columns`**: A list that defines the column names corresponding to the data.

#### **Creating a DataFrame:**

- **`df = spark.createDataFrame(data = simpleData, schema = columns)`**: This creates a PySpark DataFrame from the provided data (`simpleData`) and schema (`columns`).

#### **Printing Schema:**

- **`df.printSchema()`**: This method prints the schema of the DataFrame, showing the column names and their data types.

#### **Displaying Data:**

- **`df.show(truncate=True)`**: This displays the contents of the DataFrame in a tabular format, but with truncation enabled. If any column contains long text or values, it will show only the first 20 characters. This is useful to prevent data from being cut off in case of long strings.

### Key Points:

- **`show(truncate=True)`**: This limits the display of each column value to 20 characters by default. If you want to see the entire content of each column, you can set `truncate=False`.
  
- **`printSchema()`**: Helps in understanding the structure of the DataFrame by showing column names and their respective data types.


In [0]:
# Create DataFrame2
# Define a list of tuples containing employee data for the second DataFrame
simpleData2 = [(1,"James","Sales","NY",90000,34,10000), \
    (2,"Maria","Finance","CA",90000,24,23000), \
    (3,"Jen","Finance","NY",79000,53,15000), \
    (4,"Jeff","Marketing","CA",80000,25,18000), \
    (5,"Kumar","Marketing","NY",91000,50,21000) \
  ]

# Define column names corresponding to the data
columns2 = ["id", "employee_name", "department", "state", "salary", "age", "bonus"]

# Create the second DataFrame (df2) using the provided data and schema
df2 = spark.createDataFrame(data = simpleData2, schema = columns2)

# Print the schema of the DataFrame (shows the column names and their data types)
df2.printSchema()

# Show the contents of the DataFrame without truncating column values
df2.show(truncate=False)


root
 |-- id: long (nullable = true)
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+---+-------------+----------+-----+------+---+-----+
|id |employee_name|department|state|salary|age|bonus|
+---+-------------+----------+-----+------+---+-----+
|1  |James        |Sales     |NY   |90000 |34 |10000|
|2  |Maria        |Finance   |CA   |90000 |24 |23000|
|3  |Jen          |Finance   |NY   |79000 |53 |15000|
|4  |Jeff         |Marketing |CA   |80000 |25 |18000|
|5  |Kumar        |Marketing |NY   |91000 |50 |21000|
+---+-------------+----------+-----+------+---+-----+



In [0]:
# union() to merge two DataFrames
unionDF = df.union(df2)
unionDF.show(truncate=False)


+---+-------------+----------+-----+------+---+-----+
|id |employee_name|department|state|salary|age|bonus|
+---+-------------+----------+-----+------+---+-----+
|1  |James        |Sales     |NY   |90000 |34 |10000|
|2  |Michael      |Sales     |NY   |86000 |56 |20000|
|3  |Robert       |Sales     |CA   |81000 |30 |23000|
|4  |Maria        |Finance   |CA   |90000 |24 |23000|
|1  |James        |Sales     |NY   |90000 |34 |10000|
|2  |Maria        |Finance   |CA   |90000 |24 |23000|
|3  |Jen          |Finance   |NY   |79000 |53 |15000|
|4  |Jeff         |Marketing |CA   |80000 |25 |18000|
|5  |Kumar        |Marketing |NY   |91000 |50 |21000|
+---+-------------+----------+-----+------+---+-----+



In [0]:
# unionAll() to merge two DataFrames
unionAllDF = df.unionAll(df2)
unionAllDF.show(truncate=False)

+---+-------------+----------+-----+------+---+-----+
|id |employee_name|department|state|salary|age|bonus|
+---+-------------+----------+-----+------+---+-----+
|1  |James        |Sales     |NY   |90000 |34 |10000|
|2  |Michael      |Sales     |NY   |86000 |56 |20000|
|3  |Robert       |Sales     |CA   |81000 |30 |23000|
|4  |Maria        |Finance   |CA   |90000 |24 |23000|
|1  |James        |Sales     |NY   |90000 |34 |10000|
|2  |Maria        |Finance   |CA   |90000 |24 |23000|
|3  |Jen          |Finance   |NY   |79000 |53 |15000|
|4  |Jeff         |Marketing |CA   |80000 |25 |18000|
|5  |Kumar        |Marketing |NY   |91000 |50 |21000|
+---+-------------+----------+-----+------+---+-----+



### **Difference Between `UNION` and `UNION ALL` in PySpark**

---

### 📌 **1. Definition**  
- **`UNION`**:  
  Combines the result sets of two DataFrames and removes any duplicate rows, returning only distinct records.  
- **`UNION ALL`**:  
  Combines the result sets of two DataFrames **without** removing duplicates, meaning all records (including duplicates) are returned.

---

### 🛠️ **2. Syntax in PySpark**  
```python
# UNION - Removes duplicates
df_union = df1.union(df2).distinct()

# UNION ALL - Keeps duplicates
df_union_all = df1.union(df2)
```

---

### ⚡ **3. Key Differences**

| Feature            | `UNION`                         | `UNION ALL`                      |
|--------------------|---------------------------------|-----------------------------------|
| **Duplicates**     | Removes duplicates              | Includes duplicates               |
| **Performance**    | Slower (due to duplicate check) | Faster (no duplicate check)       |
| **Usage Scenario** | When unique records are needed  | When all records (including duplicates) are needed |
| **Memory Usage**   | Higher (due to extra processing)| Lower (no extra processing)       |

---

### 🎯 **4. Example**

#### 📋 **DataFrames Example**  
```python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("UnionExample").getOrCreate()

data1 = [(1, "James"), (2, "Maria")]
data2 = [(2, "Maria"), (3, "Jen")]

columns = ["id", "name"]

df1 = spark.createDataFrame(data1, columns)
df2 = spark.createDataFrame(data2, columns)

# UNION (removes duplicates)
df_union = df1.union(df2).distinct()
df_union.show()

# UNION ALL (includes duplicates)
df_union_all = df1.union(df2)
df_union_all.show()
```

#### 📊 **Output**

- **UNION** (Distinct results):
```
+---+-----+
| id| name|
+---+-----+
|  1|James|
|  2|Maria|
|  3|  Jen|
+---+-----+
```

- **UNION ALL** (All results including duplicates):
```
+---+-----+
| id| name|
+---+-----+
|  1|James|
|  2|Maria|
|  2|Maria|
|  3|  Jen|
+---+-----+
```

---

### 💡 **5. Performance Considerations**  
- Use **`UNION ALL`** when duplicates are acceptable to gain better performance.  
- Use **`UNION`** when you need a unique set of records but be aware it may affect performance due to the distinct operation.

---

### 📝 **6. Summary**  
- ✅ **`UNION`** ensures unique results but can be slower.  
- 🚀 **`UNION ALL`** performs faster but includes duplicates.  
- 🎯 Choose based on the **data integrity** requirements and **performance** needs of your application.

In [0]:
in_join = df.join(df2,df.id == df2.id, 'inner')
display(in_join)

id,employee_name,department,state,salary,age,bonus,id.1,employee_name.1,department.1,state.1,salary.1,age.1,bonus.1
1,James,Sales,NY,90000,34,10000,1,James,Sales,NY,90000,34,10000
2,Michael,Sales,NY,86000,56,20000,2,Maria,Finance,CA,90000,24,23000
3,Robert,Sales,CA,81000,30,23000,3,Jen,Finance,NY,79000,53,15000
4,Maria,Finance,CA,90000,24,23000,4,Jeff,Marketing,CA,80000,25,18000


In [0]:
r_join = df.join(df2,df.id == df2.id, 'right')
display(r_join)

id,employee_name,department,state,salary,age,bonus,id.1,employee_name.1,department.1,state.1,salary.1,age.1,bonus.1
1.0,James,Sales,NY,90000.0,34.0,10000.0,1,James,Sales,NY,90000,34,10000
2.0,Michael,Sales,NY,86000.0,56.0,20000.0,2,Maria,Finance,CA,90000,24,23000
3.0,Robert,Sales,CA,81000.0,30.0,23000.0,3,Jen,Finance,NY,79000,53,15000
4.0,Maria,Finance,CA,90000.0,24.0,23000.0,4,Jeff,Marketing,CA,80000,25,18000
,,,,,,,5,Kumar,Marketing,NY,91000,50,21000


In [0]:
l_join = df.join(df2,df.id == df2.id, 'left')
display(l_join)

id,employee_name,department,state,salary,age,bonus,id.1,employee_name.1,department.1,state.1,salary.1,age.1,bonus.1
1,James,Sales,NY,90000,34,10000,1,James,Sales,NY,90000,34,10000
2,Michael,Sales,NY,86000,56,20000,2,Maria,Finance,CA,90000,24,23000
3,Robert,Sales,CA,81000,30,23000,3,Jen,Finance,NY,79000,53,15000
4,Maria,Finance,CA,90000,24,23000,4,Jeff,Marketing,CA,80000,25,18000


In [0]:
f_join = df.join(df2,df.id == df2.id, 'full')
display(f_join)

id,employee_name,department,state,salary,age,bonus,id.1,employee_name.1,department.1,state.1,salary.1,age.1,bonus.1
1.0,James,Sales,NY,90000.0,34.0,10000.0,1,James,Sales,NY,90000,34,10000
2.0,Michael,Sales,NY,86000.0,56.0,20000.0,2,Maria,Finance,CA,90000,24,23000
3.0,Robert,Sales,CA,81000.0,30.0,23000.0,3,Jen,Finance,NY,79000,53,15000
4.0,Maria,Finance,CA,90000.0,24.0,23000.0,4,Jeff,Marketing,CA,80000,25,18000
,,,,,,,5,Kumar,Marketing,NY,91000,50,21000


# PySpark Joins Explained (With Real-World Banking Examples)

## 1. **Inner Join**
### Description:
Returns rows that have matching values in both DataFrames.

### Real-World Example:
*Finding customers with loan details available in both customer and loan DataFrames.*

### Code Example:
```python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySpark Joins").getOrCreate()

data_customers = [(1, "James"), (2, "Maria"), (3, "Jen")]
data_loans = [(1, "Personal Loan"), (2, "Home Loan"), (4, "Car Loan")]

columns_customers = ["id", "name"]
columns_loans = ["id", "loan_type"]

df_customers = spark.createDataFrame(data_customers, columns_customers)
df_loans = spark.createDataFrame(data_loans, columns_loans)

inner_join_df = df_customers.join(df_loans, on="id", how="inner")
inner_join_df.show()
```

### Output:
```
+---+-----+-------------+
| id| name|    loan_type|
+---+-----+-------------+
|  1|James|Personal Loan|
|  2|Maria|   Home Loan |
+---+-----+-------------+
```

### Explanation:
- Only customers who have loan records in the loan DataFrame are returned.

---

## 2. **Left Outer Join**
### Description:
Returns all records from the left DataFrame, with matched records from the right DataFrame. Nulls for non-matches.

### Real-World Example:
*List all customers with their loan details, showing null where loans are not available.*

### Code Example:
```python
left_outer_join_df = df_customers.join(df_loans, on="id", how="left")
left_outer_join_df.show()
```

### Output:
```
+---+-----+-------------+
| id| name|    loan_type|
+---+-----+-------------+
|  1|James|Personal Loan|
|  2|Maria|   Home Loan |
|  3|  Jen|         null |
+---+-----+-------------+
```

### Explanation:
- Shows all customers even if they don’t have loan records.

---

## 3. **Right Outer Join**
### Description:
Returns all records from the right DataFrame, with matched records from the left DataFrame. Nulls for non-matches.

### Real-World Example:
*Display all loan types, including those with no associated customer.*

### Code Example:
```python
right_outer_join_df = df_customers.join(df_loans, on="id", how="right")
right_outer_join_df.show()
```

### Output:
```
+---+-----+-------------+
| id| name|    loan_type|
+---+-----+-------------+
|  1|James|Personal Loan|
|  2|Maria|   Home Loan |
|  4| null|     Car Loan|
+---+-----+-------------+
```

### Explanation:
- Displays all loans, even those not assigned to any customer.

---

## 4. **Full Outer Join**
### Description:
Returns all records when there’s a match in either DataFrame, filling nulls where no match exists.

### Real-World Example:
*Combine all customer and loan records to show full data availability.*

### Code Example:
```python
full_outer_join_df = df_customers.join(df_loans, on="id", how="outer")
full_outer_join_df.show()
```

### Output:
```
+---+-----+-------------+
| id| name|    loan_type|
+---+-----+-------------+
|  1|James|Personal Loan|
|  2|Maria|   Home Loan |
|  3|  Jen|         null |
|  4| null|     Car Loan|
+---+-----+-------------+
```

### Explanation:
- Combines the complete data from both DataFrames.

---

## 5. **Left Semi Join**
### Description:
Returns only rows from the left DataFrame with a match in the right DataFrame.

### Real-World Example:
*Find customers who have at least one loan.*

### Code Example:
```python
left_semi_join_df = df_customers.join(df_loans, on="id", how="left_semi")
left_semi_join_df.show()
```

### Output:
```
+---+-----+
| id| name|
+---+-----+
|  1|James|
|  2|Maria|
+---+-----+
```

### Explanation:
- Only returns customers who have loan entries.

---

## 6. **Left Anti Join**
### Description:
Returns only rows from the left DataFrame with no match in the right DataFrame.

### Real-World Example:
*Find customers who do not have any loans.*

### Code Example:
```python
left_anti_join_df = df_customers.join(df_loans, on="id", how="left_anti")
left_anti_join_df.show()
```

### Output:
```
+---+----+
| id|name|
+---+----+
|  3| Jen|
+---+----+
```

### Explanation:
- Shows customers who have no associated loans.

---

## 7. **Cross Join**
### Description:
Returns the Cartesian product of both DataFrames (all combinations).

### Real-World Example:
*Generate all possible combinations of customers and loan types for offer planning.*

### Code Example:
```python
cross_join_df = df_customers.crossJoin(df_loans)
cross_join_df.show()
```

### Output:
```
+---+-----+---+-------------+
| id| name| id|    loan_type|
+---+-----+---+-------------+
|  1|James|  1|Personal Loan|
|  1|James|  2|   Home Loan |
|  1|James|  4|     Car Loan|
|  2|Maria|  1|Personal Loan|
|  2|Maria|  2|   Home Loan |
|  2|Maria|  4|     Car Loan|
|  3|  Jen|  1|Personal Loan|
|  3|  Jen|  2|   Home Loan |
|  3|  Jen|  4|     Car Loan|
+---+-----+---+-------------+
```

### Explanation:
- Generates all possible customer-loan pairings.

---

## 8. **Broadcast Join**
### Description:
An optimized join where the smaller DataFrame is broadcast to all nodes, reducing shuffle operations.

### Real-World Example:
*Quickly join a large transaction DataFrame with a small loan type DataFrame for efficient querying.*

### Code Example:
```python
from pyspark.sql.functions import broadcast

broadcast_join_df = df_customers.join(broadcast(df_loans), on="id", how="inner")
broadcast_join_df.show()
```

### Output:
```
+---+-----+-------------+
| id| name|    loan_type|
+---+-----+-------------+
|  1|James|Personal Loan|
|  2|Maria|   Home Loan |
+---+-----+-------------+
```

### Explanation:
- `broadcast()` ensures the smaller DataFrame is available on all nodes, improving join performance.

---

## Key Takeaways:
- **Inner Join:** Match records in both DataFrames.
- **Left/Right Outer Joins:** Keep all records from one DataFrame with matched data from the other.
- **Full Outer Join:** All records from both DataFrames with nulls for missing matches.
- **Left Semi Join:** Only left DataFrame records with a match.
- **Left Anti Join:** Left DataFrame records without a match.
- **Cross Join:** All possible row combinations (Cartesian product).
- **Broadcast Join:** Efficient for joining large DataFrames with small ones, reducing shuffle.

These join strategies, paired with real-world banking examples, enable flexible and efficient data analysis using PySpark, ensuring optimized performance in distributed environments.



### 📖 **All Joins in PySpark - Detailed Explanation**

---

### 🔗 **1. Inner Join**  
- **Definition**: Returns records that have matching values in both DataFrames based on the given condition.  
- **Syntax**:  
  ```python
  df1.join(df2, df1.id == df2.id, "inner")
  ```  
- **Example**:
  ```python
  +---+-----+---+-----+
  | id| name| id|dept |
  +---+-----+---+-----+
  |  1|John |  1|Sales|
  |  2|Mike |  2|HR   |
  +---+-----+---+-----+
  ```
- **Key Point**: Only matching rows from both DataFrames are returned.

---

### 🌐 **2. Left Outer Join (Left Join)**  
- **Definition**: Returns all rows from the **left** DataFrame and matching rows from the right DataFrame. Missing matches return `null`.  
- **Syntax**:  
  ```python
  df1.join(df2, df1.id == df2.id, "left")
  ```  
- **Example**:
  ```python
  +---+-----+-----+
  | id| name|dept |
  +---+-----+-----+
  |  1|John |Sales|
  |  2|Mike |HR   |
  |  3|Sara |null |
  +---+-----+-----+
  ```
- **Key Point**: All records from the left DataFrame are preserved.

---

### 🌍 **3. Right Outer Join (Right Join)**  
- **Definition**: Returns all rows from the **right** DataFrame and matching rows from the left DataFrame.  
- **Syntax**:  
  ```python
  df1.join(df2, df1.id == df2.id, "right")
  ```  
- **Example**:
  ```python
  +---+-----+-----+
  | id| name|dept |
  +---+-----+-----+
  |  1|John |Sales|
  |  2|Mike |HR   |
  |  4|null |Admin|
  +---+-----+-----+
  ```
- **Key Point**: All records from the right DataFrame are included.

---

### 🌟 **4. Full Outer Join (Full Join)**  
- **Definition**: Returns all rows from both DataFrames. If there is no match, returns `null` on the side without a match.  
- **Syntax**:  
  ```python
  df1.join(df2, df1.id == df2.id, "outer")
  ```  
- **Example**:
  ```python
  +---+-----+-----+
  | id| name|dept |
  +---+-----+-----+
  |  1|John |Sales|
  |  2|Mike |HR   |
  |  3|Sara |null |
  |  4|null |Admin|
  +---+-----+-----+
  ```
- **Key Point**: Combines the effects of both left and right joins.

---

### ⚡ **5. Left Semi Join**  
- **Definition**: Returns rows from the **left** DataFrame where a match exists in the right DataFrame. Only left DataFrame columns are returned.  
- **Syntax**:  
  ```python
  df1.join(df2, df1.id == df2.id, "left_semi")
  ```  
- **Example**:
  ```python
  +---+-----+
  | id| name|
  +---+-----+
  |  1|John |
  |  2|Mike |
  +---+-----+
  ```
- **Key Point**: Similar to an inner join but returns only columns from the left DataFrame.

---

### 💡 **6. Left Anti Join**  
- **Definition**: Returns only the rows from the **left** DataFrame that **do not** have a match in the right DataFrame.  
- **Syntax**:  
  ```python
  df1.join(df2, df1.id == df2.id, "left_anti")
  ```  
- **Example**:
  ```python
  +---+-----+
  | id| name|
  +---+-----+
  |  3|Sara |
  +---+-----+
  ```
- **Key Point**: Opposite of the left semi join.

---

### 🎯 **7. Cross Join (Cartesian Product)**  
- **Definition**: Returns the Cartesian product of both DataFrames. Every row of the first DataFrame is joined with every row of the second DataFrame.  
- **Syntax**:  
  ```python
  df1.crossJoin(df2).show()
  ```  
- **Example**:
  ```python
  +---+-----+---+-----+
  | id| name| id|dept |
  +---+-----+---+-----+
  |  1|John |  1|Sales|
  |  1|John |  2|HR   |
  |  2|Mike |  1|Sales|
  |  2|Mike |  2|HR   |
  |  3|Sara |  1|Sales|
  |  3|Sara |  2|HR   |
  +---+-----+---+-----+
  ```
- **Key Point**: Be cautious—can produce a large number of rows.

---

### 📝 **8. Self Join**  
- **Definition**: A join where a DataFrame is joined with itself. Useful for comparing rows within the same DataFrame.  
- **Syntax**:  
  ```python
  df.alias("a").join(df.alias("b"), col("a.id") == col("b.manager_id"), "inner")
  ```  
- **Key Point**: Use aliases to differentiate the same DataFrame when joining.

---

### 📊 **9. Natural Join**  
- **Definition**: Joins two DataFrames based on all columns with the same name.  
- **Syntax**:  
  ```python
  df1.join(df2, on=["id"], how="inner")
  ```
- **Key Point**: Can lead to unexpected results if not carefully managed.

---

### 🏃 **Performance Considerations**
- **Broadcast Join**: Use `.hint("broadcast")` when one DataFrame is small enough to fit in memory to optimize join performance.  
  ```python
  df1.join(broadcast(df2), "id").show()
  ```
- **Partitioning**: Ensure data is partitioned optimally for large-scale joins.

---

### 📚 **Summary Table**

| Join Type         | Description                                    | Missing Data Handling  | Performance Impact |
|-------------------|------------------------------------------------|------------------------|--------------------|
| **Inner Join**    | Matching rows from both DataFrames             | Excluded               | Efficient          |
| **Left Join**     | All rows from left, matching from right         | Nulls on right         | Moderate           |
| **Right Join**    | All rows from right, matching from left         | Nulls on left          | Moderate           |
| **Full Join**     | All rows from both DataFrames                  | Nulls on both sides    | High               |
| **Left Semi Join**| Matching rows from left only                   | Excluded               | Efficient          |
| **Left Anti Join**| Non-matching rows from left only               | Excluded               | Efficient          |
| **Cross Join**    | Cartesian product of both DataFrames           | None                   | Very High          |
| **Self Join**     | Join DataFrame with itself                     | N/A                    | Varies             |
| **Natural Join**  | Auto join on columns with same names           | Nulls on missing sides | Moderate           |

---

### 🎉 **Conclusion**  
- Choose **`inner`** for intersection, **`left`**/**`right`** for preservation, and **`outer`** for complete combinations.  
- Use **semi**/**anti** joins for filtering scenarios.  
- Optimize performance with **broadcast joins** when dealing with small datasets.  
- Always consider the **data size** and **join condition** for efficient joins in PySpark.