In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# 0) Вывести ключевые S3A-настройки
hc = spark.sparkContext._jsc.hadoopConfiguration()
def h(key): 
    return hc.get(key)

print("endpoint         =", h("fs.s3a.endpoint"))
print("path.style       =", h("fs.s3a.path.style.access"))
print("ssl              =", h("fs.s3a.connection.ssl.enabled"))
print("credentials prov =", h("fs.s3a.aws.credentials.provider"))
print("access.key set?  =", bool(h("fs.s3a.access.key")))
print("secret.key set?  =", bool(h("fs.s3a.secret.key")))

# 1) Листинг в бакете (замени при необходимости)
BUCKET = "s3a://warehouse/"
jvm = spark._jvm
Path = jvm.org.apache.hadoop.fs.Path
FS = jvm.org.apache.hadoop.fs.FileSystem
uri = jvm.java.net.URI(BUCKET)
fs = FS.get(uri, hc)

print("\nСписок в", BUCKET)
for st in fs.listStatus(Path(BUCKET)):
    print("-", st.getPath().toString())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/25 15:21:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


endpoint         = http://minio:9000
path.style       = true
ssl              = false
credentials prov = 
    org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider,
    org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider,
    com.amazonaws.auth.EnvironmentVariableCredentialsProvider,
    org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider
  
access.key set?  = True
secret.key set?  = True


25/08/25 15:21:35 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties



Список в s3a://warehouse/
- s3a://warehouse/iceberg


In [2]:
spark

In [3]:
TABLE_PATH = "s3a://warehouse/iceberg/bronze/crpt_2025_raw"
meta = f"{TABLE_PATH}/metadata"

# 1) Есть ли metadata/ — признак таблицы Iceberg
print("Проверяем наличие", meta)
exists = fs.exists(Path(meta))
print("metadata/ exists:", bool(exists))

# 2) Быстрое чтение 5 строк
df = spark.read.format("iceberg").load(TABLE_PATH)
df.limit(5).show(truncate=False)

Проверяем наличие s3a://warehouse/iceberg/bronze/crpt_2025_raw/metadata
metadata/ exists: True


                                                                                

+------+----------+-----+------------------------+-------------+------+-----------------+---------------------+----------------------------------+-----------+---------------------+-------------------------+-------------------+---------------------+-----------------------+
|Неделя|Год недели|Месяц|Тип эмиссии             |gtin         |Серия |ИНН производителя|ИНН вывода из оборота|Идентификатор МД вывода из оборота|Тип выбытия|Тип вывода из оборота|Выведено кодов маркировки|Выведено, руб с НДС|Выведено, руб без НДС|Источник финансирования|
+------+----------+-----+------------------------+-------------+------+-----------------+---------------------+----------------------------------+-----------+---------------------+-------------------------+-------------------+---------------------+-----------------------+
|19    |2025      |5    |Собственное производство|4601669003799|790824|4631002737       |7456011241           |00000000449209                    |Продажа    |NULL                 |3

In [4]:
# структура: s3a://warehouse/iceberg/bronze/crpt_2025_raw
spark.sql("SHOW NAMESPACES IN ice").show(truncate=False)
spark.sql("SHOW TABLES IN ice.bronze").show(truncate=False)
spark.table("ice.bronze.crpt_2025_raw").limit(5).show(truncate=False)

+---------+
|namespace|
+---------+
|bronze   |
|tmp      |
+---------+

+---------+-------------+-----------+
|namespace|tableName    |isTemporary|
+---------+-------------+-----------+
|bronze   |crpt_2025_raw|false      |
|bronze   |crpt_load_log|false      |
+---------+-------------+-----------+

+------+----------+-----+------------------------+--------+------+-----------------+---------------------+----------------------------------+------------------------+------------------------------------+-------------------------+-------------------+---------------------+-----------------------+
|Неделя|Год недели|Месяц|Тип эмиссии             |gtin    |Серия |ИНН производителя|ИНН вывода из оборота|Идентификатор МД вывода из оборота|Тип выбытия             |Тип вывода из оборота               |Выведено кодов маркировки|Выведено, руб с НДС|Выведено, руб без НДС|Источник финансирования|
+------+----------+-----+------------------------+--------+------+-----------------+---------------------+

In [5]:
# Снапшоты и файлы (работает и по каталожному имени, и по полному имени)
tbl = "ice.bronze.crpt_2025_raw"
spark.sql(f"SELECT snapshot_id, committed_at, operation FROM {tbl}.snapshots ORDER BY committed_at DESC").show(truncate=False)

