Scenario: You have a DataFrame with columns user_id, product_id, and purchase_date.
Question: Write PySpark code to find the count of distinct users who made purchases.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("practice").getOrCreate()

schema = StructType([
    StructField("user_id", StringType()),
    StructField("product_id", StringType()),
    StructField("purchase_date", DateType())
])
path = "dbfs:/raw/data/products.csv"
df = spark.read.format("csv").schema(schema).option("header", "True").load(path)

distinct_users = df.select("user_id").distinct().count()
print(distinct_users)

Scenario: You have a DataFrame with user transactions:
python
# +-------+----------+--------+
# |user_id|product_id| amount |
# +-------+----------+--------+
# |  101  |   501    |  50.0  |
# |  102  |   502    |  75.0  |
# |  101  |   503    |  30.0  |
# |  103  |   501    |  60.0  |
# |  102  |   504    |  45.0  |
# +-------+----------+--------+
```

**Question:** Write PySpark code to:
1. Calculate the total amount spent by each user
2. Sort the results by total amount in descending order
3. Show only the top 3 users

**Expected Output:**
```
+-------+------------+
|user_id|total_amount|
+-------+------------+
|  101  |   80.0     |
|  102  |  120.0     |
|  103  |   60.0     |
+-------+------------+


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql.functions import sum as agg_sum, col
spark = SparkSession.builder.appName("practice").getOrCreate()

schema = StructType([
    StructField("user_id", StringType()),
    StructField("product_id", StringType()),
    StructField("amount", FloatType())
])
path = "dbfs:/raw/data/users.csv"
df = spark.read.format("csv").schema(schema).option("header","True").load(path)
df = df.groupBy("user_id").agg(agg_sum("amount").alias("total_amount"))
df = df.orderBy(col("total_amount").desc()).limit(3)
df.show()





**Question:** Write PySpark code to find employees whose salary is **above the average salary of their department**.

**Expected Output:**
```
+------+-----------+----------+--------+--------------+
|emp_id|   name    |department| salary |dept_avg_sal  |
+------+-----------+----------+--------+--------------+
| 3    |  Charlie  |    IT    | 90000  |  85000.0     |
| 4    |  David    |    HR    | 65000  |  62500.0     |
| 5    |  Eve      |  Finance | 75000  |  72500.0     |
+------+-----------+----------+--------+--------------+

In [None]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
from pyspark.sql.functions import round, avg, col
spark = SparkSession.builder.appName("practice").getOrCreate()
df_schema = StructType([
    StructField("emp_id", IntegerType()),
    StructField("name", StringType()),
    StructField("department", StringType()),
    StructField("salary", FloatType())
])
path = "dbfs:/raw/data/employee.csv"
df = spark.read.format("csv").schema(df_schema).option("header", "True").load(path)
avg_window_spec = Window.partitionBy("department")
df = df.withColumn("dept_avg_sal", round(avg(col("salary")).over(avg_window_spec), 2))
df = df.filter(col("salary") > col("dept_avg_sal"))
df.show()

**Question:** Write PySpark code to calculate a **7-day moving average of revenue** for each user, ordered by date.

**Requirements:**
1. Calculate the moving average including the current day and previous 6 days (7 days total)
2. If there are fewer than 7 days available, calculate the average with available days
3. Round the result to 2 decimal places
4. Order by user_id and date

**Expected Output (example):**
```
+-------+------------+--------+------------------+
|user_id|   date     |revenue |moving_avg_7day   |
+-------+------------+--------+------------------+
|  101  | 2024-01-01 |  100   |     100.00       |
|  101  | 2024-01-02 |  150   |     125.00       |
|  101  | 2024-01-03 |  200   |     150.00       |
|  101  | 2024-01-05 |  180   |     157.50       |
|  102  | 2024-01-01 |   80   |      80.00       |
|  102  | 2024-01-02 |  120   |     100.00       |
|  103  | 2024-01-01 |   90   |      90.00       |
+-------+------------+--------+------------------+

In [None]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType, StructField, IntegerType, DateType, FloatType
from pyspark.sql.functions import col, round, avg

spark = SparkSession.builder.appName("practice").getOrCreate()
schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("date", DateType()),
    StructField("revenue", IntegerType())
])
path = "dbfs:/raw/data/revenue.csv"
df = spark.read.format("csv").schema(schema).option("header", "True").load(path)
window_spec = Window.partitionBy("user_id").orderBy("date").rowsBetween(-6, 0)
df = df.withColumn("moving_avg_7day", round(avg("revenue").over(window_spec), 2))
df = df.orderBy(col("user_id"), col("date"))
df.show()

