In [1]:
%AddJar https://repo1.maven.org/maven2/io/delta/delta-core_2.11/0.4.0/delta-core_2.11-0.4.0.jar

Starting download from https://repo1.maven.org/maven2/io/delta/delta-core_2.11/0.4.0/delta-core_2.11-0.4.0.jar
Finished download of delta-core_2.11-0.4.0.jar


In [50]:
import org.apache.spark.sql.delta.DeltaLog
import io.delta.tables._
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.{SaveMode, SparkSession, DataFrame}

# Create table 

In [63]:
  def createTable(data: DataFrame, tableName: String ): Unit = {
    data
      .write
      .format("delta")
      .mode(SaveMode.Overwrite)
      .save("/opt/deltalake/" + tableName)
  }

createTable: (data: org.apache.spark.sql.DataFrame, tableName: String)Unit


# Read table

In [66]:
 def readTable(tableName: String): DataFrame = {
   val df = spark
      .read
      .format("delta")
      .load("/opt/deltalake/" + tableName)
     df
  }

readTable: (tableName: String)org.apache.spark.sql.DataFrame


# UpdateTableData 

In [89]:
def updateDeltaTable(data: DataFrame, tableName: String, savemode: String): Unit = {
    data
      .write
      .format("delta")
      .mode(savemode)
      .save("/opt/deltalake/" + tableName)
  }

updateDeltaTable: (data: org.apache.spark.sql.DataFrame, tableName: String, savemode: String)Unit


# Time Travel

In [70]:
 def timeTravel(tableName: String, version: Int): DataFrame = {
   val df = spark
      .read
      .format("delta")
      .option("versionAsOf", version)
      .load("/opt/deltalake/" + tableName)
     df
  }

timeTravel: (tableName: String, version: Int)org.apache.spark.sql.DataFrame


# Add New Column 

In [100]:
 def addColumn(data: DataFrame,tableName: String): Unit = {
  data
      .write
      .format("delta")
      .mode("overwrite")
      .option("mergeSchema", "true")
      .save("/opt/deltalake/" + tableName)
  }

addColumn: (data: org.apache.spark.sql.DataFrame, tableName: String)Unit


# Get History of the Table

In [111]:
 def getLastestHistory(tableName: String): DataFrame = {
     val deltaTable = DeltaTable.forPath(spark, "/opt/deltalake/" + tableName)
     val lastOperationDF  = deltaTable.history(1) 
     lastOperationDF 
  }

getLastestHistory: (tableName: String)org.apache.spark.sql.DataFrame


# Get last operation on Table

In [None]:
 def getHistory(tableName: String): DataFrame = {
     val deltaTable = DeltaTable.forPath(spark, "/opt/deltalake/" + tableName)
     val fullHistoryDF = deltaTable.history() 
     fullHistoryDF
  }

In [None]:
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

val fullHistoryDF = deltaTable.history() 

In [52]:
val df = spark.read.option("header",true).csv("Sale_test.csv")

df = [id: string, product_id: string ... 2 more fields]


[id: string, product_id: string ... 2 more fields]

In [53]:
val modifiedDF = df.withColumn("date", date_format($"created_at", "yyyy-MM-dd"))

modifiedDF = [id: string, product_id: string ... 3 more fields]


[id: string, product_id: string ... 3 more fields]

In [58]:
createTable(modifiedDF, "sales")

In [67]:
val sales_df = readTable("sales")

sales_df = [id: string, product_id: string ... 3 more fields]


[id: string, product_id: string ... 3 more fields]

In [69]:
sales_df.count()

22

In [86]:
val data = spark.range(0, 5).toDF("no")
createTable(data, "numbers")

data = [no: bigint]


[no: bigint]

In [90]:
val moreData = spark.range(20, 25).toDF("no")
updateDeltaTable(moreData, "numbers", "overwrite")

moreData = [no: bigint]


[no: bigint]

In [92]:
val moreMoreData = spark.range(26, 30)
updateDeltaTable(moreData, "numbers", "append")

moreMoreData = [id: bigint]


[id: bigint]

In [93]:
val no_df = readTable("numbers")
no_df.show()

+---+
| no|
+---+
| 20|
| 22|
| 22|
| 24|
| 21|
| 24|
| 23|
| 20|
| 23|
| 21|
+---+



no_df = [no: bigint]


[no: bigint]

In [94]:
val version_0_df = timeTravel("numbers", 0)
version_0_df .show()

+---+
| no|
+---+
|  2|
|  4|
|  3|
|  1|
|  0|
+---+



version_0_df = [no: bigint]


[no: bigint]

In [95]:
val version_1_df = timeTravel("numbers", 1)
version_1_df.show()

+---+
| no|
+---+
| 22|
| 24|
| 20|
| 23|
| 21|
+---+



version_1_df = [no: bigint]


[no: bigint]

In [98]:
val version_2_df = timeTravel("numbers", 2)
version_2_df.show()

+---+
| no|
+---+
| 20|
| 22|
| 22|
| 24|
| 21|
| 24|
| 23|
| 20|
| 23|
| 21|
+---+



version_2_df = [no: bigint]


[no: bigint]

## Adding new column in existing data

In [101]:
val new_df = version_2_df.withColumn("new_col",lit("abc"))
new_df.show()

+---+-------+
| no|new_col|
+---+-------+
| 20|    abc|
| 22|    abc|
| 22|    abc|
| 24|    abc|
| 21|    abc|
| 24|    abc|
| 23|    abc|
| 20|    abc|
| 23|    abc|
| 21|    abc|
+---+-------+



new_df = [no: bigint, new_col: string]


[no: bigint, new_col: string]

In [102]:
addColumn(new_df, "numbers")

In [103]:
val latest_df = readTable("numbers")
latest_df.show()

+---+-------+
| no|new_col|
+---+-------+
| 23|    abc|
| 20|    abc|
| 21|    abc|
| 24|    abc|
| 23|    abc|
| 21|    abc|
| 22|    abc|
| 24|    abc|
| 20|    abc|
| 22|    abc|
+---+-------+



latest_df = [no: bigint, new_col: string]


[no: bigint, new_col: string]

## Getting history of the table

In [112]:
val numbers_table_his = getHistory("numbers")
numbers_table_his.show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|      3|2019-11-01 12:17:31|  null|    null|    WRITE|[mode -> Overwrit...|null|    null|     null|          2|          null|        false|
|      2|2019-11-01 11:43:20|  null|    null|    WRITE|[mode -> Append, ...|null|    null|     null|          1|          null|         true|
|      1|2019-11-01 11:43:05|  null|    null|    WRITE|[mode -> Overwrit...|null|    null|     null|          0|          null|        false|
|      0|2019-11-01 11:42:13|  null|    null|    WRITE|[mode -> Overwrit...|null|    null|     null|       null|          null|        false|
+-----

numbers_table_his = [version: bigint, timestamp: timestamp ... 10 more fields]


[version: bigint, timestamp: timestamp ... 10 more fields]

In [116]:
val numbers_table_lat_his = getLastestHistory("numbers")
numbers_table_lat_his.show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|      3|2019-11-01 12:17:31|  null|    null|    WRITE|[mode -> Overwrit...|null|    null|     null|          2|          null|        false|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+



numbers_table_lat_his = [version: bigint, timestamp: timestamp ... 10 more fields]


[version: bigint, timestamp: timestamp ... 10 more fields]