In [3]:
import os

os.listdir("/home/jovyan/work")


['data', 'notebooks']

In [4]:
os.listdir("/home/jovyan/work/data")


['input']

In [5]:
os.listdir("/home/jovyan/work/data/input")


['world_universities_and_domains.json']

In [6]:
os.listdir("/home/jovyan/work/notebooks")


[]

In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WorldUniversitiesETL").getOrCreate()
spark


In [8]:
path = "/home/jovyan/work/data/input/world_universities_and_domains.json"

df = spark.read.json(path)

df.printSchema()
df.show(5, truncate=False)



root
 |-- _corrupt_record: string (nullable = true)



AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).csv(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().

In [9]:
df = spark.read.option("multiline", "true").json(path)

df.printSchema()
df.show(5, truncate=False)


root
 |-- alpha_two_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- domains: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- name: string (nullable = true)
 |-- state-province: string (nullable = true)
 |-- web_pages: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------+---------+--------------------------------+---------------------------------------------------+--------------+--------------------------+
|alpha_two_code|country  |domains                         |name                                               |state-province|web_pages                 |
+--------------+---------+--------------------------------+---------------------------------------------------+--------------+--------------------------+
|AU            |Australia|[student.eit.edu.au]            |Engineering Institute of Technology                |NULL          |[https://www.eit.edu.au/] |
|ID            |Indonesia|[nusaputra.

In [10]:
print("Rows:", df.count())
print("Columns:", len(df.columns))
print(df.columns)


Rows: 10191
Columns: 6
['alpha_two_code', 'country', 'domains', 'name', 'state-province', 'web_pages']


In [11]:
from pyspark.sql.functions import col, sum as _sum, when

dq_nulls = df.select([
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(f"{c}_nulls")
    for c in ["name", "country", "domains", "web_pages"]
])

dq_nulls.show()


+----------+-------------+-------------+---------------+
|name_nulls|country_nulls|domains_nulls|web_pages_nulls|
+----------+-------------+-------------+---------------+
|         0|            0|            0|              0|
+----------+-------------+-------------+---------------+



In [12]:
duplicates = (
    df.groupBy("name", "country")
      .count()
      .filter(col("count") > 1)
      .orderBy(col("count").desc())
)

duplicates.show(10, truncate=False)


+------------------------------+--------------+-----+
|name                          |country       |count|
+------------------------------+--------------+-----+
|Three Rivers Community College|United States |2    |
|Manchester Community College  |United States |2    |
|Pondicherry University        |India         |2    |
|Southwestern Community College|United States |2    |
|Thomas Jefferson University   |United States |2    |
|Ateneo de Manila University   |Philippines   |2    |
|Glendale Community College    |United States |2    |
|Cambrian College              |Canada        |2    |
|Middlesex Community College   |United States |2    |
|Trinity College Bristol       |United Kingdom|2    |
+------------------------------+--------------+-----+
only showing top 10 rows



In [13]:
from pyspark.sql.functions import size

df.select(
    _sum(when(size(col("domains")) == 0, 1).otherwise(0)).alias("empty_domains"),
    _sum(when(size(col("web_pages")) == 0, 1).otherwise(0)).alias("empty_web_pages")
).show()


+-------------+---------------+
|empty_domains|empty_web_pages|
+-------------+---------------+
|            0|              0|
+-------------+---------------+



In [15]:
from pyspark.sql.functions import col, size

df_clean = (
    df
    .dropna(subset=["name", "country"])
    .withColumn("domains_count", size(col("domains")))
    .withColumn("web_pages_count", size(col("web_pages")))
)


In [16]:
df_clean.show(5, truncate=False)
print("Rows after cleaning:", df_clean.count())


+--------------+---------+--------------------------------+---------------------------------------------------+--------------+--------------------------+-------------+---------------+
|alpha_two_code|country  |domains                         |name                                               |state-province|web_pages                 |domains_count|web_pages_count|
+--------------+---------+--------------------------------+---------------------------------------------------+--------------+--------------------------+-------------+---------------+
|AU            |Australia|[student.eit.edu.au]            |Engineering Institute of Technology                |NULL          |[https://www.eit.edu.au/] |1            |1              |
|ID            |Indonesia|[nusaputra.ac.id]               |Universitas Nusa Putra                             |NULL          |[https://nusaputra.ac.id] |1            |1              |
|GH            |Ghana    |[regent.edu.gh]                 |Regent University Col

In [17]:
from pyspark.sql.functions import col, count, avg

# Agrégation par pays
agg_country = (
    df_clean
    .groupBy("country")
    .agg(
        count("*").alias("universities_count"),
        avg(col("domains_count")).alias("avg_domains_per_university"),
        avg(col("web_pages_count")).alias("avg_webpages_per_university")
    )
    .orderBy(col("universities_count").desc())
)

agg_country.show(10, truncate=False)


+------------------+------------------+--------------------------+---------------------------+
|country           |universities_count|avg_domains_per_university|avg_webpages_per_university|
+------------------+------------------+--------------------------+---------------------------+
|United States     |2349              |1.0208599404001704        |1.0063856960408684         |
|Japan             |572               |1.0034965034965035        |1.0                        |
|India             |474               |1.0253164556962024        |1.0168776371308017         |
|China             |398               |1.0175879396984924        |1.0100502512562815         |
|Germany           |318               |1.0817610062893082        |1.0377358490566038         |
|Russian Federation|309               |1.022653721682848         |1.0129449838187703         |
|France            |297               |1.0875420875420876        |1.0404040404040404         |
|Korea, Republic of|244               |1.110655737

In [18]:
from pyspark.sql.functions import when

if "state-province" in df_clean.columns:
    agg_country_state = (
        df_clean
        .withColumn("state_province", col("state-province"))
        .groupBy("country", "state_province")
        .agg(count("*").alias("universities_count"))
        .orderBy(col("universities_count").desc())
    )
    agg_country_state.show(10, truncate=False)


+------------------+--------------+------------------+
|country           |state_province|universities_count|
+------------------+--------------+------------------+
|United States     |NULL          |2239              |
|Japan             |NULL          |571               |
|China             |NULL          |390               |
|Russian Federation|NULL          |309               |
|France            |NULL          |292               |
|Germany           |NULL          |291               |
|Korea, Republic of|NULL          |244               |
|India             |NULL          |228               |
|Indonesia         |NULL          |192               |
|United Kingdom    |NULL          |187               |
+------------------+--------------+------------------+
only showing top 10 rows



In [19]:
jdbc_url = "jdbc:postgresql://postgres:5432/universities_dw"
props = {
    "user": "de_user",
    "password": "de_pass",
    "driver": "org.postgresql.Driver"
}

(agg_country.write
    .mode("overwrite")
    .jdbc(jdbc_url, "universities_by_country", properties=props)
)

print("✅ Loaded table: universities_by_country")


Py4JJavaError: An error occurred while calling o153.jdbc.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:246)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:250)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:756)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


In [20]:
spark.stop()

In [21]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("WorldUniversitiesETL")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.3")
    .getOrCreate()
)


In [22]:
jdbc_url = "jdbc:postgresql://postgres:5432/universities_dw"
props = {"user": "de_user", "password": "de_pass", "driver": "org.postgresql.Driver"}

(agg_country.write
    .mode("overwrite")
    .jdbc(jdbc_url, "universities_by_country", properties=props)
)

print("✅ Loaded table: universities_by_country")


Py4JJavaError: An error occurred while calling o184.jdbc.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:246)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:250)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:756)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


