In [None]:
# Update the package list to ensure you have the latest information
!apt-get update

# Install OpenJDK 8 (Java Development Kit) headless version silently
# Headless means it doesn't have a GUI, which is suitable for servers or cloud environments
!apt-get install openjdk-8-jdk-headless -qq

# Download Apache Spark version 3.1.2 with Hadoop version 3.2
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

# Extract the downloaded Spark archive
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

# Install the findspark library using pip
!pip install findspark


Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Get:5 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease [18.1 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 Packages [1,305 kB]
Get:12 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 Packages [1,599 kB]
Get:13 http://security.ubuntu.com/ubuntu jammy-securi

In [None]:
# Mount Google Drive to access files and directories in your Google Drive storage
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=c3c3dc61a06017e02aa247dea04ba7235b6477f4d6c287bb590a28063777ffb8
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("PenguinAnalysis").getOrCreate()

# Read the CSV file into a Spark DataFrame
penguins_df = spark.read.csv("/content/drive/My Drive/MalmoMsc/CapstoneProject/DataSet/courses.csv", header=True, inferSchema=True)

# Show the first few rows of the DataFrame
penguins_df.show()


+-----------+-----------------+--------------------------+
|code_module|code_presentation|module_presentation_length|
+-----------+-----------------+--------------------------+
|        AAA|            2013J|                       268|
|        AAA|            2014J|                       269|
|        BBB|            2013J|                       268|
|        BBB|            2014J|                       262|
|        BBB|            2013B|                       240|
|        BBB|            2014B|                       234|
|        CCC|            2014J|                       269|
|        CCC|            2014B|                       241|
|        DDD|            2013J|                       261|
|        DDD|            2014J|                       262|
|        DDD|            2013B|                       240|
|        DDD|            2014B|                       241|
|        EEE|            2013J|                       268|
|        EEE|            2014J|                       26

In [None]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Read CSV files into DataFrames
studentInfo = spark.read.csv("/content/drive/My Drive/MalmoMsc/CapstoneProject/DataSet/studentInfo.csv", header=True, inferSchema=True)
studentRegistration = spark.read.csv("/content/drive/My Drive/MalmoMsc/CapstoneProject/DataSet/studentRegistration.csv", header=True, inferSchema=True)
courses = spark.read.csv("/content/drive/My Drive/MalmoMsc/CapstoneProject/DataSet/courses.csv", header=True, inferSchema=True)
studentAssessment = spark.read.csv("/content/drive/My Drive/MalmoMsc/CapstoneProject/DataSet/studentAssessment.csv", header=True, inferSchema=True)
assessments = spark.read.csv("/content/drive/My Drive/MalmoMsc/CapstoneProject/DataSet/assessments.csv", header=True, inferSchema=True)
vle = spark.read.csv("/content/drive/My Drive/MalmoMsc/CapstoneProject/DataSet/vle.csv", header=True, inferSchema=True)
studentVle = spark.read.csv("/content/drive/My Drive/MalmoMsc/CapstoneProject/DataSet/studentVle.csv", header=True, inferSchema=True)

# Register DataFrames as temporary tables
studentInfo.createOrReplaceTempView("studentInfo")
studentRegistration.createOrReplaceTempView("studentRegistration")
courses.createOrReplaceTempView("courses")
studentAssessment.createOrReplaceTempView("studentAssessment")
assessments.createOrReplaceTempView("assessments")
vle.createOrReplaceTempView("vle")
studentVle.createOrReplaceTempView("studentVle")

# Write the PySpark SQL query
query = """

-- Selecting various fields from different tables
SELECT
    si.code_module,
    si.code_presentation,
    si.id_student,
    si.gender,
    si.highest_education,
    si.region,
    si.imd_band,
    si.age_band,
    si.num_of_prev_attempts,
    si.studied_credits,
    si.disability,
    sr.date_registration,
    sr.date_unregistration,

    -- Count of assessments excluding exams
    (
        SELECT COUNT(*) AS Assessment_count
        FROM assessments AS a
        WHERE
            a.code_module = si.code_module
            AND a.code_presentation = si.code_presentation
            AND a.assessment_type <> 'Exam'
    ) AS Assessment_count,

    -- Count of TMAs (Tutor Marked Assessments)
    (
        SELECT COUNT(*) AS TMA_count
        FROM assessments AS a
        WHERE
            a.code_module = si.code_module
            AND a.code_presentation = si.code_presentation
            AND a.assessment_type = 'TMA'
    ) AS TMA_count,

    -- Count of CMAs (Computer Marked Assessments)
    (
        SELECT COUNT(*) AS CMA_count
        FROM assessments AS a
        WHERE
            a.code_module = si.code_module
            AND a.code_presentation = si.code_presentation
            AND a.assessment_type = 'CMA'
    ) AS CMA_count,

    -- Count of Exams
    (
        SELECT COUNT(*) AS Exam_count
        FROM assessments AS a
        WHERE
            a.code_module = si.code_module
            AND a.code_presentation = si.code_presentation
            AND a.assessment_type = 'EXAM'
    ) AS Exam_count,

    -- Total score for TMAs
    (
        SELECT SUM(sa.score) AS TMA_Total_score
        FROM assessments AS a
        INNER JOIN studentAssessment AS sa ON a.id_assessment = sa.id_assessment
        WHERE
            a.code_module = si.code_module
            AND a.code_presentation = si.code_presentation
            AND a.assessment_type = 'TMA'
            AND sa.id_student = si.id_student
    ) AS TMA_Total_score,

    -- Total score for CMAs
    (
        SELECT SUM(sa.score) AS CMA_Total_score
        FROM assessments AS a
        INNER JOIN studentAssessment AS sa ON a.id_assessment = sa.id_assessment
        WHERE
            a.code_module = si.code_module
            AND a.code_presentation = si.code_presentation
            AND a.assessment_type = 'CMA'
            AND sa.id_student = si.id_student
    ) AS CMA_Total_score,

    -- Exam score
    (
        SELECT sa.score AS Exam_score
        FROM assessments AS a
        INNER JOIN studentAssessment AS sa ON a.id_assessment = sa.id_assessment
        WHERE
            a.code_module = si.code_module
            AND a.code_presentation = si.code_presentation
            AND a.assessment_type = 'Exam'
            AND sa.id_student = si.id_student
    ) AS Exam_score,

    -- Total assessment score excluding exams
    (
        SELECT SUM(sa.score) AS total_assessment_score
        FROM assessments AS a
        INNER JOIN studentAssessment AS sa ON a.id_assessment = sa.id_assessment
        WHERE
            a.code_module = si.code_module
            AND a.code_presentation = si.code_presentation
            AND a.assessment_type <> 'Exam'
            AND sa.id_student = si.id_student
    ) AS total_assessment_score,

    -- Count of submitted assessments excluding exams
    (
        SELECT COUNT(*)
        FROM assessments AS a
        INNER JOIN studentAssessment AS sa ON a.id_assessment = sa.id_assessment
        WHERE
            a.code_module = si.code_module
            AND a.code_presentation = si.code_presentation
            AND a.assessment_type <> 'Exam'
            AND sa.id_student = si.id_student
    ) AS Submitted_Assessments,

    -- Count of not submitted assessments excluding exams
    (
        SELECT COUNT(*) AS Not_submitted_Assessments
        FROM assessments AS a
        WHERE
            a.code_module = si.code_module
            AND a.code_presentation = si.code_presentation
            AND a.assessment_type <> 'Exam'
    ) - (
        SELECT COUNT(*)
        FROM assessments AS a
        INNER JOIN studentAssessment AS sa ON a.id_assessment = sa.id_assessment
        WHERE
            a.code_module = si.code_module
            AND a.code_presentation = si.code_presentation
            AND a.assessment_type <> 'Exam'
            AND sa.id_student = si.id_student
    ) AS Not_submitted_Assessments,

    -- Total assessment score out of 100
    (
        SELECT SUM(a.weight * sa.score / 100) AS Total_Assessments_score_100
        FROM assessments AS a
        INNER JOIN studentAssessment AS sa ON a.id_assessment = sa.id_assessment
        WHERE
            sa.id_student = si.id_student
            AND sa.score IS NOT NULL
            AND a.code_module = si.code_module
            AND a.code_presentation = si.code_presentation
            AND a.assessment_type <> 'Exam'
    ) AS Total_Assessments_score_100,

    -- Average assessment score
    (
        SELECT
            CASE
                WHEN NULLIF(Assessment_count, 0) IS NOT NULL THEN Total_Assessments_score / Assessment_count
                ELSE NULL
            END AS Average_Assessment_score
        FROM (
            SELECT
                (
                    SELECT SUM(a.weight * sa.score / 100) AS Total_Assessments_score
                    FROM assessments AS a
                    INNER JOIN studentAssessment AS sa ON a.id_assessment = sa.id_assessment
                    WHERE
                        sa.id_student = si.id_student
                        AND sa.score IS NOT NULL
                        AND a.code_module = si.code_module
                        AND a.code_presentation = si.code_presentation
                        AND a.assessment_type <> 'Exam'
                ) AS Total_Assessments_score,
                (
                    SELECT COUNT(*) AS Assessment_count
                    FROM assessments AS a
                    WHERE
                        a.code_module = si.code_module
                        AND a.code_presentation = si.code_presentation
                        AND a.assessment_type <> 'Exam'
                ) AS Assessment_count
        ) AS ssdadasa
    ) AS Average_Assessment_score,

    -- Total marks combining TMAs and Exam
    (
        SELECT
        (
            SELECT SUM(a.weight * sa.score / 100)/2 AS Total_Assessments_score
            FROM assessments AS a
            INNER JOIN studentAssessment AS sa ON a.id_assessment = sa.id_assessment
            WHERE
                sa.id_student = si.id_student
                AND a.code_module =  si.code_module
                AND a.code_presentation = si.code_presentation
                AND sa.score IS NOT NULL
                AND a.assessment_type <> 'Exam'
        ) +
        (
            SELECT  ISNULL(SUM(sa.score), 0)/2
            FROM assessments AS a
            INNER JOIN studentAssessment AS sa ON a.id_assessment = sa.id_assessment
            WHERE
                a.code_module =  si.code_module
                AND a.code_presentation = si.code_presentation
                AND sa.id_student = si.id_student
                AND a.assessment_type = 'Exam'
        )
    ) AS Total_Mark,

    -- Total resources for the course
    (
        SELECT COUNT(*) AS Total_resources
        FROM vle AS v
        WHERE  v.code_module =  si.code_module
                AND v.code_presentation = si.code_presentation
    ) AS Total_resources,

    -- Total clicks by the student on the course resources
    (
        SELECT SUM(sum_click) AS Total_clicks
        FROM studentVle AS sv
        WHERE  sv.code_module =  si.code_module
                AND sv.code_presentation = si.code_presentation
                AND sv.id_student=si.id_student
    ) AS Total_clicks,
	(
    SELECT
        CASE
            WHEN NULLIF(Total_clicks, 0) IS NOT NULL THEN ROUND(CAST(Total_clicks AS FLOAT) / Total_resources, 2)
            ELSE NULL
        END AS Average_Total_clicks
    FROM
    (
        SELECT
            (
                SELECT COUNT(*) AS Total_resources
                FROM vle AS v
                WHERE  v.code_module =  si.code_module
                        AND v.code_presentation = si.code_presentation
            ) AS Total_resources,
            (
                SELECT SUM(sum_click) AS Total_clicks
                FROM studentVle AS sv
                WHERE  sv.code_module =  si.code_module
                        AND sv.code_presentation = si.code_presentation
                        AND sv.id_student=si.id_student
            ) AS Total_clicks
    ) AS ssd
) AS Average_Total_clicks,


    -- Final result of the student
    si.final_result

-- Joining tables to get required information
FROM
    courses AS c
INNER JOIN
    studentInfo AS si ON c.code_module = si.code_module AND c.code_presentation = si.code_presentation
INNER JOIN
    studentRegistration AS sr ON si.code_module = sr.code_module AND si.code_presentation = sr.code_presentation AND si.id_student = sr.id_student;


"""

# Execute the query and create a new DataFrame
result_df = spark.sql(query)

# Show the new DataFrame
result_df.show()

# If you want to use the new DataFrame for further processing or analysis, you can do so
# For example, you can save it to a new CSV file
result_df.write.csv("/content/drive/My Drive/MalmoMsc/CapstoneProject/DataSet/train.csv", header=True, mode="overwrite")


+-----------+-----------------+----------+------+--------------------+--------------------+--------+--------+--------------------+---------------+----------+-----------------+-------------------+----------------+---------+---------+----------+---------------+---------------+---------------+------------+------------+
|code_module|code_presentation|id_student|gender|   highest_education|              region|imd_band|age_band|num_of_prev_attempts|studied_credits|disability|date_registration|date_unregistration|Assessment_count|TMA_count|CMA_count|Exam_count|TMA_Total_score|CMA_Total_score|Total_resources|Total_clicks|final_result|
+-----------+-----------------+----------+------+--------------------+--------------------+--------+--------+--------------------+---------------+----------+-----------------+-------------------+----------------+---------+---------+----------+---------------+---------------+---------------+------------+------------+
|        AAA|            2013J|     62155|    

In [None]:
!pip install pyspark_dist_explore