In [1]:
pip install delta-spark

Collecting delta-spark
  Downloading delta_spark-1.2.1-py3-none-any.whl (19 kB)
Collecting pyspark<3.3.0,>=3.2.0
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
Collecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=bd151a15cbc12a6edfbbfa6602cda73396fb42a145c7933ae460d9134a994808
  Stored in directory: c:\users\syed3\appdata\local\pip\cache\wheels\52\45\50\69db7b6e1da74a1b9fcc097827db9185cb8627117de852731e
Successfully built pyspark
Installing collected packages: py4j, pyspark, delta-spark
Successfully installed delta-spark-1.2.1 py4j-0.10.9.3 pyspark-3.2.1
Note: you may need to restart the kernel to use updated packages.


In [3]:
import findspark
findspark.init()
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [6]:

spark.sql("set spark.sql.shuffle.partitions = 1")

sourcePath = r"C:\Users\syed3\Downloads\LearningSparkV2-master\LearningSparkV2-master\CSV's\loans\loan-risks.snappy.parquet"

# Configure Delta Lake Path
deltaPath = r"C:\temp"

# # Remove folder if it exists
# dbutils.fs.rm(deltaPath, recurse=True)

# Create the Delta table with the same loans data
(spark.read.format("parquet").load(sourcePath) 
  .write.format("delta").save(deltaPath))

spark.read.format("delta").load(deltaPath).createOrReplaceTempView("loans_delta")
print("Defined view 'loans_delta'")

Defined view 'loans_delta'


In [8]:
## Loans row count
spark.sql("SELECT count(*) FROM loans_delta").show()

+--------+
|count(1)|
+--------+
|   14705|
+--------+



In [9]:
spark.sql("SELECT * FROM loans_delta LIMIT 5").show()

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   182.22|        CA|
|      1|       1000|   361.19|        WA|
|      2|       1000|   176.26|        TX|
|      3|       1000|   1000.0|        OK|
|      4|       1000|   249.98|        PA|
+-------+-----------+---------+----------+



### Loading Data Streams into a Delta Lake Table

In [None]:
newLoanStreamDF = ... # Streaming DataFrame with new loans data
checkpointDir = ... # Directory for streaming checkpoints
streamingQuery = (newLoanStreamDF.writeStream
 .format("delta")
 .option("checkpointLocation", checkpointDir)
 .trigger(processingTime = "10 seconds")
 .start(deltaPath))

###  Enforcing Schema on Write to Prevent Data Corruption
The Delta Lake format records the schema as table-level metadata. Hence, all writes
to a Delta Lake table can verify whether the data being written has a schema compati‐
ble with that of the table. If it is not compatible, Spark will throw an error before any
data is written and committed to the table, thus preventing such accidental data corruption

In [12]:
from pyspark.sql.functions import *
cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed']
items = [
(1111111, 1000, 1000.0, 'TX', True),
(2222222, 2000, 0.0, 'CA', False)
]
loanUpdates = (spark.createDataFrame(items, cols)
 .withColumn("funded_amnt", col("funded_amnt").cast("int")))
loanUpdates.write.format("delta").mode("append").save(deltaPath)

AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: a526dc65-ff3d-44c1-b902-187eabd43e9e).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- loan_id: long (nullable = true)
-- funded_amnt: integer (nullable = true)
-- paid_amnt: double (nullable = true)
-- addr_state: string (nullable = true)


Data schema:
root
-- loan_id: long (nullable = true)
-- funded_amnt: integer (nullable = true)
-- paid_amnt: double (nullable = true)
-- addr_state: string (nullable = true)
-- closed: boolean (nullable = true)

         

### Evolving Schemas to Accommodate Changing Data 

In [13]:
(loanUpdates.write.format("delta").mode("append")
 .option("mergeSchema", "true")
 .save(deltaPath))

### #Updating data to fix errors
#Suppose upon, reviewing the data, we realized that all of the loans assigned to addr_state = 'OR'
#should have been assigned to addr_state = 'WA'

In [30]:
# data before updating
spark.sql("""SELECT addr_state, count(1) FROM loans_delta WHERE addr_state IN ('OR', 'WA', 'CA', 'TX', 'NY') GROUP BY addr_state""").show()

