In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType

In [2]:
spark = SparkSession.builder.appName("jupyter_test") \
    .config("spark.master",                         "yarn") \
    .config("spark.eventLog.enabled",               "true") \
    .config("spark.eventLog.dir",                   "file:///opt/spark/eventLog") \
    .config("spark.history.fs.logDirectory",        "file:///opt/spark/eventLog") \
    .config("spark.hadoop.yarn.timeline-service.enabled", "false") \
    .config("spark.jars.packages",                  "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1") \
    .config("spark.sql.extensions",                 "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog",      "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.local",              "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type",         "hadoop") \
    .config("spark.sql.catalog.local.warehouse",    "hdfs://namenode:9000/warehouse/local") \
    .config("spark.sql.defaultCatalog",             "local") \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/spark-3.4.0-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.4_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-47bb7e88-4cf0-4130-b32f-013062e6ee62;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.4_2.12;1.3.0 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0

In [3]:
schema = StructType([
  StructField("vendor_id", LongType(), True),
  StructField("trip_id", LongType(), True),
  StructField("trip_distance", FloatType(), True),
  StructField("fare_amount", DoubleType(), True),
  StructField("store_and_fwd_flag", StringType(), True)
])

df = spark.createDataFrame([], schema)
df.writeTo("demo.nyc.taxis").create()

                                                                                

In [4]:
schema = spark.table("demo.nyc.taxis").schema
data = [
    (1, 1000371, 1.8, 15.32, "N"),
    (2, 1000372, 2.5, 22.15, "N"),
    (2, 1000373, 0.9, 9.01, "N"),
    (1, 1000374, 8.4, 42.13, "Y")
  ]
df = spark.createDataFrame(data, schema)
df.writeTo("demo.nyc.taxis").append()

                                                                                

In [5]:
df = spark.table("demo.nyc.taxis").show()

+---------+-------+-------------+-----------+------------------+
|vendor_id|trip_id|trip_distance|fare_amount|store_and_fwd_flag|
+---------+-------+-------------+-----------+------------------+
|        1|1000371|          1.8|      15.32|                 N|
|        2|1000372|          2.5|      22.15|                 N|
|        2|1000373|          0.9|       9.01|                 N|
|        1|1000374|          8.4|      42.13|                 Y|
+---------+-------+-------------+-----------+------------------+



In [6]:
spark.sql('INSERT INTO demo.nyc.taxis VALUES (2, 2000374, 1.4, 13.58, "Y")')

DataFrame[]

In [7]:
spark.sql("select * from demo.nyc.taxis").show()

+---------+-------+-------------+-----------+------------------+
|vendor_id|trip_id|trip_distance|fare_amount|store_and_fwd_flag|
+---------+-------+-------------+-----------+------------------+
|        1|1000371|          1.8|      15.32|                 N|
|        2|1000372|          2.5|      22.15|                 N|
|        2|1000373|          0.9|       9.01|                 N|
|        1|1000374|          8.4|      42.13|                 Y|
|        2|2000374|          1.4|      13.58|                 Y|
+---------+-------+-------------+-----------+------------------+



In [8]:
spark.sql("SELECT * FROM demo.nyc.taxis.history").show()

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2023-07-04 03:48:...|8438797231497536736|               null|               true|
|2023-07-04 03:48:...|1628765617924961237|8438797231497536736|               true|
|2023-07-04 03:50:...|4676572027960193653|1628765617924961237|               true|
+--------------------+-------------------+-------------------+-------------------+



In [9]:
spark.sql("SELECT * FROM demo.nyc.taxis FOR VERSION AS OF '1628765617924961237';").show()

+---------+-------+-------------+-----------+------------------+
|vendor_id|trip_id|trip_distance|fare_amount|store_and_fwd_flag|
+---------+-------+-------------+-----------+------------------+
|        1|1000371|          1.8|      15.32|                 N|
|        2|1000372|          2.5|      22.15|                 N|
|        2|1000373|          0.9|       9.01|                 N|
|        1|1000374|          8.4|      42.13|                 Y|
+---------+-------+-------------+-----------+------------------+



[Stage 6:>                                                          (0 + 1) / 1]                                                                                

