#Imports

In [0]:
from pyspark.sql.functions import col, cast

#Util functions

In [0]:
def saveCleanDataFrameAsCSV(dataframe, filename):
  dataframe.write.mode("overwrite").csv(f'/mnt/university-data/clean-data/{filename}', header=True)

def saveBusinessDataFrameAsCSV(dataframe, filename):
  pandasDF = dataframe.toPandas()
  pandasDF.to_csv(f'/dbfs/mnt/university-data/business-data/{filename}.csv', index=False)


# Mounting

In [0]:
mount_point = '/mnt/university-data'
mounts = dbutils.fs.mounts()
if len(mounts) == 0 or all(mount.mountPoint != mount_point for mount in dbutils.fs.mounts()):
    dbutils.fs.mount(
    source='wasbs://university-data@moviesdatas.blob.core.windows.net',
    mount_point=mount_point,
    extra_configs = {'fs.azure.account.key.moviesdatas.blob.core.windows.net': dbutils.secrets.get('MovieProjectScope', 'StorageAccountKey')}
)



#Cleaning Data 

##Cleaning University Rankings CSV

In [0]:
from pyspark.sql.functions import format_number

universityRankingsDF = spark.read.csv('/mnt/university-data/raw-data/World University Rankings 2016-2025.csv', header=True, inferSchema=True)
universityRankingsDF.display()
# format and drop columns
universityRankingsDFClean = universityRankingsDF.drop("Female to Male Ratio")\
    .withColumn("Overall Score", format_number(col("Overall Score"), 3))\
    .drop("International Students")\
    .drop("International Outlook")\
    .drop("Research Quality")\
    .drop("Research Environment")\
    .drop("Industry Impact")

universityRankingsDFClean = universityRankingsDFClean.withColumn("Student Population", col("Student Population").cast("int"))

saveCleanDataFrameAsCSV(universityRankingsDFClean, 'universityRankings')
universityRankingsDFClean.display()

##Cleaning Internal Education Costs CSV

In [0]:
internationalEducationCostsDF = spark.read.csv('/mnt/university-data/raw-data/International_Education_Costs.csv', header=True, inferSchema=True)
internationalEducationCostsDFClean = internationalEducationCostsDF.drop("Living_Cost_Index")
internationalEducationCostsDFClean = internationalEducationCostsDFClean.withColumn("Total_Cost_USD", col("Tuition_USD") + ((col("Duration_Years") * 12) * col("Rent_USD")) + col("Visa_Fee_USD") + (col("Duration_Years") * col("Insurance_USD")))
internationalEducationCostsDFClean = internationalEducationCostsDFClean.drop("City")

saveCleanDataFrameAsCSV(internationalEducationCostsDFClean, 'internationalEducationCosts')
internationalEducationCostsDFClean.display()

In [0]:
#Getting max year value used in later Queries
RANKING_MAX_YEAR = universityRankingsDFClean.select("Year").agg({"Year": "max"}).collect()[0][0]

##Joining data sets

In [0]:
universityRankingsDFLastestYear = universityRankingsDFClean.filter(col("Year") == RANKING_MAX_YEAR) 
rankingsWithCostsDF = internationalEducationCostsDFClean.join(universityRankingsDFLastestYear, internationalEducationCostsDFClean["University"] == universityRankingsDFLastestYear["Name"], "inner")
rankingsWithCostsDF = rankingsWithCostsDF.drop(internationalEducationCostsDFClean["Country"]).drop("Name")

saveCleanDataFrameAsCSV(rankingsWithCostsDF, 'rankingsWithCosts')
rankingsWithCostsDF.display()

#Business Question

##Finding the Best University to Study in

###Best University per Country

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

windowSpec = Window.partitionBy("Country").orderBy("Rank")
bestRankingUniversitiesDF = universityRankingsDFClean.filter(col("Year") == RANKING_MAX_YEAR) \
    .withColumn("row_number", row_number().over(windowSpec)) \
    .filter(col("row_number") == 1) \
    .drop("row_number")\
    .orderBy("Rank")

saveBusinessDataFrameAsCSV(bestRankingUniversitiesDF, 'BestUniversitiesPerCountry')

bestRankingUniversitiesDF.display()

###Best Country

In [0]:
from pyspark.sql.functions import avg, count, min, max

countryStatsDF = universityRankingsDFClean.filter(col("Year") == RANKING_MAX_YEAR) \
    .groupBy("Country") \
    .agg(
        avg("Overall Score").alias("Average_Score"),
        count("*").alias("Number_of_Universities"),
        min("Overall Score").alias("Min_Score"),
        max("Overall Score").alias("Max_Score")
    ) \
    .orderBy("Average_Score", ascending=False)

