In [3]:
# wersja z filtrem na dwie tabele + cachowanie

from pyspark.sql.functions import to_date, col

ctrl = spark.table("cdf_control_silver_part").filter("table_name = 'tbl_bronze'").first()
last_version = ctrl["last_version"]     #946



# 2. OK: znajdź bieżącą wersję tabeli
current_version = spark.sql("DESCRIBE HISTORY tbl_bronze")\
                       .selectExpr("max(version) as v")\
                       .first()["v"]

if current_version > last_version:

    # 1. Pobierz zmiany i dodaj kolumnę partition_date
    changes_df = spark.sql(f"""
    SELECT 
            timestamp,
            symbol,
            exchange,
            event_type,
            latency_ms,
            order_id,
            transaction_type,
            price,
            volume,
            bid_price,
            ask_price,
            bid_size,
            ask_size,
            canceled_order_id,
            currency,
            trade_id,
            event_id,
            EventProcessedUtcTime,
            PartitionId,
            EventEnqueuedUtcTime   
    FROM table_changes('tbl_bronze', {last_version})
    WHERE _change_type = 'insert'
    """)

    with_date = changes_df \
    .withColumn("partition_date", to_date(col("timestamp"))) \
    .cache()                   # <<< cacheujemy wynik

    # 2. Filtrujemy raz na obiekt cache’owany
    usd_df = with_date.filter(col("currency") == "USD")
    wrong_df = with_date.filter(col("currency") != "USD")

    # 3. Zapisujemy oba dołącznie, ale bez ponownego przeliczania
    usd_df.write \
        .format("delta") \
        .mode("append") \
        .saveAsTable("tbl_silver_part")

    wrong_df.write \
        .format("delta") \
        .mode("append") \
        .saveAsTable("tbl_silver_wrong_currency")

    # 4. Odświeżamy tabelę kontrolną
    spark.sql(f"""
    UPDATE cdf_control_silver_part
    SET last_version = {current_version}
    WHERE table_name = 'tbl_bronze'
    """)

else:
    print("Brak nowych zmian do przetworzenia.")


StatementMeta(, 2f7629e2-8f8a-4935-8cce-3c344bb56654, 5, Finished, Available, Finished)

In [2]:
# -- %%sql
# -- ALTER TABLE tbl_bronze
# -- SET TBLPROPERTIES (
# --   'delta.enableChangeDataFeed' = 'true'
# -- );


StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 4, Finished, Available, Finished)

In [3]:
# -- %%sql
# -- DESCRIBE history tbl_bronze

StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 5, Finished, Available, Finished)

In [4]:
# -- %%sql
# -- create table tbl_silver_part AS
# -- select * from tbl_silver 
# -- where 0=1

StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 6, Finished, Available, Finished)

In [5]:
# -- %%sql
# -- -- -- -- -- ALTER TABLE tbl_silver_wrong_currency
# -- -- -- -- -- ADD COLUMNS (
# -- -- -- -- --   partition_date DATE
# -- -- -- -- -- );
# -- CREATE OR REPLACE TABLE tbl_silver_wrong_currency
# -- USING DELTA
# -- PARTITIONED BY (partition_date)         -- jeżeli tabela była partycjonowana
# -- AS
# -- SELECT
# --   CAST(`timestamp` AS STRING) AS `timestamp`,
# --   symbol,
# --   exchange,
# --   event_type,
# --   latency_ms,
# --   order_id,
# --   transaction_type,
# --   price,
# --   volume,
# --   bid_price,
# --   ask_price,
# --   bid_size,
# --   ask_size,
# --   canceled_order_id,
# --   currency,
# --   trade_id,
# --   event_id,
# --   EventProcessedUtcTime,
# --   PartitionId,
# --   EventEnqueuedUtcTime,
# --   partition_date
# -- FROM tbl_silver_wrong_currency_rebuild;



StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 7, Finished, Available, Finished)

In [6]:
# -- %%sql
# -- select * from tbl_silver_wrong_currency_rebuild

# -- update tbl_silver_wrong_currency_rebuild
# -- set partition_date = DATE(TIMESTAMP)

StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 8, Finished, Available, Finished)

In [7]:
# -- %%sql

# -- DESCRIBE history tbl_silver_part

StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 9, Finished, Available, Finished)

In [8]:
# -- %%sql
# -- select * from tbl_silver_part
# -- -- kcwhere trade_id = 'b5089883b5bf3486e8bf2ccfe9ad442a9bd2b77a48557e9c2b05d282b7c246fa'

StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 10, Finished, Available, Finished)

In [9]:
# Welcome to your new notebook
# Type here in the cell editor to add code!


StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 11, Finished, Available, Finished)

In [10]:
# -- select * from tbl_bronze

StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 12, Finished, Available, Finished)

In [11]:
# -- %%sql

# -- insert into tbl_silver_part
# -- select *, timestamp as pratition_date from tbl_silver
# -- where trade_id = 'b5089883b5bf3486e8bf2ccfe9ad442a9bd2b77a48557e9c2b05d282b7c246fa'

StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 13, Finished, Available, Finished)

In [12]:
# -- %%sql

# -- select count(*) from tbl_bronze

StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 14, Finished, Available, Finished)

In [13]:
# from pyspark.sql.functions import to_date, col

# # 1. Wczytaj dane z tabeli Delta „bronze”
# bronze_df = spark.table("tbl_bronze")

# # 2. Dodaj kolumnę partition_date (data bez części czasowej)
# silver_df = bronze_df.withColumn("partition_date", to_date(col("timestamp")))

# # 3. Zapisz do tabeli Delta „silver”, nadpisując całą zawartość
# silver_df.write \
#     .format("delta") \
#     .mode("overwrite") \
#     .option("overwriteSchema", "true") \
#     .saveAsTable("tbl_silver_part")


StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 15, Finished, Available, Finished)

In [14]:
# -- %%sql
# -- select count(*) from tbl_silver_part

StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 16, Finished, Available, Finished)

In [15]:
# -- %%sql
# -- CREATE OR REPLACE TABLE tbl_silver_part
# -- USING DELTA
# -- PARTITIONED BY (partition_date)
# -- AS
# -- SELECT *
# -- FROM tbl_silver_part;


StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 17, Finished, Available, Finished)

In [16]:
# -- %%sql
# -- -- wykonaj raz, aby stworzyć kontrolkę
# -- -- CREATE TABLE IF NOT EXISTS cdf_control_silver_part (
# -- --   table_name STRING,
# -- --   last_version BIGINT
# -- -- );

# -- -- -- i wstaw wpis dla tbl_bronze, jeśli nie ma
# -- -- MERGE INTO cdf_control_silver_part AS c
# -- -- USING (SELECT 'tbl_bronze' AS table_name, 0 AS last_version) AS src
# -- --   ON c.table_name = src.table_name
# -- -- WHEN NOT MATCHED THEN
# -- --   INSERT (table_name, last_version) VALUES (src.table_name, src.last_version);

# --   select * from cdf_control_silver_part


StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 18, Finished, Available, Finished)

In [17]:
# --  %%sql

# -- -- DESCRIBE HISTORY tbl_silver_part;
# --  DESCRIBE TABLE tbl_silver_part;


StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 19, Finished, Available, Finished)

In [18]:
# from pyspark.sql.functions import to_date, col
# # 1. Pobierz ostatnią przetworzoną wersję z tabeli kontrolnej
# ctrl = spark.table("cdf_control_silver_part").filter("table_name = 'tbl_bronze'").first()
# last_version = ctrl["last_version"]     #946



# # 2. OK: znajdź bieżącą wersję tabeli
# current_version = spark.sql("DESCRIBE HISTORY tbl_bronze")\
#                        .selectExpr("max(version) as v")\
#                        .first()["v"]

# if current_version > last_version:
#     # 3. Pobierz tylko nowe wiersze (INSERT) od last_version (exclusive)
#     changes_df = spark.sql(f"""
#       SELECT 
#         timestamp,
#         symbol,
#         exchange,
#         event_type,
#         latency_ms,
#         order_id,
#         transaction_type,
#         price,
#         volume,
#         bid_price,
#         ask_price,
#         bid_size,
#         ask_size,
#         canceled_order_id,
#         currency,
#         trade_id,
#         event_id,
#         EventProcessedUtcTime,
#         PartitionId,
#         EventEnqueuedUtcTime 
#       FROM table_changes('tbl_bronze', {last_version})
#       WHERE _change_type = 'insert'
#     """)

#     # 4. Dodaj partycję date-only
#     to_write = changes_df.withColumn("partition_date", to_date(col("timestamp")))

#     # 5. Dopisz do silver (append)
#     to_write.write \
#         .format("delta") \
#         .mode("append") \
#         .saveAsTable("tbl_silver_part")

#     # 6. Zaktualizuj tabelę kontrolną do nowej wersji
#     spark.sql(f"""
#       UPDATE cdf_control_silver_part 
#       SET last_version = {current_version} 
#       WHERE table_name = 'tbl_bronze'
#     """)
# else:
#     print("Brak nowych zmian do przetworzenia.")


StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 20, Finished, Available, Finished)

In [19]:
# -- %%sql
# -- SELECT
# -- timestamp,
# -- symbol,
# -- exchange,
# -- event_type,
# -- latency_ms,
# -- order_id,
# -- transaction_type,
# -- price,
# -- volume,
# -- bid_price,
# -- ask_price,
# -- bid_size,
# -- ask_size,
# -- canceled_order_id,
# -- currency,
# -- trade_id,
# -- event_id,
# -- EventProcessedUtcTime,
# -- PartitionId,
# -- EventEnqueuedUtcTime
# --       FROM table_changes('tbl_bronze', 945)
# --       WHERE _change_type = 'insert'

StatementMeta(, a48ff611-8fd3-48f3-b671-0bbec4205d7e, 21, Finished, Available, Finished)