In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
# Create a Spark session with Delta Lake configurations
spark = SparkSession.builder \
    .appName("DeltaLakeExample") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()





:: loading settings :: url = jar:file:/Users/xwyang/anaconda3/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/xwyang/.ivy2/cache
The jars for the packages stored in: /Users/xwyang/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3c75e663-3f9b-4213-8bef-185aa03d2f5a;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 221ms :: artifacts dl 12ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.0 from central in [default]
	io.delta#delta-storage;3.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |  

In [3]:
spark.sparkContext.setLocalProperty('spark.scheduler.pool','pool_1')

In [4]:
df = spark.read.csv('/Users/xwyang/Desktop/data/flights.csv',header=True,inferSchema=True)

                                                                                

In [5]:
df.show(10)

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
|1030605|    0|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
|1040605|   28|     602|   ABE|        ATL|
|1051245|   88|     602|   ABE|        ATL|
|1050605|    9|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+
only showing top 10 rows



In [6]:
df.columns

['date', 'delay', 'distance', 'origin', 'destination']

In [7]:
spark.sql('show tables').show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [9]:
df.write.format('parquet').mode('overwrite').saveAsTable('pooltbl_1')

                                                                                

In [10]:
spark.table('pooltbl_1').show(10)

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1010630|  -10|     928|   RSW|        EWR|
|1021029|   87|     974|   RSW|        ORD|
|1021346|    0|     928|   RSW|        EWR|
|1021044|   18|     928|   RSW|        EWR|
|1021730|   29|     748|   RSW|        IAH|
|1020535|  605|     974|   RSW|        ORD|
|1021820|   71|     974|   RSW|        ORD|
|1021743|    0|     928|   RSW|        EWR|
|1022017|    0|     928|   RSW|        EWR|
|1020600|   -2|     748|   RSW|        IAH|
+-------+-----+--------+------+-----------+
only showing top 10 rows



In [11]:
spark.sql("""  create table if not exists pool_scheduler_tbl 
using csv options(path'/Users/xwyang/Desktop/data/flights.csv',header=True,inferSchema=True)    
""")

                                                                                

DataFrame[]

In [12]:
spark.read.table('pool_scheduler_tbl').show(5)

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+
only showing top 5 rows



In [13]:
spark.sql("""select date,delay,distance,origin,destination,
case when delay < 0 then'Early' 
     when delay between 20 and 100 then'on-time'
     when delay >100 and delay <200 then 'minor-delay'
     Else 'delay' end as status
     from pool_scheduler_tbl
""").show(10)

+-------+-----+--------+------+-----------+-------+
|   date|delay|distance|origin|destination| status|
+-------+-----+--------+------+-----------+-------+
|1011245|    6|     602|   ABE|        ATL|  delay|
|1020600|   -8|     369|   ABE|        DTW|  Early|
|1021245|   -2|     602|   ABE|        ATL|  Early|
|1020605|   -4|     602|   ABE|        ATL|  Early|
|1031245|   -4|     602|   ABE|        ATL|  Early|
|1030605|    0|     602|   ABE|        ATL|  delay|
|1041243|   10|     602|   ABE|        ATL|  delay|
|1040605|   28|     602|   ABE|        ATL|on-time|
|1051245|   88|     602|   ABE|        ATL|on-time|
|1050605|    9|     602|   ABE|        ATL|  delay|
+-------+-----+--------+------+-----------+-------+
only showing top 10 rows



In [14]:
spark.sql('show tables').show()

+---------+------------------+-----------+
|namespace|         tableName|isTemporary|
+---------+------------------+-----------+
|  default|pool_scheduler_tbl|      false|
|  default|         pooltbl_1|      false|
+---------+------------------+-----------+



In [15]:
deltaPath='/Users/xwyang/Desktop/delta_dir'

In [16]:
dff = spark.table('pooltbl_1')

