# Extracting from Data Source

In [0]:
file_location = "dbfs:/databricks-datasets/flights/"
file_type = "csv" 

df = spark.read.format(file_type) \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .load(file_location)

df.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

# Setting Up Delta Table

In [0]:
# Define the location for the Delta table
delta_table_path = "/delta/flights_data"

# Write the transformed DataFrame to Delta format
df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(delta_table_path)

# Register the Delta table as a temporary view
spark.read.format("delta").load(delta_table_path).createOrReplaceTempView("flights_data")

# Query the top 10 rows from the Delta table using the temporary view
display(spark.sql("SELECT * FROM flights_data LIMIT 10"))

date,delay,distance,origin,destination
3131005,-3,618,MDW,EWR
3132055,25,748,MDW,BOS
3131710,13,501,MDW,IAD
3130720,-5,218,MDW,STL
3131445,12,778,MDW,DEN
3131940,3,1014,MDW,FLL
3131525,8,405,MDW,BKG
3132055,27,623,MDW,ALB
3131320,33,266,MDW,CLE
3131115,-1,352,MDW,MCI


# Capabilities of Delta Lake

## 1. Schema Enforcement

Initially, data is written with a certain schema. Later, a new column is added, and Delta Lake supports this schema evolution by merging the new schema with the existing one without any data loss or corruption.

In [0]:
from pyspark.sql.functions import lit

df = spark.sql("SELECT * FROM flights_data")

# Add a new column and write (schema evolution)
df_with_new_column = df.withColumn("new_column", lit("default_value"))
df_with_new_column.write.format("delta").mode("append").option("mergeSchema", "true").save("/delta/my_table")
df_with_new_column.show()

+--------+-----+--------+------+-----------+-------------+
|    date|delay|distance|origin|destination|   new_column|
+--------+-----+--------+------+-----------+-------------+
|03131005|   -3|     618|   MDW|        EWR|default_value|
|03132055|   25|     748|   MDW|        BOS|default_value|
|03131710|   13|     501|   MDW|        IAD|default_value|
|03130720|   -5|     218|   MDW|        STL|default_value|
|03131445|   12|     778|   MDW|        DEN|default_value|
|03131940|    3|    1014|   MDW|        FLL|default_value|
|03131525|    8|     405|   MDW|        BKG|default_value|
|03132055|   27|     623|   MDW|        ALB|default_value|
|03131320|   33|     266|   MDW|        CLE|default_value|
|03131115|   -1|     352|   MDW|        MCI|default_value|
|03130755|    9|     866|   MDW|        TPA|default_value|
|03131405|    0|     495|   MDW|        BHM|default_value|
|03132100|   48|     218|   MDW|        STL|default_value|
|03131650|   23|     647|   MDW|        CHS|default_valu

## 2. Time Travel (Data Versioning)

Delta Lake also allows us to access historical data by specifying a version number or a timestamp. This is useful for auditing, rollbacks, and reproducing results.

In [0]:
# Accessing a specific version of the data
df_version = spark.read.format("delta").option("versionAsOf", 1).load("/delta/flights_data")
df_version.show()

# Accessing data as of a specific timestamp
df_timestamp = spark.read.format("delta").option("timestampAsOf", "2023-11-29 23:00:25").load("/delta/flights_data")
df_timestamp.show()

+------+-------------+
|origin|average_delay|
+------+-------------+
|   MSY|        13.18|
|   SNA|        11.51|
|   PSG|        -0.25|
|   MYR|         7.65|
|   PVD|        10.41|
|   OAK|        13.88|
|   MQT|        23.87|
|   MSN|         8.66|
|   SCC|          1.6|
|   MLU|         7.91|
|   WRG|         -2.5|
|   LEX|        13.87|
|   RDM|         4.27|
|   ORF|        10.36|
|   SCE|        17.92|
|   SAV|        10.41|
|   TRI|         6.67|
|   MOD|         9.93|
|   TYR|         3.57|
|   MOB|        11.26|
+------+-------------+
only showing top 20 rows

+------+-------------+
|origin|average_delay|
+------+-------------+
|   GEG|         3.99|
|   BUR|         9.59|
|   GRB|         9.47|
|   GTF|          2.1|
|   IDA|         3.57|
|   GRR|        13.42|
|   EUG|        11.74|
|   GSO|        15.86|
|   BTM|        -0.77|
|   COD|         2.37|
|   FAR|        17.57|
|   FSM|          4.0|
|   DCA|         8.49|
|   CID|        16.12|
|   GTR|        14.06|
|   HLN|

## 3. Metadata

Delta Lake stores metadata about each Delta table, which includes information about the schema, partitioning, and table history. To explore and use this metadata, we can utilize Delta Lake's built-in functions. 

In [0]:
from delta.tables import DeltaTable

In [0]:
# Load the Delta table
deltaTable = DeltaTable.forPath(spark, "/delta/flights_data")

# View history of the Delta table
print("1\n",deltaTable.history().show(truncate=False))
# Get the schema of the Delta table
print("2\n",deltaTable.toDF().schema)
# Accessing Delta table metadata (partitioning and properties)
print("3\n",deltaTable.toDF().describe().show(truncate=False))

+-------+-------------------+----------------+--------------+---------+------------------------------------------------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------------------------------------------------------+------------+------------------------------------------+
|version|timestamp          |userId          |userName      |operation|operationParameters                                         |job |notebook          |clusterId           |readVersion|isolationLevel   |isBlindAppend|operationMetrics                                                    |userMetadata|engineInfo                                |
+-------+-------------------+----------------+--------------+---------+------------------------------------------------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------------------------------------------------------+------------+---