In [None]:
try:
    spark.stop()
except:
    import os
    os._exit(0)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, rand, round, date_format, expr, floor,to_timestamp, monotonically_increasing_id
from datetime import datetime
from faker import Faker


spark = SparkSession.builder \
    .appName("DataGeneration") \
    .config("spark.sql.parquet.compression.codec", "snappy") \
    .getOrCreate()

num_records = 10_000_000  # ~300GB данных
num_clients = 50_000     # Количество уникальных клиентов
num_partitions = 10      # Количество партиций для выходных данных


df = spark.range(num_records).repartition(num_partitions)

fake = Faker()

# 1. Генерация transaction_id (UUID)
df = df.withColumn("transaction_id", monotonically_increasing_id())  # 8 байт на запись

# 2. Генерация client_id с повторениями
df = df.withColumn("client_id",
    (floor(rand() * num_clients) + 100000))

# 3. Генерация amount (1-10000 с 2 знаками после запятой)
df = df.withColumn("amount", round(rand() * 9999 + 1, 2))

# 4. Генерация currency (случайный выбор из списка)
currencies = ["USD", "EUR", "RUB", "CNY"]
df = df.withColumn("currency",
    expr(f"case when rand() < 0.25 then '{currencies[0]}' "
        f"when rand() < 0.5 then '{currencies[1]}' "
        f"when rand() < 0.75 then '{currencies[2]}' "
        f"else '{currencies[3]}' end"))

# 5. Генерация случайных дат и времени с 2020-01-01 по 2025-05-31
start_ts = int(datetime(2020, 1, 1).timestamp())
end_ts = int(datetime(2025, 5, 31).timestamp())
seconds_range = end_ts - start_ts

df = df.withColumn(
    "transaction_datetime",
    to_timestamp(
        expr(f"from_unixtime({start_ts} + (rand() * {seconds_range}))"),
        "yyyy-MM-dd HH:mm:ss"
    )
)

# 6. Генерация категорий
categories = ["payment", "transfer", "withdrawal", "deposit"]
df = df.withColumn("category",
    expr(f"case when rand() < 0.25 then '{categories[0]}' "
        f"when rand() < 0.5 then '{categories[1]}' "
        f"when rand() < 0.75 then '{categories[2]}' "
        f"else '{categories[3]}' end"))

# df = df.drop("id")

output_path = "finance_transactions_2gb"
(df.write
    .mode("overwrite")
    .option("header", "true")
    .option("parquet.block.size", 128 * 1024 * 1024)  # 128MB блоки
    .parquet(output_path)
)

print(f"Данные успешно сгенерированы и сохранены в {output_path}")
print(f"Количество записей: {df.count():,}")
print(f"Количество уникальных client_id: {df.select('client_id').distinct().count():,}")
print(f"Количество партиций: {df.rdd.getNumPartitions()}")

df.show(5, truncate=False)

spark.stop()

25/06/04 01:58:40 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/06/04 01:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/06/04 01:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/06/04 01:58:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/06/04 01:59:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/06/04 01:59:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

Данные успешно сгенерированы и сохранены в finance_transactions_2gb


                                                                                

Количество записей: 10,000,000


                                                                                

Количество уникальных client_id: 50,000




Количество партиций: 10


                                                                                

+------+--------------+---------+-------+--------+--------------------+--------+
|id    |transaction_id|client_id|amount |currency|transaction_datetime|category|
+------+--------------+---------+-------+--------+--------------------+--------+
|394896|0             |134415   |2954.32|USD     |2025-05-03 15:52:48 |transfer|
|263573|1             |137748   |8671.04|RUB     |2020-05-31 06:02:47 |payment |
|347179|2             |125915   |8171.14|RUB     |2023-05-24 22:13:39 |transfer|
|448475|3             |105033   |622.65 |EUR     |2020-06-29 05:23:29 |payment |
|439696|4             |100498   |8264.01|EUR     |2021-07-20 14:03:40 |transfer|
+------+--------------+---------+-------+--------+--------------------+--------+
only showing top 5 rows



NameError: name 'df' is not defined

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from faker import Faker
from datetime import datetime
import random
import os
import shutil

spark = SparkSession.builder \
    .appName("ClientsDataGeneration") \
    .getOrCreate()

transactions_path = "finance_transactions_2gb"
transactions_df = spark.read.parquet(transactions_path)
unique_client_ids = transactions_df.select("client_id").distinct()

Faker.seed(42)
random.seed(42)

countries = ["RU", "US", "DE", "CN", "JP", "GB", "FR", "IN", "BR", "CA"]
tiers = ["premium", "standard"]
tier_weights = [0.3, 0.7]  # 30% premium, 70% standard

# Функция генерации данных клиента
def generate_client_data(client_id):
    country = random.choice(countries)
    tier = random.choices(tiers, weights=tier_weights)[0]

    locale_map = {
        "RU": "ru_RU",
        "CN": "zh_CN",
        "JP": "ja_JP",
        "DE": "de_DE"
    }
    locale = locale_map.get(country, "en_US")
    fake = Faker(locale)

    return (
        client_id,
        fake.name(),
        fake.date_between(start_date=datetime(2015, 1, 1)),
        tier,
        country
    )

client_data = [generate_client_data(row.client_id) for row in unique_client_ids.collect()]
clients_df = spark.createDataFrame(
    client_data,
    schema=["client_id", "name", "registration_date", "tier", "country"]
)

# Преобразование даты
#clients_df = clients_df.withColumn(
#    "registration_date",
#    to_date(col("registration_date"))
#)


temp_dir = "temp_clients"
output_path = "clients.csv"

(clients_df.coalesce(1)
    .write
    .mode("overwrite")
    .option("header", "true")
    .option("delimiter", ",")
    .option("quote", '"')
    .option("escape", '"')
    .option("dateFormat", "yyyy-MM-dd")
    .csv(temp_dir)
)

# Переименование файла
output_file = [f for f in os.listdir(temp_dir) if f.startswith('part-')][0]
os.rename(
    os.path.join(temp_dir, output_file),
    output_path
)
shutil.rmtree(temp_dir)

# Проверка
print(f"Файл успешно сохранен как {output_path}")
print(f"Всего клиентов: {clients_df.count()}")
clients_df.show(5, truncate=False)

spark.stop()

