In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

# 1. Создаем SparkSession
spark = SparkSession.builder \
    .appName("Spark SQL with PostgreSQL and ClickHouse") \
    .config("spark.jars", "postgresql-42.6.0.jar,clickhouse-jdbc-0.4.6.jar") \
    .getOrCreate()

# 2. Создаем DataFrame
data = [
    ("Alice", "Sales", 5000),
    ("Bob", "Sales", 6000),
    ("Charlie", "HR", 4000),
    ("David", "HR", 4500),
    ("Eve", "Sales", 5500)
]
columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)

# Показываем исходный DataFrame
print("Исходные данные:")
display(df.toPandas())

# 3. Записываем данные в PostgreSQL
pg_jdbc_url = "jdbc:postgresql://postgres:5432/spark_db"
properties = {
    "user": "spark_user",
    "password": "spark_password",
    "driver": "org.postgresql.Driver"
}

# Записываем DataFrame в таблицу 'employees'
df.write.jdbc(url=pg_jdbc_url, table="employees", mode="overwrite", properties=properties)

# 4. Читаем данные из PostgreSQL
df_from_postgres = spark.read.jdbc(url=pg_jdbc_url, table="employees", properties=properties)

print("Данные, прочитанные из PostgreSQL:")
display(df_from_postgres.toPandas())

# 5. Выполняем агрегацию данных
# Группируем по департаменту и считаем общую зарплату
aggregated_df = df_from_postgres.groupBy("Department").agg(sum("Salary").alias("Total_Salary"))

print("Агрегированные данные (сумма зарплат по департаментам):")
pandas_df = aggregated_df.toPandas()
display(pandas_df)

# Записываем данные в ClickHouse через JDBC
ch_jdbc_url = "jdbc:clickhouse://clickhouse:8123/default"
properties = {
    "driver": "com.clickhouse.jdbc.ClickHouseDriver",
    "user": "custom_user",
    "password": "custom_password"
}

# # Записываем данные в ClickHouse
# aggregated_df.write.jdbc(url=ch_jdbc_url, table="report_ch", mode="append", properties=properties)

# Читаем данные из ClickHouse
ch_df = spark.read.jdbc(url=ch_jdbc_url, table="test_table", properties=properties)
ch_df.toPandas()
# Создаем временное view
# ch_df.createOrReplaceTempView("report_ch")

# ch_pandas_df = spark.sql("select Total_Salary from report_ch where Department = 'HR' limit 1").toPandas()

# print("Агрегированные данные (сумма зарплат по департаментам) из ClickHouse где выбран департамент HR:")
# display(ch_pandas_df)

# 6. Останавливаем SparkSession


Исходные данные:


Unnamed: 0,Name,Department,Salary
0,Alice,Sales,5000
1,Bob,Sales,6000
2,Charlie,HR,4000
3,David,HR,4500
4,Eve,Sales,5500


Данные, прочитанные из PostgreSQL:


Unnamed: 0,Name,Department,Salary
0,Alice,Sales,5000
1,Bob,Sales,6000
2,Charlie,HR,4000
3,Eve,Sales,5500
4,David,HR,4500


Агрегированные данные (сумма зарплат по департаментам):


Unnamed: 0,Department,Total_Salary
0,Sales,16500
1,HR,8500


Py4JJavaError: An error occurred while calling o73.jdbc.
: java.sql.BatchUpdateException: Code: 60. DB::Exception: Unknown table expression identifier 'test_table' in scope SELECT * FROM test_table WHERE 1 = 0. (UNKNOWN_TABLE) (version 25.5.1.2782 (official build))
, server ClickHouseNode [uri=http://clickhouse:8123/default]@-1121396271
	at com.clickhouse.jdbc.SqlExceptionUtils.batchUpdateError(SqlExceptionUtils.java:107)
	at com.clickhouse.jdbc.internal.SqlBasedPreparedStatement.executeAny(SqlBasedPreparedStatement.java:219)
	at com.clickhouse.jdbc.internal.SqlBasedPreparedStatement.executeQuery(SqlBasedPreparedStatement.java:246)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.getQueryOutputSchema(JDBCRDD.scala:68)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:58)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:241)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:37)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:249)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
