GCS bucket \
creation
can be using any config but make sure that GCS will having same region as Dataproc\
In this workshop will be using "dev-test-services-lakehouse-training"\
in region "us-central1"


Data proc \
creation will be using single machine in "us-central1"\
with spark version 3.3 \
name "lakehouse-workshop" \
Enable "component gateway", adding component "Jupyter Notebook", "Zeppelin Notebook
"\
customerize cluster to use bucket that we made in above mention,\
in this workshop we use "dev-test-services-lakehouse-training"


Biglake \
to create Biglake connection go to bigquery \
Go to +ADD and choose external data source \
connnection type BViglake and remote function (cloud resource) \
create connection ID "biglake_iceberg" \
region "us-central1" 
#https://cloud.google.com/bigquery/docs/iceberg-tables#bq_1

In [None]:
#Start with pyspark session
import pyspark
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder.appName("iceberg")
    .master("local")
    .config(
    "spark.jars.packages",
    """org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0""",    
    )
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
    .config("spark.sql.catalog.spark_catalog.type", "hive")
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.local.type", "hadoop")
    .config("spark.sql.catalog.local.warehouse", "gs://arm-test-lakehouse/warehouse")
    .config("spark.sql.defaultCatalog", "local")
    .getOrCreate()
)
#spark session documents
#https://api-docs.databricks.com/python/pyspark/latest/pyspark.sql/spark_session.html

In [None]:
#let create some dataframe
from pyspark.sql.types import *
schema = StructType([
  StructField("vendor_id", LongType(), True),
  StructField("trip_id", LongType(), True),
  StructField("trip_distance", FloatType(), True),
  StructField("fare_amount", DoubleType(), True),
  StructField("store_and_fwd_flag", StringType(), True)
])
data = [
    (1, 1000371, 1.8, 15.32, "N"),
    (2, 1000372, 2.5, 22.15, "N"),
    (2, 1000373, 0.9, 9.01, "N"),
    (1, 1000374, 8.4, 42.13, "Y")
  ]
df = spark.createDataFrame(data, schema)
df.show()

In [None]:
#let save it as a file 
df.writeTo("nyc_taxis").create()

In [None]:
#let have a look at schema in Iceberg table
schema = spark.table("nyc_taxis").schema
print(schema)

In [None]:
#try adding new row to iceberg table
schema = spark.table("nyc_taxis").schema
data = [
    (9, 1000999, 9.9, 99.99, "Y")
  ]
df = spark.createDataFrame(data, schema)
df.writeTo("nyc_taxis").append()

In [None]:
#have look at new record
df = spark.table("nyc_taxis").show()

#connect Biglake to Iceberg table in GCS \
First have a look at GCS that we connect with dataproc you will see warehouse directory with each table inside \
also each table will contain data and metadata \
to connect with Iceberg table use below code

In [None]:
-- Bigquery

  CREATE EXTERNAL TABLE dev-test-services.lake__workshop.lake_workshop_iceberg
  WITH CONNECTION `dev-test-services.us-central1.biglake_iceberg`
  OPTIONS (
         format = 'ICEBERG',
         uris = ["gs://dev-test-services-lake-workshop-m/warehouse/taxis_pyspark_ice/metadata/v2.metadata.json"]
   )

#try create table using spark sql
spark.sql(
    """
CREATE OR REPLACE TABLE demo.nyc.taxis
(
  vendor_id bigint,
  trip_id bigint,
  trip_distance float,
  fare_amount double,
  store_and_fwd_flag string
)  USING iceberg
PARTITIONED BY (vendor_id);
 
"""
)

spark.sql(
    """
INSERT INTO demo.nyc.taxis
VALUES (1, 1000371, 1.8, 15.32, 'N'), (2, 1000372, 2.5, 22.15, 'N'), (2, 1000373, 0.9, 9.01, 'N'), (1, 1000374, 8.4, 42.13, 'Y');
 
"""
)

In [None]:
#query data
spark.sql("""
    select * from nyc_taxis
"""
).show()