25/06/04 02:01:43 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/06/04 02:02:29 WARN TaskSetManager: Stage 4 contains a task of very large size (1706 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Файл успешно сохранен как clients.csv
Всего клиентов: 50000
+---------+-----------------+-----------------+--------+-------+
|client_id|name             |registration_date|tier    |country|
+---------+-----------------+-----------------+--------+-------+
|103809   |Allison Hill     |2017-04-29       |premium |US     |
|101221   |森 涼平          |2020-09-07       |premium |JP     |
|114324   |Frau Almut Davids|2017-04-12       |standard|DE     |
|105784   |Alyssa Gonzalez  |2021-10-10       |premium |BR     |
|136275   |Connie Lawrence  |2023-06-09       |premium |FR     |
+---------+-----------------+-----------------+--------+-------+
only showing top 5 rows



In [17]:
!pip install yfinance pandas
import yfinance as yf
import pandas as pd
from datetime import datetime

start_date = '2020-01-01'
end_date = '2025-05-31'
output_file = 'currency_rates.csv'

tickers = {
    'USD_RUB': 'RUB=X',
    'USD_CNY': 'CNY=X',
    'EUR_RUB': 'EURRUB=X' 
}

print("Загрузка данных с Yahoo Finance...")
data = yf.download(
    list(tickers.values()),
    start=start_date,
    end=end_date,
    group_by='ticker',
    progress=True
)

rates = pd.DataFrame()

rates['Date'] = data.index.strftime('%Y-%m-%d')

# USD/RUB
rates['USD'] = data['RUB=X']['Close'].values

# EUR/RUB
rates['EUR'] = data['EURRUB=X']['Close'].values

# CNY/RUB (рассчитываем через кросс-курс)
usd_cny = data['CNY=X']['Close'].values
rates['CNY'] = rates['USD'] / usd_cny


# Заполнение пропусков
#rates.fillna(method='ffill', inplace=True)
#rates.fillna(method='bfill', inplace=True)

rates.to_csv(output_file, index=False, encoding='utf-8')

print(f"\nДанные сохранены в {output_file}")
print(f"Период: {rates['Date'].min()} - {rates['Date'].max()}")
print(f"Всего записей: {len(rates)}")
print("\nПример данных:")
print(rates.head(1405))

Загрузка данных с Yahoo Finance...
YF.download() has changed argument auto_adjust default to True


[*********************100%***********************]  3 of 3 completed
ERROR:yfinance:
3 Failed downloads:
ERROR:yfinance:['RUB=X', 'EURRUB=X', 'CNY=X']: DNSError('Failed to perform, curl: (6) Could not resolve host: query1.finance.yahoo.com. See https://curl.se/libcurl/c/libcurl-errors.html first for more details.')


IsADirectoryError: [Errno 21] Is a directory: 'currency_rates.csv'

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
import os


spark = SparkSession.builder \
    .appName("BronzeLayerCreation") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.3") \
    .getOrCreate()



base_path = "lakehouse/bronze"
transactions_path = "finance_transactions_2gb"
clients_path = "clients.csv"
currency_rates_path = "currency_rates.csv"

os.makedirs(base_path, exist_ok=True)

try:
    # 1. Транзакции (Parquet -> Delta)
    transactions = spark.read.parquet(transactions_path)
    (transactions.write
        .format("delta")
        .mode("overwrite")
        .save(f"{base_path}/transactions")
    )
    print(f"Транзакции сохранены в {base_path}/transactions")

    # 2. Клиенты (CSV -> Delta)
    clients = spark.read.csv(clients_path, header=True)
    (clients.withColumn("registration_date", to_date(col("registration_date")))
        .write
        .format("delta")
        .mode("overwrite")
        .save(f"{base_path}/clients")
    )
    print(f"Клиенты сохранены в {base_path}/clients")

    # 3. Курсы валют (CSV -> Delta)
    currency_rates = spark.read.csv(currency_rates_path, header=True)
    (currency_rates.withColumn("date", to_date(col("date")))
        .write
        .format("delta")
        .mode("overwrite")
        .save(f"{base_path}/currency_rates")
    )
    print(f"Курсы валют сохранены в {base_path}/currency_rates")

    print("\nПроверка загруженных данных:")
    spark.read.format("delta").load(f"{base_path}/transactions").show(5)
    spark.read.format("delta").load(f"{base_path}/clients").show(5)
    spark.read.format("delta").load(f"{base_path}/currency_rates").show(5)

    transactions.printSchema()
    clients.printSchema()
    currency_rates.printSchema()

except Exception as e:
    print(f"Произошла ошибка: {str(e)}")
finally:
    spark.stop()

Транзакции сохранены в lakehouse/bronze/transactions
Клиенты сохранены в lakehouse/bronze/clients
Курсы валют сохранены в lakehouse/bronze/currency_rates

Проверка загруженных данных:
root
 |-- id: long (nullable = true)
 |-- transaction_id: long (nullable = true)
 |-- client_id: long (nullable = true)
 |-- amount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_datetime: timestamp (nullable = true)
 |-- category: string (nullable = true)

root
 |-- client_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- registration_date: string (nullable = true)
 |-- tier: string (nullable = true)
 |-- country: string (nullable = true)

root
 |-- Date: string (nullable = true)
 |-- USD: string (nullable = true)
 |-- EUR: string (nullable = true)
 |-- CNY: string (nullable = true)



# Silver

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from datetime import datetime
from delta import configure_spark_with_delta_pip
import os


builder = SparkSession.builder.appName("SilverLayerProcessing") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Пути к данным
bronze_path = "lakehouse/bronze"
silver_path = "lakehouse/silver"

def process_transactions():
    """Обработка данных транзакций"""
    print("\nОбработка транзакций...")
    
    # Чтение данных из Bronze
    transactions = spark.read.format("delta").load(f"{bronze_path}/transactions")
    
    # 1. Преобразование типов данных
    transactions = transactions.withColumn("amount", col("amount").cast(DecimalType(18, 2)))
    
    # 2. Извлечение даты из datetime для партиционирования
    transactions = transactions.withColumn("transaction_date", to_date(col("transaction_datetime")))
    
    # 3. Добавление флага для подозрительных транзакций
    transactions = transactions.withColumn("is_suspicious", 
        (col("amount") > 5000) & (col("category").isin(["withdrawal", "transfer"])))
    
    # 4. Сохранение в Silver слой с партиционированием по дате
    (transactions.write
        .format("delta")
        .partitionBy("transaction_date")
        .mode("overwrite")
        .save(f"{silver_path}/transactions"))
    
    transactions.printSchema()

    transactions.show()
    
    print(f"Транзакции сохранены в Silver слой. Всего записей: {transactions.count()}")

def process_clients():
    """Обработка данных клиентов"""
    print("\nОбработка клиентов...")
    
    # Чтение данных из Bronze
    clients = spark.read.format("delta").load(f"{bronze_path}/clients")
    
    # 2. Расчет возраста клиента
    clients = clients.withColumn("age", 
        floor(months_between(current_date(), col("registration_date")) / 12))
    
    # 3. Категоризация клиентов
    clients = clients.withColumn("client_category",
        when(col("age").isNull(), "new")
        .when(col("age") < 1, "new")
        .when(col("age") < 3, "regular")
        .otherwise("vip"))
    
    # 4. Сохранение в Silver слой
    (clients.write
        .format("delta")
        .mode("overwrite")
        .save(f"{silver_path}/clients"))

    clients.printSchema()
    clients.show()
    
    print(f"Данные клиентов сохранены в Silver слой. Всего клиентов: {clients.count()}")

def process_currency_rates():
    """Обработка курсов валют"""
    print("\nОбработка курсов валют...")
    
    # Чтение данных из Bronze
    currency_rates = spark.read.format("delta").load(f"{bronze_path}/currency_rates")
    
    # 1. Заполнение пропущенных дат (выходные/праздники)
    date_range = currency_rates.select(
        min(col("date")).alias("min_date"),
        max(col("date")).alias("max_date")
    ).first()
    
    all_dates = spark.sql(f"""
        SELECT explode(sequence(to_date('{date_range.min_date}'), 
                              to_date('{date_range.max_date}'), 
                              interval 1 day)) as date
    """)
    
    # 2. Forward fill для пропущенных значений
    window_spec = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    
    currency_rates = all_dates.join(currency_rates, "date", "left")
    
    for currency in ["USD", "EUR", "CNY"]:
        currency_rates = currency_rates.withColumn(
            currency,
            last(col(currency), ignorenulls=True).over(window_spec))
    
    # 3. Добавление изменений курсов
    window_spec_prev = Window.orderBy("date")
    for currency in ["USD", "EUR", "CNY"]:
        currency_rates = currency_rates.withColumn(
            f"{currency}_change",
            col(currency) - lag(col(currency), 1).over(window_spec_prev))
    
    # 4. Сохранение в Silver слой
    (currency_rates.write
        .format("delta")
        .mode("overwrite")
        .save(f"{silver_path}/currency_rates"))

    currency_rates.printSchema()
    
    print(f"Курсы валют сохранены в Silver слой. Период: {date_range.min_date} - {date_range.max_date}")

def create_silver_tables():
    """Создание таблиц в Silver слое"""
    spark.sql("CREATE SCHEMA IF NOT EXISTS silver")
    
    tables = {
        "transactions": "Данные о финансовых транзакциях",
        "clients": "Информация о клиентах",
        "currency_rates": "Исторические курсы валют"
    }
    
    for table, comment in tables.items():
        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS silver.{table}
            USING DELTA
            LOCATION '{silver_path}/{table}'
            COMMENT '{comment}'
        """)
    
    print("\nТаблицы в Silver слое созданы:")
    spark.sql("SHOW TABLES IN silver").show()


process_transactions()
process_clients()
process_currency_rates()


create_silver_tables()

spark.stop()


Обработка транзакций...


25/06/09 21:13:19 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/06/09 21:13:19 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/06/09 21:13:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/06/09 21:13:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/06/09 21:13:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/06/09 21:13:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/06/09 21:13:24 WARN MemoryManager: Total allocation exceeds 95.00% 

root
 |-- id: long (nullable = true)
 |-- transaction_id: long (nullable = true)
 |-- client_id: long (nullable = true)
 |-- amount: decimal(18,2) (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_datetime: timestamp (nullable = true)
 |-- category: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- is_suspicious: boolean (nullable = true)

+------+--------------+---------+-------+--------+--------------------+----------+----------------+-------------+
|    id|transaction_id|client_id| amount|currency|transaction_datetime|  category|transaction_date|is_suspicious|
+------+--------------+---------+-------+--------+--------------------+----------+----------------+-------------+
|394896|             0|   134415|2954.32|     USD| 2025-05-03 15:52:48|  transfer|      2025-05-03|        false|
|263573|             1|   137748|8671.04|     RUB| 2020-05-31 06:02:47|   payment|      2020-05-31|        false|
|347179|             2|   125915|8171

                                                                                

root
 |-- client_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- tier: string (nullable = true)
 |-- country: string (nullable = true)
 |-- age: long (nullable = true)
 |-- client_category: string (nullable = false)

+---------+--------------------+-----------------+--------+-------+---+---------------+
|client_id|                name|registration_date|    tier|country|age|client_category|
+---------+--------------------+-----------------+--------+-------+---+---------------+
|   103809|        Allison Hill|       2017-04-29| premium|     US|  8|            vip|
|   101221|             森 涼平|       2020-09-07| premium|     JP|  4|            vip|
|   114324|   Frau Almut Davids|       2017-04-12|standard|     DE|  8|            vip|
|   105784|     Alyssa Gonzalez|       2021-10-10| premium|     BR|  3|            vip|
|   136275|     Connie Lawrence|       2023-06-09| premium|     FR|  2|        regular|
|   112568|  

25/06/09 21:14:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 21:14:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 21:14:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 21:14:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 21:14:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 21:14:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 2

root
 |-- date: date (nullable = false)
 |-- USD: string (nullable = true)
 |-- EUR: string (nullable = true)
 |-- CNY: string (nullable = true)
 |-- USD_change: double (nullable = true)
 |-- EUR_change: double (nullable = true)
 |-- CNY_change: double (nullable = true)

Курсы валют сохранены в Silver слой. Период: 2020-01-01 - 2025-06-04


25/06/09 21:14:21 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/06/09 21:14:21 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
25/06/09 21:14:23 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
25/06/09 21:14:23 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore UNKNOWN@172.17.0.2



Таблицы в Silver слое созданы:
+---------+--------------+-----------+
|namespace|     tableName|isTemporary|
+---------+--------------+-----------+
|   silver|       clients|      false|
|   silver|currency_rates|      false|
|   silver|  transactions|      false|
+---------+--------------+-----------+



25/06/09 21:14:24 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


In [38]:
currency_rates.printSchema()

root
 |-- Date: string (nullable = true)
 |-- USD: string (nullable = true)
 |-- EUR: string (nullable = true)
 |-- CNY: string (nullable = true)



# Gold

In [8]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta import *

builder = SparkSession.builder.appName("GoldLayerProcessing") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

silver_path = "lakehouse/silver"
spark

transactions = spark.read.format("delta").load(f"{silver_path}/transactions")
clients = spark.read.format("delta").load(f"{silver_path}/clients")
currency_rates = spark.read.format("delta").load(f"{silver_path}/currency_rates")

transactions.printSchema()
clients.printSchema()
currency_rates.printSchema()

root
 |-- id: long (nullable = true)
 |-- transaction_id: long (nullable = true)
 |-- client_id: long (nullable = true)
 |-- amount: decimal(18,2) (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_datetime: timestamp (nullable = true)
 |-- category: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- is_suspicious: boolean (nullable = true)

root
 |-- client_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- tier: string (nullable = true)
 |-- country: string (nullable = true)
 |-- age: long (nullable = true)
 |-- client_category: string (nullable = true)

root
 |-- date: date (nullable = true)
 |-- USD: string (nullable = true)
 |-- EUR: string (nullable = true)
 |-- CNY: string (nullable = true)
 |-- USD_change: double (nullable = true)
 |-- EUR_change: double (nullable = true)
 |-- CNY_change: double (nullable = true)



In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("GoldLayerProcessing") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

silver_path = "lakehouse/silver"
gold_path = "lakehouse/gold"

transactions = spark.read.format("delta").load(f"{silver_path}/transactions")
clients = spark.read.format("delta").load(f"{silver_path}/clients")
currency_rates = spark.read.format("delta").load(f"{silver_path}/currency_rates")

# 1. Витрина: Клиентская статистика
client_stats = (
    transactions.join(clients, 'client_id', 'left')
    .groupby("client_id", "name", "country", "client_category", "tier")
    .agg(
        sum("amount").alias("total_amount"),
        avg("amount").alias("avg_amount")
    )
    .orderBy(desc("total_amount"))
)

# 2. Витрина: Ежедневные метрики
daily_metrics = (
    transactions.join(currency_rates, transactions["transaction_date"] == currency_rates["date"], "left")
    .withColumn("amount_rub",
        when(col("currency") == "USD", col("amount") * col("USD"))
        .when(col("currency") == "EUR", col("amount") * col("EUR"))
        .when(col("currency") == "CNY", col("amount") * col("CNY"))
        .otherwise(col("amount")))
    .groupBy("date")
    .agg(
        sum("amount_rub").alias("daily_volume_rub"),
        avg("amount_rub").alias("avg_transaction_rub"),
        count("*").alias("transactions_count"),
        sum(when(col("is_suspicious"), 1).otherwise(0)).alias("suspicious_count"),
        sum(when(col("is_suspicious"), col("amount_rub")).otherwise(0)).alias("suspicious_volume_rub")
    )
    .orderBy("date")
)

# 3. Витрина: Анализ подозрительных операций
fraud_analysis = (
    transactions.join(clients, 'client_id', 'left')
    .filter(col("is_suspicious"))
    .groupBy("category", "country")
    .agg(
        count("*").alias("fraud_count"),
        avg("amount").alias("avg_fraud_amount"),
        sum("amount").alias("total_fraud_amount")
    )
)


(client_stats.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")  # перезаписываем схему
    .partitionBy("country")
    .save(f"{gold_path}/client_stats"))

(daily_metrics.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .partitionBy("date")
    .save(f"{gold_path}/daily_metrics"))

(fraud_analysis.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(f"{gold_path}/fraud_analysis"))

# 5. Оптимизация таблиц
for table in ["client_stats", "daily_metrics", "fraud_analysis"]:
    delta_table = DeltaTable.forPath(spark, f"{gold_path}/{table}")
    delta_table.optimize().executeCompaction()

# 6. Создание таблиц в метастароне с предварительной очисткой
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

tables = {
    "client_stats": "Агрегированная статистика по клиентам",
    "daily_metrics": "Ежедневные финансовые показатели",
    "fraud_analysis": "Анализ подозрительных операций"
}

for table, comment in tables.items():
    spark.sql(f"DROP TABLE IF EXISTS gold.{table}")
    

    spark.sql(f"""
    CREATE TABLE gold.{table}
    USING DELTA
    LOCATION '{gold_path}/{table}'
    COMMENT '{comment}'
    """)

print("Gold-слой успешно создан:")
spark.sql("SHOW TABLES IN gold").show()

print("\nТоп клиентов по объему операций:")
spark.read.format("delta").load(f"{gold_path}/client_stats") \
    .orderBy(desc("total_amount")) \
    .limit(5) \
    .show(truncate=False)

print("\nЕжедневные метрики за последние 5 дней:")
spark.read.format("delta").load(f"{gold_path}/daily_metrics") \
    .orderBy(desc("date")) \
    .limit(5) \
    .show(truncate=False)

print("\nАнализ подозрительных операций:")
spark.read.format("delta").load(f"{gold_path}/fraud_analysis") \
    .orderBy(desc("total_fraud_amount")) \
    .limit(5) \
    .show(truncate=False)

spark.stop()

                                                                                

Gold-слой успешно создан:
+---------+--------------+-----------+
|namespace|     tableName|isTemporary|
+---------+--------------+-----------+
|     gold|  client_stats|      false|
|     gold| daily_metrics|      false|
|     gold|fraud_analysis|      false|
+---------+--------------+-----------+


Топ клиентов по объему операций:
+---------+------------------------+-------+---------------+--------+------------+-----------+
|client_id|name                    |country|client_category|tier    |total_amount|avg_amount |
+---------+------------------------+-------+---------------+--------+------------+-----------+
|146223   |张志强                  |CN     |vip            |standard|1353365.07  |5370.496310|
|127139   |Татьяна Петровна Орехова|RU     |vip            |standard|1315572.01  |5118.957237|
|141773   |Stephen Massey          |BR     |regular        |standard|1315431.81  |5240.764183|
|106814   |小川 修平               |JP     |vip            |premium |1314203.37  |5386.079385|
|108997 

                                                                                

+----------+--------------------+-------------------+------------------+----------------+---------------------+
|date      |daily_volume_rub    |avg_transaction_rub|transactions_count|suspicious_count|suspicious_volume_rub|
+----------+--------------------+-------------------+------------------+----------------+---------------------+
|2025-05-30|1.34722614380593E9  |269714.94370489084 |4995              |1723            |6.849086673450425E8  |
|2025-05-29|1.3877737884348643E9|272059.1626024043  |5101              |1687            |6.959045807251476E8  |
|2025-05-28|1.3925355713049808E9|272511.85348434065 |5110              |1648            |6.806957411977961E8  |
|2025-05-27|1.3457215176122358E9|268875.4280943528  |5005              |1572            |6.419387073779885E8  |
|2025-05-26|1.3825624753702915E9|272587.23883483664 |5072              |1697            |6.94906063240068E8   |
+----------+--------------------+-------------------+------------------+----------------+---------------

In [19]:
spark.sql("DESCRIBE TABLE gold.client_stats").show(truncate=False)
spark.read.format("delta").load(f"{gold_path}/client_stats").printSchema()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
+--------+---------+-------+

root
 |-- client_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- client_category: string (nullable = true)
 |-- tier: string (nullable = true)
 |-- total_amount: decimal(28,2) (nullable = true)
 |-- avg_amount: decimal(22,6) (nullable = true)



# ETL

# Дозагрузка курсов валют

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from delta.tables import DeltaTable
import yfinance as yf
import pandas as pd
from datetime import datetime, date, timedelta

# Инициализация Spark
spark = SparkSession.builder \
    .appName("CurrencyRatesLoader") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Пути к данным
bronze_path = "lakehouse/bronze/currency_rates"
silver_path = "lakehouse/silver/currency_rates"

def get_last_date_from_silver():
    """Получаем последнюю дату из Silver слоя"""
    try:
        last_date = spark.read.format("delta").load(silver_path) \
            .agg(max(col("date"))).collect()[0][0]
        return last_date + timedelta(days=1) if last_date else date(2020, 1, 1)
    except Exception:
        return date(2020, 1, 1)

def fetch_new_rates(start_date, end_date):
    """Загрузка новых курсов валют с Yahoo Finance"""
    tickers = {
        'USD_RUB': 'RUB=X',
        'USD_CNY': 'CNY=X',
        'EUR_RUB': 'EURRUB=X'
    }
    
    data = yf.download(
        list(tickers.values()),
        start=start_date,
        end=end_date,
        group_by='ticker',
        progress=False
    )
    
    rates = pd.DataFrame()
    rates['date'] = data.index.strftime('%Y-%m-%d')
    rates['USD'] = data['RUB=X']['Close'].values
    rates['EUR'] = data['EURRUB=X']['Close'].values
    rates['CNY'] = rates['USD'] / data['CNY=X']['Close'].values
    return rates.round(4).dropna()

def process_new_rates(new_rates_pd):
    """Обработка новых курсов валют"""
    if new_rates_pd.empty:
        return None
    
    new_rates = spark.createDataFrame(new_rates_pd) \
        .withColumn("date", to_date(col("date"), "yyyy-MM-dd")) \
        .withColumn("USD", col("USD").cast("string")) \
        .withColumn("EUR", col("EUR").cast("string")) \
        .withColumn("CNY", col("CNY").cast("string")) \
        .select("date", "USD", "EUR", "CNY")
    
    new_rates.write.format("delta").mode("append").save(bronze_path)
    return new_rates

def update_silver_layer():
    """Обновление Silver слоя с нужной структурой"""
    bronze_rates = spark.read.format("delta").load(bronze_path) \
        .select(
            col("date"),
            col("USD").cast("double"),
            col("EUR").cast("double"),
            col("CNY").cast("double")
        )
    
    # Полный диапазон дат
    date_range = bronze_rates.agg(
        min(col("date")).alias("min_date"),
        max(col("date")).alias("max_date")
    ).first()
    
    all_dates = spark.sql(f"""
        SELECT explode(sequence(to_date('{date_range.min_date}'), 
                              to_date('{date_range.max_date}'), 
                              interval 1 day)) as date
    """)
    
    window_spec = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    
    silver_rates = all_dates.join(bronze_rates, "date", "left")
    
    for currency in ["USD", "EUR", "CNY"]:
        silver_rates = silver_rates.withColumn(
            currency,
            last(col(currency), ignorenulls=True).over(window_spec))
    
    window_spec_prev = Window.orderBy("date")
    for currency in ["USD", "EUR", "CNY"]:
        silver_rates = silver_rates.withColumn(
            f"{currency}_change",
            round(col(currency) - lag(col(currency), 1).over(window_spec_prev), 4)
        )
    

    silver_rates = silver_rates.withColumn("USD", col("USD").cast("string")) \
                              .withColumn("EUR", col("EUR").cast("string")) \
                              .withColumn("CNY", col("CNY").cast("string"))
    
    # Сохранение в Silver слой
    silver_rates.write.format("delta") \
        .mode("overwrite") \
        .save(silver_path)

try:
    last_date = get_last_date_from_silver()
    today = date.today()
    
    if last_date < today:
        new_rates_pd = fetch_new_rates(last_date, today)
        
        if new_rates_pd is not None and not new_rates_pd.empty:
            process_new_rates(new_rates_pd)
            update_silver_layer()
            
            # Проверка результата
            result_df = spark.read.format("delta").load(silver_path)
            print("Схема Silver слоя:")
            result_df.printSchema()
            print("\nПример данных:")
            display(result_df.orderBy("date", ascending=False).limit(5).toPandas())
        else:
            print("Нет новых данных для обработки")
    else:
        print("Данные уже актуальны")
        
finally:
    spark.stop()

25/06/06 22:05:27 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


YF.download() has changed argument auto_adjust default to True



3 Failed downloads:
['CNY=X', 'EURRUB=X', 'RUB=X']: DNSError('Failed to perform, curl: (6) Could not resolve host: query1.finance.yahoo.com. See https://curl.se/libcurl/c/libcurl-errors.html first for more details.')


Нет новых данных для обработки


# Дозагрузка транзакций

In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, rand, round, expr, floor, to_timestamp, col
from pyspark.sql.types import DecimalType
from datetime import datetime, timedelta
from faker import Faker
import random

def generate_additional_transactions(start_date, end_date, num_records, existing_clients_path="clients.csv"):
    """
    Генерация дополнительных транзакций для указанного периода
    
    Параметры:
    - start_date: начальная дата в формате 'YYYY-MM-DD'
    - end_date: конечная дата в формате 'YYYY-MM-DD'
    - num_records: количество транзакций для генерации
    - existing_clients_path: путь к файлу с существующими клиентами
    """
    
    spark = SparkSession.builder \
        .appName("AdditionalTransactionsGenerator") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()

    existing_clients = spark.read.option("header", "true").csv(existing_clients_path)
    client_ids = [row.client_id for row in existing_clients.select("client_id").collect()]
    num_clients = len(client_ids)
    
    fake = Faker()
    num_partitions = num_records // 100_000
    
    df = spark.range(num_records).repartition(num_partitions)
    
    # 1. Генерация transaction_id
    df = df.withColumn("transaction_id", monotonically_increasing_id())
    
    # 2. Выбор client_id из существующих клиентов
    df = df.withColumn("client_id", 
        expr(f"{client_ids[0]} + floor(rand() * {num_clients})").cast("long"))
    
    # 3. Генерация amount (1-10000 с 2 знаками после запятой)
    df = df.withColumn("amount", round(rand() * 9999 + 1, 2))
    
    # 4. Генерация currency
    currencies = ["USD", "EUR", "RUB", "CNY"]
    df = df.withColumn("currency",
        expr(f"case when rand() < 0.25 then '{currencies[0]}' "
            f"when rand() < 0.5 then '{currencies[1]}' "
            f"when rand() < 0.75 then '{currencies[2]}' "
            f"else '{currencies[3]}' end"))
    
    # 5. Генерация дат и времени в указанном диапазоне
    start_dt = datetime.strptime(start_date, "%Y-%m-%d")
    end_dt = datetime.strptime(end_date, "%Y-%m-%d")
    start_ts = int(start_dt.timestamp())
    end_ts = int(end_dt.timestamp())
    seconds_range = end_ts - start_ts
    
    df = df.withColumn(
        "transaction_datetime",
        to_timestamp(
            expr(f"from_unixtime({start_ts} + (rand() * {seconds_range}))"),
            "yyyy-MM-dd HH:mm:ss"
        )
    )
    
    # 6. Генерация категорий
    categories = ["payment", "transfer", "withdrawal", "deposit"]
    df = df.withColumn("category",
        expr(f"case when rand() < 0.25 then '{categories[0]}' "
            f"when rand() < 0.5 then '{categories[1]}' "
            f"when rand() < 0.75 then '{categories[2]}' "
            f"else '{categories[3]}' end"))
    
    # Удаление временного столбца id
    #df = df.drop("id")
    

    bronze_path = "lakehouse/bronze/transactions"
    df.printSchema()
    
    if DeltaTable.isDeltaTable(spark, bronze_path):
        delta_table = DeltaTable.forPath(spark, bronze_path)
        delta_table.alias("target").merge(
            df.alias("source"),
            "target.transaction_id = source.transaction_id"
        ).whenNotMatchedInsertAll().execute()
    else:
        df.write.format("delta").mode("overwrite").save(bronze_path)
    
    # Статистика
    print(f"\nДозагружено {num_records:,} транзакций за период с {start_date} по {end_date}")
    print(f"Использовано {num_clients:,} существующих клиентов")
    print(f"Первая транзакция: {df.agg(min('transaction_datetime')).collect()[0][0]}")
    print(f"Последняя транзакция: {df.agg(max('transaction_datetime')).collect()[0][0]}")
    
    # Пример данных
    print("\nПример добавленных транзакций:")
    df.show(5, truncate=False)
    
    spark.stop()


generate_additional_transactions("2024-05-26", "2024-06-06", 2_000_000)

25/06/06 22:07:50 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


root
 |-- id: long (nullable = false)
 |-- transaction_id: long (nullable = false)
 |-- client_id: long (nullable = true)
 |-- amount: double (nullable = true)
 |-- currency: string (nullable = false)
 |-- transaction_datetime: timestamp (nullable = true)
 |-- category: string (nullable = false)



25/06/06 22:07:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/06/06 22:07:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/06/06 22:07:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/06/06 22:07:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/06/06 22:07:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
25/06/06 22:07:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/06/06 22:07:58 WARN MemoryManager: Total allocation exceeds 95.


Дозагружено 2,000,000 транзакций за период с 2024-05-26 по 2024-06-06
Использовано 50,000 существующих клиентов
Первая транзакция: 2024-05-26 00:00:01
Последняя транзакция: 2024-06-05 23:59:59

Пример добавленных транзакций:
+------+--------------+---------+-------+--------+--------------------+--------+
|id    |transaction_id|client_id|amount |currency|transaction_datetime|category|
+------+--------------+---------+-------+--------+--------------------+--------+
|116978|0             |152915   |2634.42|RUB     |2024-06-03 14:03:05 |payment |
|65904 |1             |137042   |5276.81|USD     |2024-05-28 04:29:10 |payment |
|33736 |2             |153520   |8201.5 |EUR     |2024-05-29 03:00:33 |payment |
|70560 |3             |138460   |8675.07|RUB     |2024-05-30 00:41:57 |transfer|
|31776 |4             |134006   |6433.97|USD     |2024-05-26 01:12:32 |deposit |
+------+--------------+---------+-------+--------+--------------------+--------+
only showing top 5 rows



# Добавление транзакций в серебряный слой

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import DecimalType
from delta.tables import DeltaTable
import time

def process_transactions():
    """Оптимизированная обработка транзакций с защитой от ошибок памяти"""
    spark = None
    try:
        spark = SparkSession.builder \
            .appName("TransactionsProcessing") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .config("spark.executor.memory", "4g") \
            .config("spark.driver.memory", "4g") \
            .config("spark.memory.offHeap.enabled", "true") \
            .config("spark.memory.offHeap.size", "2g") \
            .config("spark.sql.shuffle.partitions", "200") \
            .config("spark.default.parallelism", "200") \
            .config("spark.delta.merge.repartitionBeforeWrite", "true") \
            .getOrCreate()

        print("\nНачало обработки транзакций...")
        
        bronze_path = "lakehouse/bronze/transactions"
        silver_path = "lakehouse/silver/transactions"
        
        print("Чтение данных из бронзового слоя...")
        new_transactions = spark.read.format("delta").load(bronze_path)
        
        print("Преобразование данных...")
        processed_transactions = new_transactions \
            .repartition(200, "transaction_id") \
            .withColumn("amount", col("amount").cast(DecimalType(18, 2))) \
            .withColumn("transaction_date", to_date(col("transaction_datetime"))) \
            .withColumn("is_suspicious", 
                (col("amount") > 5000) & 
                (col("category").isin(["withdrawal", "transfer"])))

        if DeltaTable.isDeltaTable(spark, silver_path):
            print("Обнаружен существующий Silver слой, выполняется оптимизированный merge...")
            delta_table = DeltaTable.forPath(spark, silver_path)
            
            batch_size = 100000 
            batches = [processed_transactions.filter(f"transaction_id % 5 == {i}") for i in range(5)]
            
            for i, batch in enumerate(batches):
                print(f"Обработка батча {i+1}/{len(batches)}...")
                delta_table.alias("target").merge(
                    batch.alias("source"),
                    "target.transaction_id = source.transaction_id"
                ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
                time.sleep(1)
        else:
            print("Создание новой Delta таблицы с оптимизированной записью...")
            (processed_transactions.write
                .format("delta")
                .partitionBy("transaction_date")
                .option("maxRecordsPerFile", 1000000)
                .mode("overwrite")
                .save(silver_path))
        
        print("Проверка результатов...")
        result = spark.read.format("delta").load(silver_path)
        print(f"Обработка завершена. Всего записей: {result.count():,}")
        
        sample = result.orderBy(col("transaction_datetime").desc()).limit(5).cache()
        print("Последние 5 транзакций:")
        sample.show(5, truncate=False)
        sample.unpersist()
        
    except Exception as e:
        print(f"Ошибка при обработке: {str(e)}")
        raise
    finally:
        if spark:
            try:
                spark.stop()
                print("SparkSession успешно завершен")
            except Exception as e:
                print(f"Ошибка при завершении SparkSession: {str(e)}")


process_transactions()

25/06/06 22:12:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.



Начало обработки транзакций...
Чтение данных из бронзового слоя...
Преобразование данных...
Обнаружен существующий Silver слой, выполняется оптимизированный merge...
Обработка батча 1/5...


25/06/06 22:12:41 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/06/06 22:12:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/06/06 22:12:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/06/06 22:12:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/06/06 22:12:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/06/06 22:12:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/06/06 22:12:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/06/06 22:12:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but re



25/06/06 22:13:09 ERROR Executor: Exception in task 9.0 in stage 34.0 (TID 953)
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(Unknown Source)
	at java.base/java.nio.ByteBuffer.allocate(Unknown Source)
	at org.apache.spark.io.ReadAheadInputStream.<init>(ReadAheadInputStream.java:105)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:74)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:555)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:172)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apa

Ошибка при обработке: An error occurred while calling o88.execute.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 34.0 failed 1 times, most recent failure: Lost task 9.0 in stage 34.0 (TID 953) (c0e981295911 executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(Unknown Source)
	at java.base/java.nio.ByteBuffer.allocate(Unknown Source)
	at org.apache.spark.io.ReadAheadInputStream.<init>(ReadAheadInputStream.java:105)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:74)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:555)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:172)
	at org.apache.spark.sql.catalyst.expressi

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/tmp/ipykernel_435901/137221944.py", line 55, in process_transactions
    delta_table.alias("target").merge(
  File "/tmp/spark-9ff0d27e-ba90-4777-9109-d20bdc735c78/userFiles-cb5fbcd3-8952-4187-ad58-0b9a53b29e23/io.delta_delta-spark_2.12-3.0.0.jar/delta/tables.py", line 1022, in execute
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/opt/spark/python/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in 

Ошибка при завершении SparkSession: SparkSession does not exist in the JVM


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


ConnectionRefusedError: [Errno 111] Connection refused

In [19]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable
import time
import warnings
from pyspark.sql import SparkSession
import logging

def setup_logging():
    """Настройка логирования для подавления лишних сообщений"""
    warnings.filterwarnings("ignore")

    logger = logging.getLogger('py4j')
    logger.setLevel(logging.ERROR)
    logger = logging.getLogger('pyspark')
    logger.setLevel(logging.ERROR)
    
    sc = SparkSession.builder.getOrCreate().sparkContext
    sc.setLogLevel("ERROR")

def process_gold_layer():
    """Оптимизированная обработка золотого слоя с замерами времени"""
    setup_logging()
    
    spark = SparkSession.builder \
        .appName("GoldLayerProcessing") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.sql.shuffle.partitions", "4") \
        .config("spark.ui.showConsoleProgress", "false") \
        .getOrCreate()

    try:
        # 1. Загрузка данных с принудительным выполнением
        start_load = time.time()
        transactions = spark.read.format("delta").load("lakehouse/silver/transactions").cache()
        clients = spark.read.format("delta").load("lakehouse/silver/clients").cache()
        currency_rates = spark.read.format("delta").load("lakehouse/silver/currency_rates").cache()
        
        transactions.count()
        clients.count()
        currency_rates.count()
        
        load_time = time.time() - start_load
        print(f"Время загрузки данных: {load_time:.2f} сек")

        # Клиентская статистика
        start_client_stats = time.time()
        client_stats = (
            transactions.join(broadcast(clients), 'client_id', 'left')
            .groupby("client_id", "name", "country", "client_category", "tier")
            .agg(
                sum("amount").alias("total_amount"),
                avg("amount").alias("avg_amount"),
                count("*").alias("transactions_count")
            )
            .cache()
        )
        client_stats.count()
        client_stats_time = time.time() - start_client_stats
        print(f"Время расчета клиентской статистики: {client_stats_time:.2f} сек")

        # 3. Витрина: Ежедневные метрики
        start_daily_metrics = time.time()
        daily_metrics = (
            transactions.join(currency_rates, transactions["transaction_date"] == currency_rates["date"], "left")
            .withColumn("amount_rub",
                when(col("currency") == "USD", col("amount") * col("USD"))
                .when(col("currency") == "EUR", col("amount") * col("EUR"))
                .when(col("currency") == "CNY", col("amount") * col("CNY"))
                .otherwise(col("amount")))
            .groupBy("date")
            .agg(
                sum("amount_rub").alias("daily_volume_rub"),
                avg("amount_rub").alias("avg_transaction_rub"),
                count("*").alias("transactions_count"),
                sum(when(col("is_suspicious"), 1).otherwise(0)).alias("suspicious_count"),
                sum(when(col("is_suspicious"), col("amount_rub")).otherwise(0)).alias("suspicious_volume_rub")
            )
            .orderBy("date")
            .cache()
        )
        daily_metrics.count()
        daily_metrics_time = time.time() - start_daily_metrics
        print(f"Время расчета ежедневных метрик: {daily_metrics_time:.2f} сек")
        
        # 4. Витрина: Анализ подозрительных операций
        start_fraud_analysis = time.time()
        fraud_analysis = (
            transactions.join(clients, 'client_id', 'left')
            .filter(col("is_suspicious"))
            .groupBy("category", "country")
            .agg(
                count("*").alias("fraud_count"),
                avg("amount").alias("avg_fraud_amount"),
                sum("amount").alias("total_fraud_amount")
            )
            .cache()
        )
        fraud_analysis.count()
        fraud_analysis_time = time.time() - start_fraud_analysis
        print(f"Время анализа подозрительных операций: {fraud_analysis_time:.2f} сек")

        # 5. Сохранение витрин
        start_saving = time.time()
        
        # Клиентская статистика (merge)
        if DeltaTable.isDeltaTable(spark, "lakehouse/gold/client_stats"):
            DeltaTable.forPath(spark, "lakehouse/gold/client_stats") \
                .alias("target") \
                .merge(client_stats.alias("source"), "target.client_id = source.client_id") \
                .whenMatchedUpdateAll() \
                .whenNotMatchedInsertAll() \
                .execute()
        else:
            client_stats.write \
                .format("delta") \
                .partitionBy("country") \
                .mode("overwrite") \
                .save("lakehouse/gold/client_stats")

        # Ежедневные метрики (append новых дат)
        if DeltaTable.isDeltaTable(spark, "lakehouse/gold/daily_metrics"):
            existing_dates = spark.read.format("delta") \
                .load("lakehouse/gold/daily_metrics") \
                .select("date").distinct()
            
            daily_metrics.join(existing_dates, "date", "left_anti") \
                .write \
                .format("delta") \
                .mode("append") \
                .partitionBy("date") \
                .save("lakehouse/gold/daily_metrics")
        else:
            daily_metrics.write \
                .format("delta") \
                .partitionBy("date") \
                .mode("overwrite") \
                .save("lakehouse/gold/daily_metrics")

        saving_time = time.time() - start_saving
        print(f"Время сохранения данных: {saving_time:.2f} сек")
        
        total_time = time.time() - start_load
        print(f"\nОбщее время обработки золотого слоя: {total_time:.2f} сек")
        
        print("\nРезультаты обработки:")
        client_stats.show(5, truncate=False)
        daily_metrics.show(5, truncate=False)
        fraud_analysis.show(5, truncate=False)

    finally:
        spark.stop()

print("Начало обработки золотого слоя...")
process_gold_layer()
print("\nGold-слой успешно создан")

Начало обработки золотого слоя...


                                                                                

Время загрузки данных: 43.30 сек


                                                                                

Время расчета клиентской статистики: 13.56 сек


                                                                                

Время расчета ежедневных метрик: 5.67 сек


                                                                                

Время анализа подозрительных операций: 3.23 сек


25/06/13 22:53:15 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
                                                                                

Время сохранения данных: 10.69 сек

Общее время обработки золотого слоя: 76.45 сек

Результаты обработки:
+---------+----------------+-------+---------------+--------+------------+-----------+------------------+
|client_id|name            |country|client_category|tier    |total_amount|avg_amount |transactions_count|
+---------+----------------+-------+---------------+--------+------------+-----------+------------------+
|147412   |Lisa Jones      |FR     |vip            |standard|1058918.23  |4925.201070|215               |
|143094   |Robert Thompson |CA     |vip            |standard|1133475.35  |4928.153696|230               |
|149951   |Maria Kelly     |BR     |new            |standard|1087992.35  |4923.042308|221               |
|149984   |Jeffrey Delacruz|BR     |vip            |premium |1134061.33  |4867.216009|233               |
|142223   |前田 千代       |JP     |regular        |standard|1248844.67  |5382.951164|232               |
+---------+----------------+-------+--------------

In [18]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable
import time

def process_gold_layer():
    """Оптимизированная обработка золотого слоя с замерами времени"""
    spark = SparkSession.builder \
        .appName("GoldLayerProcessing") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.sql.shuffle.partitions", "4") \
        .getOrCreate()

    try:
        # 1. Загрузка данных с принудительным выполнением
        start_load = time.time()
        transactions = spark.read.format("delta").load("lakehouse/silver/transactions").cache()
        clients = spark.read.format("delta").load("lakehouse/silver/clients").cache()
        currency_rates = spark.read.format("delta").load("lakehouse/silver/currency_rates").cache()
        
        transactions.count()
        clients.count()
        currency_rates.count()
        
        load_time = time.time() - start_load
        print(f"Время загрузки данных: {load_time:.2f} сек")

        # 2. Подготовка витрин с принудительным выполнением
        # Клиентская статистика
        start_client_stats = time.time()
        client_stats = (
            transactions.join(broadcast(clients), 'client_id', 'left')
            .groupby("client_id", "name", "country", "client_category", "tier")
            .agg(
                sum("amount").alias("total_amount"),
                avg("amount").alias("avg_amount"),
                count("*").alias("transactions_count")
            )
            .cache()
        )
        client_stats.count()  # Принудительное выполнение
        client_stats_time = time.time() - start_client_stats
        print(f"Время расчета клиентской статистики: {client_stats_time:.2f} сек")

        # 3. Витрина: Ежедневные метрики
        start_daily_metrics = time.time()
        daily_metrics = (
            transactions.join(currency_rates, transactions["transaction_date"] == currency_rates["date"], "left")
            .withColumn("amount_rub",
                when(col("currency") == "USD", col("amount") * col("USD"))
                .when(col("currency") == "EUR", col("amount") * col("EUR"))
                .when(col("currency") == "CNY", col("amount") * col("CNY"))
                .otherwise(col("amount")))
            .groupBy("date")
            .agg(
                sum("amount_rub").alias("daily_volume_rub"),
                avg("amount_rub").alias("avg_transaction_rub"),
                count("*").alias("transactions_count"),
                sum(when(col("is_suspicious"), 1).otherwise(0)).alias("suspicious_count"),
                sum(when(col("is_suspicious"), col("amount_rub")).otherwise(0)).alias("suspicious_volume_rub")
            )
            .orderBy("date")
            .cache()
        )
        daily_metrics.count()
        daily_metrics_time = time.time() - start_daily_metrics
        print(f"Время расчета ежедневных метрик: {daily_metrics_time:.2f} сек")
        
        # 4. Витрина: Анализ подозрительных операций
        start_fraud_analysis = time.time()
        fraud_analysis = (
            transactions.join(clients, 'client_id', 'left')
            .filter(col("is_suspicious"))
            .groupBy("category", "country")
            .agg(
                count("*").alias("fraud_count"),
                avg("amount").alias("avg_fraud_amount"),
                sum("amount").alias("total_fraud_amount")
            )
            .cache()
        )
        fraud_analysis.count()
        fraud_analysis_time = time.time() - start_fraud_analysis
        print(f"Время анализа подозрительных операций: {fraud_analysis_time:.2f} сек")

        # 5. Сохранение витрин
        start_saving = time.time()
        
        # Клиентская статистика (merge)
        if DeltaTable.isDeltaTable(spark, "lakehouse/gold/client_stats"):
            DeltaTable.forPath(spark, "lakehouse/gold/client_stats") \
                .alias("target") \
                .merge(client_stats.alias("source"), 
                      "target.client_id = source.client_id") \
                .whenMatchedUpdateAll() \
                .whenNotMatchedInsertAll() \
                .execute()
        else:
            client_stats.write \
                .format("delta") \
                .partitionBy("country") \
                .mode("overwrite") \
                .save("lakehouse/gold/client_stats")

        # Ежедневные метрики (append новых дат)
        if DeltaTable.isDeltaTable(spark, "lakehouse/gold/daily_metrics"):
            existing_dates = spark.read.format("delta") \
                .load("lakehouse/gold/daily_metrics") \
                .select("date").distinct()
            
            daily_metrics.join(existing_dates, "date", "left_anti") \
                .write \
                .format("delta") \
                .mode("append") \
                .partitionBy("date") \
                .save("lakehouse/gold/daily_metrics")
        else:
            daily_metrics.write \
                .format("delta") \
                .partitionBy("date") \
                .mode("overwrite") \
                .save("lakehouse/gold/daily_metrics")


        for table in ["client_stats", "daily_metrics"]:
            DeltaTable.forPath(spark, f"lakehouse/gold/{table}") \
                .optimize() \
                .executeCompaction()
        
        saving_time = time.time() - start_saving
        print(f"Время сохранения данных: {saving_time:.2f} сек")
        
        total_time = time.time() - start_load
        print(f"\nОбщее время обработки золотого слоя: {total_time:.2f} сек")
        
        print("\nРезультаты обработки:")
        client_stats.show(5)
        daily_metrics.show(5)
        fraud_analysis.show(5)

    finally:
        spark.stop()

print("Начало обработки золотого слоя...")
process_gold_layer()
print("\nGold-слой успешно создан")

Начало обработки золотого слоя...


25/06/13 22:45:09 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_73_563 in memory.
25/06/13 22:45:09 WARN MemoryStore: Not enough space to cache rdd_73_563 in memory! (computed 384.0 B so far)
25/06/13 22:45:09 WARN BlockManager: Persisting block rdd_73_563 to disk instead.
25/06/13 22:45:09 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_73_565 in memory.
25/06/13 22:45:09 WARN MemoryStore: Not enough space to cache rdd_73_565 in memory! (computed 384.0 B so far)
25/06/13 22:45:09 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_73_566 in memory.
25/06/13 22:45:09 WARN BlockManager: Persisting block rdd_73_565 to disk instead.
25/06/13 22:45:09 WARN MemoryStore: Not enough space to cache rdd_73_566 in memory! (computed 384.0 B so far)
25/06/13 22:45:09 WARN BlockManager: Persisting block rdd_73_566 to disk instead.
25/06/13 22:45:09 W

Время загрузки данных: 38.90 сек


25/06/13 22:45:16 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_73_21 in memory.
25/06/13 22:45:16 WARN MemoryStore: Not enough space to cache rdd_73_21 in memory! (computed 384.0 B so far)
25/06/13 22:45:17 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_73_47 in memory.
25/06/13 22:45:17 WARN MemoryStore: Not enough space to cache rdd_73_47 in memory! (computed 384.0 B so far)
25/06/13 22:45:17 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_73_50 in memory.
25/06/13 22:45:17 WARN MemoryStore: Not enough space to cache rdd_73_50 in memory! (computed 384.0 B so far)
25/06/13 22:45:17 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_73_56 in memory.
25/06/13 22:45:17 WARN MemoryStore: Not enough space to cache rdd_73_56 in memory! (computed 384.0 B so far)
25/06/13 22:45:17 WARN Memor

Время расчета клиентской статистики: 15.76 сек


25/06/13 22:45:32 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_73_85 in memory.
25/06/13 22:45:32 WARN MemoryStore: Not enough space to cache rdd_73_85 in memory! (computed 384.0 B so far)
25/06/13 22:45:32 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_73_86 in memory.
25/06/13 22:45:32 WARN MemoryStore: Not enough space to cache rdd_73_86 in memory! (computed 384.0 B so far)
25/06/13 22:45:32 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_73_96 in memory.
25/06/13 22:45:32 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_73_94 in memory.
25/06/13 22:45:32 WARN MemoryStore: Not enough space to cache rdd_73_96 in memory! (computed 384.0 B so far)
25/06/13 22:45:32 WARN MemoryStore: Not enough space to cache rdd_73_94 in memory! (computed 384.0 B so far)
25/06/13 22:45:32 WARN Memor

Время расчета ежедневных метрик: 5.55 сек


25/06/13 22:45:38 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_73_171 in memory.
25/06/13 22:45:38 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_73_170 in memory.
25/06/13 22:45:38 WARN MemoryStore: Not enough space to cache rdd_73_171 in memory! (computed 384.0 B so far)
25/06/13 22:45:38 WARN MemoryStore: Not enough space to cache rdd_73_170 in memory! (computed 384.0 B so far)
25/06/13 22:45:38 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_73_197 in memory.
25/06/13 22:45:38 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_73_200 in memory.
25/06/13 22:45:38 WARN MemoryStore: Not enough space to cache rdd_73_197 in memory! (computed 384.0 B so far)
25/06/13 22:45:38 WARN MemoryStore: Not enough space to cache rdd_73_200 in memory! (computed 384.0 B so far)
25/06/13 22:45:38 WA

Время анализа подозрительных операций: 3.75 сек


25/06/13 22:45:40 ERROR NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl
                                                                                

Время сохранения данных: 13.31 сек

Общее время обработки золотого слоя: 77.27 сек

Результаты обработки:
+---------+----------------+-------+---------------+--------+------------+-----------+------------------+
|client_id|            name|country|client_category|    tier|total_amount| avg_amount|transactions_count|
+---------+----------------+-------+---------------+--------+------------+-----------+------------------+
|   147412|      Lisa Jones|     FR|            vip|standard|  1058918.23|4925.201070|               215|
|   143094| Robert Thompson|     CA|            vip|standard|  1133475.35|4928.153696|               230|
|   149951|     Maria Kelly|     BR|            new|standard|  1087992.35|4923.042308|               221|
|   149984|Jeffrey Delacruz|     BR|            vip| premium|  1134061.33|4867.216009|               233|
|   142223|       前田 千代|     JP|        regular|standard|  1248844.67|5382.951164|               232|
+---------+----------------+-------+--------------

In [16]:
from pyspark.sql import SparkSession
from delta import *

# 1. Инициализация Spark сессии с поддержкой Delta Lake
spark = SparkSession.builder \
    .appName("DeltaToCSVExport") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

currency_rates_delta_path = "lakehouse/silver/currency_rates"
output_csv_path = "currency_rates_export"
single_csv_path = "currency_rates.csv"

try:
    print("Чтение данных из Delta таблицы...")
    currency_rates = spark.read.format("delta").load(currency_rates_delta_path)
    
    print(f"Всего строк: {currency_rates.count():,}")
    print("Схема данных:")
    currency_rates.printSchema()
    
    print("\nЭкспорт в CSV с разбивкой на партиции...")
    (currency_rates.write
        .format("csv")
        .option("header", "true")
        .mode("overwrite")
        .save(output_csv_path))
    
    print("\nСоздание единого CSV файла...")
    csv_data = spark.read.csv(output_csv_path, header=True)
    (csv_data.coalesce(1)
        .write
        .option("header", "true")
        .mode("overwrite")
        .csv(single_csv_path))
    
    from pyspark.sql.functions import input_file_name
    single_file = spark.read.csv(single_csv_path, header=True) \
                   .withColumn("file_name", input_file_name())
    
    actual_file_name = single_file.first()["file_name"].split("/")[-1]
    
    print(f"\nЭкспорт завершен. Основные файлы в: {output_csv_path}")
    print(f"Единый CSV файл: {single_csv_path}/{actual_file_name}")

finally:
    spark.stop()
    print("\nSpark сессия завершена")

Чтение данных из Delta таблицы...


                                                                                

Всего строк: 1,410
Схема данных:
root
 |-- date: date (nullable = true)
 |-- USD: string (nullable = true)
 |-- EUR: string (nullable = true)
 |-- CNY: string (nullable = true)
 |-- USD_change: double (nullable = true)
 |-- EUR_change: double (nullable = true)
 |-- CNY_change: double (nullable = true)


Экспорт в CSV с разбивкой на партиции...

Создание единого CSV файла...

Экспорт завершен. Основные файлы в: currency_rates_export
Единый CSV файл: currency_rates.csv/part-00000-c1856890-8255-4e7d-a964-d12789ca888d-c000.csv

Spark сессия завершена


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DeltaToCSVExport") \
    .getOrCreate()

clients_path = "lakehouse/silver/clients"
output_clients_path = "client"

df2 = spark.read.format("delta").load(clients_path)

(df2.write
    .mode("overwrite")
    .csv(output_clients_path))
    
print(f"\nДанные успешно сохранены в {output_clients_path}")


25/06/07 02:31:13 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                


Данные успешно сохранены в client


                                                                                

# Optimization

In [7]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable
import time
from datetime import datetime

def process_zordering_only_gold_layer():
    """Обработка золотого слоя с использованием только Z-Ordering"""
    spark = SparkSession.builder \
        .appName("ZOrderingOnlyGoldLayer") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.delta.optimize.repartition.enabled", "true") \
        .getOrCreate()

    metrics = {
        'load_time': 0,
        'processing_times': {},
        'saving_time': 0,
        'optimization_time': 0,
        'total_time': 0
    }

    try:
        total_start = time.time()
        
        start_load = time.time()
        transactions = spark.read.format("delta").load("lakehouse/silver/transactions")
        clients = spark.read.format("delta").load("lakehouse/silver/clients")
        currency_rates = spark.read.format("delta").load("lakehouse/silver/currency_rates")
        metrics['load_time'] = time.time() - start_load

        # 2. Подготовка витрин
        # Клиентская статистика
        start_client_stats = time.time()
        client_stats = (
            transactions.join(broadcast(clients), 'client_id', 'left')
            .groupby("client_id", "name", "country", "client_category", "tier")
            .agg(
                sum("amount").alias("total_amount"),
                avg("amount").alias("avg_amount"),
                count("*").alias("transactions_count")
            ))
        metrics['processing_times']['client_stats'] = time.time() - start_client_stats

        # Ежедневные метрики
        start_daily_metrics = time.time()
        daily_metrics = (
            transactions.join(currency_rates, 
                           transactions["transaction_date"] == currency_rates["date"], "left")
            .withColumn("amount_rub",
                when(col("currency") == "USD", col("amount") * col("USD"))
                .when(col("currency") == "EUR", col("amount") * col("EUR"))
                .when(col("currency") == "CNY", col("amount") * col("CNY"))
                .otherwise(col("amount")))
            .groupBy("transaction_date")
            .agg(
                sum("amount_rub").alias("daily_volume_rub"),
                avg("amount_rub").alias("avg_transaction_rub"),
                count("*").alias("transactions_count"),
                sum(when(col("is_suspicious"), 1).otherwise(0)).alias("suspicious_count"),
                sum(when(col("is_suspicious"), col("amount_rub")).otherwise(0)).alias("suspicious_volume_rub")
            )
            .withColumnRenamed("transaction_date", "date")
            .orderBy("date")
        )
        metrics['processing_times']['daily_metrics'] = time.time() - start_daily_metrics

        # 3. Сохранение без партиционирования
        start_saving = time.time()
        
        (client_stats.write
            .format("delta")
            .option("mergeSchema", "true")
            .mode("overwrite")
            .save("lakehouse/gold/client_stats_zorder"))
        
        (daily_metrics.write
            .format("delta")
            .option("mergeSchema", "true")
            .mode("overwrite")
            .save("lakehouse/gold/daily_metrics_zorder"))
        
        metrics['saving_time'] = time.time() - start_saving

        # 4. Оптимизация с Z-Ordering
        start_optimization = time.time()
        
        DeltaTable.forPath(spark, "lakehouse/gold/client_stats_zorder") \
            .optimize() \
            .executeZOrderBy("client_category", "tier", "country")
        
        DeltaTable.forPath(spark, "lakehouse/gold/daily_metrics_zorder") \
            .optimize() \
            .executeZOrderBy("date", "daily_volume_rub")
        
        metrics['optimization_time'] = time.time() - start_optimization
        metrics['total_time'] = time.time() - total_start

        print("\n" + "="*50)
        print("РЕЗУЛЬТАТЫ ОБРАБОТКИ С Z-ORDERING (БЕЗ ПАРТИЦИОНИРОВАНИЯ)")
        print(f"Дата выполнения: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("="*50)
        
        print(f"\n{'Метрика':<30} | {'Время (сек)':>10}")
        print("-"*42)
        print(f"{'Загрузка данных':<30} | {metrics['load_time']:>10.2f}")
        for step, t in metrics['processing_times'].items():
            print(f"{'Обработка ' + step:<30} | {t:>10.2f}")
        print(f"{'Сохранение данных':<30} | {metrics['saving_time']:>10.2f}")
        print(f"{'Оптимизация (Z-Order)':<30} | {metrics['optimization_time']:>10.2f}")
        print("-"*42)
        print(f"{'ОБЩЕЕ ВРЕМЯ':<30} | {metrics['total_time']:>10.2f}")
        print("\n" + "="*50)

        print("\nПримеры данных (первые 5 строк):")
        print("\nКлиентская статистика:")
        spark.read.format("delta").load("lakehouse/gold/client_stats_zorder").show(5)
        
        print("\nЕжедневные метрики:")
        spark.read.format("delta").load("lakehouse/gold/daily_metrics_zorder").show(5)

    finally:
        spark.stop()

# Запуск обработки
print("Запуск обработки Gold-слоя с Z-Ordering (без партиционирования)...")
process_zordering_only_gold_layer()

Запуск обработки Gold-слоя с Z-Ordering (без партиционирования)...


25/06/10 00:59:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/06/10 00:59:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/06/10 00:59:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/06/10 00:59:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/06/10 00:59:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                


РЕЗУЛЬТАТЫ ОБРАБОТКИ С Z-ORDERING (БЕЗ ПАРТИЦИОНИРОВАНИЯ)
Дата выполнения: 2025-06-10 01:00:02

Метрика                        | Время (сек)
------------------------------------------
Загрузка данных                |       0.44
Обработка client_stats         |       0.04
Обработка daily_metrics        |       0.12
Сохранение данных              |      57.12
Оптимизация (Z-Order)          |       4.27
------------------------------------------
ОБЩЕЕ ВРЕМЯ                    |      61.98


Примеры данных (первые 5 строк):

Клиентская статистика:
+---------+----------------+-------+---------------+--------+------------+-----------+------------------+
|client_id|            name|country|client_category|    tier|total_amount| avg_amount|transactions_count|
+---------+----------------+-------+---------------+--------+------------+-----------+------------------+
|   149984|Jeffrey Delacruz|     BR|            vip| premium|  1134061.33|4867.216009|               233|
|   131296|    Craig Stee

In [13]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable
import time
from datetime import datetime
from pyspark.sql import SparkSession

def demonstrate_zordering():
    """Демонстрация разницы в производительности с Z-Ordering на реальных данных транзакций"""
    spark = SparkSession.builder \
        .appName("ZOrderingDemoOnTransactions") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()

    try:
        print("Загрузка данных транзакций...")
        transactions = spark.read.format("delta").load("lakehouse/silver/transactions")
        
        print("Создание тестовых таблиц...")
        transactions.write.format("delta").mode("overwrite").save("lakehouse/demo/transactions_no_zorder")
        transactions.write.format("delta").mode("overwrite").save("lakehouse/demo/transactions_zorder")
        
        print("Применение Z-Ordering...")
        DeltaTable.forPath(spark, "lakehouse/demo/transactions_zorder") \
            .optimize() \
            .executeZOrderBy("transaction_date", "client_id", "is_suspicious")
        
        def run_test_queries(path, description):
            print(f"\nЗапуск тестовых запросов ({description})...")
            results = {}
            

            start = time.time()
            cnt1 = spark.read.format("delta").load(path) \
                .filter((col("transaction_date") == "2023-01-15") & 
                       (col("client_id") == "12345")) \
                .count()
            results['date_client_filter'] = time.time() - start
            
            # Запрос 2: Фильтрация по подозрительным транзакциям
            start = time.time()
            cnt2 = spark.read.format("delta").load(path) \
                .filter(col("is_suspicious") == True) \
                .count()
            results['suspicious_filter'] = time.time() - start
            
            # Запрос 3: Диапазон дат и сумма
            start = time.time()
            cnt3 = spark.read.format("delta").load(path) \
                .filter((col("transaction_date").between("2023-01-01", "2023-01-31")) &
                       (col("amount") > 1000)) \
                .count()
            results['date_range_amount_filter'] = time.time() - start
            
            print(f"Найдено: {cnt1} транзакций за 2023-01-15 клиента 12345")
            print(f"Найдено: {cnt2} подозрительных транзакций")
            print(f"Найдено: {cnt3} транзакций >1000 за январь 2023")
            
            return results
        
        print("\n" + "="*60)
        print("ТЕСТИРОВАНИЕ ПРОИЗВОДИТЕЛЬНОСТИ НА РЕАЛЬНЫХ ДАННЫХ")
        print("="*60)
        
        no_zorder_results = run_test_queries(
            "lakehouse/demo/transactions_no_zorder", 
            "БЕЗ Z-Ordering"
        )
        
        print("\n" + "-"*60)
        
        zorder_results = run_test_queries(
            "lakehouse/demo/transactions_zorder", 
            "С Z-Ordering"
        )
        
        print("\n" + "="*60)
        print("РЕЗУЛЬТАТЫ СРАВНЕНИЯ ПРОИЗВОДИТЕЛЬНОСТИ")
        print("="*60)
        print(f"\n{'Тип запроса':<35} | {'Без Z-Order':>12} | {'С Z-Order':>12} | {'Выигрыш':>12}")
        print("-"*80)
        
        for query in no_zorder_results:
            time_no = no_zorder_results[query]
            time_with = zorder_results[query]
            gain = time_no - time_with
            gain_pct = (gain / time_no) * 100 if time_no > 0 else 0
            
            query_name = {
                'date_client_filter': 'Фильтр: дата + клиент',
                'suspicious_filter': 'Фильтр: подозрительные',
                'date_range_amount_filter': 'Фильтр: диапазон дат + сумма'
            }.get(query, query)
            
            print(f"{query_name:<35} | {time_no:>12.4f}s | {time_with:>12.4f}s | {gain:>12.4f}s ({gain_pct:.1f}%)")
        
    finally:
        spark.stop()

if __name__ == "__main__":
    demonstrate_zordering()

Загрузка данных транзакций...
Создание тестовых таблиц...


25/06/10 02:19:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/06/10 02:19:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/06/10 02:19:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/06/10 02:19:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/06/10 02:19:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
25/06/10 02:19:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/06/10 02:19:59 WARN MemoryManager: Total allocation exceeds 95.

Применение Z-Ordering...


                                                                                


ТЕСТИРОВАНИЕ ПРОИЗВОДИТЕЛЬНОСТИ НА РЕАЛЬНЫХ ДАННЫХ

Запуск тестовых запросов (БЕЗ Z-Ordering)...
Найдено: 0 транзакций за 2023-01-15 клиента 12345
Найдено: 3610988 подозрительных транзакций
Найдено: 140618 транзакций >1000 за январь 2023

------------------------------------------------------------

Запуск тестовых запросов (С Z-Ordering)...
Найдено: 0 транзакций за 2023-01-15 клиента 12345
Найдено: 3610988 подозрительных транзакций
Найдено: 140618 транзакций >1000 за январь 2023

РЕЗУЛЬТАТЫ СРАВНЕНИЯ ПРОИЗВОДИТЕЛЬНОСТИ

Тип запроса                         |  Без Z-Order |    С Z-Order |      Выигрыш
--------------------------------------------------------------------------------
Фильтр: дата + клиент               |       0.3638s |       0.2975s |       0.0663s (18.2%)
Фильтр: подозрительные              |       0.5323s |       0.2997s |       0.2326s (43.7%)
Фильтр: диапазон дат + сумма        |       0.5962s |       0.2981s |       0.2981s (50.0%)

КОММЕНТАРИИ К РЕЗУЛЬТАТАМ

1. Z-O

In [14]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta import *

builder = SparkSession.builder.appName("ExportCurrencyRates") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

silver_path = "lakehouse/silver"

currency_rates = spark.read.format("delta").load(f"{silver_path}/currency_rates")

currency_rates.printSchema()

output_path = "currency_rates_export"
dbutils.fs.mkdirs(output_path)

(currency_rates
    .write
    .format("csv")
    .option("header", "true")
    .option("delimiter", ",")
    .mode("overwrite")
    .save(output_path))

csv_files = spark.read.format("csv") \
    .option("header", "true") \
    .load(output_path)

csv_files.coalesce(1) \
    .write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(f"{output_path}_single_file")

print(f"Данные успешно экспортированы в {output_path}")

spark.stop()

root
 |-- date: date (nullable = true)
 |-- USD: string (nullable = true)
 |-- EUR: string (nullable = true)
 |-- CNY: string (nullable = true)
 |-- USD_change: double (nullable = true)
 |-- EUR_change: double (nullable = true)
 |-- CNY_change: double (nullable = true)



NameError: name 'dbutils' is not defined

In [8]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta import *

builder = SparkSession.builder.appName("ExportCurrencyRates") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

silver_path = "lakehouse/silver"

print("Transaction table details:")
spark.sql(f"DESCRIBE DETAIL public.`{silver_path}/transactions`").select("*").show(truncate=False)

print("\nClients table details:")
spark.sql(f"DESCRIBE DETAIL delta.`{silver_path}/clients`").select("*").show(truncate=False)

print("\nCurrency rates table details:")
spark.sql(f"DESCRIBE DETAIL delta.`{silver_path}/currency_rates`").select("*").show(truncate=False)

Transaction table details:


25/06/13 06:05:17 WARN ObjectStore: Failed to get database public, returning NoSuchObjectException


AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `public`.`lakehouse/silver/transactions` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 0;
'DescribeDeltaDetailCommand
+- 'UnresolvedTable [public, lakehouse/silver/transactions], DESCRIBE DETAIL


In [16]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta import *
import os

# Инициализация Spark сессии с Delta Lake
builder = SparkSession.builder.appName("TableMetadataViewer") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Базовый путь - используем абсолютный путь для Docker
base_path = "lakehouse/silver"  # Замените на реальный путь в контейнере

def show_table_metadata(table_name):
    try:
        print(f"\n{'='*50}")
        print(f"Метаданные таблицы: {table_name}")
        print(f"{'='*50}")
        
        full_path = os.path.join(base_path, table_name)
        
        # Вариант 1: Использование DeltaTable API
        print("\nМетод 1: Использование DeltaTable.detail()")
        delta_table = DeltaTable.forPath(spark, full_path)
        detail_df = delta_table.detail()
        detail_df.show(truncate=False, vertical=True)
        
    #     # Дополнительная информация
    #     print("\nДополнительные метаданные:")
    #     print(f"Путь к таблице: {full_path}")
    #     print("Схема таблицы:")
    #     spark.read.format("delta").load(full_path).printSchema()
        
    #     # История изменений (последние 5 версий)
    #     print("\nИстория изменений (последние 5 версий):")
    #     delta_table.history(5).show(truncate=False)
        
    except Exception as e:
        print(f"Ошибка: {str(e)}")
        print("Возможные причины:")
        print(f"1. Таблица не существует по пути: {full_path}")
        print("2. Нет прав доступа к директории")
        print("3. Формат таблицы не Delta")

# Проверяем таблицы
tables = ["transactions", "clients", "currency_rates"]
for table in tables:
    show_table_metadata(table)

spark.stop()


Метаданные таблицы: transactions

Метод 1: Использование DeltaTable.detail()


                                                                                

-RECORD 0------------------------------------------------------------------
 format           | delta                                                  
 id               | ec25adc6-ff00-465e-9aa1-1a26faabc48b                   
 name             | NULL                                                   
 description      | NULL                                                   
 location         | file:/opt/spark/work-dir/lakehouse/silver/transactions 
 createdAt        | 2025-06-04 02:25:49.671                                
 lastModified     | 2025-06-09 21:14:08.282                                
 partitionColumns | [transaction_date]                                     
 numFiles         | 19803                                                  
 sizeInBytes      | 363399779                                              
 properties       | {}                                                     
 minReaderVersion | 1                                                      
 minWriterVe

                                                                                

-RECORD 0-------------------------------------------------------------
 format           | delta                                             
 id               | af89fd5e-22b0-4a3e-849d-e716dbbd1769              
 name             | NULL                                              
 description      | NULL                                              
 location         | file:/opt/spark/work-dir/lakehouse/silver/clients 
 createdAt        | 2025-06-04 02:26:59.875                           
 lastModified     | 2025-06-09 21:14:14.626                           
 partitionColumns | []                                                
 numFiles         | 1                                                 
 sizeInBytes      | 1054362                                           
 properties       | {}                                                
 minReaderVersion | 1                                                 
 minWriterVersion | 2                                                 
 table