# Шаблон SparkSession

## Запуск универсального SparkSession

In [None]:
import os
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("S3Example") \
    .config("spark.jars.packages",
        "org.apache.hadoop:hadoop-aws:3.3.4,"
        "com.amazonaws:aws-java-sdk-bundle:1.12.262,"
        "ru.yandex.clickhouse:clickhouse-jdbc:0.3.2,"
        "org.postgresql:postgresql:42.5.0,"
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
    .getOrCreate()


hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", os.getenv("MINIO_ROOT_USER"))
hadoop_conf.set("fs.s3a.secret.key", os.getenv("MINIO_ROOT_PASSWORD"))
hadoop_conf.set("fs.s3a.endpoint", "http://minio:9000")
hadoop_conf.set("fs.s3a.connection.ssl.enabled", "false")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.path.style.access", "true")

:: loading settings :: url = jar:file:/usr/local/lib/python3.11/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
ru.yandex.clickhouse#clickhouse-jdbc added as a dependency
org.postgresql#postgresql added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4ffbf4ce-f085-4c72-a218-007a67e267d9;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found ru.yandex.clickhouse#clickhouse-jdbc;0.3.2 in central
	found com.clickhouse#clickhouse-http-client;0.3.2 in central
	found com.clickhouse#clickhouse-client;0.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found com.google.code.gson#gson;2.8.8 in central
	found org.apache.httpcomponents#httpclient;4.5

### Чтение и запись в S3

In [2]:

s3_path_regions = "s3a://dev/test_regions/*.parquet"
dev_path = "s3a://prod/jdbc/regions"

# Чтение JSON-файла
df = spark.read.parquet(s3_path_regions)
df.show() 

df.write.mode("overwrite").parquet(dev_path)



25/06/22 10:20:17 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

+--------------------+--------------------+--------------------+
|       place_pattern|              region|          place_hash|
+--------------------+--------------------+--------------------+
|south of the Fiji...|                Fiji|89bd0962a3404155c...|
|         Fiji region|                Fiji|d60abbbcdfcba1907...|
|     West Chile Rise|               Chile|199b8a7ca0b59d483...|
|South Georgia Isl...|South Georgia Island|7b5e0ccc0ccc98fa3...|
|Pacific-Antarctic...|      Southern Ocean|b4f9599d8acce224d...|
|    Mid-Indian Ridge|        Indian Ocean|ad7a1be84850ec6f3...|
|western Indian-An...|      Southern Ocean|6f934ded11be8ea61...|
|Kermadec Islands ...|         New Zealand|f99b416f756e80bcf...|
|southern East Pac...|       Pacific Ocean|9ba088ca77cc2f6b8...|
|South Sandwich Is...|South Sandwich Is...|c3b754373563b27ef...|
+--------------------+--------------------+--------------------+



### Чтение и запись в Clickhouse

In [4]:
# ⬇️ Параметры подключения к CLICKHOUSE
jdbc_url = 'jdbc:clickhouse://ru.tuna.am:31086/default'
jdbc_url_dev = 'jdbc:clickhouse://ru.tuna.am:31086/dev'
db_user = os.getenv('CLICKHOUSE_USER')
db_password = os.getenv('CLICKHOUSE_PASSWORD')
table_name = 'enriched_earthquakes'


# Чтение таблицы из ClickHouse
enriched = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("user", db_user) \
    .option("password", db_password) \
    .option("dbtable", table_name) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .load()

enriched.show()


enriched.write \
    .format("jdbc") \
    .option("url", jdbc_url_dev) \
    .option("user", db_user) \
    .option("password", db_password) \
    .option("dbtable", table_name) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("createTableOptions", """
            ENGINE = ReplacingMergeTree(updated_at)
            PARTITION BY toYYYYMM(load_date)
            ORDER BY (load_date, id)
        """) \
    .mode("append") \
    .save()

print("Таблица сохранена в Clickhouse")

+------------+-------------------+--------------------+----------------+---------+----+-------+--------------------+-----------------+----------------+-----+----------+-------------------+
|          id|                 ts|               place|          region|magnitude|felt|tsunami|                 url|        longitude|        latitude|depth| load_date|         updated_at|
+------------+-------------------+--------------------+----------------+---------+----+-------+--------------------+-----------------+----------------+-----+----------+-------------------+
|ak0257kvrbk7|2025-06-14 00:14:25|     96 km S of Adak|          Alaska|      2.7|NULL|      0|https://earthquak...|        -176.5501|         51.0121| 11.3|2025-06-14|2025-06-22 00:55:15|
|ak0257kvv4mz|2025-06-14 00:32:02|   40 km W of Beaver|          Alaska|      1.7|NULL|      0|https://earthquak...|        -148.2943|         66.4002|  8.3|2025-06-14|2025-06-22 00:55:15|
|ak0257kwi62y|2025-06-14 01:39:50| 5 km SE of Unalaska|

25/06/22 10:05:19 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/06/22 10:05:19 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. Change jdbcCompliant to false to throw SQLException instead.
[Stage 4:>                                                          (0 + 1) / 1]

Таблица сохранена в Clickhouse


25/06/22 10:05:20 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. Change jdbcCompliant to false to throw SQLException instead.
25/06/22 10:05:20 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction [e14ca857-ccf7-4ad5-a4c9-01048a5fa990](4 queries & 0 savepoints) is committed.
25/06/22 10:05:20 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction [2eab5886-b2a5-460e-a29d-1242483c70c6](0 queries & 0 savepoints) is committed.
                                                                                

### Чтение и запись в PostgreSQL

In [5]:

# ⬇️ Параметры подключения к PostgreSQL
jdbc_url = "jdbc:postgresql://ru.tuna.am:35663/backend"
table_name = "public.regions"
db_user = os.getenv("POSTGRES_USER")
db_password = os.getenv("POSTGRES_PASSWORD")


jdbc_df = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("user", db_user) \
    .option("password", db_password) \
    .option("dbtable", table_name) \
    .option("fetchsize", 1000) \
    .option("driver", "org.postgresql.Driver") \
    .load()


jdbc_df.show()


# Запись в dev
target_url = "jdbc:postgresql://ru.tuna.am:35663/dev"
target_table = "public.test_regions"

jdbc_df.write \
    .format("jdbc") \
    .option("url", target_url) \
    .option("user", db_user) \
    .option("password", db_password) \
    .option("dbtable", target_table) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

print("Данные записаны в dev.public.test_regions")

Py4JJavaError: An error occurred while calling o61.load.
: org.postgresql.util.PSQLException: Connection to ru.tuna.am:35663 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
	at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:319)
	at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
	at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:247)
	at org.postgresql.Driver.makeConnection(Driver.java:434)
	at org.postgresql.Driver.connect(Driver.java:291)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:161)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:157)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.getQueryOutputSchema(JDBCRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:58)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:241)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:37)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.net.ConnectException: Connection refused
	at java.base/sun.nio.ch.Net.pollConnect(Native Method)
	at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
	at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:554)
	at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:602)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327)
	at java.base/java.net.Socket.connect(Socket.java:633)
	at org.postgresql.core.PGStream.createSocket(PGStream.java:241)
	at org.postgresql.core.PGStream.<init>(PGStream.java:98)
	at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:109)
	at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:235)
	... 30 more