In [17]:
dff.show(10)

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1010630|  -10|     928|   RSW|        EWR|
|1021029|   87|     974|   RSW|        ORD|
|1021346|    0|     928|   RSW|        EWR|
|1021044|   18|     928|   RSW|        EWR|
|1021730|   29|     748|   RSW|        IAH|
|1020535|  605|     974|   RSW|        ORD|
|1021820|   71|     974|   RSW|        ORD|
|1021743|    0|     928|   RSW|        EWR|
|1022017|    0|     928|   RSW|        EWR|
|1020600|   -2|     748|   RSW|        IAH|
+-------+-----+--------+------+-----------+
only showing top 10 rows



In [19]:
dff.write.format('delta').mode('overwrite').save(deltaPath)

24/07/22 22:04:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [20]:
spark.conf.set("spark.sql.debug.maxToStringFields", "1000")

In [21]:
delta_df = spark.read.format('delta').load(deltaPath)

In [22]:
delta_df.show(10)



+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|3300630|   -4|     473|   LGA|        CLT|
|3301200|   -2|     160|   LGA|        BOS|
|3301400|   -5|     160|   LGA|        BOS|
|3301600|   46|     160|   LGA|        BOS|
|3301800|    0|     160|   LGA|        BOS|
|3302000|   69|     160|   LGA|        BOS|
|3300900|  -10|     186|   LGA|        DCA|
|3301100|   15|     186|   LGA|        DCA|
|3301200|    2|     186|   LGA|        DCA|
|3301300|   -5|     186|   LGA|        DCA|
+-------+-----+--------+------+-----------+
only showing top 10 rows



                                                                                

In [23]:
delta_df.printSchema()

root
 |-- date: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [24]:
delta_df.explain()

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [date#1373,delay#1374,distance#1375,origin#1376,destination#1377] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[file:/Users/xwyang/Desktop/delta_dir], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<date:int,delay:int,distance:int,origin:string,destination:string>




In [25]:
spark.read.format('delta').load(deltaPath).createOrReplaceTempView('delta_tbl')

In [26]:
spark.sql('show tables').show()

+---------+------------------+-----------+
|namespace|         tableName|isTemporary|
+---------+------------------+-----------+
|  default|pool_scheduler_tbl|      false|
|  default|         pooltbl_1|      false|
|         |         delta_tbl|      false|
+---------+------------------+-----------+



In [27]:
spark.sql(""" select * from delta_tbl limit 5  """).show()

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|3300630|   -4|     473|   LGA|        CLT|
|3301200|   -2|     160|   LGA|        BOS|
|3301400|   -5|     160|   LGA|        BOS|
|3301600|   46|     160|   LGA|        BOS|
|3301800|    0|     160|   LGA|        BOS|
+-------+-----+--------+------+-----------+



In [28]:
spark.sql("""  select count(*) as total_counts from delta_tbl""").show()

+------------+
|total_counts|
+------------+
|     1391578|
+------------+



In [30]:
spark.sql("""  select origin, count(*) as total_counts 
from delta_tbl group by origin order by total_counts DESC limit 10""").show()

+------+------------+
|origin|total_counts|
+------+------------+
|   ATL|       91484|
|   DFW|       68482|
|   ORD|       64228|
|   LAX|       54086|
|   DEN|       53148|
|   IAH|       43361|
|   PHX|       40155|
|   SFO|       39483|
|   LAS|       33107|
|   CLT|       28402|
+------+------------+



**Loading Data Streams into a Delta Lake Table**
As with static DataFrames, you can easily modify your eisting Structured Streaming jobs to write to and read from a Delta Lake table by setting the format to 'delta'. Say you have a stream of data as a DataFrame named NewStreamDF,which has the same schema as the table. You can append to the table as follows:

```

 delta_query = (NewStreamDF.writeStream
                           .format('delta')
                           .outputMode('append')
                           .option('checkpointLocation',check_dir)
                           .trigger(processingTime='10 seconds')
                           .start(deltaPath))
 ```