In [37]:
from pyspark.sql import SparkSession
import os

# Create Spark session with Iceberg
spark = SparkSession.builder \
    .appName("Iceberg") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hive") \
    .config("spark.sql.catalog.local.warehouse", "/warehouse") \
    .config("spark.sql.catalog.local.uri", "postgresql://iceberg:iceberg@iceberg-postgres:5432/iceberg_db") \
    .getOrCreate()

# Test connection
print("Spark session created successfully!")
spark.sql("SHOW TABLES").show()

Spark session created successfully!
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [38]:
df = spark.read.parquet("/home/jovyan/work/notebooks/lidar_processed_v1.0.parquet")
df.show(5)


+--------------+------------------+-------+-------------------+--------------+-------------------+---------------+-------------+--------+
|      voxel_id|            mean_x| mean_y|     mean_blue_norm|mean_intensity|    mean_brightness|mean_green_norm|mean_red_norm|  mean_z|
+--------------+------------------+-------+-------------------+--------------+-------------------+---------------+-------------+--------+
|1345_9127_1399|           134.468|912.659|             0.4118|          15.0|0.40786666666666666|         0.3961|       0.4157| 139.857|
|1156_9633_1409|115.62299999999999|963.329|0.29610000000000003|          15.5|0.30393333333333333|            0.3|       0.3157|140.9305|
|1197_9516_1407|           119.681|951.642|             0.2745|          22.0|0.27449999999999997|         0.2706|       0.2784| 140.654|
|1195_9514_1407|           119.501|951.361|             0.1569|          17.0|0.15946666666666667|         0.1529|       0.1686| 140.663|
|1171_9625_1409|           117.076

In [39]:
df.count()


17283

In [40]:
import os

# The directory that contains the Parquet parts
parquet_dir = "/home/jovyan/work/notebooks/lidar_processed_v1.0.parquet"

# Get a list of all files inside the directory
files_to_upload = [
    f for f in os.listdir(parquet_dir) 
    if os.path.isfile(os.path.join(parquet_dir, f)) and not f.startswith('.') # Filter out directories and hidden files
]

print(f"Files to upload: {files_to_upload}")

Files to upload: ['part-00000-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet', 'part-00001-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet', 'part-00002-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet', 'part-00003-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet', 'part-00004-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet', 'part-00005-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet', 'part-00006-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet', 'part-00007-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet', 'part-00008-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet', 'part-00009-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet', 'part-00010-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet', 'part-00011-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet', 'part-00012-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet', 'part-00013-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snapp

In [22]:
from minio import Minio
import os

# --- MinIO Client Setup (Confirmed Working) ---
client = Minio(
    endpoint="iceberg-minio:9000",
    access_key="minioadmin",
    secret_key="minioadmin",
    secure=False
)
bucket_name = "iceberg"
parquet_dir = "/home/jovyan/work/notebooks/lidar_processed_v1.0.parquet"
minio_prefix = "lidar_processed_v1.0/" # The folder name you want inside the MinIO bucket

# Create bucket if not exists
try:
    if not client.bucket_exists(bucket_name=bucket_name):
        client.make_bucket(bucket_name=bucket_name)
except Exception as e:
    print(f"Error checking/creating bucket: {e}") 
    pass 

# --- Upload Logic ---
files_to_upload = [
    f for f in os.listdir(parquet_dir) 
    if os.path.isfile(os.path.join(parquet_dir, f)) and not f.startswith('.')
]

if not files_to_upload:
    print(f"No files found in directory: {parquet_dir}")
else:
    for filename in files_to_upload:
        local_file_path = os.path.join(parquet_dir, filename)
        minio_object_name = minio_prefix + filename
        
        # Upload the single file
        client.fput_object(
            bucket_name=bucket_name,
            object_name=minio_object_name, 
            file_path=local_file_path
        )
        print(f"Uploaded {filename} to {minio_object_name}")

print("Directory contents uploaded to MinIO!")

Uploaded part-00000-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet to lidar_processed_v1.0/part-00000-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet
Uploaded part-00001-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet to lidar_processed_v1.0/part-00001-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet
Uploaded part-00002-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet to lidar_processed_v1.0/part-00002-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet
Uploaded part-00003-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet to lidar_processed_v1.0/part-00003-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet
Uploaded part-00004-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet to lidar_processed_v1.0/part-00004-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet
Uploaded part-00005-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet to lidar_processed_v1.0/part-00005-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.s

In [41]:
import subprocess
import sys

subprocess.check_call([sys.executable, "-m", "pip", "install", "pyiceberg[s3,postgres]", "-q"])
print("‚úì Iceberg installed")

‚úì Iceberg installed


In [42]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Iceberg-Minio") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.0,org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hive") \
    .config("spark.sql.catalog.local.warehouse", "s3a://iceberg/warehouse") \
    .config("spark.sql.catalog.local.uri", "postgresql://iceberg:iceberg@iceberg-postgres:5432/iceberg_db") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://iceberg-minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print("‚úì Spark session with Iceberg created!")