### Чтение из Kafka

In [9]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
from pyspark.sql.functions import from_json

kafka_topic = "backend.public.order_events"
kafka_bootstrap = "kafka:29093"


# Чтение из Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "latest") \
    .load()


# Описание схемы JSON сообщения
schema = StructType([
    StructField("before", StructType([
        StructField("id", IntegerType(), True),
        StructField("order_id", IntegerType(), True),
        StructField("status", StringType(), True),
        StructField("ts", LongType(), True)
    ]), True),
    StructField("after", StructType([
        StructField("id", IntegerType(), True),
        StructField("order_id", IntegerType(), True),
        StructField("status", StringType(), True),
        StructField("ts", LongType(), True)
    ]), True),
    StructField("source", StructType([]), True),  # если не используешь, можно пустым
    StructField("op", StringType(), True),
    StructField("ts_ms", LongType(), True)
])


# Распарсенные данные
json_df = df.selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json("json_str", schema).alias("data")) \
    .where("data.after IS NOT NULL") \
    .select("data.after.*")

# Вывод в консоль
json_df.writeStream \
    .format("console") \
    .option("truncate", False) \
    .start() \
    .awaitTermination()


25/06/22 03:34:44 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-065f41e5-c7eb-4ab5-b6a8-80802829d659. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/06/22 03:34:44 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/06/22 03:34:44 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
25/06/22 03:34:44 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
25/06/22 03:34:44 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
25/06/22 03:34:44 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known con

