####Question 1 (Complex Aggregation)
##### Given a DataFrame of employee details with their salaries, find the average salary for each department and the count of employees in each department.

**Input data** 
```
+----+-------+---------+------+
| ID | Name  | Dept    |Salary|
+----+-------+---------+------+
| 1  | John  | IT      | 5000 |
| 2  | Alice | HR      | 6000 |
| 3  | Bob   | IT      | 7000 |
| 4  | David | HR      | 5500 |
| 5  | Eve   | IT      | 6500 |
+----+-------+---------+------+

```
**Output data**

```
+---------+-------------+----------------+
| Dept    | Avg_Salary  | Employee_Count |
+---------+-------------+----------------+
| HR      | 5750.0      | 2              |
| IT      | 6166.67     | 3              |
+---------+-------------+----------------+

```

In [0]:
from pyspark.sql.functions import avg,count,round 
data = [
    {'ID': 1, 'Name': 'John', 'Dept': 'IT', 'Salary': 5000},
    {'ID': 2, 'Name': 'Alice', 'Dept': 'HR', 'Salary': 6000},
    {'ID': 3, 'Name': 'Bob', 'Dept': 'IT', 'Salary': 7000},
    {'ID': 4, 'Name': 'David', 'Dept': 'HR', 'Salary': 5500},
    {'ID': 5, 'Name': 'Eve', 'Dept': 'IT', 'Salary': 6500}
]

df = spark.createDataFrame(data)


final_df = df.groupBy('Dept').agg(round(avg('Salary'),2).alias('Avg_Salary'),count('Name').alias('Employee_count'))

display(final_df)

Dept,Avg_Salary,Employee_count
IT,6166.67,3
HR,5750.0,2


####Question 2 (Window Functions with Partitioning and Ordering)
##### Given a DataFrame of sales transactions, calculate the running total of sales for each region, ordered by the transaction date.

**Input data** 
```
+--------+-------+-------+------------+
| Product| Region| Sales | Date       |
+--------+-------+-------+------------+
| A      | North | 100   | 2024-01-01 |
| A      | North | 200   | 2024-01-02 |
| B      | South | 150   | 2024-01-01 |
| B      | South | 100   | 2024-01-03 |
| C      | North | 300   | 2024-01-02 |
| C      | South | 400   | 2024-01-04 |
+--------+-------+-------+------------+


```
**Output data**

```
+--------+-------+-------+------------+-------------+
| Product| Region| Sales | Date       | Running_Sum |
+--------+-------+-------+------------+-------------+
| A      | North | 100   | 2024-01-01 | 100         |
| A      | North | 200   | 2024-01-02 | 300         |
| C      | North | 300   | 2024-01-02 | 600         |
| B      | South | 150   | 2024-01-01 | 150         |
| B      | South | 100   | 2024-01-03 | 250         |
| C      | South | 400   | 2024-01-04 | 650         |
+--------+-------+-------+------------+-------------+


```

In [0]:
from pyspark.sql.functions import col,sum as fsum 
from pyspark.sql.window import Window
data = [
    ( 'A', 'North', 100, '2024-01-01'),
    ( 'A', 'North', 200, '2024-01-02'),
    ( 'B', 'South', 150, '2024-01-01'),
    ( 'B', 'South', 100, '2024-01-03'),
    ( 'C', 'North', 300, '2024-01-02'),
    ( 'C', 'South', 400, '2024-01-04')
]
columns = ['Product','Region','Sales','Date']

df = spark.createDataFrame(data,schema=columns)

final_df = df.withColumn('Running_Sum',fsum(col('Sales')).over(Window.partitionBy('Region').orderBy('Date').rowsBetween(Window.unboundedPreceding,Window.currentRow)))

display(final_df)

Product,Region,Sales,Date,Running_Sum
A,North,100,2024-01-01,100
A,North,200,2024-01-02,300
C,North,300,2024-01-02,600
B,South,150,2024-01-01,150
B,South,100,2024-01-03,250
C,South,400,2024-01-04,650


####Question 3 (Complex Joins)
##### Given two DataFrames, one with employee details and another with department details, perform a left join to include all employees, even if they don’t have a corresponding department.