+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|8012012217108634249|2025-08-23 23:42:42.169|append   |
|3629424066847997642|2025-08-23 23:39:31.663|append   |
|484499927572824358 |2025-08-23 23:36:44.261|append   |
|2286615430862358207|2025-08-23 23:33:46.991|append   |
|2473604785124089023|2025-08-23 23:32:10.965|append   |
|211446294242441037 |2025-08-23 23:29:11.883|append   |
|3466752973977639435|2025-08-23 23:26:29.18 |append   |
|285044371247057607 |2025-08-23 23:24:05.869|append   |
|6890475041206907799|2025-08-23 23:21:43.235|append   |
|2031417170101462200|2025-08-23 23:19:00.752|append   |
|1133908811693098057|2025-08-23 23:16:32.812|append   |
|4140667713898054888|2025-08-23 23:14:08.148|append   |
|4213941939151711478|2025-08-23 23:11:27.243|append   |
|5009047799226429782|2025-08-23 23:09:00.867|append   |
|452598316202781071 |2025-08-23 23:06:29.287|app

In [6]:
spark.sql(f"SELECT count(*) AS data_files, sum(file_size_in_bytes) AS size_bytes FROM {tbl}.files").show()

+----------+----------+
|data_files|size_bytes|
+----------+----------+
|      2344|9543587350|
+----------+----------+



In [8]:
spark.stop()
spark = SparkSession.builder.getOrCreate()

hc  = spark.sparkContext._jsc.hadoopConfiguration()
jvm = spark._jvm
Path = jvm.org.apache.hadoop.fs.Path
FS   = jvm.org.apache.hadoop.fs.FileSystem

def fs_for(uri_str: str):
    return FS.get(jvm.java.net.URI(uri_str), hc)

# выбери бакет, куда тестируешь запись
TEST_DIR = "s3a://raw/_connect_test"
# TEST_DIR = "s3a://warehouse/_connect_test"  # альтернативно

# 1) запись
import time, random
TEST_DIR = f"{TEST_DIR}_{int(time.time())}_{random.randint(0,9999)}"
spark.range(3).write.mode("overwrite").parquet(TEST_DIR)
print("Записали в:", TEST_DIR)

# 2) чтение
df = spark.read.parquet(TEST_DIR)
print("Прочитали обратно, count =", df.count())

# 3) удаление тем же FS, из того же бакета
fs = fs_for(TEST_DIR)              # <— ВАЖНО: FS для этого URI
ok = fs.delete(Path(TEST_DIR), True)
print("Удалили?", bool(ok))


25/08/25 15:26:10 WARN DataSource: All paths were ignored:                      
  s3a://raw/_connect_test_1756135567_5192


Записали в: s3a://raw/_connect_test_1756135567_5192
Прочитали обратно, count = 3
Удалили? True


In [14]:
# Проверим, что виден каталог и неймспейсы
print("defaultCatalog =", spark.conf.get("spark.sql.defaultCatalog"))
spark.sql("SHOW NAMESPACES IN ice").show(truncate=False)

# Создадим неймспейс и тестовую таблицу — проверим запись коммитов Iceberg
spark.sql("CREATE NAMESPACE IF NOT EXISTS ice.tmp")
spark.sql("DROP TABLE IF EXISTS ice.tmp.__ctest")

spark.sql("""
  CREATE TABLE ice.tmp.__ctest (id BIGINT)
  USING iceberg
  PARTITIONED BY (bucket(8, id))
""")
spark.sql("INSERT INTO ice.tmp.__ctest SELECT id FROM range(0, 10)")

print("count =", spark.table("ice.tmp.__ctest").count())
spark.sql("SELECT snapshot_id, committed_at, operation FROM ice.tmp.__ctest.snapshots ORDER BY committed_at DESC").show(truncate=False)

# Чисто — можно удалить
spark.sql("DROP TABLE ice.tmp.__ctest")

defaultCatalog = ice
+---------+
|namespace|
+---------+
|bronze   |
+---------+



                                                                                

count = 10
+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|8674144843526871029|2025-08-24 07:27:21.417|append   |
+-------------------+-----------------------+---------+



DataFrame[]

In [9]:
spark.sql("SHOW TABLES IN ice.bronze").show(truncate=False)
spark.table("ice.bronze.crpt_2025_raw").limit(5).show(truncate=False)
# т.к. defaultCatalog=ice, можно писать короче:
spark.table("bronze.crpt_2025_raw").limit(5).show(truncate=False)