StreamingQueryException: [STREAM_FAILED] Query [id = 1803af85-6546-4d2c-b778-c5ef3e4d93c0, runId = 28b8907e-db5f-49a1-b7b5-370bf3da0106] terminated with exception: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, broadcast, format_string
import csv

# 1) Инициализируем SparkSession с оптимальными настройками
spark = SparkSession.builder \
    .appName("Top50BranchesByReturnSum_Optimized") \
    .config("spark.sql.shuffle.partitions", "50") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.driver.memory","8g") \
    .config("spark.driver.maxResultSize","2g") \
    .config("spark.sql.parquet.enableVectorizedReader", "false") \
    .getOrCreate()

# 2) Настройка доступа к S3
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", "https://s3.firstvds.ru")
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.retry.limit", "10")
hadoop_conf.set("fs.s3a.retry.interval", "2000")
hadoop_conf.set("fs.s3a.connection.timeout", "600000")

# 3) Пути к данным
receipts_base        = "s3a://store-receipts-prod/receipts"
receipt_content_base = "s3a://store-receipts-prod/receipt_content"
terminals_path       = "s3a://store-receipts-prod/terminals"
branches_path        = "s3a://store-receipts-prod/branch"

# 4) Читаем из receipts только нужные колонки возвратов за 2024 год
receipts_df = (
    spark.read
         .option("basePath", receipts_base)
         .parquet(f"{receipts_base}/year=2024/month=*")
         .filter(col("op_type") == 2)
         .select(
             col("id").alias("receipt_id"),
             col("kassa_id").alias("terminal_id"),
             col("year"),
             col("month")
         )
)

# 5) Читаем из receipt_content только нужные колонки за 2024 год
receipt_content_df = (
    spark.read
         .option("basePath", receipt_content_base)
         .parquet(f"{receipt_content_base}/year=2024/month=*")
         .select(
             "receipt_id",
             "year",
             "month",
             "price"
         )
)

# 6) Читаем и broadcast‑им справочник терминалов
terminals_df = (
    spark.read
         .parquet(terminals_path)
         .select(
             col("id").alias("terminal_id"),
             "branch_id"
         )
)
terminals_b = broadcast(terminals_df)

# 7) Читаем справочник магазинов
branches_df = (
    spark.read
         .parquet(branches_path)
         .select(
             col("id").alias("branch_id"),
             "city_id",
             "addr_id"
         )
)

# 8) Вычисляем сумму возвратов по branch_id
branch_returns = (
    receipts_df
      .join(terminals_b, on="terminal_id", how="inner")
      .join(receipt_content_df, on=["receipt_id","year","month"], how="inner")
      .groupBy("branch_id")
      .agg(spark_sum("price").alias("return_sum"))
)

# 9) Добавляем city_id, addr_id, сортируем, форматируем return_sum и берём топ‑50
top50_df = (
    branch_returns
      .join(branches_df, on="branch_id", how="inner")
      .orderBy(col("return_sum").desc(), col("branch_id").asc())
      .limit(50)
      .select(
          "branch_id",
          "city_id",
          "addr_id",
          format_string("%.2f", col("return_sum")).alias("return_sum")
      )
)

# 10) Собираем результат на драйвер
rows = top50_df.collect()  # всего 50 Row

# 12) Пишем CSV вручную
output_path = "top50_branches_return_sum_final.csv"
with open(output_path, "w", encoding="utf-8", newline="") as f:
    writer = csv.writer(f, delimiter=";")
    # заголовок
    writer.writerow(["branch_id", "city_id", "addr_id", "return_sum"])
    # сами данные
    for r in rows:
        writer.writerow([r["branch_id"], r["city_id"], r["addr_id"], r["return_sum"]])

print(f"Готово! Файл сохранён как {output_path}")

spark.stop()