In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window

In [2]:
print(pyspark.__version__)

3.5.5


In [3]:
spark = SparkSession.builder \
    .appName("online_courses") \
    .master("local[*]") \
    .getOrCreate()

25/05/14 12:33:14 WARN Utils: Your hostname, Ameys-Mac-mini.local resolves to a loopback address: 127.0.0.1; using 192.168.1.10 instead (on interface en1)
25/05/14 12:33:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/14 12:33:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
students_df = spark.read.csv("students.csv", header=True, inferSchema=True)
courses_df = spark.read.csv("courses.csv", header=True, inferSchema=True)
instructors_df = spark.read.csv("instructors.csv", header=True, inferSchema=True)
enrollments_df = spark.read.csv("enrollments.csv", header=True, inferSchema=True)
reviews_df = spark.read.csv("reviews.csv", header=True, inferSchema=True)

In [5]:
students_df.createOrReplaceTempView("students")
courses_df.createOrReplaceTempView("courses")
instructors_df.createOrReplaceTempView("instructors")
enrollments_df.createOrReplaceTempView("enrollments")
reviews_df.createOrReplaceTempView("reviews")

In [6]:
students_df.printSchema()

root
 |-- student_id: integer (nullable = true)
 |-- student_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- country: string (nullable = true)



In [7]:
courses_df.printSchema()

root
 |-- course_id: integer (nullable = true)
 |-- course_title: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- instructor_id: integer (nullable = true)



In [8]:
instructors_df.printSchema()

root
 |-- instructor_id: integer (nullable = true)
 |-- instructor_name: string (nullable = true)
 |-- specialization: string (nullable = true)
 |-- years_of_experience: integer (nullable = true)



In [9]:
enrollments_df.printSchema()

root
 |-- enrollment_id: integer (nullable = true)
 |-- student_id: integer (nullable = true)
 |-- course_id: integer (nullable = true)
 |-- enrollment_date: date (nullable = true)
 |-- progress_percent: integer (nullable = true)



In [10]:
reviews_df.printSchema()

root
 |-- review_id: integer (nullable = true)
 |-- student_id: integer (nullable = true)
 |-- course_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_date: date (nullable = true)
 |-- comment: string (nullable = true)



### Which students have enrolled in more than 3 courses?

In [11]:
spark.sql(
    """
    SELECT
        e.student_id,
        s.student_name,
        count(distinct e.course_id) as num_courses
    FROM enrollments e
    JOIN students s 
    USING (student_id)
    GROUP BY 1, 2
    HAVING count(distinct course_id) > 3
    """
).show()

+----------+------------------+-----------+
|student_id|      student_name|num_courses|
+----------+------------------+-----------+
|        41|       Mary French|          4|
|        32|       Leah Travis|          4|
|        33|Jennifer Hernandez|          4|
|        48|       Dale Dodson|          5|
+----------+------------------+-----------+



In [12]:
joined = enrollments_df.join(
    students_df,
    on="student_id",
    how="inner"
)

joined.groupBy("student_id", "student_name") \
    .agg(F.countDistinct("course_id").alias("num_courses")) \
    .filter(F.col("num_courses") > 3) \
    .show()


+----------+------------------+-----------+
|student_id|      student_name|num_courses|
+----------+------------------+-----------+
|        41|       Mary French|          4|
|        32|       Leah Travis|          4|
|        33|Jennifer Hernandez|          4|
|        48|       Dale Dodson|          5|
+----------+------------------+-----------+



### Which instructor has the highest average rating across all their courses?

In [13]:
spark.sql(
    """
    SELECT
        c.instructor_id,
        i.instructor_name,
        ROUND(AVG(r.rating), 2) as avg_rating
    FROM reviews r
    JOIN courses c
    USING (course_id)
    JOIN instructors i
    USING (instructor_id)
    GROUP BY 1, 2
    ORDER BY avg_rating DESC
    """
).show()

+-------------+---------------+----------+
|instructor_id|instructor_name|avg_rating|
+-------------+---------------+----------+
|            3| Samantha Price|      4.33|
|            9|  Jean Williams|       4.0|
|            7|   Shane Wilcox|       3.5|
|            1|  Angela Martin|      3.33|
|            4|    Mary Kelley|      3.23|
|           10|    Adam Barton|      3.14|
|            2|     Erica Hill|       3.0|
|            5|   Carmen Mcgee|      2.75|
|            8| Matthew Orozco|       2.5|
+-------------+---------------+----------+