# Расположение
spark.sql("DESCRIBE TABLE EXTENDED bronze.crpt_2025_raw").where("col_name='Location'").show(truncate=False)

+---------+-------------+-----------+
|namespace|tableName    |isTemporary|
+---------+-------------+-----------+
|bronze   |crpt_2025_raw|false      |
|bronze   |crpt_load_log|false      |
+---------+-------------+-----------+



                                                                                

+------+----------+-----+------------------------+-------------+------------+-----------------+---------------------+----------------------------------+-----------+---------------------+-------------------------+-------------------+---------------------+-----------------------+
|Неделя|Год недели|Месяц|Тип эмиссии             |gtin         |Серия       |ИНН производителя|ИНН вывода из оборота|Идентификатор МД вывода из оборота|Тип выбытия|Тип вывода из оборота|Выведено кодов маркировки|Выведено, руб с НДС|Выведено, руб без НДС|Источник финансирования|
+------+----------+-----+------------------------+-------------+------------+-----------------+---------------------+----------------------------------+-----------+---------------------+-------------------------+-------------------+---------------------+-----------------------+
|5     |2025      |1    |Собственное производство|4602242010760|029A36102024|2227000087       |7732121736           |00000000511899                    |Продажа    

In [10]:
# Снапшоты
spark.sql("SELECT snapshot_id, committed_at, operation FROM bronze.crpt_2025_raw.snapshots ORDER BY committed_at DESC").show(truncate=False)

# Файлы (понять масштаб/партиции)
spark.sql("""
  SELECT content, record_count, file_size_in_bytes, file_path
  FROM bronze.crpt_2025_raw.files
  ORDER BY file_size_in_bytes DESC
  LIMIT 20
""").show(truncate=False)

+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|8012012217108634249|2025-08-23 23:42:42.169|append   |
|3629424066847997642|2025-08-23 23:39:31.663|append   |
|484499927572824358 |2025-08-23 23:36:44.261|append   |
|2286615430862358207|2025-08-23 23:33:46.991|append   |
|2473604785124089023|2025-08-23 23:32:10.965|append   |
|211446294242441037 |2025-08-23 23:29:11.883|append   |
|3466752973977639435|2025-08-23 23:26:29.18 |append   |
|285044371247057607 |2025-08-23 23:24:05.869|append   |
|6890475041206907799|2025-08-23 23:21:43.235|append   |
|2031417170101462200|2025-08-23 23:19:00.752|append   |
|1133908811693098057|2025-08-23 23:16:32.812|append   |
|4140667713898054888|2025-08-23 23:14:08.148|append   |
|4213941939151711478|2025-08-23 23:11:27.243|append   |
|5009047799226429782|2025-08-23 23:09:00.867|append   |
|452598316202781071 |2025-08-23 23:06:29.287|app

In [14]:
spark.table("ice.bronze.crpt_load_log").show(truncate=False)

+---------------+--------------------------+
|zip_name       |loaded_at                 |
+---------------+--------------------------+
|2025_1.csv.zip |2025-08-23 22:25:27.292049|
|2025_22.csv.zip|2025-08-23 23:01:37.702308|
|2025_26.csv.zip|2025-08-23 23:11:27.321692|
|2025_20.csv.zip|2025-08-23 22:56:27.43579 |
|2025_23.csv.zip|2025-08-23 23:04:05.300402|
|2025_7.csv.zip |2025-08-23 23:36:44.350645|
|2025_21.csv.zip|2025-08-23 22:59:01.597045|
|2025_31.csv.zip|2025-08-23 23:26:29.255269|
|2025_29.csv.zip|2025-08-23 23:19:00.82884 |
|2025_18.csv.zip|2025-08-23 22:48:48.784363|
|2025_13.csv.zip|2025-08-23 22:34:52.515156|
|2025_4.csv.zip |2025-08-23 23:29:11.971912|
|2025_24.csv.zip|2025-08-23 23:06:29.366374|
|2025_19.csv.zip|2025-08-23 22:51:16.794433|
|2025_12.csv.zip|2025-08-23 22:34:17.600667|
|2025_25.csv.zip|2025-08-23 23:09:00.946394|
|2025_27.csv.zip|2025-08-23 23:14:08.226   |
|2025_28.csv.zip|2025-08-23 23:16:32.895415|
|2025_5.csv.zip |2025-08-23 23:32:11.058604|
|2025_16.c

