<a href="https://colab.research.google.com/github/deepavasanthkumar/spark_delta_lake/blob/main/delta_quickstart.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#QuickStart to DeltaLake

**Delta Lake** is an open source storage layer that brings reliability to data lakes. Delta Lake provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing. Delta Lake runs on top of your existing data lake and is fully compatible with Apache Spark APIs. Delta Lake on Databricks allows you to configure Delta Lake based on your workload patterns.

In [4]:
!pip install pyspark==3.2.2

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


https://github.com/delta-io/delta 
and download zip file
now inflate ita and find the python folder.

It contains a folder 'delta' - as delta.zip it and add to colab using the next steps.





In [9]:

from google.colab import files
 
 
uploaded = files.upload()

Saving delta.zip to delta.zip


In [3]:
!rm -rf /usr/local/lib/python3.7/dist-packages/delta
!unzip delta -d '/usr/local/lib/python3.7/dist-packages/'

Archive:  delta.zip
  inflating: /usr/local/lib/python3.7/dist-packages/delta/__init__.py  
  inflating: /usr/local/lib/python3.7/dist-packages/delta/_typing.py  
  inflating: /usr/local/lib/python3.7/dist-packages/delta/exceptions.py  
  inflating: /usr/local/lib/python3.7/dist-packages/delta/pip_utils.py  
 extracting: /usr/local/lib/python3.7/dist-packages/delta/py.typed  
  inflating: /usr/local/lib/python3.7/dist-packages/delta/tables.py  
   creating: /usr/local/lib/python3.7/dist-packages/delta/testing/
  inflating: /usr/local/lib/python3.7/dist-packages/delta/testing/__init__.py  
  inflating: /usr/local/lib/python3.7/dist-packages/delta/testing/log4j.properties  
  inflating: /usr/local/lib/python3.7/dist-packages/delta/testing/utils.py  
   creating: /usr/local/lib/python3.7/dist-packages/delta/tests/
  inflating: /usr/local/lib/python3.7/dist-packages/delta/tests/__init__.py  
  inflating: /usr/local/lib/python3.7/dist-packages/delta/tests/test_deltatable.py  
  inflating: /

In [5]:
!pip install deltalake

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [6]:
!pip install delta-spark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
from delta.tables import DeltaTable
import shutil

In [10]:
shutil.rmtree("/tmp/delta-table", ignore_errors=True)

#Creating SparkSession with ***configure_spark_with_delta_pip***



In [8]:
import pyspark
from delta import *
builder = SparkSession.builder.appName("DeltaLakeApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
    .config("spark.jars.packages","io.delta:delta-core_2.12:2.0.0")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

#Write Data

In [9]:
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

#Read Data

In [10]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|  2|
|  3|
|  4|
|  0|
|  1|
+---+



#Upsert (merge) new data

In [11]:
newData = spark.range(0, 20)

deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

deltaTable.alias("oldData")\
    .merge(
    newData.alias("newData"),
    "oldData.id = newData.id")\
    .whenMatchedUpdate(set={"id": col("newData.id")})\
    .whenNotMatchedInsert(values={"id": col("newData.id")})\
    .execute()

deltaTable.toDF().show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



#Update the Table

In [12]:

data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
deltaTable.toDF().show()

+---+
| id|
+---+
|  7|
|  8|
|  9|
|  5|
|  6|
+---+



# Update every even value by adding 100 to it

In [13]:
deltaTable.update(
    condition=expr("id % 2 == 0"),
    set={"id": expr("id + 100")})

deltaTable.toDF().show()

+---+
| id|
+---+
|  7|
|108|
|  9|
|  5|
|106|
+---+



# Delete every **even** value

In [14]:
deltaTable.delete(condition=expr("id % 2 == 0"))
deltaTable.toDF().show()


+---+
| id|
+---+
|  7|
|  9|
|  5|
+---+



# Read old version of data using time travel

In [15]:
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|  2|
|  3|
|  4|
|  0|
|  1|
+---+



# cleanup

In [None]:
shutil.rmtree("/tmp/delta-table")

#References




*   https://delta.io/learn/getting-started
*   https://github.com/delta-io/delta






#Issues Faced

I was trying to setup with pip install delta-spark
pip install deltalake

adding jars package with delta.io and all these were throwing error 



```
Caused by: java.lang.ClassNotFoundException: delta.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
```


This was happening when trying to use the delta format in write/read. 

`data.write.format("delta").save("/tmp/delta-table")`

This is internally referring to **tables.py** which is available in the github delta\python folder. Though there were multiple suggestions in stackoverflow, none of them, except manually copying the delta folder to python site packages worked.


This post helped to make the manual changes. 

https://stackoverflow.com/questions/65553722/no-module-named-delta-tables






