In [1]:
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta import *
from delta.tables import *
from datetime import datetime
import pandas as pd
import json

warehouse_location = 'hdfs://hdfs:9000/hive/warehouse'

In [2]:
# !wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.1.2/spark-sql-kafka-0-10_2.12-3.1.2.jar
# !wget https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.1.2/spark-token-provider-kafka-0-10_2.12-3.1.2.jar
# !wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.6.0/kafka-clients-2.6.0.jar
# !wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.6.2/commons-pool2-2.6.2.jar
# !cp *.jar $SPARK_HOME/jars

In [3]:
builder = SparkSession \
    .builder \
    .appName("hive") \
    .master('spark://master:7077') \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.warehouse.dir", warehouse_location)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-bc52b71e-c598-4a10-8316-f0f3a427f6e8;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.0.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-core_2.12/1.0.0/delta-core_2.12-1.0.0.jar ...
	[SUCCESSFUL ] io.delta#delta-core_2.12;1.0.0!delta-core_2.12.jar (1057ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4/4.7/antlr4-4.7.jar ...
	[SUCCESSFUL ] org.antlr#antlr4;4.7!antlr4.jar (229ms)
downloading https://repo1.maven

In [4]:
!$HADOOP_HOME/bin/hdfs dfs -rm -r hdfs://hdfs:9000/hive/warehouse/employee_salary

Deleted hdfs://hdfs:9000/hive/warehouse/employee_salary


In [5]:
sql = """
CREATE TABLE IF NOT EXISTS default.employee_salary (
  id INT,
  name STRING,
  salary FLOAT,
  last_update TIMESTAMP,
  time_created TIMESTAMP
) USING DELTA
"""
spark.sql(sql)

                                                                                me version 4.8ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8

DataFrame[]

In [6]:
deltadf = DeltaTable.forName(spark, 'default.employee_salary')
deltadf

<delta.tables.DeltaTable at 0x7f75b1e4dca0>

In [7]:
spark_jdbc = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://postgresql:5432/postgres") \
.option("user", "postgres") \
.option("password", "postgres") \
.option("driver", "org.postgresql.Driver")

In [8]:
df_employee = spark_jdbc.option("dbtable", "public.employee").load()
df_salary = spark_jdbc.option("dbtable", "public.salary").load()

In [9]:
df = spark \
.readStream \
.format('kafka') \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", 'employee.public.salary') \
.option("startingOffsets", "earliest") \
.load()
# .option('kafka.group.id', 'test7') \
# .load()
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic")

In [10]:
def millisecond_to_timestamp(row):
    try:
        ms_dt = datetime.fromtimestamp(ms / 1_000_000.0)
    except ValueError:
        ms_dt = None

    return ms_dt

In [11]:
def merge(microdf, batchId):
    print(f'batch id: {batchId}, rows count: {microdf.count()}')
    after = [json.loads(row['value']) for row in microdf.select('value').collect()]
    after = [row['payload']['after'] for row in after]
    df_pd = pd.DataFrame(after)
    for s in df_salary.schema:
        if s.dataType == TimestampType():
            df_pd[s] = df_pd[s].apply(millisecond_to_timestamp, axis = 1)

    after_df = spark.createDataFrame(df_pd, schema = df_salary.schema)
    inner_join = after_df.join(df_employee, ['id'])
    deltadf.alias('t').merge(inner_join.alias('s'),'s.id = t.id') \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

In [12]:
streamQuery = df.writeStream.format('delta').outputMode('append') \
.foreachBatch(merge) \
.start()

22/01/27 08:23:22 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-1dec56a6-6017-451a-9d74-7cedb1784209. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

batch id: 0, rows count: 2


                                                                                

In [13]:
spark.sql('select * from default.employee_salary').show()

                                                                                

+---+------+------+--------------------+--------------------+
| id|  name|salary|         last_update|        time_created|
+---+------+------+--------------------+--------------------+
|  1|husein|1000.0|2022-01-25 15:48:...|2022-01-25 15:48:...|
|  2| kasim|1000.0|2022-01-25 15:48:...|2022-01-25 15:48:...|
+---+------+------+--------------------+--------------------+

