<img src="https://camo.githubusercontent.com/d120d4367f4fcaea0086ec2533ecad35c4ce2fadc313071ee2c26ff319833168/68747470733a2f2f696365626572672e6170616368652e6f72672f646f63732f6c61746573742f696d672f496365626572672d6c6f676f2e706e67" width="250">

<hr>

> ### Iceberg architecture

<div align="center">
	<img src="https://www.dremio.com/wp-content/uploads/2022/08/Fig1-1-768x432.png">
</div>
<h4 align="right" style="color: blue;">Author: By Pasit Y.</h4>

---

## Import Library

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pyspark
import pandas as pd
import os
import requests
from datetime import datetime

## กำหนด Environment สำหรับ Incerg จะเหมือนกับ PySpark แค่เพิ่มมาบาง Config เท่านั้น

In [None]:
os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['JAVA_HOME'] = '/usr/local/jdk8u222-b10'
os.environ['HADOOP_USER_NAME']='hive'
os.environ['PYSPARK_PYTHON'] ='/HDFS01/anaconda3/envs/main/bin/python'

conf = pyspark.SparkConf().setAll([
     ('spark.driver.maxResultSize', '0'),
     ('spark.sql.catalog.iceberg', 'org.apache.iceberg.spark.SparkCatalog'),
     ('spark.sql.catalog.iceberg.type', 'hadoop'),
     ('spark.sql.catalog.iceberg.warehouse', 'hdfs:/user/hive/warehouse/iceberg_test'),
     ('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178'),
     ('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions'),
     ('spark.driver.memory', '2g'),
     ('spark.sql.repl.eagerEval.enabled','true'),
     #('hive.strict.managed.tables','false'),
     ('hive.metastore.uris', 'thrift://nn01.bigdata:9083'),
     ('metastore.client.capability.check','false')
    ])
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("Iceberg") \
        .config(conf=conf) \
        .enableHiveSupport() \
        .getOrCreate()

## อ่าน CSV Files

In [None]:
myDF = spark.read.csv("file:///HDFS01/airflow/notebooks/Pasit/PySpark Tutorial/example.csv",
                      header=True, inferSchema=True)
myDF = myDF.withColumn('age_group', translate('age_group', ' ', ''))

---

<h1 align="center">Iceberg Read / Write (PySpark)</h1>

---

## สร้าง directory ใน HDFS
* ```hdfs -dfs -mkdir /iceberg```
* ```hdfs -dfs -chmod 777 /iceberg```

## เขียนข้อมูลลงไปยัง HDFS
* แบบที่ 1 คือบันทึกไปยัง HDFS แบบค่า Default
* แบบที่ 2 คือกำหนด Partition File

In [None]:
#1
myDF.writeTo("iceberg_test.test_iceberg3") \
    .tableProperty("table_type", "ICEBERG") \
    .create()

In [None]:
#2
myDF.writeTo("iceberg_test.test_iceberg2") \
    .tableProperty("table_type", "ICEBERG") \
    .create()

## อ่านข้อมูล Iceberg จาก HDFS ในรูปแบบของ Schema.tableName

In [None]:
spark.read.format("iceberg").load("iceberg_test.tbl_example")

## อ่านข้อมูล Iceberg จาก HDFS ในรูปแบบของ DFS Path

In [None]:
spark.read \
    .format("iceberg") \
    .load("/user/hive/warehouse/iceberg_test/example_iceberg_code")

## Insert Data

In [None]:
spark.sql("INSERT INTO iceberg.example (_c0,age_group,vaccine_status,outcome) VALUES (268168, 'xxx','xxx','xxx')")

## Update Data

In [None]:
spark.sql("UPDATE iceberg.example SET vaccine_status = 'vaccinateds' WHERE age_group = 'under50'")

## Delete Data

In [None]:
spark.sql("DELETE FROM iceberg.example WHERE _c0 = '268168'")

## Snapshot ID ในการกระทำกับข้อมูล CRUD จะแสดงข้อมูลประวัติทั้งหมด โดยสามารถแยก Operation ได้ว่ามีการกระทำ Statement ใด

In [None]:
spark.sql("SELECT * FROM iceberg.example.snapshots")

## แสดงข้อมูลไฟล์ที่ได้เพิ่มข้อมูลเข้าไป จะเป็นชนิด Parquet โดยจะบอกถึง Partiton ทั้งหมด แต่ละไฟล์มี Record, Size เท่าไหร่ และได้ทำการ Split ไว้กี่ไฟล์

In [None]:
spark.sql("SELECT * FROM iceberg.example.files")

## แสดง Snapshot ของไฟล์ Parquet ทั้งหมด พร้อมบอกวันที่และเวลาที่ได้กระทำกับไฟล์นั้นๆ พร้อม Operation

In [None]:
spark.sql("""
        SELECT
            h.made_current_at,
            s.operation,
            h.snapshot_id,
            h.is_current_ancestor,
            s.summary['spark.app.id']
        FROM
        	iceberg.example.history h
        JOIN
        	iceberg.example.snapshots s
          	ON h.snapshot_id = s.snapshot_id
        ORDER BY
            made_current_at
""")

