<a href="https://colab.research.google.com/github/ardy-dl/big_data_manipulations/blob/main/ardy-dl%20/%20big_data_manipulationsManipulation_as_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install pyspark
from pyspark import SparkContext
sc = SparkContext(appName="performanceFactors")
factorsrdd = sc.textFile("/content/StudentPerformanceFactors.csv")

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=d7d226a6a5eb3318df51d5f27b235aa31683e036739b7d67899f174ce9fcf049
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [25]:
header = factorsrdd.first()  # header
datardd = factorsrdd.filter(lambda row: row != header).map(lambda line: line.split(","))  # rdd

column_names = header.split(",")  # split headers
column_index = {column_names[i].strip(): i for i in range(len(column_names))}  # dictionary for each column

# index of specific columns
exam_score_column = column_index['Exam_Score']
previous_exam_column = column_index['Previous_Scores']
sleep_column = column_index['Sleep_Hours']
hrs_studied_column = column_index['Hours_Studied']
income_column = column_index['Family_Income']
Access_to_Resources = column_index['Access_to_Resources']
Teacher_column = column_index['Teacher_Quality']
School_Type = column_index['School_Type']
Home_Column = column_index['Distance_from_Home']
Attendance_column = column_index["Attendance"]

# Filter students who passed both exams
passed_both_filtered = datardd.filter(lambda row: (
    int(row[previous_exam_column].strip()) >= 70 and
    int(row[exam_score_column].strip()) >= 70
))

# Filter students who passed the previous exam but not the recent one
passed_previous_filtered = datardd.filter(lambda row: (
    int(row[previous_exam_column].strip()) >= 70 and
    int(row[exam_score_column].strip()) < 70
))

# Combine selected columns
both_exam = passed_both_filtered.map(lambda row: (
    int(row[hrs_studied_column].strip()),
    (int(row[previous_exam_column].strip()),
     row[income_column].strip(),
     row[sleep_column].strip() + " hrs",
     int(row[exam_score_column].strip()))
))

previous_exam = passed_previous_filtered.map(lambda row: (
    row[income_column].strip(),
    (int(row[previous_exam_column].strip()),
     int(row[hrs_studied_column].strip()),
     row[sleep_column].strip() + " hrs",
     int(row[exam_score_column].strip()))
))

# Group by Family Income and sort by Previous Exam Score within each group
groupby_income = previous_exam.groupByKey()
sortedby_income = groupby_income.mapValues(lambda records: sorted(records, key=lambda x: x[1]))

# Collect and print the results
groupsorted_data = sortedby_income.collect()

#for group_income, records in groupsorted_data:
#    print("Family Income: ", group_income)
#    for record in records:
#        print(f"  Hours_Studied: {record[1]}, Previous_Score: {record[0]}, Sleep_Hours: {record[2]}, Recent_Exam_Score: {record[3]}")


In [None]:
sc.stop()

In [28]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col, when, count

spark = SparkSession.builder.appName("Student Performance").getOrCreate()

df = datardd.map(lambda row: Row(
    Hours_Studied=int(row[hrs_studied_column].strip()),
    Previous_Scores=int(row[previous_exam_column].strip()),
    Family_Income=row[income_column].strip(),
    Sleep_Hours=int(row[sleep_column].strip()),
    Exam_Score=int(row[exam_score_column].strip()),
    Resources=row[Access_to_Resources].strip(),
    Teacher_Quality=row[Teacher_column].strip(),
    School_Type=row[School_Type].strip(),
    Home_Distance=row[Home_Column].strip(),
    Attendance=row[Attendance_column].strip()
)).toDF()

# select
selected_df = df.select("Hours_Studied", "Previous_Scores", "Family_Income", "Sleep_Hours", "Exam_Score", "Resources", "Teacher_Quality", "School_Type")
# filter
filtered_df = selected_df.filter(selected_df["Previous_Scores"] >= 70)

# adding score range
score_range_df = filtered_df.withColumn(
    "Score_Range",
    when((col("Previous_Scores") >= 90), "Line of 9 and above")
    .when((col("Previous_Scores") >= 80) & (col("Previous_Scores") < 90), "Line of 8")
    .when((col("Previous_Scores") >= 70) & (col("Previous_Scores") < 80), "Line of 7")
)

grouped_score_income_df = score_range_df.groupBy("Resources", "Score_Range").agg(count("*").alias("Total_Students"))