+----------+--------+
|addr_state|count(1)|
+----------+--------+
|        CA|    2017|
|        OR|     518|
|        TX|    1295|
|        NY|    1282|
+----------+--------+



In [32]:
# updating the data
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.update("addr_state = 'OR'", {"addr_state": "'WA'"})

In [33]:
# after updating
spark.sql("""SELECT addr_state, count(1) FROM loans_delta WHERE addr_state IN ('OR', 'WA', 'CA', 'TX', 'NY') GROUP BY addr_state""").show()

+----------+--------+
|addr_state|count(1)|
+----------+--------+
|        CA|    2017|
|        WA|     518|
|        TX|    1295|
|        NY|    1282|
+----------+--------+



In [22]:
c=deltaTable.toDF()

In [24]:
c.show(5)

+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
|      0|       1000|   182.22|        CA|  null|
|      1|       1000|   361.19|        WA|  null|
|      2|       1000|   176.26|        TX|  null|
|      3|       1000|   1000.0|        OK|  null|
|      4|       1000|   249.98|        PA|  null|
+-------+-----------+---------+----------+------+
only showing top 5 rows



In [25]:
type(c)

pyspark.sql.dataframe.DataFrame

In [34]:
# For user whose loans been paid-off
spark.sql("SELECT COUNT(*) FROM loans_delta WHERE funded_amnt = paid_amnt").show()

+--------+
|count(1)|
+--------+
|    5134|
+--------+



In [35]:
# del the user with fully paid loans
deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.delete("funded_amnt >= paid_amnt")


In [41]:
spark.sql("SELECT COUNT(*) FROM loans_delta WHERE funded_amnt = paid_amnt").show()

+--------+
|count(1)|
+--------+
|       1|
+--------+



### Upserting change data to a table using merge
A common use cases is Change Data Capture (CDC), where you have to replicate row changes made in an OLTP table to another table for OLAP workloads. To continue with our loan data example, say we have another table of new loan information, some of which are new loans and others are updates to existing loans. In addition, let’s say this changes table has the same schema as the loan_delta table. You can upsert these changes into the table using the DeltaTable.merge() operation which is based on the MERGE SQL command. 

In [37]:
spark.sql("select * from loans_delta where addr_state = 'NY' and loan_id < 30").show()

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
+-------+-----------+---------+----------+



In [38]:
cols = ["loan_id", "funded_amnt", "paid_amnt", "addr_state", "closed"]

items = [
  (11, 1000, 1000.0, 'NY', True),   # loan paid off
  (12, 1000, 0.0, 'NY', False)      # new loan
]

loanUpdates = spark.createDataFrame(items, cols)

In [39]:
#update the table with the changed data using the merge operation
deltaTable = DeltaTable.forPath(spark, deltaPath)

(deltaTable
  .alias("t")
  .merge(loanUpdates.alias("s"), "t.loan_id = s.loan_id") 
  .whenMatchedUpdateAll() 
  .whenNotMatchedInsertAll() 
  .execute())

In [40]:
#UPDAted
spark.sql("select * from loans_delta where addr_state = 'NY' and loan_id < 30").show()

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|     11|       1000|   1000.0|        NY|
|     12|       1000|      0.0|        NY|
+-------+-----------+---------+----------+



## Deduplicating data while inserting using insert-only merge
The merge operation in Delta Lake supports an extended syntax beyond that specified by the ANSI standard. It supports advanced features like the following.