In [10]:
spark.sql("SELECT * FROM demo.nyc.taxis;").show()

+---------+-------+-------------+-----------+------------------+
|vendor_id|trip_id|trip_distance|fare_amount|store_and_fwd_flag|
+---------+-------+-------------+-----------+------------------+
|        1|1000371|          1.8|      15.32|                 N|
|        2|1000372|          2.5|      22.15|                 N|
|        2|1000373|          0.9|       9.01|                 N|
|        1|1000374|          8.4|      42.13|                 Y|
|        2|2000374|          1.4|      13.58|                 Y|
+---------+-------+-------------+-----------+------------------+



In [11]:
spark.sql("DELETE FROM demo.nyc.taxis WHERE vendor_id=2")

DataFrame[]

In [16]:
spark.sql("SELECT * FROM demo.nyc.taxis.history").show()

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2023-07-04 03:48:...|8438797231497536736|               null|               true|
|2023-07-04 03:48:...|1628765617924961237|8438797231497536736|               true|
|2023-07-04 03:50:...|4676572027960193653|1628765617924961237|               true|
|2023-07-04 03:51:...|6376625549467677474|4676572027960193653|               true|
+--------------------+-------------------+-------------------+-------------------+



In [17]:
spark.sql("SELECT * FROM demo.nyc.taxis;").show()

+---------+-------+-------------+-----------+------------------+
|vendor_id|trip_id|trip_distance|fare_amount|store_and_fwd_flag|
+---------+-------+-------------+-----------+------------------+
|        1|1000371|          1.8|      15.32|                 N|
|        1|1000374|          8.4|      42.13|                 Y|
+---------+-------+-------------+-----------+------------------+



In [18]:
spark.sql("SELECT * FROM demo.nyc.taxis FOR VERSION AS OF '4676572027960193653';").show()

+---------+-------+-------------+-----------+------------------+
|vendor_id|trip_id|trip_distance|fare_amount|store_and_fwd_flag|
+---------+-------+-------------+-----------+------------------+
|        2|2000374|          1.4|      13.58|                 Y|
|        1|1000371|          1.8|      15.32|                 N|
|        2|1000372|          2.5|      22.15|                 N|
|        2|1000373|          0.9|       9.01|                 N|
|        1|1000374|          8.4|      42.13|                 Y|
+---------+-------+-------------+-----------+------------------+



In [22]:
spark.sql('UPDATE demo.nyc.taxis SET vendor_id=2 WHERE vendor_id=1')

[Stage 22:>                                                         (0 + 1) / 1]                                                                                

DataFrame[]

In [23]:
spark.sql("SELECT * FROM demo.nyc.taxis.history").show()

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2023-07-04 03:48:...|8438797231497536736|               null|               true|
|2023-07-04 03:48:...|1628765617924961237|8438797231497536736|               true|
|2023-07-04 03:50:...|4676572027960193653|1628765617924961237|               true|
|2023-07-04 03:51:...|6376625549467677474|4676572027960193653|               true|
|2023-07-04 03:56:...|6340313450874384905|6376625549467677474|               true|
|2023-07-04 03:56:...|5687530669073898290|6340313450874384905|               true|
+--------------------+-------------------+-------------------+-------------------+



In [24]:
spark.sql("SELECT * FROM demo.nyc.taxis;").show()

+---------+-------+-------------+-----------+------------------+
|vendor_id|trip_id|trip_distance|fare_amount|store_and_fwd_flag|
+---------+-------+-------------+-----------+------------------+
|        2|1000371|          1.8|      15.32|                 N|
|        2|1000374|          8.4|      42.13|                 Y|
+---------+-------+-------------+-----------+------------------+



In [25]:
spark.sql("SELECT * FROM demo.nyc.taxis FOR VERSION AS OF '6340313450874384905';").show()

+---------+-------+-------------+-----------+------------------+
|vendor_id|trip_id|trip_distance|fare_amount|store_and_fwd_flag|
+---------+-------+-------------+-----------+------------------+
|        1|1000371|          1.8|      15.32|                 N|
|        1|1000374|          8.4|      42.13|                 Y|
+---------+-------+-------------+-----------+------------------+