In [23]:
import os

jar_dir = "/home/jovyan/work/jars"
os.makedirs(jar_dir, exist_ok=True)

jar_path = f"{jar_dir}/postgresql-42.7.3.jar"

# Téléchargement depuis Maven Central
!wget -O {jar_path} https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.3/postgresql-42.7.3.jar

!ls -lh /home/jovyan/work/jars


--2025-12-26 17:13:12--  https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.3/postgresql-42.7.3.jar
Resolving repo1.maven.org (repo1.maven.org)... 104.18.19.12, 104.18.18.12
Connecting to repo1.maven.org (repo1.maven.org)|104.18.19.12|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1089312 (1.0M) [application/java-archive]
Saving to: ‘/home/jovyan/work/jars/postgresql-42.7.3.jar’


2025-12-26 17:13:12 (10.3 MB/s) - ‘/home/jovyan/work/jars/postgresql-42.7.3.jar’ saved [1089312/1089312]

total 1.1M
-rw-r--r-- 1 jovyan users 1.1M Mar 14  2024 postgresql-42.7.3.jar


In [24]:
spark.stop()


In [25]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("WorldUniversitiesETL")
    .config("spark.jars", "/home/jovyan/work/jars/postgresql-42.7.3.jar")
    .getOrCreate()
)