In [14]:
joined = reviews_df.join(
    courses_df,
    on="course_id",
    how="inner"
).join(
    instructors_df,
    on="instructor_id",
    how="inner"
)

joined.groupBy("instructor_id", "instructor_name") \
    .agg(F.round(F.avg("rating"), 2).alias("avg_rating")) \
    .orderBy(F.col("avg_rating").desc()) \
    .show()

+-------------+---------------+----------+
|instructor_id|instructor_name|avg_rating|
+-------------+---------------+----------+
|            3| Samantha Price|      4.33|
|            9|  Jean Williams|       4.0|
|            7|   Shane Wilcox|       3.5|
|            1|  Angela Martin|      3.33|
|            4|    Mary Kelley|      3.23|
|           10|    Adam Barton|      3.14|
|            2|     Erica Hill|       3.0|
|            5|   Carmen Mcgee|      2.75|
|            8| Matthew Orozco|       2.5|
+-------------+---------------+----------+



### What is the average rating for each course category? (Tech, Business, Design)

In [15]:
spark.sql(
    """
    SELECT
        c.category,
        ROUND(AVG(r.rating), 2) AS avg_rating
    FROM reviews r
    JOIN courses c
    USING (course_id)
    GROUP BY 1
    """
).show()

+--------+----------+
|category|avg_rating|
+--------+----------+
|    Tech|      3.32|
|  Design|      3.21|
|Business|      3.09|
+--------+----------+



In [16]:
joined = reviews_df.join(
    courses_df,
    on="course_id",
    how="inner"
)

joined.groupBy("category") \
    .agg(F.round(F.avg("rating"), 2).alias("avg_rating")) \
    .show()

+--------+----------+
|category|avg_rating|
+--------+----------+
|    Tech|      3.32|
|  Design|      3.21|
|Business|      3.09|
+--------+----------+



### Find the top 5 most popular courses (by number of enrollments).

In [17]:
spark.sql(
    """
    SELECT
        e.course_id,
        c.course_title,
        count(distinct e.student_id) as enrollments
    FROM enrollments e
    JOIN courses c
    USING (course_id)
    GROUP BY 1, 2
    ORDER BY enrollments DESC
    """
).show()

+---------+--------------------+-----------+
|course_id|        course_title|enrollments|
+---------+--------------------+-----------+
|        7|Team-oriented nex...|          8|
|        6|Progressive homog...|          7|
|       18|Configurable reci...|          7|
|        4|Switchable neutra...|          6|
|        1|   Enhanced 24/7 hub|          6|
|       17|Cloned optimizing...|          6|
|        8|User-centric comp...|          6|
|       19|Optimized object-...|          6|
|       15|Up-sized stable a...|          5|
|       13|Fully-configurabl...|          5|
|       12|Sharable secondar...|          5|
|        2|Ameliorated 4thge...|          4|
|       14|Streamlined eco-c...|          4|
|       10|Open-architected ...|          4|
|       20|Persevering conte...|          4|
|       11|Multi-tiered exec...|          3|
|       16|User-centric real...|          3|
|        5|Pre-emptive recip...|          3|
|        3|Front-line intera...|          3|
|        9

In [18]:
joined = enrollments_df.join(
    courses_df,
    on="course_id",
    how="inner"
)

joined.groupBy("course_id", "course_title") \
    .agg(F.countDistinct("student_id").alias("enrollments")) \
    .orderBy(F.col("enrollments").desc()) \
    .show()

+---------+--------------------+-----------+
|course_id|        course_title|enrollments|
+---------+--------------------+-----------+
|        7|Team-oriented nex...|          8|
|        6|Progressive homog...|          7|
|       18|Configurable reci...|          7|
|        4|Switchable neutra...|          6|
|        1|   Enhanced 24/7 hub|          6|
|       17|Cloned optimizing...|          6|
|        8|User-centric comp...|          6|
|       19|Optimized object-...|          6|
|       15|Up-sized stable a...|          5|
|       13|Fully-configurabl...|          5|
|       12|Sharable secondar...|          5|
|        2|Ameliorated 4thge...|          4|
|       14|Streamlined eco-c...|          4|
|       10|Open-architected ...|          4|
|       20|Persevering conte...|          4|
|       11|Multi-tiered exec...|          3|
|       16|User-centric real...|          3|
|        5|Pre-emptive recip...|          3|
|        3|Front-line intera...|          3|
|        9