**Input data (Employee Dataframe)** 
```
+----+-------+---------+
| ID | Name  | Dept_ID |
+----+-------+---------+
| 1  | John  | 101     |
| 2  | Alice | 102     |
| 3  | Bob   | null    |
| 4  | David | 104     |
+----+-------+---------+


```
**Input data (Department DataFrame)** 
```
+--------+------------+
| Dept_ID| Dept_Name  |
+--------+------------+
| 101    | IT         |
| 102    | HR         |
| 103    | Sales      |
+--------+------------+


```
**Output data**

```
+----+-------+---------+------------+
| ID | Name  | Dept_ID | Dept_Name  |
+----+-------+---------+------------+
| 1  | John  | 101     | IT         |
| 2  | Alice | 102     | HR         |
| 3  | Bob   | null    | null       |
| 4  | David | 104     | null       |
+----+-------+---------+------------+



```

In [0]:
employee_data = [
    {'ID': 1, 'Name': 'John', 'Dept_ID': 101},
    {'ID': 2, 'Name': 'Alice', 'Dept_ID': 102},
    {'ID': 3, 'Name': 'Bob', 'Dept_ID': None},
    {'ID': 4, 'Name': 'David', 'Dept_ID': 104}
]

department_data = [
    {'Dept_ID': 101, 'Dept_Name': 'IT'},
    {'Dept_ID': 102, 'Dept_Name': 'HR'},
    {'Dept_ID': 103, 'Dept_Name': 'Sales'}
]

employee_df = spark.createDataFrame(employee_data)

department_df = spark.createDataFrame(department_data)

final_df = employee_df.join(department_df,on = 'Dept_ID', how='left')

display(final_df.select('ID','Name','Dept_ID','Dept_Name'))



ID,Name,Dept_ID,Dept_Name
1,John,101.0,IT
2,Alice,102.0,HR
3,Bob,,
4,David,104.0,


####Question 4 (DataFrame UDF (User Defined Functions))
##### Given a DataFrame of student names, write a UDF to convert their names to uppercase.

**Input data** 
```
+----+--------+
| ID | Name   |
+----+--------+
| 1  | John   |
| 2  | Alice  |
| 3  | Bob    |
| 4  | David  |
+----+--------+

```
**Output data**

```
+----+--------+
| ID | Name   |
+----+--------+
| 1  | JOHN   |
| 2  | ALICE  |
| 3  | BOB    |
| 4  | DAVID  |
+----+--------+
```

In [0]:
data = [
    {'ID': 1, 'Name': 'John'},
    {'ID': 2, 'Name': 'Alice'},
    {'ID': 3, 'Name': 'Bob'},
    {'ID': 4, 'Name': 'David'}
]


####Question 5 (Handling Nested Data)
##### Given a DataFrame with nested JSON data, flatten the nested structure.

**Input data** 
```
+----+---------------------------+
| ID | Info                      |
+----+---------------------------+
| 1  | {"Name":"John", "Age":28} |
| 2  | {"Name":"Alice", "Age":34}|
| 3  | {"Name":"Bob", "Age":32}  |
| 4  | {"Name":"David", "Age":25}|
+----+---------------------------+


```
**Output data**

```
+----+------+-----+
| ID | Name | Age |
+----+------+-----+
| 1  | John | 28  |
| 2  | Alice| 34  |
| 3  | Bob  | 32  |
| 4  | David| 25  |
+----+------+-----+

```

In [0]:
data = [
    {'ID': 1, 'Info': {'Name': 'John', 'Age': 28}},
    {'ID': 2, 'Info': {'Name': 'Alice', 'Age': 34}},
    {'ID': 3, 'Info': {'Name': 'Bob', 'Age': 32}},
    {'ID': 4, 'Info': {'Name': 'David', 'Age': 25}}
]

####Question 6 (Recursive Data Processing)
##### Given a DataFrame representing a hierarchy of employees, compute the total salary for each manager, including the salaries of their direct and indirect reports.

**Input data** 
```
+----+-------+--------+------+
| ID | Name  | Manager|Salary|
+----+-------+--------+------+
| 1  | John  | null   | 1000 |
| 2  | Alice | 1      | 800  |
| 3  | Bob   | 1      | 600  |
| 4  | David | 2      | 400  |
| 5  | Eve   | 3      | 500  |
+----+-------+--------+------+


```
**Output data**

```
+-------+---------------+
| Name  | Total_Salary  |
+-------+---------------+
| John  | 3300          |
| Alice | 1200          |
| Bob   | 1100          |
| David | 400           |
| Eve   | 500           |
+-------+---------------+

```

