In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.errors import PySparkException
from pyspark.sql.types import StructType, StructField, StringType, TimestampType , LongType

# spark_config = config['spark']
conf = SparkConf() \
    .setAppName("spark demo") \
    .setMaster("local[*]") \
    .set("spark.jars", "/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.1.jar, /opt/spark/jars/iceberg-aws-bundle-1.5.1.jar ")\
    .set("spark.driver.memory","1g") \
    .set("spark.executor.memory", "2g") \
    .set("spark.driver.host", "localhost") \
    .set("spark.sql.shuffle.partitions", "40") \
    .set("spark.sql.catalog.apm", "org.apache.iceberg.spark.SparkCatalog") \
    .set("iceberg.engine.hive.enabled", "true") \
    .set("spark.sql.defaultCatalog", "apm")\
    .set("spark.sql.catalog.apm.type", "hive") \
    .set("spark.sql.catalog.apm.uri", "thrift://hive-metastore:9083")\
    .set("spark.sql.catalog.apm.cache-enabled", "false")\
    .set("spark.hadoop.hive.metastore.schema.verification", "false")\
    .set("spark.hadoop.hive.metastore.schema.verification.record.version", "false")\
    .set("spark.sql.catalog.apm.warehouse", "s3a://datamesh/observability/")\
    .set("spark.sql.catalog.apm.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")\
    .set("spark.sql.catalog.apm.s3.endpoint", "http://minio:9000")\
    .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")\
    .set("spark.eventLog.enabled", "false")\
    .set("spark.sql.catalog.apm.s3.path-style-access", "true")\
    .set("spark.sql.storeAssignmentPolicy", "ANSI")

spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()


In [3]:
emp_data = [
    ["001","101","John Doe","30","Male","50000","2015-01-01"],
    ["002","101","Jane Smith","25","Female","45000","2016-02-15"],
    ["003","102","Bob Brown","35","Male","55000","2014-05-01"],
    ["004","102","Alice Lee","28","Female","48000","2017-09-30"],
    ["005","103","Jack Chan","40","Male","60000","2013-04-01"],
    ["006","103","Jill Wong","32","Female","52000","2018-07-01"],
    ["007","101","James Johnson","42","Male","70000","2012-03-15"],
    ["008","102","Kate Kim","29","Female","51000","2019-10-01"],
    ["009","103","Tom Tan","33","Male","58000","2016-06-01"],
    ["010","104","Lisa Lee","27","Female","47000","2018-08-01"],
    ["011","104","David Park","38","Male","65000","2015-11-01"],
    ["012","105","Susan Chen","31","Female","54000","2017-02-15"],
    ["013","106","Brian Kim","45","Male","75000","2011-07-01"],
    ["014","107","Emily Lee","26","Female","46000","2019-01-01"],
    ["015","106","Michael Lee","37","Male","63000","2014-09-30"],
    ["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
    ["017","105","George Wang","34","Male","57000","2016-03-15"],
    ["018","104","Nancy Liu","29","Female","50000","2017-06-01"],
    ["019","103","Steven Chen","36","Male","62000","2015-08-01"],
    ["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]

emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"

In [4]:


try:
    # Create Iceberg table using the specified schema and catalog
    print("Iceberg databae created  1------ ")

    spark.sql(""" CREATE DATABASE IF NOT EXISTS observability  """)
    
    # spark.sql("create database observability location '/user/data_transform/iceberg_test'")
   
    print("Iceberg databae created  ------ ")
   
    
    # spark.sql("""
    #     CREATE TABLE IF NOT EXISTS observability.trace  (
    #         traceId string,
    #         logData string                
    #     )  USING iceberg  """)
    
    spark.sql("""
        CREATE TABLE IF NOT EXISTS observability.employee (
            employee_id string,
            department_id string,
            name string,
            age string,
            gender string,
            salary string,
            hire_date string            
        )  USING iceberg  """)
    
    print("Iceberg table trace  created ------ ")
   
    print("Iceberg table 'apm.trace' created successfully.")
except Exception as e:
    print("Error creating Iceberg table:", str(e))

 
    
    # trace_data.append({
    #     "traceId": "10012",
    #     # "createdTime": created_time,
    #     "logData": "mydata"
    # })

    # trace_df = spark.createDataFrame(trace, schema=create_schema())

    # try:
       
    #     # trace_df.writeTo("iceberg.observability.trace").create()
    #     trace_df.show()
    #     trace_df.printSchema()
    #     spark.catalog.currentDatabase()
    #     spark.catalog.listDatabases()
    #     # trace_df.writeTo("observability.trace").createOrReplace()
    #     trace_df.writeTo("observability.trace").append()
        
    #     print("Data written to Iceberg table trace successfully." )
    # except Exception as e:
    #     print("Error writing data to Iceberg trace table:", str(e))

Iceberg databae created  1------ 
Iceberg databae created  ------ 
Iceberg table trace  created ------ 
Iceberg table 'apm.trace' created successfully.


In [5]:
emp = spark.createDataFrame(data=emp_data, schema=emp_schema)

In [6]:
# Show data (ACTION)

emp.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

In [None]:
emp.writeTo("observability.employee").append()

24/05/02 02:42:23 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/05/02 02:42:23 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