- Delete actions: For example, MERGE … WHEN MATCHED THEN DELETE
- Clause conditions: For example, `MERGE … WHEN MATCHED AND THEN ...``
- Optional actions: All the MATCHED and NOT MATCHED clauses are optional.
- Star syntax: For example, UPDATE * and INSERT * to update/insert all the columns in the target table with matching columns from the source dataset. The equivalent API in DeltaTable is updateAll() and insertAll(), which we have already seen.<br>

This allows you to express many more complex use cases with little code. For example, say you want to backfill the loan_delta table with historical data of past loans. But some of the historical data may already have been inserted in the table and you don't want to update them (since their emails may already have been updated). You can deduplicate by the loan_id while inserting by running the following merge operation with only the INSERT action (since the UPDATE action is optional).

In [42]:
spark.sql("select * from loans_delta where addr_state = 'NY' and loan_id < 30").show()

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|     11|       1000|   1000.0|        NY|
|     12|       1000|      0.0|        NY|
+-------+-----------+---------+----------+



Let's say we have some historical data that we want to merge with this table. One of the historical loan exists in the current table but the historical table has old values, therefore it should not update the current value present in the table. And another historical does not exist in the current table, therefore it should be inserted into the table.

In [43]:
cols = ["loan_id", "funded_amnt", "paid_amnt", "addr_state", "closed"]

items = [
  (11, 1000, 0.0, "NY", False),
  (-100, 1000, 10.0, "NY", False)
]

historicalUpdates = spark.createDataFrame(items, cols)

In [44]:
deltaTable = DeltaTable.forPath(spark, deltaPath)

(deltaTable
  .alias("t")
  .merge(historicalUpdates.alias("s"), "t.loan_id = s.loan_id") 
  .whenNotMatchedInsertAll() 
  .execute())

In [45]:
spark.sql("select * from loans_delta where addr_state = 'NY' and loan_id < 30").show()
#Notice that the only change in the table is that insert of new loan, and existing loans were not updated to old values.

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|     11|       1000|   1000.0|        NY|
|     12|       1000|      0.0|        NY|
|   -100|       1000|     10.0|        NY|
+-------+-----------+---------+----------+



In [48]:
c.show()

+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
|     11|       1000|   1000.0|        NY|  true|
|     12|       1000|      0.0|        NY| false|
|   -100|       1000|     10.0|        NY| false|
+-------+-----------+---------+----------+------+



In [49]:
deltaTable.history().show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      7|2022-06-12 11:30:...|  null|    null|    MERGE|{predicate -> (t....|null|    null|     null|          6|  Serializable|        false|{numTargetRowsCop...|        null|Apache-Spark/3.2....|
|      6|2022-06-12 11:14:...|  null|    null|    MERGE|{predicate -> (t....|null|    null|     null|          5|  Serializable|        false|{numTargetRowsCop...|        null|Apache-Spark/3.2....|
|      5|2

In [50]:
deltaTable.history(4).select("version", "timestamp", "operation", "operationParameters").show(truncate=False)

+-------+-----------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------+
|version|timestamp              |operation|operationParameters                                                                                                                      |
+-------+-----------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------+
|7      |2022-06-12 11:30:12.855|MERGE    |{predicate -> (t.loan_id = s.loan_id), matchedPredicates -> [], notMatchedPredicates -> [{"actionType":"insert"}]}                       |
|6      |2022-06-12 11:14:51.765|MERGE    |{predicate -> (t.loan_id = s.loan_id), matchedPredicates -> [{"actionType":"update"}], notMatchedPredicates -> [{"actionType":"insert"}]}|
|5      |2022-06-12 11:09:07.989|DELETE   |{predicate -> ["(CAST(funded_amnt AS DOUBLE) >=

# Querying previous snapshots of the table with time travel
Delta Lake’s time travel feature allows you to access previous versions of the table. Here are some possible uses of this feature:

Auditing Data Changes
Reproducing experiments & reports
Rollbacks
You can query by using either a timestamp or a version number using Python, Scala, and/or SQL syntax. For this examples we will query a specific version using the Python syntax.

For more information, refer to Introducing Delta Time Travel for Large Scale Data Lakes and the docs.

Let's query the table's state before we deleted the data, which still contains the fully paid loans.

In [51]:
previousVersion = deltaTable.history(1).select("version").first()[0] - 3

(spark.read.format("delta")
  .option("versionAsOf", previousVersion)
  .load(deltaPath)
  .createOrReplaceTempView("loans_delta_pre_delete"))

spark.sql("SELECT COUNT(*) FROM loans_delta_pre_delete WHERE funded_amnt = paid_amnt").show()

+--------+
|count(1)|
+--------+
|    5134|
+--------+



# DATALAKES KEY OPERATIONS
Transactional guarantees and schema management, like databases
• Scalability and openness, like data lakes

• Support for concurrent batch and streaming workloads with ACID guarantees

• Support for transformation of existing data using update, delete, and merge operations that ensure ACID guarantees

• Support for versioning, auditing of operation history, and querying of previous versions