### Which students have completed more than 80% in all the courses they enrolled in?

In [19]:
spark.sql(
    """
    with cte as (
        SELECT
        DISTINCT
            e.student_id,
            s.student_name,
            AVG(e.progress_percent) over (PARTITION BY e.student_id) as avg_progress
        FROM enrollments e
        JOIN students s
        USING (student_id)
    )
    SELECT
        student_id,
        student_name
    FROM cte
    WHERE avg_progress >= 80
    """
).show()

+----------+-------------+
|student_id| student_name|
+----------+-------------+
|         5|  John Hughes|
|        15|Brian Morales|
|        17| Mary Wheeler|
|        18|Judith Church|
|        28|  Julie Smith|
|        31|   Sarah Sims|
+----------+-------------+



In [20]:
window_spec = Window.partitionBy("student_id")

joined = enrollments_df.join(
    students_df,
    on="student_id",
    how="inner"
)

windowed_df = joined.withColumn(
    "avg_progress",
    F.avg("progress_percent").over(window_spec)
).select(
    "student_id",
    "student_name"
)

filtered_df = windowed_df.filter(
    F.col("avg_progress") >= 80
).distinct().show()

+----------+-------------+
|student_id| student_name|
+----------+-------------+
|         5|  John Hughes|
|        15|Brian Morales|
|        17| Mary Wheeler|
|        18|Judith Church|
|        28|  Julie Smith|
|        31|   Sarah Sims|
+----------+-------------+



### List all instructors and the number of unique students they’ve taught.

In [21]:
spark.sql(
    """
    SELECT
        i.instructor_id,
        i.instructor_name,
        COUNT(DISTINCT e.student_id) AS student_count
    FROM instructors i
    JOIN courses c
    USING (instructor_id)
    JOIN enrollments e
    USING (course_id)
    GROUP BY 1, 2
    """
).show()

+-------------+---------------+-------------+
|instructor_id|instructor_name|student_count|
+-------------+---------------+-------------+
|            4|    Mary Kelley|           16|
|            1|  Angela Martin|           14|
|            7|   Shane Wilcox|            7|
|            8| Matthew Orozco|           10|
|            3| Samantha Price|            3|
|           10|    Adam Barton|           14|
|            5|   Carmen Mcgee|            6|
|            9|  Jean Williams|            4|
|            2|     Erica Hill|           15|
+-------------+---------------+-------------+



In [22]:
joined = instructors_df.join(
    courses_df,
    on="instructor_id",
    how="inner"
).join(
    enrollments_df,
    on="course_id",
    how="inner"
)

joined.groupBy("instructor_id", "instructor_name") \
    .agg(F.countDistinct("student_id").alias("student_count")) \
    .show()

+-------------+---------------+-------------+
|instructor_id|instructor_name|student_count|
+-------------+---------------+-------------+
|            4|    Mary Kelley|           16|
|            1|  Angela Martin|           14|
|            7|   Shane Wilcox|            7|
|            8| Matthew Orozco|           10|
|            3| Samantha Price|            3|
|           10|    Adam Barton|           14|
|            5|   Carmen Mcgee|            6|
|            9|  Jean Williams|            4|
|            2|     Erica Hill|           15|
+-------------+---------------+-------------+



### For each country, find the average student progress across all courses.

In [23]:
spark.sql(
    """
    SELECT
        s.country,
        avg(e.progress_percent) as avg_progress
    FROM students s
    LEFT JOIN enrollments e
    USING (student_id)
    GROUP BY 1
    """
).show()