# final results
grouped_score_income_df.orderBy("Resources", "Score_Range").show()

+---------+-------------------+--------------+
|Resources|        Score_Range|Total_Students|
+---------+-------------------+--------------+
|     High|          Line of 7|           395|
|     High|          Line of 8|           412|
|     High|Line of 9 and above|           427|
|      Low|          Line of 7|           271|
|      Low|          Line of 8|           258|
|      Low|Line of 9 and above|           243|
|   Medium|          Line of 7|           660|
|   Medium|          Line of 8|           654|
|   Medium|Line of 9 and above|           721|
+---------+-------------------+--------------+



In [47]:
df.createOrReplaceTempView("student_performance")
# Is there a difference in exam scores between students from private and public?
query = '''SELECT School_Type, COUNT(*) AS Total_Passed, AVG(Exam_Score) AS Avg_Exam_Score
           FROM student_performance
           WHERE Exam_Score >= 70
           GROUP BY School_Type
           ORDER BY Avg_Exam_Score DESC
           '''
schoolType = spark.sql(query)
schoolType.show()

# Does distance from home affects attendance and exam performance?
df.createOrReplaceTempView("student_performance")
query = """SELECT Home_Distance, AVG(Attendance) AS Avg_Attendance, AVG(Exam_Score) AS Avg_Exam_Score
           FROM student_performance
           GROUP BY Home_Distance
           ORDER BY Avg_Exam_Score DESC
           """
result = spark.sql(query)
result.show()

# Is there a threshold for Hours Studying in a week and Hours of Sleep per day after which further studying has diminishing returns on Exam Scores?
df.createOrReplaceTempView("student_performance")
query = '''SELECT
        CASE
            WHEN Hours_Studied BETWEEN 0 and 5 THEN '0-5 Hours'
            WHEN Hours_Studied BETWEEN 6 and 10 THEN '6-10 Hours'
            WHEN Hours_Studied BETWEEN 11 and 15 THEN '11-15 Hours'
            WHEN Hours_Studied BETWEEN 16 and 20 THEN '16-20 Hours'
            ELSE '20+ Hours'
        END AS Hours_Studied,

        CASE
            WHEN Sleep_Hours BETWEEN 0 AND 4 THEN '0-4 hours'
            WHEN Sleep_Hours BETWEEN 5 AND 7 THEN '5-7 hours'
            ELSE '8+ hours'
        END AS Sleep_Hours,

        AVG(Previous_Scores) AS Average_Previous_Scores

        FROM student_performance
        GROUP BY
    CASE
        WHEN Hours_Studied BETWEEN 0 AND 5 THEN '0-5 Hours'
        WHEN Hours_Studied BETWEEN 6 AND 10 THEN '6-10 Hours'
        WHEN Hours_Studied BETWEEN 11 AND 15 THEN '11-15 Hours'
        WHEN Hours_Studied BETWEEN 16 AND 20 THEN '16-20 Hours'
        ELSE '20+ Hours'
    END,
    CASE
        WHEN Sleep_Hours BETWEEN 0 AND 4 THEN '0-4 hours'
        WHEN Sleep_Hours BETWEEN 5 AND 7 THEN '5-7 hours'
        ELSE '8+ hours'
    END
        ORDER BY Average_Previous_Scores DESC
        '''
result = spark.sql(query)
result.show()

+-----------+------------+-----------------+
|School_Type|Total_Passed|   Avg_Exam_Score|
+-----------+------------+-----------------+
|     Public|        1122|71.99643493761141|
|    Private|         503|71.98210735586481|
+-----------+------------+-----------------+

+-------------+-----------------+-----------------+
|Home_Distance|   Avg_Attendance|   Avg_Exam_Score|
+-------------+-----------------+-----------------+
|         Near|80.16786817713697|67.51210092687951|
|     Moderate|79.85435435435436|66.98148148148148|
|          Far|79.43617021276596|66.45744680851064|
|             |77.92537313432835|66.43283582089552|
+-------------+-----------------+-----------------+

+-------------+-----------+-----------------------+
|Hours_Studied|Sleep_Hours|Average_Previous_Scores|
+-------------+-----------+-----------------------+
|   6-10 Hours|  5-7 hours|      77.54913294797687|
|    20+ Hours|  0-4 hours|      76.95744680851064|
|   6-10 Hours|  0-4 hours|      75.88888888888889|