python
# +-------+---------------------+
# |user_id|     login_time      |
# +-------+---------------------+
# |  101  | 2024-01-01 09:00:00 |
# |  101  | 2024-01-01 09:05:00 |
# |  101  | 2024-01-01 09:40:00 |
# |  101  | 2024-01-02 10:00:00 |
# |  102  | 2024-01-01 08:00:00 |
# |  102  | 2024-01-01 08:45:00 |
# |  102  | 2024-01-01 09:50:00 |
# +-------+---------------------+
```

**Question:** Identify **user sessions** where a new session starts if there's a gap of **more than 30 minutes** between consecutive logins for the same user.

**Requirements:**
1. Assign a unique `session_id` to each session
2. Calculate the `session_start` time (first login of the session)
3. Calculate the `session_end` time (last login of the session)
4. Count the number of logins in each session

**Expected Output:**
```
+-------+----------+---------------------+---------------------+-------------+
|user_id|session_id|    session_start    |     session_end     |login_count  |
+-------+----------+---------------------+---------------------+-------------+
|  101  |    1     | 2024-01-01 09:00:00 | 2024-01-01 09:05:00 |      2      |
|  101  |    2     | 2024-01-01 09:40:00 | 2024-01-01 09:40:00 |      1      |
|  101  |    3     | 2024-01-02 10:00:00 | 2024-01-02 10:00:00 |      1      |
|  102  |    1     | 2024-01-01 08:00:00 | 2024-01-01 08:45:00 |      2      |
|  102  |    2     | 2024-01-01 09:50:00 | 2024-01-01 09:50:00 |      1      |

In [None]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType
from pyspark.sql.functions import col, lag, unix_timestamp, when, sum, min, max, count

spark = SparkSession.builder.appName("practice").getOrCreate()
schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("login_time", TimestampType())
])
path = "dbfs:/raw/data/login_times.csv"
df = spark.read.format("csv").schema(schema).option("header", "True").load(path)
window_spec = Window.partitionBy("user_id").orderBy("login_time")
df = df.withColumn("prevLoginTime", lag("login_time", 1).over(window_spec))
df = df.withColumn("time_diff_min", (unix_timestamp(col("login_time")) - unix_timestamp(col("prevLoginTime"))) / 60)
df = df.withColumn("is_new_session", when(((col("time_diff_min") > 30) | (col("time_diff_min").isNull())) , 1).otherwise(0))
df = df.withColumn("session_id", sum("is_new_session").over(window_spec))
df = df.groupBy("user_id", "session_id").agg(min("login_time").alias("session_start"),
                                             max("login_time").alias("session_end"),
                                             count("login_time").alias("login_count")
                                            )
df = df.orderBy("user_id", "session_id")
df.show()

# +--------+----------+------------+--------+
# |order_id|customer_id|order_date |amount  |
# +--------+----------+------------+--------+
# |  1001  |   101    | 2024-01-15 |  250.0 |
# |  1002  |   102    | 2024-01-16 |  180.0 |
# |  1003  |   101    | 2024-01-17 |  320.0 |
# |  1004  |   999    | 2024-01-18 |  150.0 |  # Orphan record
# |  1005  |   103    | 2024-01-19 |  400.0 |
# |  1006  |   101    | 2024-01-20 |  275.0 |
# +--------+----------+------------+--------+

# +----------+-----------+----------+
# |customer_id|   name   | segment  |
# +----------+-----------+----------+
# |   101    |  Alice   | Premium  |
# |   102    |  Bob     | Standard |
# |   103    |  Charlie | Premium  |
# |   104    |  David   | Standard |  # No orders
# +----------+-----------+----------+
```

**Question:** Write PySpark code to:

1. **Identify and report data quality issues:**
   - Orders with no matching customer (orphan orders)
   - Customers with no orders
   
2. **Create a clean dataset** that includes:
   - All valid orders with customer information
   - Calculate total order amount per customer
   - Calculate number of orders per customer
   - Rank customers by total amount within each segment

3. **Output two DataFrames:**
   - `data_quality_report`: Shows all data quality issues
   - `clean_customer_summary`: Final clean dataset with rankings

**Expected Output 1 - Data Quality Report:**
```
+-------------+----------+-------------+
|  issue_type |entity_id | entity_type |
+-------------+----------+-------------+
| orphan_order|   1004   |   order     |
| no_orders   |   104    |  customer   |
+-------------+----------+-------------+
```

**Expected Output 2 - Clean Customer Summary:**
```
+----------+---------+---------+-------------+------------+-------------+
|customer_id| name   | segment |order_count  |total_amount|segment_rank |
+----------+---------+---------+-------------+------------+-------------+
|   101    | Alice  | Premium |      3      |   845.0    |      1      |
|   103    | Charlie| Premium |      1      |   400.0    |      2      |
|   102    | Bob    |Standard |      1      |   180.0    |      1      |
+----------+---------+---------+-------------+------------+-------------+