In [1]:
import pyspark
from pyspark.sql import SparkSession
import os
import numpy as np
import xgboost as xgb

import matplotlib.pyplot as plt

from sklearn.metrics import mean_squared_error

from pyspark import SparkConf, SparkContext

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.functions import udf, col, lit, col,isnan, when, count

from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.feature import StringIndexer

spark = SparkSession.builder \
    .master("spark://de-data.us-west4-b.c.dtc-de-379809.internal:7077") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/05/24 18:40:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
!pip install xgboost



In [3]:
spark

In [4]:
df_green = spark.read.parquet('2019_2023_rankings.parquet')

                                                                                

In [None]:
# List of top 10 universities (from actual data)
# List of top 10 universities (from your model's prediction)
# List of worst 10 universities (from actual data)
# List of worst 10 universities (from your model's prediction)
# Average rank of universities from each country
# Best university in each country
# Worst university in each country

In [7]:
df_green.groupBy("name").agg(f.max(col("rank")), f.min("rank")).show(truncate=False)

[Stage 1:>                                                          (0 + 1) / 1]

+----------------------------------------+---------+---------+
|name                                    |max(rank)|min(rank)|
+----------------------------------------+---------+---------+
|AECC University College                 |Reporter |Reporter |
|AGH University of Science and Technology|801–1000 |1001+    |
|AKFA University                         |Reporter |Reporter |
|Aalborg University                      |251–300  |194      |
|Aalto University                        |201–250  |181      |
|Aarhus University                       |123      |104      |
|Abdelmalek Essaâdi University           |1201–1500|1201–1500|
|Abdul Wali Khan University Mardan       |601–800  |501–600  |
|Abdullah Gül University                 |Reporter |Reporter |
|Abertay University                      |Reporter |Reporter |
|Aberystwyth University                  |501–600  |351–400  |
|Abu Dhabi University                    |301–350  |301–350  |
|Academy of Economic Studies of Moldova  |Reporter |Rep

                                                                                

In [8]:
df_green.select("year", "name", col("rank")).filter(col("name")=="Aalto University").show(truncate=False)

+----+----------------+-------+
|year|name            |rank   |
+----+----------------+-------+
|2023|Aalto University|201–250|
|2022|Aalto University|201–250|
|2021|Aalto University|201–250|
|2020|Aalto University|184    |
|2019|Aalto University|181    |
+----+----------------+-------+



In [9]:
ranking_df_clean = (df_green.withColumn("rank", f.split(col("rank"), '[–=]'))
                    .withColumn("scores_overall", f.split(col("scores_overall"), '[—–]'))
                   )

In [11]:
ranking_df_clean = (ranking_df_clean
                    .withColumn("min_rank", col("rank").getItem(0).cast("int"))
                    .withColumn("max_rank", col("rank").getItem(1).cast("int"))
                    # if min_rank or max_rank is null, then copy the other value to the null column (min_rank = max_rank)
                    .withColumn("min_rank", f.when(col("min_rank").isNull(), col("max_rank")).otherwise(col("min_rank")))
                    .withColumn("max_rank", f.when(col("max_rank").isNull(), col("min_rank")).otherwise(col("max_rank")))
                    # repeat the same pattern for scores_overall column
                    .withColumn("min_scores_overall", col("scores_overall").getItem(0).cast("float"))
                    .withColumn("max_scores_overall", col("scores_overall").getItem(1).cast("float"))
                    .withColumn("min_scores_overall", f.when(col("min_scores_overall").isNull(), col("max_scores_overall")).otherwise(col("min_scores_overall")))
                    .withColumn("max_scores_overall", f.when(col("max_scores_overall").isNull(), col("min_scores_overall")).otherwise(col("max_scores_overall")))
                    .drop("rank", "scores_overall")
                   )

In [13]:
ranking_df_clean.limit(1).show()