‚úì Spark session with Iceberg created!


In [44]:
import subprocess
import sys

subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", 
    "pyarrow", "boto3", "s3fs", "pyiceberg[postgres]"])
print("‚úì Dependencies installed")

‚úì Dependencies installed


In [45]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Iceberg-Simple") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hive") \
    .config("spark.sql.catalog.local.warehouse", "/tmp/iceberg-warehouse") \
    .config("spark.sql.catalog.local.uri", "postgresql://iceberg:iceberg@iceberg-postgres:5432/iceberg_db") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print("‚úì Spark session created!")

‚úì Spark session created!


In [47]:
import s3fs
import pyarrow.parquet as pq
import pandas as pd

# Connect to Minio using S3
s3 = s3fs.S3FileSystem(
    anon=False,
    key='minioadmin',
    secret='minioadmin',
    use_ssl=False,
    client_kwargs={
        'endpoint_url': 'http://iceberg-minio:9000'
    }
)

print("‚úì Connected to Minio")

# List files in bucket
files = s3.ls('iceberg/')
print(f"Files in iceberg bucket: {len(files)}")
for f in files[:5]:
    print(f"  - {f}")

‚úì Connected to Minio
Files in iceberg bucket: 1
  - iceberg/lidar_processed_v1.0


In [49]:
# Read parquet using s3fs
with s3.open('iceberg/lidar_processed_v1.0/part-00000-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet', 'rb') as f:
    parquet_file = pq.read_table(f)

print(f"‚úì Parquet loaded via PyArrow")
print(f"Rows: {parquet_file.num_rows}")
print(f"Columns: {parquet_file.num_columns}")
print(f"\nSchema:")
print(parquet_file.schema)

‚úì Parquet loaded via PyArrow
Rows: 86
Columns: 9

Schema:
voxel_id: string not null
mean_x: double
mean_y: double
mean_blue_norm: double
mean_intensity: double
mean_brightness: double
mean_green_norm: double
mean_red_norm: double
mean_z: double
-- schema metadata --
org.apache.spark.version: '3.1.2'
org.apache.spark.sql.parquet.row.metadata: '{"type":"struct","fields":[{"' + 619


In [50]:
# Convert PyArrow table to Pandas then to Spark DataFrame
pandas_df = parquet_file.to_pandas()
print(f"Converted to Pandas: {pandas_df.shape}")

spark_df = spark.createDataFrame(pandas_df)
print(f"‚úì Converted to Spark DataFrame")
print(f"\nFirst 5 rows:")
spark_df.show(5)

Converted to Pandas: (86, 9)
‚úì Converted to Spark DataFrame

First 5 rows:
+--------------+-------+-------+--------------+--------------+-------------------+---------------+-------------+-------+
|      voxel_id| mean_x| mean_y|mean_blue_norm|mean_intensity|    mean_brightness|mean_green_norm|mean_red_norm| mean_z|
+--------------+-------+-------+--------------+--------------+-------------------+---------------+-------------+-------+
|1216_9477_1406|121.551|947.738|        0.3569|          10.0|0.35686666666666667|         0.3529|       0.3608|140.563|
|1240_9403_1404|123.956| 940.25|        0.2196|          15.0|0.23396666666666666|         0.2431|       0.2392|140.423|
|1226_9434_1405|122.585|943.403|           0.0|          15.0|0.02223333333333333|         0.0549|       0.0118|140.494|
|1185_9620_1409|118.488|961.979|        0.3373|          10.0|0.34379999999999994|         0.3451|        0.349|140.904|
|1312_9195_1400| 131.17|919.494|        0.4471|          14.0| 0.43663333333

In [52]:
# Convert PyArrow table to Pandas then to Spark DataFrame
pandas_df = parquet_file.to_pandas()
print(f"Converted to Pandas: {pandas_df.shape}")