In [0]:
data = [
    {'ID': 1, 'Name': 'John', 'Manager': None, 'Salary': 1000},
    {'ID': 2, 'Name': 'Alice', 'Manager': 1, 'Salary': 800},
    {'ID': 3, 'Name': 'Bob', 'Manager': 1, 'Salary': 600},
    {'ID': 4, 'Name': 'David', 'Manager': 2, 'Salary': 400},
    {'ID': 5, 'Name': 'Eve', 'Manager': 3, 'Salary': 500}
]

####Question 7 (Advanced Window Functions)
##### Given a DataFrame of stock prices, calculate the moving average for each stock over a window of 3 days.

**Input data** 
```
+--------+---------+----------+------+
| Date   | Stock   | Price    | Day  |
+--------+---------+----------+------+
| 2024-01-01 | AAPL    | 150    | 1   |
| 2024-01-02 | AAPL    | 155     | 2  |
| 2024-01-03 | AAPL    | 160     | 3  |
| 2024-01-04 | AAPL    | 165     | 4  |
| 2024-01-05 | AAPL    | 170     | 5  |
| 2024-01-01 | GOOG    | 1200    | 1  |
| 2024-01-02 | GOOG    | 1220    | 2  |
| 2024-01-03 | GOOG    | 1250    | 3  |
| 2024-01-04 | GOOG    | 1300    | 4  |
| 2024-01-05 | GOOG    | 1350    | 5  |
+--------+---------+----------+------+


```
**Output data**

```
+--------+---------+----------+-------------+
| Date   | Stock   | Price    | Moving_Avg  |
+--------+---------+----------+-------------+
| 2024-01-03 | AAPL    | 160      | 155.0   |
| 2024-01-04 | AAPL    | 165      | 160.0   |
| 2024-01-05 | AAPL    | 170      | 165.0   |
| 2024-01-03 | GOOG    | 1250     | 1230.0  |
| 2024-01-04 | GOOG    | 1300     | 1256.67 |
| 2024-01-05 | GOOG    | 1350     | 1300.0  |
+--------+---------+----------+-------------+

```

In [0]:
data = [
    {'Date': '2024-01-01', 'Stock': 'AAPL', 'Price': 150, 'Day': 1},
    {'Date': '2024-01-02', 'Stock': 'AAPL', 'Price': 155, 'Day': 2},
    {'Date': '2024-01-03', 'Stock': 'AAPL', 'Price': 160, 'Day': 3},
    {'Date': '2024-01-04', 'Stock': 'AAPL', 'Price': 165, 'Day': 4},
    {'Date': '2024-01-05', 'Stock': 'AAPL', 'Price': 170, 'Day': 5},
    {'Date': '2024-01-01', 'Stock': 'GOOG', 'Price': 1200, 'Day': 1},
    {'Date': '2024-01-02', 'Stock': 'GOOG', 'Price': 1220, 'Day': 2},
    {'Date': '2024-01-03', 'Stock': 'GOOG', 'Price': 1250, 'Day': 3},
    {'Date': '2024-01-04', 'Stock': 'GOOG', 'Price': 1300, 'Day': 4},
    {'Date': '2024-01-05', 'Stock': 'GOOG', 'Price': 1350, 'Day': 5}
]

####Question 8 (Handling Skewed Data with Salting)
##### Given a highly skewed DataFrame with many transactions for certain products, use salting to repartition the data and balance the load across partitions.

**Input data** 
```
+--------+-------+------+
| Product| Region| Sales|
+--------+-------+------+
| A      | North | 100  |
| A      | North | 200  |
| A      | North | 150  |
| B      | South | 300  |
| C      | North | 400  |
| C      | South | 500  |
| C      | South | 600  |
+--------+-------+------+

```
**Expected Output: (The output is repartitioned such that the data distribution across partitions is balanced)**



In [0]:
data = [
    {'Product': 'A', 'Region': 'North', 'Sales': 100},
    {'Product': 'A', 'Region': 'North', 'Sales': 200},
    {'Product': 'A', 'Region': 'North', 'Sales': 150},
    {'Product': 'B', 'Region': 'South', 'Sales': 300},
    {'Product': 'C', 'Region': 'North', 'Sales': 400},
    {'Product': 'C', 'Region': 'South', 'Sales': 500},
    {'Product': 'C', 'Region': 'South', 'Sales': 600}
]