In [26]:
jdbc_url = "jdbc:postgresql://postgres:5432/universities_dw"
props = {"user": "de_user", "password": "de_pass", "driver": "org.postgresql.Driver"}

(agg_country.write
    .mode("overwrite")
    .jdbc(jdbc_url, "universities_by_country", properties=props)
)

print("✅ Loaded table: universities_by_country")


Py4JJavaError: An error occurred while calling o215.jdbc.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:246)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:250)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:756)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


In [27]:
!ls -lh /home/jovyan/work/jars


total 1.1M
-rw-r--r-- 1 jovyan users 1.1M Mar 14  2024 postgresql-42.7.3.jar


In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("WorldUniversitiesETL")
    .config("spark.jars", "/home/jovyan/work/jars/postgresql-42.7.3.jar")
    .config("spark.driver.extraClassPath", "/home/jovyan/work/jars/postgresql-42.7.3.jar")
    .getOrCreate()
)


In [2]:
spark._jvm.java.lang.Class.forName("org.postgresql.Driver")
print("✅ PostgreSQL JDBC driver loaded")


✅ PostgreSQL JDBC driver loaded


In [3]:
jdbc_url = "jdbc:postgresql://postgres:5432/universities_dw"
props = {"user": "de_user", "password": "de_pass", "driver": "org.postgresql.Driver"}

(agg_country.write
    .mode("overwrite")
    .jdbc(jdbc_url, "universities_by_country", properties=props)
)

print("✅ Loaded table: universities_by_country")


NameError: name 'agg_country' is not defined

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WorldUniversitiesETL").getOrCreate()


In [5]:
path = "/home/jovyan/work/data/input/world_universities_and_domains.json"
df = spark.read.option("multiline", "true").json(path)


In [6]:
from pyspark.sql.functions import col, size

df_clean = (
    df.dropna(subset=["name", "country"])
      .withColumn("domains_count", size(col("domains")))
      .withColumn("web_pages_count", size(col("web_pages")))
)


In [7]:
from pyspark.sql.functions import count, avg

agg_country = (
    df_clean.groupBy("country")
      .agg(
          count("*").alias("universities_count"),
          avg(col("domains_count")).alias("avg_domains_per_university"),
          avg(col("web_pages_count")).alias("avg_webpages_per_university"),
      )
      .orderBy(col("universities_count").desc())
)

agg_country.show(10, truncate=False)


+------------------+------------------+--------------------------+---------------------------+
|country           |universities_count|avg_domains_per_university|avg_webpages_per_university|
+------------------+------------------+--------------------------+---------------------------+
|United States     |2349              |1.0208599404001704        |1.0063856960408684         |
|Japan             |572               |1.0034965034965035        |1.0                        |
|India             |474               |1.0253164556962024        |1.0168776371308017         |
|China             |398               |1.0175879396984924        |1.0100502512562815         |
|Germany           |318               |1.0817610062893082        |1.0377358490566038         |
|Russian Federation|309               |1.022653721682848         |1.0129449838187703         |
|France            |297               |1.0875420875420876        |1.0404040404040404         |
|Korea, Republic of|244               |1.110655737

In [8]:
jdbc_url = "jdbc:postgresql://postgres:5432/universities_dw"
props = {"user": "de_user", "password": "de_pass", "driver": "org.postgresql.Driver"}

(agg_country.write
    .mode("overwrite")
    .jdbc(jdbc_url, "universities_by_country", properties=props)
)

print("✅ Loaded table: universities_by_country")


✅ Loaded table: universities_by_country