23/05/24 18:45:43 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+----------+--------------------+-------------------+---------------+--------------------+---------------+--------------------+----------------+---------------------+----------------------+---------------------------+----------------------------+---------------------------------+--------------+---------------------+-------------------------+----------------------+-----------------------+--------------------+--------------------+------+------------+----+-----------------+--------+--------+------------------+------------------+
|rank_order|                name|scores_overall_rank|scores_teaching|scores_teaching_rank|scores_research|scores_research_rank|scores_citations|scores_citations_rank|scores_industry_income|scores_industry_income_rank|scores_international_outlook|scores_international_outlook_rank|      lo

In [14]:
ranking_df_clean.printSchema()

root
 |-- rank_order: long (nullable = true)
 |-- name: string (nullable = true)
 |-- scores_overall_rank: long (nullable = true)
 |-- scores_teaching: double (nullable = true)
 |-- scores_teaching_rank: long (nullable = true)
 |-- scores_research: double (nullable = true)
 |-- scores_research_rank: long (nullable = true)
 |-- scores_citations: double (nullable = true)
 |-- scores_citations_rank: long (nullable = true)
 |-- scores_industry_income: double (nullable = true)
 |-- scores_industry_income_rank: long (nullable = true)
 |-- scores_international_outlook: double (nullable = true)
 |-- scores_international_outlook_rank: long (nullable = true)
 |-- location: string (nullable = true)
 |-- stats_number_students: string (nullable = true)
 |-- stats_student_staff_ratio: double (nullable = true)
 |-- stats_pc_intl_students: string (nullable = true)
 |-- stats_female_male_ratio: string (nullable = true)
 |-- aliases: string (nullable = true)
 |-- subjects_offered: string (nullable = tru

In [15]:
columns_list = ["stats_number_students", "stats_pc_intl_students" ,"stats_female_male_ratio" ]

for c in columns_list:
    ranking_df_clean.select(col(c).cast("float").alias(c)).select([count(when(isnan(c) | col(c).isNull(), c)).alias(c)]).show()

+---------------------+
|stats_number_students|
+---------------------+
|                 8527|
+---------------------+

+----------------------+
|stats_pc_intl_students|
+----------------------+
|                  8638|
+----------------------+

+-----------------------+
|stats_female_male_ratio|
+-----------------------+
|                   8638|
+-----------------------+



In [16]:
ranking_df_clean = (ranking_df_clean
                    # we need to replace the commas here with dots to make this column castable to float
                    .withColumn("stats_number_students", f.regexp_replace(col("stats_number_students"),',', '.').cast("float"))
                    # we need to remove the percentage sign(%) from this column
                    .withColumn("stats_pc_intl_students", f.split(col("stats_pc_intl_students"),"%").getItem(0).cast("float"))
                    # map 48:52 type of ratio to a number between 0 and 1
                    .withColumn("stats_female_male_ratio", f.split(col("stats_female_male_ratio"),":").getItem(0).cast("int")/f.split(col("stats_female_male_ratio"),":").getItem(1).cast("int"))
                   )

In [21]:
list_of_float_cols = ["min_rank", "max_rank", "min_scores_overall", "max_scores_overall", "scores_teaching", "scores_research", "scores_citations", "scores_industry_income", "scores_international_outlook"]
for col_name in list_of_float_cols:
    ranking_df_clean = ranking_df_clean.withColumn(col_name, col(col_name).cast("float"))

In [23]:
name_indexer = StringIndexer(inputCol='name', outputCol='university_id')
ranking_df_clean = name_indexer.fit(ranking_df_clean).transform(ranking_df_clean)

                                                                                

In [24]:
name_to_index_df = ranking_df_clean.select("name","university_id").distinct()
ranking_df_clean = ranking_df_clean.drop("name")

In [25]:
location_indexer = StringIndexer(inputCol='location', outputCol='location_id')
ranking_df_clean = location_indexer.fit(ranking_df_clean).transform(ranking_df_clean)

In [26]:
location_to_index_df = ranking_df_clean.select("location","location_id","university_id").distinct()
ranking_df_clean = ranking_df_clean.drop("location").drop("aliases").drop("subjects_offered")

In [27]:
ranking_df_clean = (ranking_df_clean
                    .withColumn("unaccredited", f.when(col('unaccredited') == "True", 1).otherwise(0).cast("boolean"))
                    .withColumn("closed", f.when(col('closed') == "True", 1).otherwise(0).cast("boolean"))
                   )

In [28]:
ranking_df_clean.select("scores_research", "scores_research_rank").filter(col("scores_research_rank")>90).orderBy("scores_research_rank").limit(100).show()

+---------------+--------------------+
|scores_research|scores_research_rank|
+---------------+--------------------+
|           58.2|                  91|
|           57.8|                  91|
|           57.0|                  91|
|           56.1|                  91|
|           54.1|                  91|
|           58.2|                  92|
|           57.5|                  92|
|           56.6|                  92|
|           55.6|                  92|
|           54.1|                  92|
|           58.2|                  93|
|           56.9|                  93|
|           56.4|                  93|
|           55.6|                  93|
|           53.7|                  93|
|           58.0|                  94|
|           56.8|                  94|
|           56.4|                  94|
|           55.5|                  94|
|           53.6|                  94|
+---------------+--------------------+
only showing top 20 rows



In [29]:
list_of_drop_cols = ["min_scores_overall", "max_scores_overall", "rank_order", "scores_overall_rank", 
                     "scores_teaching_rank", "scores_research_rank", "scores_citations_rank", 
                     "scores_industry_income_rank", "scores_international_outlook_rank",
                     "stats_number_students", "stats_student_staff_ratio", "stats_pc_intl_students",
                     "stats_female_male_ratio"]
ranking_df_clean = ranking_df_clean.drop(*list_of_drop_cols)

In [30]:
ranking_df_clean.printSchema()

root
 |-- scores_teaching: float (nullable = true)
 |-- scores_research: float (nullable = true)
 |-- scores_citations: float (nullable = true)
 |-- scores_industry_income: float (nullable = true)
 |-- scores_international_outlook: float (nullable = true)
 |-- closed: boolean (nullable = false)
 |-- unaccredited: boolean (nullable = false)
 |-- year: long (nullable = true)
 |-- __index_level_0__: long (nullable = true)
 |-- min_rank: float (nullable = true)
 |-- max_rank: float (nullable = true)
 |-- university_id: double (nullable = false)
 |-- location_id: double (nullable = false)



In [31]:
ranking_df_clean = ranking_df_clean.dropna(subset = ["min_rank"])

In [32]:
ranking_df_clean = ranking_df_clean.select(*[
 'scores_teaching',
 'scores_research',
 'scores_citations',
 'scores_industry_income',
 'scores_international_outlook',
 'closed',
 'unaccredited',
 'year',
 'university_id',
 'location_id',
 'min_rank',
])

In [33]:
ranking_df_clean.printSchema()

root
 |-- scores_teaching: float (nullable = true)
 |-- scores_research: float (nullable = true)
 |-- scores_citations: float (nullable = true)
 |-- scores_industry_income: float (nullable = true)
 |-- scores_international_outlook: float (nullable = true)
 |-- closed: boolean (nullable = false)
 |-- unaccredited: boolean (nullable = false)
 |-- year: long (nullable = true)
 |-- university_id: double (nullable = false)
 |-- location_id: double (nullable = false)
 |-- min_rank: float (nullable = true)



In [34]:
train_df = ranking_df_clean.na.fill(0).filter(col("year") < 2023)
test_df = ranking_df_clean.na.fill(0).filter(col("year") == 2023)

# Show the number of rows in each set
print(f"Number of rows in train_df: {train_df.count()}")
print(f"Number of rows in test_df: {test_df.count()}")

Number of rows in train_df: 4203
Number of rows in test_df: 1509


In [35]:
d_train = train_df.rdd.map(lambda line: LabeledPoint(line[10],line[:-1]))
d_train.take(1) 

                                                                                

[LabeledPoint(1.0, [91.0,99.5999984741211,98.0,74.4000015258789,96.30000305175781,0.0,0.0,2022.0,964.0,2.0])]

In [36]:
d_test = test_df.rdd.map(lambda line: LabeledPoint(line[10],line[:-1]))
d_test.take(1) 

                                                                                

[LabeledPoint(1.0, [92.30000305175781,99.69999694824219,99.0,74.9000015258789,96.19999694824219,0.0,0.0,2023.0,964.0,2.0])]