### Запускаем сессию Spark

In [1]:
from pyspark.sql import SparkSession
ip = '11.11.1.10'
# --- ШАГ 1: Принудительно останавливаем любую существующую сессию ---
try:
    SparkSession.builder.getOrCreate().stop()
    print("Существующая Spark-сессия остановлена.")
except Exception as e:
    print(f"Не было активной сессии для остановки, что хорошо: {e}")

# --- ШАГ 2: Создаем новую сессию с исправленной конфигурацией чтения ---

print("Создание новой, правильно сконфигурированной Spark-сессии...")

spark = (
    SparkSession.builder
    .appName("Iceberg-Read-Write-Final")

    # === КЛЮЧЕВОЕ ИЗМЕНЕНИЕ: Настройка чтения из S3 ===
    # Говорим Spark, что для адресов s3:// нужно использовать драйвер s3a
    .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    # Отключаем кеширование файловой системы, чтобы гарантировать применение настроек
    .config("spark.hadoop.fs.s3.impl.disable.cache", "true")

    # --- Остальные настройки из предыдущей рабочей версии ---

    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")

    .config("spark.sql.catalog.datalake", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.datalake.catalog-impl", "org.apache.iceberg.rest.RESTCatalog")
    .config("spark.sql.catalog.datalake.uri", f"http://{ip}:8181")

    .config("spark.sql.catalog.datalake.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .config("spark.sql.catalog.datalake.s3.endpoint", f"http://{ip}:9000")
    .config("spark.sql.catalog.datalake.s3.path-style-access", "true")

    .config("spark.hadoop.fs.s3a.endpoint", f"http://{ip}:9000")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

    .getOrCreate()
)

print("✅ Spark-сессия успешно создана и готова к чтению и записи!")
spark

Существующая Spark-сессия остановлена.
Создание новой, правильно сконфигурированной Spark-сессии...
✅ Spark-сессия успешно создана и готова к чтению и записи!


26/01/04 20:20:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
26/01/04 20:20:28 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


### Проверим, какие есть таблицы в схеме STG

In [2]:
%%sql
show tables in stg;

26/01/04 20:20:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


namespace,tableName,isTemporary
stg,foo,False
stg,test,False


In [3]:
%%sql
create database if not exists nyc;

In [4]:
%%sql
drop table if exists nyc.taxis;

In [7]:
df = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2021-04.parquet")
df.show(2)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2021-04-01 00:00:18|  2021-04-01 00:21:54|            1.0|          8.4|       1.0|                 N|          79|         116|           1|       25.5|  3.0|    0.5|      5.8

In [9]:
df.writeTo("datalake.nyc.taxis").createOrReplace()

                                                                                

### Просмотр схемы таблицы nyc.taxis через методы Датафрейма

In [11]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



### или Просмотр схемы таблицы nyc.taxis через SQL

In [12]:
%%sql
DESCRIBE EXTENDED nyc.taxis;

col_name,data_type,comment
VendorID,bigint,
tpep_pickup_datetime,timestamp_ntz,
tpep_dropoff_datetime,timestamp_ntz,
passenger_count,double,
trip_distance,double,
RatecodeID,double,
store_and_fwd_flag,string,
PULocationID,bigint,
DOLocationID,bigint,
payment_type,bigint,


In [18]:
%%sql
select count(*)
from datalake.nyc.taxis;

count(1)
2171187


### Останавливаем Spark сессию

In [20]:
SparkSession.builder.getOrCreate().stop()