spark_df = spark.createDataFrame(pandas_df)
print(f"‚úì Converted to Spark DataFrame")
print(f"\nFirst 5 rows:")
spark_df.show(5)

Converted to Pandas: (86, 9)
‚úì Converted to Spark DataFrame

First 5 rows:
+--------------+-------+-------+--------------+--------------+-------------------+---------------+-------------+-------+
|      voxel_id| mean_x| mean_y|mean_blue_norm|mean_intensity|    mean_brightness|mean_green_norm|mean_red_norm| mean_z|
+--------------+-------+-------+--------------+--------------+-------------------+---------------+-------------+-------+
|1216_9477_1406|121.551|947.738|        0.3569|          10.0|0.35686666666666667|         0.3529|       0.3608|140.563|
|1240_9403_1404|123.956| 940.25|        0.2196|          15.0|0.23396666666666666|         0.2431|       0.2392|140.423|
|1226_9434_1405|122.585|943.403|           0.0|          15.0|0.02223333333333333|         0.0549|       0.0118|140.494|
|1185_9620_1409|118.488|961.979|        0.3373|          10.0|0.34379999999999994|         0.3451|        0.349|140.904|
|1312_9195_1400| 131.17|919.494|        0.4471|          14.0| 0.43663333333

In [53]:
# Save as parquet locally
output_path = '/home/jovyan/work/lidar_iceberg'
spark_df.coalesce(1).write \
    .mode("overwrite") \
    .parquet(output_path)

print(f"‚úì Data saved to {output_path}")
print(f"Total rows: {spark_df.count()}")

‚úì Data saved to /home/jovyan/work/lidar_iceberg
Total rows: 86


In [54]:
# Read it back
df_read = spark.read.parquet(output_path)

# Query
result = spark.sql("SELECT COUNT(*) as total_rows FROM (SELECT * FROM parquet.`" + output_path + "`)")
result.show()

# Schema
print("\nSchema:")
df_read.printSchema()

# Sample data
print("\nSample Data:")
df_read.show(5)

+----------+
|total_rows|
+----------+
|        86|
+----------+


Schema:
root
 |-- voxel_id: string (nullable = true)
 |-- mean_x: double (nullable = true)
 |-- mean_y: double (nullable = true)
 |-- mean_blue_norm: double (nullable = true)
 |-- mean_intensity: double (nullable = true)
 |-- mean_brightness: double (nullable = true)
 |-- mean_green_norm: double (nullable = true)
 |-- mean_red_norm: double (nullable = true)
 |-- mean_z: double (nullable = true)


Sample Data:
+--------------+-------+-------+--------------+--------------+-------------------+---------------+-------------+-------+
|      voxel_id| mean_x| mean_y|mean_blue_norm|mean_intensity|    mean_brightness|mean_green_norm|mean_red_norm| mean_z|
+--------------+-------+-------+--------------+--------------+-------------------+---------------+-------------+-------+
|1216_9477_1406|121.551|947.738|        0.3569|          10.0|0.35686666666666667|         0.3529|       0.3608|140.563|
|1240_9403_1404|123.956| 940.25|    

In [55]:
# Load the parquet file for analysis
df = spark.read.parquet('/home/jovyan/work/lidar_iceberg')

# Count unique voxels
print("Unique Voxels:")
df.select("voxel_id").distinct().count()

# Intensity statistics
print("\nIntensity Statistics:")
df.agg({
    "mean_intensity": "min",
    "mean_intensity": "max",
    "mean_intensity": "avg"
}).show()

# Height statistics
print("\nHeight Statistics:")
df.agg({
    "mean_z": "min",
    "mean_z": "max",
    "mean_z": "avg"
}).show()

Unique Voxels:

Intensity Statistics:
+-------------------+
|avg(mean_intensity)|
+-------------------+
|  14.68269405013591|
+-------------------+


Height Statistics:
+-----------------+
|      avg(mean_z)|
+-----------------+
|140.5416108023759|
+-----------------+



In [56]:
# High intensity voxels
df = spark.read.parquet('/home/jovyan/work/lidar_iceberg')

print("Voxels with intensity > 20:")
high_intensity = df.filter(df.mean_intensity > 20)
high_intensity.select("voxel_id", "mean_x", "mean_y", "mean_z", "mean_intensity").show(10)