+--------------------+------------------+
|             country|      avg_progress|
+--------------------+------------------+
|            Anguilla|              59.0|
|               Macao|              83.5|
|              Guyana|              75.5|
|      Norfolk Island|              70.0|
|                Fiji|              26.0|
|United States Vir...|              99.0|
|             Germany|              62.0|
|         Afghanistan|             47.25|
|              Rwanda|              66.0|
|               Sudan|              61.0|
|Holy See (Vatican...|              66.0|
|           Sri Lanka|              NULL|
|            Dominica|              35.0|
|   Equatorial Guinea| 77.83333333333333|
|             Algeria|              32.5|
|           Argentina|             59.75|
|             Lesotho|56.333333333333336|
|       New Caledonia|              53.0|
|               Benin|              10.0|
|        Sierra Leone|              NULL|
+--------------------+------------

### Rank courses within each category by average rating using window functions.

In [24]:
spark.sql(
    """
    SELECT
    distinct
        c.course_id,
        c.course_title,
        c.category,
        AVG(r.rating) over(PARTITION BY c.course_id) as avg_rating,
        dense_rank() over(PARTITION BY c.category ORDER BY AVG(r.rating) over(PARTITION BY c.course_id) DESC) as rnk
    FROM courses c
    LEFT JOIN reviews r
    USING (course_id)
    """
).show()

+---------+--------------------+--------+------------------+---+
|course_id|        course_title|category|        avg_rating|rnk|
+---------+--------------------+--------+------------------+---+
|       20|Persevering conte...|Business|               4.0|  1|
|       14|Streamlined eco-c...|Business|               3.5|  2|
|       11|Multi-tiered exec...|Business|3.3333333333333335|  3|
|       12|Sharable secondar...|Business|               2.8|  4|
|        8|User-centric comp...|Business|2.6666666666666665|  5|
|       13|Fully-configurabl...|Business|1.6666666666666667|  6|
|        5|Pre-emptive recip...|  Design| 4.333333333333333|  1|
|        2|Ameliorated 4thge...|  Design|              3.75|  2|
|       15|Up-sized stable a...|  Design|3.6666666666666665|  3|
|       18|Configurable reci...|  Design|               3.5|  4|
|        7|Team-oriented nex...|  Design|              3.25|  5|
|        4|Switchable neutra...|  Design|               3.0|  6|
|        1|   Enhanced 24

In [25]:
window_spec_1 = Window.partitionBy("course_id")
window_spec_2 = Window.partitionBy("category").orderBy(F.avg("rating").over(window_spec_1).desc())

joined = courses_df.join(
    reviews_df,
    on="course_id",
    how="left"
)

joined.select(
    "course_id",
    "course_title",
    "category",
    F.avg("rating").over(window_spec_1).alias("avg_rating"),
    F.dense_rank().over(window_spec_2).alias("rnk")
).distinct().show()

+---------+--------------------+--------+------------------+---+
|course_id|        course_title|category|        avg_rating|rnk|
+---------+--------------------+--------+------------------+---+
|       20|Persevering conte...|Business|               4.0|  1|
|       14|Streamlined eco-c...|Business|               3.5|  2|
|       11|Multi-tiered exec...|Business|3.3333333333333335|  3|
|       12|Sharable secondar...|Business|               2.8|  4|
|        8|User-centric comp...|Business|2.6666666666666665|  5|
|       13|Fully-configurabl...|Business|1.6666666666666667|  6|
|        5|Pre-emptive recip...|  Design| 4.333333333333333|  1|
|        2|Ameliorated 4thge...|  Design|              3.75|  2|
|       15|Up-sized stable a...|  Design|3.6666666666666665|  3|
|       18|Configurable reci...|  Design|               3.5|  4|
|        7|Team-oriented nex...|  Design|              3.25|  5|
|        4|Switchable neutra...|  Design|               3.0|  6|
|        1|   Enhanced 24

### Get the latest review per student per course using ROW_NUMBER.

In [26]:
spark.sql(
    """
    with cte as (
        SELECT
            student_id,
            course_id,
            rating,
            review_date,
            ROW_NUMBER() OVER (PARTITION BY student_id, course_id ORDER BY review_date DESC) as rnk
        FROM reviews
    )
    SELECT
        student_id,
        course_id,
        rating,
        review_date
    FROM cte
    WHERE rnk = 1
    """
).show()

+----------+---------+------+-----------+
|student_id|course_id|rating|review_date|
+----------+---------+------+-----------+
|         1|        4|     2| 2024-11-05|
|         1|        8|     5| 2024-09-12|
|         1|       13|     1| 2023-10-09|
|         1|       20|     4| 2023-07-05|
|         2|        6|     2| 2024-11-03|
|         2|       20|     5| 2024-10-27|
|         4|        1|     4| 2025-02-22|
|         4|        2|     5| 2024-03-08|
|         4|        9|     2| 2024-11-17|
|         4|       14|     1| 2023-05-07|
|         6|        1|     4| 2023-10-03|
|         6|        4|     4| 2024-05-09|
|         6|       18|     4| 2025-01-08|
|         8|        9|     3| 2024-07-31|
|         8|       16|     2| 2024-12-01|
|         9|        4|     3| 2023-11-16|
|         9|        6|     4| 2025-03-18|
|        12|        1|     1| 2025-04-01|
|        12|        8|     2| 2023-08-30|
|        13|       12|     2| 2023-11-11|
+----------+---------+------+-----

In [27]:
window_spec = Window.partitionBy("student_id", "course_id").orderBy(reviews_df["review_date"].desc())

window_df = reviews_df.withColumn(
    "rnk",
    F.row_number().over(window_spec)
)

filtered_df = window_df.filter(
    F.col("rnk") == 1
).select(
    "student_id",
    "course_id",
    "rating",
    "review_date"
)

filtered_df.show()

+----------+---------+------+-----------+
|student_id|course_id|rating|review_date|
+----------+---------+------+-----------+
|         1|        4|     2| 2024-11-05|
|         1|        8|     5| 2024-09-12|
|         1|       13|     1| 2023-10-09|
|         1|       20|     4| 2023-07-05|
|         2|        6|     2| 2024-11-03|
|         2|       20|     5| 2024-10-27|
|         4|        1|     4| 2025-02-22|
|         4|        2|     5| 2024-03-08|
|         4|        9|     2| 2024-11-17|
|         4|       14|     1| 2023-05-07|
|         6|        1|     4| 2023-10-03|
|         6|        4|     4| 2024-05-09|
|         6|       18|     4| 2025-01-08|
|         8|        9|     3| 2024-07-31|
|         8|       16|     2| 2024-12-01|
|         9|        4|     3| 2023-11-16|
|         9|        6|     4| 2025-03-18|
|        12|        1|     1| 2025-04-01|
|        12|        8|     2| 2023-08-30|
|        13|       12|     2| 2023-11-11|
+----------+---------+------+-----

### Find students who have given a 5-star rating to more than one course.

In [28]:
spark.sql(
    """
    with cte as (
        SELECT
            student_id,
            rating
        FROM reviews
    )
    
    SELECT
        student_id,
        COUNT(*) as five_star_count
    FROM cte
    WHERE rating = 5
    GROUP BY 1
    HAVING COUNT(*) > 1
    """
).show()

+----------+---------------+
|student_id|five_star_count|
+----------+---------------+
|        26|              2|
|        23|              2|
+----------+---------------+



In [29]:
filtered_df = reviews_df.filter(
    F.col("rating") == 5
)

filtered_df.groupBy("student_id") \
    .agg(F.count("*").alias("five_star_count")) \
    .filter(F.col("five_star_count") > 1) \
    .show()


+----------+---------------+
|student_id|five_star_count|
+----------+---------------+
|        26|              2|
|        23|              2|
+----------+---------------+



### Calculate the rolling average rating over the last 3 reviews for each course.

In [30]:
spark.sql(
    """
    SELECT
        course_id,
        review_date,
        rating,
        AVG(rating) OVER (PARTITION BY course_id ORDER BY review_date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as rolling_avg
    FROM reviews
    """
).show()

+---------+-----------+------+------------------+
|course_id|review_date|rating|       rolling_avg|
+---------+-----------+------+------------------+
|        1| 2023-09-17|     2|               2.0|
|        1| 2023-10-03|     4|               3.0|
|        1| 2023-10-18|     3|               3.0|
|        1| 2023-11-02|     4|3.6666666666666665|
|        1| 2024-02-27|     1|2.6666666666666665|
|        1| 2024-12-14|     3|2.6666666666666665|
|        1| 2025-02-22|     4|2.6666666666666665|
|        1| 2025-04-01|     1|2.6666666666666665|
|        2| 2023-08-12|     5|               5.0|
|        2| 2023-12-27|     4|               4.5|
|        2| 2024-03-08|     5| 4.666666666666667|
|        2| 2025-03-11|     1|3.3333333333333335|
|        4| 2023-08-16|     4|               4.0|
|        4| 2023-11-16|     3|               3.5|
|        4| 2024-04-14|     4|3.6666666666666665|
|        4| 2024-05-09|     4|3.6666666666666665|
|        4| 2024-08-25|     2|3.3333333333333335|


In [31]:
window_spec = Window.partitionBy("course_id").orderBy("review_date").rowsBetween(-2, 0)

rolling_avg_df = reviews_df.withColumn(
    "rolling_avg",
    F.avg("rating").over(window_spec)
)

rolling_avg_df.select(
    "course_id",
    "review_date",
    "rating",
    "rolling_avg"
).show()

+---------+-----------+------+------------------+
|course_id|review_date|rating|       rolling_avg|
+---------+-----------+------+------------------+
|        1| 2023-09-17|     2|               2.0|
|        1| 2023-10-03|     4|               3.0|
|        1| 2023-10-18|     3|               3.0|
|        1| 2023-11-02|     4|3.6666666666666665|
|        1| 2024-02-27|     1|2.6666666666666665|
|        1| 2024-12-14|     3|2.6666666666666665|
|        1| 2025-02-22|     4|2.6666666666666665|
|        1| 2025-04-01|     1|2.6666666666666665|
|        2| 2023-08-12|     5|               5.0|
|        2| 2023-12-27|     4|               4.5|
|        2| 2024-03-08|     5| 4.666666666666667|
|        2| 2025-03-11|     1|3.3333333333333335|
|        4| 2023-08-16|     4|               4.0|
|        4| 2023-11-16|     3|               3.5|
|        4| 2024-04-14|     4|3.6666666666666665|
|        4| 2024-05-09|     4|3.6666666666666665|
|        4| 2024-08-25|     2|3.3333333333333335|


### Find the most reviewed course and list all students who reviewed it.

In [32]:
spark.sql(
    """
    with review_counts as (
        SELECT
            course_id,
            COUNT(*) as review_count
        FROM reviews
        GROUP BY 1
    ),
    top_course as (
        SELECT
            course_id
        FROM review_counts
        ORDER BY review_count DESC
        LIMIT 1
    )

    SELECT
        r.course_id,
        c.course_title,
        r.student_id,
        s.student_name
    FROM reviews r
    JOIN top_course t
    USING (course_id)
    JOIN courses c
    USING (course_id)
    JOIN students s
    USING (student_id)
    ORDER BY r.student_id
    """
).show()


+---------+--------------------+----------+----------------+
|course_id|        course_title|student_id|    student_name|
+---------+--------------------+----------+----------------+
|        4|Switchable neutra...|         1| Michael Sweeney|
|        4|Switchable neutra...|         6|     Denise Diaz|
|        4|Switchable neutra...|         9|      James King|
|        4|Switchable neutra...|        17|    Mary Wheeler|
|        4|Switchable neutra...|        18|   Judith Church|
|        4|Switchable neutra...|        18|   Judith Church|
|        4|Switchable neutra...|        31|      Sarah Sims|
|        4|Switchable neutra...|        45|     Keith Perez|
|        4|Switchable neutra...|        49|Caroline Preston|
+---------+--------------------+----------+----------------+



In [33]:
group_df = reviews_df.groupBy("course_id") \
    .agg(F.count("*").alias("review_count"))

top_course_df = group_df.orderBy(F.col("review_count").desc()).limit(1)

joined = reviews_df.join(
    top_course_df,
    on="course_id",
    how="inner"
).join(
    courses_df,
    on="course_id",
    how="inner"
).join(
    students_df,
    on="student_id",
    how="inner"
)

joined.select(
    "course_id",
    "course_title",
    "student_id",
    "student_name"
).orderBy("student_id") \
    .show()

+---------+--------------------+----------+----------------+
|course_id|        course_title|student_id|    student_name|
+---------+--------------------+----------+----------------+
|        4|Switchable neutra...|         1| Michael Sweeney|
|        4|Switchable neutra...|         6|     Denise Diaz|
|        4|Switchable neutra...|         9|      James King|
|        4|Switchable neutra...|        17|    Mary Wheeler|
|        4|Switchable neutra...|        18|   Judith Church|
|        4|Switchable neutra...|        18|   Judith Church|
|        4|Switchable neutra...|        31|      Sarah Sims|
|        4|Switchable neutra...|        45|     Keith Perez|
|        4|Switchable neutra...|        49|Caroline Preston|
+---------+--------------------+----------+----------------+



### For each instructor, calculate the average progress of students across all their courses.

In [34]:
courses_df.printSchema()

root
 |-- course_id: integer (nullable = true)
 |-- course_title: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- instructor_id: integer (nullable = true)



In [35]:
enrollments_df.printSchema()

root
 |-- enrollment_id: integer (nullable = true)
 |-- student_id: integer (nullable = true)
 |-- course_id: integer (nullable = true)
 |-- enrollment_date: date (nullable = true)
 |-- progress_percent: integer (nullable = true)



In [36]:
instructors_df.printSchema()

root
 |-- instructor_id: integer (nullable = true)
 |-- instructor_name: string (nullable = true)
 |-- specialization: string (nullable = true)
 |-- years_of_experience: integer (nullable = true)



In [37]:
spark.sql(
    """
    SELECT
        c.instructor_id,
        i.instructor_name,
        ROUND(AVG(e.progress_percent), 2) AS avg_student_progress
    FROM enrollments e
    JOIN courses c
    ON e.course_id = c.course_id
    JOIN instructors i
    ON c.instructor_id = i.instructor_id
    GROUP BY c.instructor_id, i.instructor_name
    ORDER BY avg_student_progress DESC, c.instructor_id
    """
).show()

+-------------+---------------+--------------------+
|instructor_id|instructor_name|avg_student_progress|
+-------------+---------------+--------------------+
|            5|   Carmen Mcgee|               59.83|
|            7|   Shane Wilcox|                57.0|
|            4|    Mary Kelley|               56.29|
|            8| Matthew Orozco|               55.09|
|            2|     Erica Hill|               51.94|
|            3| Samantha Price|               50.33|
|            1|  Angela Martin|               47.88|
|           10|    Adam Barton|               46.44|
|            9|  Jean Williams|                45.0|
+-------------+---------------+--------------------+



In [38]:
joined_df = enrollments_df.join(
    courses_df,
    on="course_id",
    how="inner"
).join(
    instructors_df,
    on="instructor_id",
    how="inner"
)

joined_df.groupBy("instructor_id", "instructor_name") \
    .agg(F.round(F.avg("progress_percent"), 2).alias("avg_student_progress")) \
    .orderBy(F.col("avg_student_progress").desc(), "instructor_id") \
    .show()

+-------------+---------------+--------------------+
|instructor_id|instructor_name|avg_student_progress|
+-------------+---------------+--------------------+
|            5|   Carmen Mcgee|               59.83|
|            7|   Shane Wilcox|                57.0|
|            4|    Mary Kelley|               56.29|
|            8| Matthew Orozco|               55.09|
|            2|     Erica Hill|               51.94|
|            3| Samantha Price|               50.33|
|            1|  Angela Martin|               47.88|
|           10|    Adam Barton|               46.44|
|            9|  Jean Williams|                45.0|
+-------------+---------------+--------------------+



### For each course, compare the number of enrolled students vs number of reviewers.

In [39]:
reviews_df.printSchema()

root
 |-- review_id: integer (nullable = true)
 |-- student_id: integer (nullable = true)
 |-- course_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_date: date (nullable = true)
 |-- comment: string (nullable = true)



In [40]:
spark.sql(
    """
    with enrollment_cte as (
        SELECT
            course_id,
            COUNT(DISTINCT student_id) as enrollments
        FROM enrollments
        GROUP BY 1
    ),
    reviewer_cte as(
        SELECT
            course_id,
            COUNT(distinct review_id) as reviewers
        FROM reviews
        GROUP BY 1
    )
    
    SELECT
        c.course_id,
        ec.enrollments,
        rc.reviewers,
        COALESCE(ec.enrollments, 0) - COALESCE(rc.reviewers, 0) AS difference
    FROM courses c
    LEFT JOIN enrollment_cte ec
    USING (course_id)
    LEFT JOIN reviewer_cte rc
    USING (course_id)
    
    """
).show()

+---------+-----------+---------+----------+
|course_id|enrollments|reviewers|difference|
+---------+-----------+---------+----------+
|        1|          6|        8|        -2|
|        2|          4|        4|         0|
|        3|          3|     NULL|         3|
|        4|          6|        9|        -3|
|        5|          3|        3|         0|
|        6|          7|        6|         1|
|        7|          8|        4|         4|
|        8|          6|        3|         3|
|        9|          1|        5|        -4|
|       10|          4|        3|         1|
|       11|          3|        3|         0|
|       12|          5|        5|         0|
|       13|          5|        3|         2|
|       14|          4|        4|         0|
|       15|          5|        3|         2|
|       16|          3|        5|        -2|
|       17|          6|        1|         5|
|       18|          7|        4|         3|
|       19|          6|        2|         4|
|       20

In [41]:
enrollments_cte = enrollments_df.groupBy("course_id") \
    .agg(F.countDistinct(F.col("student_id")).alias("enrollments")) \

reviewers_cte = reviews_df.groupBy("course_id") \
    .agg(F.countDistinct(F.col("review_id")).alias("reviewers")) \

joined_df = courses_df.join(
    enrollments_cte,
    on = "course_id",
    how = "left"
).join(
    reviewers_cte,
    on = "course_id",
    how = "left"
)

joined_df.withColumn(
    "difference",
    F.coalesce(F.col("enrollments"), F.lit(0)) - F.coalesce(F.col("reviewers"), F.lit(0))
).select(
    "course_id",
    F.coalesce(F.col("enrollments"), F.lit(0)).alias("enrollments"),
    F.coalesce(F.col("reviewers"), F.lit(0)).alias("reviewers"),
    "difference"
).show()

+---------+-----------+---------+----------+
|course_id|enrollments|reviewers|difference|
+---------+-----------+---------+----------+
|        1|          6|        8|        -2|
|        2|          4|        4|         0|
|        3|          3|        0|         3|
|        4|          6|        9|        -3|
|        5|          3|        3|         0|
|        6|          7|        6|         1|
|        7|          8|        4|         4|
|        8|          6|        3|         3|
|        9|          1|        5|        -4|
|       10|          4|        3|         1|
|       11|          3|        3|         0|
|       12|          5|        5|         0|
|       13|          5|        3|         2|
|       14|          4|        4|         0|
|       15|          5|        3|         2|
|       16|          3|        5|        -2|
|       17|          6|        1|         5|
|       18|          7|        4|         3|
|       19|          6|        2|         4|
|       20

### List all students who enrolled but never left a review for any course.

In [45]:
spark.sql(
    """
    SELECT DISTINCT
        s.student_id,
        s.student_name
    FROM students s
    JOIN enrollments e
    ON s.student_id = e.student_id
    LEFT JOIN reviews r
    ON s.student_id = r.student_id
    WHERE r.review_id IS NULL
    """
).show()

+----------+------------------+
|student_id|      student_name|
+----------+------------------+
|         7|  Matthew Young MD|
|        29|    Peter Williams|
|        11|     Patricia Kirk|
|        42|     Steven Hughes|
|         3|     Denise George|
|        10|     Melissa Moody|
|        21|     Lisa Caldwell|
|        33|Jennifer Hernandez|
|        15|     Brian Morales|
|         5|       John Hughes|
|        34|     Janice Martin|
|        43|      Maxwell Ward|
+----------+------------------+



In [48]:
students_df.join(
    enrollments_df,
    on = "student_id",
    how = "inner"
).join(
    reviews_df,
    on = "student_id",
    how = "left"
).filter(
    F.col("review_id").isNull()
).select(
    "student_id",
    "student_name"
).distinct().show()

+----------+------------------+
|student_id|      student_name|
+----------+------------------+
|         7|  Matthew Young MD|
|        29|    Peter Williams|
|        11|     Patricia Kirk|
|        42|     Steven Hughes|
|         3|     Denise George|
|        10|     Melissa Moody|
|        21|     Lisa Caldwell|
|        33|Jennifer Hernandez|
|        15|     Brian Morales|
|         5|       John Hughes|
|        34|     Janice Martin|
|        43|      Maxwell Ward|
+----------+------------------+

