In [None]:
#!pip install dbldatagen
#!pip install jmespath

In [None]:
import import_ipynb
%run  ../Configuration.ipynb

In [None]:
import dbldatagen as dg
from pyspark.sql import functions as F
from pyspark.sql.types import (
    IntegerType, FloatType, StringType, TimestampType,
    StructField, BooleanType, StructType, ArrayType, DecimalType
    )

In [None]:
# Number of rows to generate (1 billion)
row_count = 1_000
data_spec = (
    dg.DataGenerator(name="orders", rows=row_count)
    .withIdOutput()
    .withColumn("order_id", IntegerType(), minValue=1, maxValue=100_000_000)
    .withColumn("customer_id", IntegerType(), minValue=1, maxValue=100_000_000)
    .withColumn("order_amount", FloatType(), minValue=1.0, maxValue=5000.0, random=True)
    .withColumn("order_ts", TimestampType(), begin="2022-01-01 00:00:00", end="2022-12-31 23:59:59")
)

In [None]:
# Build the DataFrame
df_orders = data_spec.build()
# Verify the row count
print(f"Total Rows: {df_orders.count()}")

In [None]:
df_orders.show(5)

In [None]:
df_orders.count()

In [None]:
df_orders.printSchema()

In [None]:
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_catalog.db2.table_MOR ( 
order_id BIGINT, 
customer_id BIGINT, 
order_amount DECIMAL(10, 2), 
order_ts TIMESTAMP 
)TBLPROPERTIES ( 
 'write.format.default'='parquet', 
 'write.delete.mode'='merge-on-read',
 'write.update.mode'='merge-on-read',
 'write.merge.mode'='merge-on-read', 
 'format-version' = '2') 
 """
)

In [None]:
df_orders.createOrReplaceTempView("tmp_tbl")

In [None]:
spark.sql("""select order_id,customer_id,order_amount,order_ts
from tmp_tbl  """).show(5,100)

In [None]:
df_orders.writeTo("iceberg_catalog.db2.table_MOR").using("iceberg").createOrReplace()

In [None]:
spark.sql("""select * 
from iceberg_catalog.db2.table_MOR
limit 10""").show(100,100)