# By height range
print("\nVoxels between 135-145 height:")
height_range = df.filter((df.mean_z >= 135) & (df.mean_z <= 145))
print(f"Count: {height_range.count()}")
height_range.select("mean_z", "mean_intensity").show(10)

Voxels with intensity > 20:
+---------------+-------+--------+-------+--------------+
|       voxel_id| mean_x|  mean_y| mean_z|mean_intensity|
+---------------+-------+--------+-------+--------------+
| 1294_9264_1402| 129.43| 926.416|140.166|          61.0|
| 1207_9439_1406|120.695| 943.884|140.614|          22.0|
| 1386_9561_1409|138.649| 956.066|140.875|          25.0|
| 1040_9037_1393|103.968| 903.655|139.286|          22.0|
| 1637_8466_1387|163.727| 846.562|138.731|          21.0|
| 1623_8087_1380|162.289| 808.717|138.014|          21.0|
|1028_10083_1420|102.791|1008.313|142.035|          22.0|
| 1331_9016_1397|133.056| 901.612|139.734|          34.0|
+---------------+-------+--------+-------+--------------+


Voxels between 135-145 height:
Count: 86
+------------------+--------------+
|            mean_z|mean_intensity|
+------------------+--------------+
|           140.563|          10.0|
|           140.423|          15.0|
|           140.494|          15.0|
|           140.9

In [67]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Iceberg-Simple") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hive") \
    .config("spark.sql.catalog.local.warehouse", "/tmp/iceberg-warehouse") \
    .config("spark.sql.catalog.local.uri", "postgresql://iceberg:iceberg@iceberg-postgres:5432/iceberg_db") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print("‚úì Spark session created!")

‚úì Spark session created!


In [74]:
try:
    spark.stop()
    print("‚úì Spark session stopped")
except:
    print("No active session to stop")

‚úì Spark session stopped


In [5]:
from pyspark.sql import SparkSession

# Stop any existing session
try:
    SparkSession.getActiveSession().stop()
except:
    pass

