In [1]:
! pip freeze > requirements.txt

# Imports

In [None]:
import json
from pyspark.sql import SparkSession
from delta import *

from delta.tables import DeltaTable

# Start Session

In [6]:
builder = SparkSession.builder.appName(" Refactored  local test") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.driver.memory", "2g")
 
spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Dataframe Operation

## Creation 

In [7]:
# Create a sample DataFrame
data = [
    (1, "Alice", 30),
    (2, "Bob", 31),
    (3, "Cathy", 26),
    (4, "David", 34)
]

columns = ["id", "name", "age"]

df = spark.createDataFrame(data, columns)

# Show the DataFrame
df.show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 30|
|  2|  Bob| 31|
|  3|Cathy| 26|
|  4|David| 34|
+---+-----+---+



## save in delta format using  `.save(<path>)`

In [3]:
# Save the DataFrame as a Delta table in user defined location  and dont register it as a table in spark
delta_path = "output/sample_table"

df.write.format("delta").mode("overwrite").save(delta_path)


24/07/04 12:39:26 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/07/04 12:39:30 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

In [None]:
# view history of the table


## Save in delta format using  `.saveAsTables(<table_name>,path=<path>)`

In [16]:
# save the dataframe as a delta table and register it as a table in spark

df.write.format("delta").mode("overwrite").saveAsTable("sample_table_spark")

# if path is not provided then it will be saved in default location "spark-warehouse" it will create that directory if not present

                                                                                

In [17]:
# to verify the table is created or not
spark.catalog.listTables()

[Table(name='sample11', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='sample_table', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='sample_table_spark', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False)]

## View History

In [18]:
# To read the history, you can use the table name  for tables registered in spark  i.e  saved using saveAsTable
history_df = spark.sql("DESCRIBE HISTORY sample_table_spark")
history_df.show()

+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      0|2024-07-04 12:48:...|  NULL|    NULL|CREATE OR REPLACE...|{partitionBy -> [...|NULL|    NULL|     NULL|       NULL|  Serializable|        false|{numFiles -> 5, n...|        NULL|Apache-Spark/3.5....|
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+-----------

In [15]:
# for  other tables saved using save() method

# Create a DeltaTable object
delta_table = DeltaTable.forPath(spark, delta_path)

# Describe the history of the Delta table
history_df = delta_table.history()

# Show the history
history_df.show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      6|2024-07-04 12:39:...|  NULL|    NULL|    WRITE|{mode -> Overwrit...|NULL|    NULL|     NULL|          5|  Serializable|        false|{numFiles -> 5, n...|        NULL|Apache-Spark/3.5....|
|      5|2024-07-04 12:37:...|  NULL|    NULL|    WRITE|{mode -> Overwrit...|NULL|    NULL|     NULL|          4|  Serializable|        false|{numFiles -> 5, n...|        NULL|Apache-Spark/3.5....|
|      4|2

In [19]:
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")


'0'