In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [2]:
spark = SparkSession.builder \
    .appName("SparkHiveExample") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("hive.metastore.uris", "thrift://demo-hive-metastore:9083") \
    .enableHiveSupport() \
    .getOrCreate()

In [3]:
spark.sql("CREATE DATABASE IF NOT EXISTS emp")
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
|      emp|
+---------+



In [7]:
spark.sql("USE emp;")

DataFrame[]

In [5]:
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True)
])

In [8]:
data = [(1, "James",30,"M"), (2, "Ann",40,"F"),
    (3, "Jeff",41,"M"),(4, "Jennifer",20,"F")]
df = spark.createDataFrame(data, schema)

df.write.mode('append') \
          .saveAsTable("emp.employee")

spark.sql("select * from employee;").show()

+---+--------+---+------+
| id|    name|age|gender|
+---+--------+---+------+
|  4|Jennifer| 20|     F|
|  4|Jennifer| 20|     F|
|  1|   James| 30|     M|
|  1|   James| 30|     M|
|  3|    Jeff| 41|     M|
|  3|    Jeff| 41|     M|
|  2|     Ann| 40|     F|
|  2|     Ann| 40|     F|
+---+--------+---+------+



In [9]:
data = [(5, "Sky",30,"M"), (6, "Anna",40,"F")]
df = spark.createDataFrame(data)
df.write.mode('append') \
          .insertInto("employee")
spark.sql("select * from employee;").show()

+---+--------+---+------+
| id|    name|age|gender|
+---+--------+---+------+
|  4|Jennifer| 20|     F|
|  4|Jennifer| 20|     F|
|  1|   James| 30|     M|
|  1|   James| 30|     M|
|  3|    Jeff| 41|     M|
|  3|    Jeff| 41|     M|
|  6|    Anna| 40|     F|
|  5|     Sky| 30|     M|
|  2|     Ann| 40|     F|
|  2|     Ann| 40|     F|
+---+--------+---+------+



In [13]:
# Create a new session to test if the metadata persist
spark_new = SparkSession.builder \
    .appName("SparkHiveExample") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .enableHiveSupport() \
    .getOrCreate()

In [14]:
spark_new.sql("select * from emp.employee;").show()

+---+--------+---+------+
| id|    name|age|gender|
+---+--------+---+------+
|  4|Jennifer| 20|     F|
|  4|Jennifer| 20|     F|
|  1|   James| 30|     M|
|  1|   James| 30|     M|
|  3|    Jeff| 41|     M|
|  3|    Jeff| 41|     M|
|  6|    Anna| 40|     F|
|  5|     Sky| 30|     M|
|  2|     Ann| 40|     F|
|  2|     Ann| 40|     F|
+---+--------+---+------+