spark = SparkSession.builder \
    .appName("Iceberg-PostgreSQL") \
    .config("spark.jars.packages", 
            "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,"
            "org.postgresql:postgresql:42.7.3") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "jdbc") \
    .config("spark.sql.catalog.local.warehouse", "/tmp/iceberg-warehouse") \
    .config("spark.sql.catalog.local.uri", "jdbc:postgresql://iceberg-postgres:5432/iceberg_db") \
    .config("spark.sql.catalog.local.jdbc.user", "iceberg") \
    .config("spark.sql.catalog.local.jdbc.password", "iceberg") \
    .config("spark.sql.catalog.local.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print("‚úì Spark session created with PostgreSQL catalog!")
print(f"Spark version: {spark.version}")

# Verify catalogs
spark.sql("SHOW CATALOGS").show()

‚úì Spark session created with PostgreSQL catalog!
Spark version: 3.5.0
+-------------+
|      catalog|
+-------------+
|spark_catalog|
+-------------+



In [11]:
import s3fs
import pyarrow.parquet as pq
# iceberg/lidar_processed_v1.0/part-00000-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet
s3 = s3fs.S3FileSystem(
    anon=False,
    key='minioadmin',
    secret='minioadmin',
    use_ssl=False,
    client_kwargs={'endpoint_url': 'http://iceberg-minio:9000'}
)

print("Reading parquet from MinIO...")
with s3.open('iceberg/lidar_processed_v1.0/part-00000-c4e2f12c-6b88-442d-a6b0-5976313ba1fe-c000.snappy.parquet', 'rb') as f:
    parquet_table = pq.read_table(f)

print(f"‚úì Loaded {parquet_table.num_rows} rows")

pandas_df = parquet_table.to_pandas()
spark_df = spark.createDataFrame(pandas_df)

print(f"‚úì Created Spark DataFrame")

# Create Iceberg table
spark_df.writeTo("local.default.lidar") \
    .using("iceberg") \
    .createOrReplace()

print("‚úì Iceberg table created!")

# Verify
spark.sql("SELECT COUNT(*) FROM local.default.lidar").show()

Reading parquet from MinIO...
‚úì Loaded 86 rows
‚úì Created Spark DataFrame
‚úì Iceberg table created!
+--------+
|count(1)|
+--------+
|      86|
+--------+



In [7]:
# Show first 10 rows
spark.sql("SELECT * FROM local.default.lidar LIMIT 10").show()

# Or using DataFrame API
spark.table("local.default.lidar").show(10)

# Show specific columns
spark.sql("""
    SELECT voxel_id, mean_x, mean_y, mean_z, mean_intensity 
    FROM local.default.lidar 
    LIMIT 10
""").show()

+--------------+--------+--------+-------------------+--------------+-------------------+---------------+-------------------+------------------+
|      voxel_id|  mean_x|  mean_y|     mean_blue_norm|mean_intensity|    mean_brightness|mean_green_norm|      mean_red_norm|            mean_z|
+--------------+--------+--------+-------------------+--------------+-------------------+---------------+-------------------+------------------+
|1216_9477_1406| 121.551| 947.738|             0.3569|          10.0|0.35686666666666667|         0.3529|             0.3608|           140.563|
|1240_9403_1404| 123.956|  940.25|             0.2196|          15.0|0.23396666666666666|         0.2431|             0.2392|           140.423|
|1226_9434_1405| 122.585| 943.403|                0.0|          15.0|0.02223333333333333|         0.0549|             0.0118|           140.494|
|1185_9620_1409| 118.488| 961.979|             0.3373|          10.0|0.34379999999999994|         0.3451|              0.349|     

In [15]:
import s3fs
import pyarrow.parquet as pq

s3 = s3fs.S3FileSystem(
    anon=False,
    key='minioadmin',
    secret='minioadmin',
    use_ssl=False,
    client_kwargs={'endpoint_url': 'http://iceberg-minio:9000'}
)

# Correct path without URL encoding
path = 'iceberg/lidar_processed_v1.0/'

print(f"Listing files in {path}...")
files = s3.ls(path)
print(f"Found {len(files)} files")

# Filter only parquet files
parquet_files = [f for f in files if f.endswith('.parquet')]
print(f"Parquet files: {len(parquet_files)}")

# Read all as dataset
dataset = pq.ParquetDataset(parquet_files, filesystem=s3)
table = dataset.read()

print(f"‚úì Loaded {table.num_rows} total rows")

# Create Spark DataFrame and Iceberg table
spark_df = spark.createDataFrame(table.to_pandas())

spark_df.writeTo("local.default.lidar") \
    .using("iceberg") \
    .createOrReplace()

print("‚úì Iceberg table created with all data!")
spark.sql("SELECT COUNT(*) FROM local.default.lidar").show()

Listing files in iceberg/lidar_processed_v1.0/...
Found 201 files
Parquet files: 200
‚úì Loaded 17283 total rows
‚úì Iceberg table created with all data!
+--------+
|count(1)|
+--------+
|   17283|
+--------+



In [16]:
# View current table history
print("üì∏ Current Table History:")
spark.sql("SELECT * FROM local.default.lidar.history").show(truncate=False)

# View snapshots
print("\nüì∏ Current Snapshots:")
spark.sql("SELECT snapshot_id, parent_id, operation, summary FROM local.default.lidar.snapshots").show(truncate=False)

# Current row count
print("\nüìä Current Row Count:")
spark.sql("SELECT COUNT(*) as count FROM local.default.lidar").show()

üì∏ Current Table History:
+-----------------------+-------------------+---------+-------------------+
|made_current_at        |snapshot_id        |parent_id|is_current_ancestor|
+-----------------------+-------------------+---------+-------------------+
|2025-11-26 15:51:51.918|2828298841096146947|NULL     |false              |
|2025-11-26 15:55:45.36 |5019788378544448199|NULL     |false              |
|2025-11-26 15:57:51.486|7617711455816498064|NULL     |true               |
+-----------------------+-------------------+---------+-------------------+


üì∏ Current Snapshots:
+-------------------+---------+---------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|snapshot_id        |parent_id|operation|summary                       

In [17]:
# Create modified data (example: boost intensity by 10%)
print("Creating modified data...")
modified_df = spark.sql("""
    SELECT 
        voxel_id,
        mean_x,
        mean_y,
        mean_z,
        mean_intensity * 1.10 as mean_intensity,
        mean_brightness,
        mean_blue_norm,
        mean_green_norm,
        mean_red_norm
    FROM local.default.lidar
    WHERE mean_intensity > 15
    LIMIT 50
""")

print(f"Modified records: {modified_df.count()}")

# Append to table (creates new snapshot)
modified_df.writeTo("local.default.lidar") \
    .using("iceberg") \
    .append()

print("‚úì New snapshot created by appending data!")

# Check new row count
spark.sql("SELECT COUNT(*) as count FROM local.default.lidar").show()

Creating modified data...
Modified records: 50
‚úì New snapshot created by appending data!
+-----+
|count|
+-----+
|17333|
+-----+



In [18]:
print("=" * 80)
print("üì∏ SNAPSHOT HISTORY")
print("=" * 80)

# Detailed snapshot information
snapshots = spark.sql("""
    SELECT 
        snapshot_id,
        parent_id,
        operation,
        committed_at,
        summary['added-data-files'] as added_files,
        summary['added-records'] as added_records,
        summary['total-data-files'] as total_files,
        summary['total-records'] as total_records
    FROM local.default.lidar.snapshots
    ORDER BY committed_at
""")

snapshots.show(truncate=False)

# Also view history table
print("\nüìã TABLE HISTORY:")
spark.sql("SELECT * FROM local.default.lidar.history ORDER BY made_current_at").show(truncate=False)

üì∏ SNAPSHOT HISTORY
+-------------------+-------------------+---------+-----------------------+-----------+-------------+-----------+-------------+
|snapshot_id        |parent_id          |operation|committed_at           |added_files|added_records|total_files|total_records|
+-------------------+-------------------+---------+-----------------------+-----------+-------------+-----------+-------------+
|2828298841096146947|NULL               |append   |2025-11-26 15:51:51.918|4          |86           |4          |86           |
|5019788378544448199|NULL               |append   |2025-11-26 15:55:45.36 |4          |86           |4          |86           |
|7617711455816498064|NULL               |append   |2025-11-26 15:57:51.486|4          |17283        |4          |17283        |
|6160886182689171394|7617711455816498064|append   |2025-11-26 16:01:43.582|1          |50           |5          |17333        |
+-------------------+-------------------+---------+-----------------------+-------

In [19]:
# Get the first (original) snapshot ID
first_snapshot = spark.sql("""
    SELECT snapshot_id 
    FROM local.default.lidar.snapshots 
    ORDER BY committed_at ASC 
    LIMIT 1
""").collect()[0][0]

print(f"üïê First snapshot ID: {first_snapshot}")

# Query data from first snapshot
print("\nüìä Data from FIRST snapshot:")
spark.sql(f"""
    SELECT COUNT(*) as count 
    FROM local.default.lidar 
    VERSION AS OF {first_snapshot}
""").show()

# Get latest snapshot ID
latest_snapshot = spark.sql("""
    SELECT snapshot_id 
    FROM local.default.lidar.snapshots 
    ORDER BY committed_at DESC 
    LIMIT 1
""").collect()[0][0]

print(f"\nüïê Latest snapshot ID: {latest_snapshot}")

# Query data from latest snapshot
print("\nüìä Data from LATEST snapshot:")
spark.sql(f"""
    SELECT COUNT(*) as count 
    FROM local.default.lidar 
    VERSION AS OF {latest_snapshot}
""").show()

# Compare
print("\nüìä Current table (should match latest):")
spark.sql("SELECT COUNT(*) as count FROM local.default.lidar").show()

üïê First snapshot ID: 2828298841096146947

üìä Data from FIRST snapshot:
+-----+
|count|
+-----+
|   86|
+-----+


üïê Latest snapshot ID: 6160886182689171394

üìä Data from LATEST snapshot:
+-----+
|count|
+-----+
|17333|
+-----+


üìä Current table (should match latest):
+-----+
|count|
+-----+
|17333|
+-----+



In [20]:
# Snapshot 3: Delete some records
print("\nüóëÔ∏è Creating snapshot by deleting records...")
spark.sql("""
    DELETE FROM local.default.lidar 
    WHERE mean_intensity < 5
""")

# Snapshot 4: Update records
print("\n‚úèÔ∏è Creating snapshot by updating records...")
spark.sql("""
    UPDATE local.default.lidar 
    SET mean_brightness = mean_brightness * 1.05
    WHERE mean_z > 140
""")

# Snapshot 5: Insert new records
print("\n‚ûï Creating snapshot by inserting records...")
new_data = spark.sql("""
    SELECT 
        'new_voxel_1' as voxel_id,
        100.0 as mean_x,
        200.0 as mean_y,
        150.0 as mean_z,
        25.0 as mean_intensity,
        50.0 as mean_brightness,
        0.3 as mean_blue_norm,
        0.4 as mean_green_norm,
        0.3 as mean_red_norm
""")

new_data.writeTo("local.default.lidar").using("iceberg").append()

print("\n‚úì Created 3 more snapshots!")


üóëÔ∏è Creating snapshot by deleting records...

‚úèÔ∏è Creating snapshot by updating records...

‚ûï Creating snapshot by inserting records...

‚úì Created 3 more snapshots!


In [21]:
print("=" * 100)
print("üì∏ COMPLETE SNAPSHOT TIMELINE")
print("=" * 100)

# View all snapshots with details
spark.sql("""
    SELECT 
        snapshot_id,
        operation,
        committed_at,
        summary['total-records'] as total_records,
        summary['added-records'] as added_records,
        summary['deleted-records'] as deleted_records
    FROM local.default.lidar.snapshots
    ORDER BY committed_at
""").show(truncate=False)

# Count snapshots
snapshot_count = spark.sql("SELECT COUNT(*) as count FROM local.default.lidar.snapshots").collect()[0][0]
print(f"\nüìä Total Snapshots: {snapshot_count}")

üì∏ COMPLETE SNAPSHOT TIMELINE
+-------------------+---------+-----------------------+-------------+-------------+---------------+
|snapshot_id        |operation|committed_at           |total_records|added_records|deleted_records|
+-------------------+---------+-----------------------+-------------+-------------+---------------+
|2828298841096146947|append   |2025-11-26 15:51:51.918|86           |86           |NULL           |
|5019788378544448199|append   |2025-11-26 15:55:45.36 |86           |86           |NULL           |
|7617711455816498064|append   |2025-11-26 15:57:51.486|17283        |17283        |NULL           |
|6160886182689171394|append   |2025-11-26 16:01:43.582|17333        |50           |NULL           |
|2787782567001300120|delete   |2025-11-26 16:02:22.805|17333        |NULL         |NULL           |
|3293567882475020787|overwrite|2025-11-26 16:02:24.723|17333        |17333        |17333          |
|8649775498874253189|append   |2025-11-26 16:02:25.087|17334        

In [22]:
print("=" * 100)
print("üì∏ COMPLETE SNAPSHOT TIMELINE")
print("=" * 100)

# View all snapshots with details
spark.sql("""
    SELECT 
        snapshot_id,
        operation,
        committed_at,
        summary['total-records'] as total_records,
        summary['added-records'] as added_records,
        summary['deleted-records'] as deleted_records
    FROM local.default.lidar.snapshots
    ORDER BY committed_at
""").show(truncate=False)

# Count snapshots
snapshot_count = spark.sql("SELECT COUNT(*) as count FROM local.default.lidar.snapshots").collect()[0][0]
print(f"\nüìä Total Snapshots: {snapshot_count}")

üì∏ COMPLETE SNAPSHOT TIMELINE
+-------------------+---------+-----------------------+-------------+-------------+---------------+
|snapshot_id        |operation|committed_at           |total_records|added_records|deleted_records|
+-------------------+---------+-----------------------+-------------+-------------+---------------+
|2828298841096146947|append   |2025-11-26 15:51:51.918|86           |86           |NULL           |
|5019788378544448199|append   |2025-11-26 15:55:45.36 |86           |86           |NULL           |
|7617711455816498064|append   |2025-11-26 15:57:51.486|17283        |17283        |NULL           |
|6160886182689171394|append   |2025-11-26 16:01:43.582|17333        |50           |NULL           |
|2787782567001300120|delete   |2025-11-26 16:02:22.805|17333        |NULL         |NULL           |
|3293567882475020787|overwrite|2025-11-26 16:02:24.723|17333        |17333        |17333          |
|8649775498874253189|append   |2025-11-26 16:02:25.087|17334        

In [23]:
# Query data as it was 5 minutes ago (example)
from datetime import datetime, timedelta

# Get timestamp from 5 minutes ago
five_min_ago = datetime.now() - timedelta(minutes=5)
timestamp_str = five_min_ago.strftime('%Y-%m-%d %H:%M:%S')

print(f"üïê Querying data as of: {timestamp_str}")

# Time-travel by timestamp
spark.sql(f"""
    SELECT COUNT(*) as count 
    FROM local.default.lidar 
    TIMESTAMP AS OF '{timestamp_str}'
""").show()

üïê Querying data as of: 2025-11-26 15:58:12
+-----+
|count|
+-----+
|17283|
+-----+

