In [1]:
from pyspark.sql import SparkSession
from delta import *

builder = (
    SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/home/nil/anaconda3/envs/spark/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/nil/.ivy2/cache
The jars for the packages stored in: /home/nil/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7d9aead8-c48a-46f3-9fb5-5a61261dceaa;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.1.0 in central
	found io.delta#delta-storage;3.1.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 231ms :: artifacts dl 14ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.1.0 from central in [default]
	io.delta#delta-storage;3.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |  

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [3]:
spark.sql("show databases").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [4]:
schema = StructType(
    [
        StructField("emp_id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("city", StringType(), True),
        StructField("country", StringType(), True),
        StructField("contact_no", IntegerType(), True),
    ]
)

In [5]:
data = [(1000, "Michael", "Columbus", "USA", 1234567890)]

df = spark.createDataFrame(data=data, schema=schema)
df.show()

                                                                                

+------+-------+--------+-------+----------+
|emp_id|   name|    city|country|contact_no|
+------+-------+--------+-------+----------+
|  1000|Michael|Columbus|    USA|1234567890|
+------+-------+--------+-------+----------+



In [6]:
spark.sql(
    """CREATE OR REPLACE TABLE dim_employee
(
    emp_id INT,
    name STRING,
    city STRING,
    country STRING,
    contact_no INT
)
USING DELTA
LOCATION '/mnt/sda3/iNeuron/Data Engineering/pySpark/Delta_Lake/delta_lake_04'"""
)

24/02/17 12:42:12 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
24/02/17 12:42:12 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

DataFrame[]

In [7]:
spark.sql("SELECT * FROM dim_employee").show()

+------+----+----+-------+----------+
|emp_id|name|city|country|contact_no|
+------+----+----+-------+----------+
+------+----+----+-------+----------+



### Method 1: Spark SQL

In [8]:
df.createOrReplaceTempView("source_view")

In [9]:
spark.sql("select * from source_view").show()

+------+-------+--------+-------+----------+
|emp_id|   name|    city|country|contact_no|
+------+-------+--------+-------+----------+
|  1000|Michael|Columbus|    USA|1234567890|
+------+-------+--------+-------+----------+



In [10]:
spark.sql(
    """MERGE INTO dim_employee as target
USING source_view as source
    ON target.emp_id=source.emp_id
    WHEN MATCHED
THEN UPDATE SET
    target.name=source.name,
    target.city=source.city,
    target.country=source.country,
    target.contact_no=source.contact_no
WHEN NOT MATCHED THEN
INSERT(emp_id,name,city,country,contact_no) VALUES (emp_id,name,city,country,contact_no)"""
)

                                                                                

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [11]:
spark.sql("select * from dim_employee").show()

+------+-------+--------+-------+----------+
|emp_id|   name|    city|country|contact_no|
+------+-------+--------+-------+----------+
|  1000|Michael|Columbus|    USA|1234567890|
+------+-------+--------+-------+----------+



In [12]:
data = [
    (1000, "Michael", "Chicago", "USA", 1234567890),
    (2000, "Nancy", "New York", "USA", 1234567890),
]

df = spark.createDataFrame(data=data, schema=schema)
df.show()

+------+-------+--------+-------+----------+
|emp_id|   name|    city|country|contact_no|
+------+-------+--------+-------+----------+
|  1000|Michael| Chicago|    USA|1234567890|
|  2000|  Nancy|New York|    USA|1234567890|
+------+-------+--------+-------+----------+



In [13]:
df.createOrReplaceTempView("source_view")

In [14]:
# source table
spark.sql("select * from source_view").show()

+------+-------+--------+-------+----------+
|emp_id|   name|    city|country|contact_no|
+------+-------+--------+-------+----------+
|  1000|Michael| Chicago|    USA|1234567890|
|  2000|  Nancy|New York|    USA|1234567890|
+------+-------+--------+-------+----------+



In [15]:
# target table
spark.sql("select * from dim_employee").show()

+------+-------+--------+-------+----------+
|emp_id|   name|    city|country|contact_no|
+------+-------+--------+-------+----------+
|  1000|Michael|Columbus|    USA|1234567890|
+------+-------+--------+-------+----------+



In [16]:
spark.sql(
    """MERGE INTO dim_employee as target
USING source_view as source
    ON target.emp_id=source.emp_id
    WHEN MATCHED
THEN UPDATE SET
    target.name=source.name,
    target.city=source.city,
    target.country=source.country,
    target.contact_no=source.contact_no
WHEN NOT MATCHED THEN
INSERT (emp_id,name,city,country,contact_no) VALUES (emp_id,name,city,country,contact_no)"""
)

                                                                                

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [17]:
# updated target table
spark.sql("select * from dim_employee").show()

+------+-------+--------+-------+----------+
|emp_id|   name|    city|country|contact_no|
+------+-------+--------+-------+----------+
|  1000|Michael| Chicago|    USA|1234567890|
|  2000|  Nancy|New York|    USA|1234567890|
+------+-------+--------+-------+----------+



### Method 2: PySpark

In [18]:
data = [
    (2000, "Sarah", "New York", "USA", 1234567890),
    (3000, "David", "Atlanta", "USA", 1234567890),
]

df = spark.createDataFrame(data=data, schema=schema)
df.show()

+------+-----+--------+-------+----------+
|emp_id| name|    city|country|contact_no|
+------+-----+--------+-------+----------+
|  2000|Sarah|New York|    USA|1234567890|
|  3000|David| Atlanta|    USA|1234567890|
+------+-----+--------+-------+----------+



In [19]:
delta_df = DeltaTable.forPath(sparkSession=spark, path="Delta_Lake/delta_lake_04")

In [20]:
delta_df.toDF().show()

+------+-------+--------+-------+----------+
|emp_id|   name|    city|country|contact_no|
+------+-------+--------+-------+----------+
|  1000|Michael| Chicago|    USA|1234567890|
|  2000|  Nancy|New York|    USA|1234567890|
+------+-------+--------+-------+----------+



In [21]:
delta_df.alias("target").merge(
    source=df.alias("source"), condition="target.emp_id=source.emp_id"
).whenMatchedUpdate(
    set={
        "name": "source.name",
        "city": "source.city",
        "country": "source.country",
        "contact_no": "source.contact_no",
    }
).whenNotMatchedInsert(
    values={
        "emp_id": "source.emp_id",
        "name": "source.name",
        "city": "source.city",
        "country": "source.country",
        "contact_no": "source.contact_no",
    }
).execute()

                                                                                

In [22]:
# updated target table after PySpark merge
spark.sql("select * from dim_employee").show()

+------+-------+--------+-------+----------+
|emp_id|   name|    city|country|contact_no|
+------+-------+--------+-------+----------+
|  1000|Michael| Chicago|    USA|1234567890|
|  2000|  Sarah|New York|    USA|1234567890|
|  3000|  David| Atlanta|    USA|1234567890|
+------+-------+--------+-------+----------+



### Audit Log

In [25]:
spark.sql(
    """CREATE TABLE audit_log(
    operation STRING,
    updated_time timestamp,
    user_name string,
    notebook_name string,
    numTargetRowsUpdated int,
    numTargetRowsInserted int,
    numTargetRowDeleted int
)"""
)

24/02/17 12:50:27 WARN ResolveSessionCatalog: A Hive serde table will be created as there is no table provider specified. You can set spark.sql.legacy.createHiveTableByDefault to false so that native data source table will be created instead.


AnalysisException: [NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT] CREATE Hive TABLE (AS SELECT) is not supported, if you want to enable it, please set "spark.sql.catalogImplementation" to "hive".;
'CreateTable `spark_catalog`.`default`.`audit_logs`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, ErrorIfExists


#### Create Dataframe with last operation in Delta Table

In [30]:
delta_df = DeltaTable.forPath(sparkSession=spark, path="Delta_Lake/delta_lake_04")

lastOperationDF = delta_df.history(1)  # get the last operation
lastOperationDF.show(truncate=False)

+-------+-----------------------+------+--------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|version|time

#### Explode Operation Metrics Column

In [31]:
explode_df = lastOperationDF.select(
    lastOperationDF.operation, explode(lastOperationDF.operationMetrics)
)

explode_df_select = explode_df.select(
    explode_df.operation, explode_df.key, explode_df.value.cast("int")
)
explode_df_select.show(truncate=False)

+---------+--------------------------------------+-----+
|operation|key                                   |value|
+---------+--------------------------------------+-----+
|MERGE    |numTargetRowsCopied                   |1    |
|MERGE    |numTargetRowsDeleted                  |0    |
|MERGE    |numTargetFilesAdded                   |1    |
|MERGE    |numTargetBytesAdded                   |1526 |
|MERGE    |numTargetBytesRemoved                 |1503 |
|MERGE    |numTargetDeletionVectorsAdded         |0    |
|MERGE    |numTargetRowsMatchedUpdated           |1    |
|MERGE    |executionTimeMs                       |1284 |
|MERGE    |numTargetRowsInserted                 |1    |
|MERGE    |numTargetRowsMatchedDeleted           |0    |
|MERGE    |numTargetDeletionVectorsUpdated       |0    |
|MERGE    |scanTimeMs                            |597  |
|MERGE    |numTargetRowsUpdated                  |1    |
|MERGE    |numOutputRows                         |3    |
|MERGE    |numTargetDeletionVec

### Pivot Operation to Convert Rows to Columns

In [32]:
pivot_DF = explode_df_select.groupBy("operation").pivot("key").sum("value")
pivot_DF.show(truncate=False)

+---------+---------------+-------------+-------------+-------------------+---------------------+-------------------------+-----------------------------+-------------------------------+-------------------------------+-------------------+---------------------+-------------------+--------------------+---------------------+---------------------------+---------------------------+--------------------------------------+--------------------------------------+--------------------+-------------+----------+
|operation|executionTimeMs|numOutputRows|numSourceRows|numTargetBytesAdded|numTargetBytesRemoved|numTargetChangeFilesAdded|numTargetDeletionVectorsAdded|numTargetDeletionVectorsRemoved|numTargetDeletionVectorsUpdated|numTargetFilesAdded|numTargetFilesRemoved|numTargetRowsCopied|numTargetRowsDeleted|numTargetRowsInserted|numTargetRowsMatchedDeleted|numTargetRowsMatchedUpdated|numTargetRowsNotMatchedBySourceDeleted|numTargetRowsNotMatchedBySourceUpdated|numTargetRowsUpdated|rewriteTimeMs|scanTi