In [None]:
#create new table
spark.sql(
    """
CREATE OR REPLACE TABLE new_data
(
  vendor_id bigint,
  trip_id bigint,
  trip_distance float,
  fare_amount double,
  store_and_fwd_flag string
)  USING iceberg
PARTITIONED BY (vendor_id); 
 
"""
)
 
spark.sql(
    """
INSERT INTO new_data
(
VALUES (1, 1000371, 2, 20, 'N'), (2, 1000372, 4, 39, 'Y'));
 
"""
)
spark.sql(
    """
    SELECT * FROM new_data
""").show()

In [None]:
#try merge new and old data
spark.sql("""
    MERGE INTO nyc_taxis target
    USING (SELECT * FROM new_data) source
    ON target.trip_id = source.trip_id
    WHEN MATCHED
        THEN UPDATE SET target.trip_distance = source.trip_distance, target.fare_amount = source.fare_amount, target.store_and_fwd_flag = source.store_and_fwd_flag
    WHEN NOT MATCHED
        THEN INSERT *;
""").show()

In [None]:
#try on schema Evolution
spark.sql(
    """
CREATE OR REPLACE TABLE taxis
(
  vendor_id bigint,
  trip_id bigint,
  trip_distance float,
  fare_amount double,
  store_and_fwd_flag string
)  USING iceberg
PARTITIONED BY (vendor_id);
 
"""
)
 
spark.sql(
    """
INSERT INTO taxis
VALUES (1, 1000371, 1.8, 15.32, 'N'), (2, 1000372, 2.5, 22.15, 'N'), (2, 1000373, 0.9, 9.01, 'N'), (1, 1000374, 8.4, 42.13, 'Y');
 
"""
).show()

In [None]:
spark.sql("""
  ALTER TABLE taxis RENAME COLUMN fare_amount TO fare
""")
df = spark.table("demo.nyc.taxis").show()

In [None]:
#change trip distance to distance
spark.sql(""" 
    ALTER TABLE taxis RENAME COLUMN trip_distance TO distance
""")

df = spark.table("taxis").show()

In [None]:
#add comment COLUMN
spark.sql(""" 
   ALTER TABLE taxis ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'
""")
spark.sql( """ 
  DESCRIBE EXTENDED taxis
""")

In [None]:
#change data type
spark.sql(
  """ 
 ALTER TABLE taxis ALTER COLUMN distance TYPE double;
""")
spark.sql(
  """ 
 DESCRIBE EXTENDED taxis
""")


In [None]:
#change order
spark.sql(
  """ 
 ALTER TABLE taxis ALTER COLUMN distance AFTER fare;
""")

In [None]:
#add new COLUMN
spark.sql(
  """ 
 ALTER TABLE taxis
ADD COLUMN fare_per_distance_unit float AFTER distance
""")
df = spark.table("taxis").show()

In [None]:
#add data to new COLUMN
spark.sql(
  """ 
UPDATE taxis
SET fare_per_distance_unit = fare/distance
""")

In [None]:
#delete COLUMN
spark.sql("""
    select * from taxis
"""
).show()

In [None]:
spark.sql("""
    ALTER TABLE taxis DROP COLUMN store_and_fwd_flag
"""
).show()

In [None]:
#Partition evolution change partition without need to create new TABLE
spark.sql(
  """ 
ALTER TABLE nyc_taxis
ADD PARTITION FIELD trip_id
""")
spark.sql(
  """ 
DESCRIBE EXTENDED nyc_taxis
""")

In [None]:
#query snapshot id to rollback table
spark.sql(
  """ 
	SELECT snapshot_id, manifest_list
	FROM nyc_taxis.snapshots
""").show()

In [None]:
#check table history
spark.sql(
  """ 
	SELECT *
	FROM nyc_taxis
""").show()

In [None]:
#rollback table must fill in snapshot id
spark.sql(f"""
  CALL system.rollback_to_snapshot('nyc_taxis', {snapshotid})
""").show()

In [None]:
spark.sql(
  """ 
SELECT *
FROM nyc_taxis
""").show()