In [None]:
spark.sql("SELECT * FROM iceberg.example.table")

## นอกจากนั้นยังสามารถ Query ระบุ Snapshot ID ได้ดังนี้
* ### ระบุแค่ Snapshot ID ที่ต้องการหาข้อมูลที่เจาะจงเท่านั้น

In [None]:
spark.read \
    .option("snapshot-id", 10963874102873L) \
    .load("/iceberg/default/example")

---

<h1 align="center">ปรับใช้ Iceberg กับ TrinoDB</h1>

---

<img src="https://github.com/trinodb/trino/raw/master/.github/homepage.png" width="200">

<h3 style="color:red;">** Hive ไม่สามารถอ่าน Iceberg ได้</h3>

## เพิ่ม Iceberg Catalog
``` vim /home/trino/etc/catalog/iceberg.properties```
### จากนั้นเพิ่ม Parameter ดังนี้
```bash
connector.name=iceberg
hive.metastore.uri=thrift://hive-metastore:9083
hive.metastore.username=hive
iceberg.file-format=PARQUET
iceberg.hive-catalog-name=iceberg
iceberg.register-table-procedure.enabled=true
#SNAPPY, LZ4, ZSTD, GZIP
iceberg.compression-codec=ZSTD
iceberg.unique-table-location=false
```


## สร้าง Schema และ Table สำหรับการเขียน Iceberg
> ### การใช้งาน Iceberg โดยใช้ Trino มีความจำเป็นต้อง Mount Fuse เพื่อบันทึกลงไปยัง HDFS (โดย Default จะบันทึกลงไปที่ Linux File system) หากไม่ต้องการ Mount Fuse ก็สามารถเลือก Allow User ผ่าน Ranger ได้เช่นเดียวกัน

<h3 style="color:green;">** รันคำสั่งต่อไปนี้บน Trino CLI หรือ DBeaver</h3>
<h3 style="color: red;">** หากไม่สามารถสร้างได้ อาจจะติดสิทธิของ HDFS ให้เลือกแก้ไขตามวิธีดังนี้</h3>
<li style="color:red;">- Allow User ผ่าน Ranger</li>
<li style="color:red;">- เข้าไป chmod 777 ตาม path ที่สร้างไว้</li>
 <li style="color:red;">- เปลี่ยน User Trino ให้เป็น hdfs หรือ root</li>

<hr>
<h3 style="color: red;">หลัง Create Schema และ Table แล้ว ให้ไปใช้คำสั่ง chmod 777 ที่ hdfs เพื่อให้ Trino มีสิทธิเขียนไฟล์</h3>

### สร้าง Schema
---

```sh
CREATE SCHEMA 
	catalog.schema
WITH (
	location='hdfs://nn01.bigdata:8020/user/hive/warehouse/schema.db'
);
```
### สร้าง Table
---
```bash
#Normal Table
CREATE TABLE catalog.schema.tablename (
    _c0 integer,
    age_group varchar,
    vaccine_status varchar,
    outcome varchar
)
WITH (
    format = 'PARQUET',
    location = 'hdfs://nn01.bigdata:8020/user/hive/warehouse/schema.db/tablename'
);
```
---

```bash
#Partition Table
CREATE TABLE catalog.schema.tablename (
    _c0 integer,
    age_group varchar,
    vaccine_status varchar,
    outcome varchar
)
WITH (
    format = 'PARQUET',
    partitioning = ARRAY['outcome', 'vaccine_status'],
    sorted_by = ARRAY['_c0'],
    location = 'hdfs://nn01.bigdata:8020/user/hive/warehouse/schema.db/tablename'
);
```

### Query เพื่อดูประวัติการ Snapshot

```bash
SELECT
    *
FROM
    iceberg.iceberg_test."tbl_example$snapshots"
ORDER BY
    committed_at DESC
```

### ย้อนกลับไปยัง Snapshot ก่อนหน้าหรือระบุ Snapshot ID

```bash
CALL iceberg.system.rollback_to_snapshot('iceberg_test', 'tbl_example', 8954597067493422955)
```

### Register Table <- เหมือน Hive External Table

```bash
CALL iceberg.system.register_table(
    schema_name => 'iceberg_test',
    table_name => 'tbl_example',
    table_location => 'hdfs://nn01.bigdata:8020/user/hive/warehouse/tbl_example'
    )
```

### Unregister Table <- เหมือน Hive External Table

```bash
CALL iceberg.system.unregister_table(
    schema_name => 'iceberg_test',
    table_name => 'tbl_example'
    )
```

### แสดงรายการ Record หรือ Rows ทั้งหมดในตารางพร้อมระบุเวลาที่เกิด Condition พร้อมบอกว่า ข้อมูล Row นี้อยู่ในไฟล์ parquet ตัวไหน

```bash
SELECT
    *, 
    "$path", "$file_modified_time"
FROM
    iceberg.iceberg_test.tbl_example
```

### Query แบบระบุไฟล์ Parquet

```bash
SELECT
    *
FROM
    iceberg.iceberg_test.tbl_example
WHERE
    "$path" = 'hdfs://nn01.bigdata:8020/user/hive/warehouse/iceberg_test/tbl_example/data/file1.parquet'
```