saveBusinessDataFrameAsCSV(countryStatsDF, 'CountryScoresLastestYear')

countryStatsDF.display()

Databricks visualization. Run in Databricks to view.

###Rankings over time

###Rankings of top 10 universities Over Time

In [0]:
from pyspark.sql.functions import avg

top10Unis = universityRankingsDFClean.filter(col("Year") == RANKING_MAX_YEAR).orderBy("Overall Score", ascending=False).limit(10).select("Name").collect()

universityOverallScoreDF = universityRankingsDFClean.filter(col("Name").isin([row['Name'] for row in top10Unis])).groupBy("Name", "Year") \
    .agg(avg("Overall Score").alias("Overall_Score")) \
    .orderBy("Name", "Year")

saveBusinessDataFrameAsCSV(universityOverallScoreDF, 'Top10UniversitiesOverallScoreOverTime')

display(universityOverallScoreDF)

Databricks visualization. Run in Databricks to view.

###Rankings of top 10 Countries Over Time

In [0]:
top10Counties = countryStatsDF.orderBy("Average_Score", ascending=False).limit(10).select("Country").collect()

countryStatsOverTimeDF = universityRankingsDFClean.filter(col("Country").isin([row['Country'] for row in top10Counties])) \
    .groupBy("Country", "Year") \
    .agg(
        avg("Overall Score").alias("Average_Score"),
        min("Overall Score").alias("Min_Score"),
        max("Overall Score").alias("Max_Score")
    ) \
    .orderBy("Average_Score", ascending=False)

saveBusinessDataFrameAsCSV(countryStatsOverTimeDF, 'CountryOverallScoresOverTime')

countryStatsOverTimeDF.display()

Databricks visualization. Run in Databricks to view.

##Best Cost to Ranking

###Value for money Per country

In [0]:
def rankingWithCostsForLevel(rankingDataFrame, level):
  countryOverallScoreAverageCost = rankingDataFrame.filter(col("Level") == level).groupBy("Country", "Overall Score")\
  .agg(
    avg("Total_Cost_USD").alias("Average_Cost"),
  )

  return countryOverallScoreAverageCost.withColumn("Score_to_Cost_Ratio", col("Overall Score") / (col("Average_Cost") / 1000)) \
    .orderBy(col("Score_to_Cost_Ratio"), ascending=False)

bachelorRankingWithCosts = rankingWithCostsForLevel(rankingsWithCostsDF, "Bachelor")
saveBusinessDataFrameAsCSV(bachelorRankingWithCosts, 'BachelorRankingWithCosts')
bachelorRankingWithCosts.display()

masterRankingWithCosts = rankingWithCostsForLevel(rankingsWithCostsDF, "Master")
saveBusinessDataFrameAsCSV(masterRankingWithCosts, 'MasterRankingWithCosts')
masterRankingWithCosts.display()

phDRankingWithCosts = rankingWithCostsForLevel(rankingsWithCostsDF, "PhD")
saveBusinessDataFrameAsCSV(phDRankingWithCosts, 'phDRankingWithCosts')
phDRankingWithCosts.display()

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

##Impact of Student staff ratio on teaching ranking

In [0]:
from pyspark.sql.functions import col, avg
import matplotlib.pyplot as plt
import numpy as np

# Group by student-to-staff ratio and calculate the average teaching score
studentStaffImpactDF = rankingsWithCostsDF.groupBy("Students to Staff Ratio") \
    .agg(
        avg("Teaching").alias("Average_Teaching_Score")
    ) \
    .orderBy("Students to Staff Ratio")

saveBusinessDataFrameAsCSV(studentStaffImpactDF, 'StudentStaffRatioImpact')

display(studentStaffImpactDF)

pandasDF = studentStaffImpactDF.toPandas()

# Scatter plot
plt.figure(figsize=(10, 6))
plt.scatter(pandasDF["Students to Staff Ratio"], pandasDF["Average_Teaching_Score"], color='blue', label='Average Teaching Score')

# Fit a linear trend line
z = np.polyfit(pandasDF["Students to Staff Ratio"], pandasDF["Average_Teaching_Score"], 1)
p = np.poly1d(z)
plt.plot(pandasDF["Students to Staff Ratio"], p(pandasDF["Students to Staff Ratio"]), color='red', label='Trend')

# Add labels and title
plt.xlabel("Students to Staff Ratio")
plt.ylabel("Average Teaching Score")
plt.title("Students to Staff Ratio vs. Teaching Score")
plt.legend()
plt.grid(True)
plt.show()


Databricks visualization. Run in Databricks to view.