In [17]:
spark.sql("""
SELECT CAST(summary['total-records'] AS BIGINT) AS total_rows
FROM bronze.crpt_2025_raw.snapshots
ORDER BY committed_at DESC
LIMIT 1
""").show()

+----------+
|total_rows|
+----------+
|1574418370|
+----------+



In [16]:
spark.sql("""
SELECT CAST(summary['total-records'] AS BIGINT) AS total_rows
FROM bronze.crpt_load_log.snapshots
ORDER BY committed_at DESC
LIMIT 1
""").show()

+----------+
|total_rows|
+----------+
|        31|
+----------+



In [None]:
# spark.sql("USE ice.bronze")

In [None]:
# пример запроса к таблице Iceberg bronze.crpt_2025_raw, который агрегирует данные по ИНН вывода из оборота (ИНН вывода из оборота) и считает:
# общее количество операций (строк),
# сумму Выведено кодов маркировки,
# сумму Выведено, руб без НДС,
# сумму Выведено, руб с НДС.
# Также добавим фильтр по Год недели = 2025 и Месяц = 1

In [18]:
query = """
SELECT
  `ИНН вывода из оборота` AS inn_output,
  COUNT(*) AS operations_count,
  SUM(`Выведено кодов маркировки`) AS total_codes,
  SUM(`Выведено, руб без НДС`) AS total_rub_no_vat,
  SUM(`Выведено, руб с НДС`) AS total_rub_with_vat
FROM
  bronze.crpt_2025_raw
WHERE
  `Год недели` = 2025
  AND `Месяц` = 1
GROUP BY
  `ИНН вывода из оборота`
ORDER BY
  total_rub_with_vat DESC
LIMIT 20
"""

# Выполнение запроса
df_result = spark.sql(query)

In [19]:
df_result.show(truncate=False)



+----------+----------------+-----------+---------------------+---------------------+
|inn_output|operations_count|total_codes|total_rub_no_vat     |total_rub_with_vat   |
+----------+----------------+-----------+---------------------+---------------------+
|7732121736|4598819         |8190001    |4.661779222294687E12 |5.127952664873764E12 |
|7720023269|519550          |3993510    |6.262771892517047E10 |6.77651535483803E10  |
|7724211288|7718878         |12220596   |5.346768358735825E10 |5.880782875759953E10 |
|7103015403|311             |2903       |4.152687222381E10    |4.481564917240001E10 |
|2466189073|1916059         |4690770    |3.4441170652769646E10|3.788491244041018E10 |
|7705947629|4297676         |6176600    |3.4107006973051632E10|3.751439567514653E10 |
|3666031335|601             |4227       |3.3851664569469997E10|3.653262844141E10    |
|7706795062|1581267         |4243153    |3.277651178023924E10 |3.605000343311003E10 |
|7709068298|3852388         |5747268    |3.21779956644

                                                                                

In [None]:
# топ-10 GTIN'ов (товаров) по суммарной стоимости (в рублях с НДС) за январь 2025 года, а также количество выводов из оборота по каждому:

In [22]:
query = """
SELECT
  gtin,
  COUNT(*) AS operations_count,
  SUM(`Выведено кодов маркировки`) AS total_codes,
  SUM(`Выведено, руб с НДС`) AS total_rub_with_vat
FROM
  bronze.crpt_2025_raw
WHERE
  `Год недели` = 2025
  AND `Месяц` = 1
GROUP BY
  gtin
ORDER BY
  total_rub_with_vat DESC
LIMIT 10
"""

df_gtin_top10 = spark.sql(query)
df_gtin_top10.show(truncate=False)



+-------------+----------------+-----------+---------------------+
|gtin         |operations_count|total_codes|total_rub_with_vat   |
+-------------+----------------+-----------+---------------------+
|4606207000025|360633          |725600     |5.062969837546472E12 |
|5390387001048|28104           |34234      |1.6192236371568218E12|
|7793397038901|30              |128        |2.1222187489023987E11|
|4640008270034|41671           |48336      |1.5039029737442734E11|
|7290008079341|2645            |3560       |8.997423019815959E10 |
|7793397038918|10              |27         |4.476555173466E10    |
|4029799125554|102744          |226410     |2.6751384925692566E10|
|5200000020427|1627            |2114       |2.2233831074069862E10|
|4630257150010|591             |720        |2.0439234079199986E10|
|4640008270027|7006            |7517       |1.6142781173E10      |
+-------------+----------------+-----------+---------------------+



                                                                                