In [None]:
df = spark.read.format("csv").option("header","true").load("Files/covid.csv")

# df now is a Spark DataFrame containing CSV data from "Files/covid.csv".
display(df)

In [None]:
#creating a managed delta table from a dataframe : managed table
# Deleting the table will also delete the underlying files from the Tables storage location for the lakehouse.
df.write.format("delta").mode("overwrite").saveAsTable("covid_delta_table")

In [2]:
#creates an external table for which the data is stored in the folder in the Files storage location for the lakehouse.
#Deleting an external table from the lakehouse metastore does not delete the associated data files.
df.write.format("delta").saveAsTable("myexternalCovid_Delta_table", path="Files/covid")

StatementMeta(, 3b8d7018-50fa-4623-833d-4b84bd7390dc, 4, Finished, Available)

In [None]:
#create delta table through DeltaTableBuilder API 
from delta.tables import *

DeltaTable.create(spark) \
  .tableName("products") \
  .addColumn("Productid", "INT") \
  .addColumn("ProductName", "STRING") \
  .addColumn("Category", "STRING") \
  .addColumn("Price", "FLOAT") \
  .execute()


In [1]:
 # insert data to delta tables in Spark using Spark SQL.
 spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")

StatementMeta(, 13aa77be-5785-48f6-9acd-82819bd031ce, 3, Finished, Available)

DataFrame[]

In [6]:
# query data from delta tables
mydf = spark.sql("select * from products")
display(mydf)

StatementMeta(, 13aa77be-5785-48f6-9acd-82819bd031ce, 8, Finished, Available)

SynapseWidget(Synapse.DataFrame, f87aea9a-a3f0-4de7-b714-3738fc35501e)

In [5]:
mydf = spark.sql("SELECT * FROM AmberFabricWSLakehouse.covid_delta_table LIMIT 1000")
display(mydf)

StatementMeta(, 13aa77be-5785-48f6-9acd-82819bd031ce, 7, Finished, Available)

SynapseWidget(Synapse.DataFrame, 1c2bed7c-0ac8-4562-882b-0084604475ab)

In [2]:
-- create managed delta tables by using the Spark SQL CREATE TABLE statement

--%%sql

CREATE TABLE salesorders
(
    Orderid INT NOT NULL,
    OrderDate TIMESTAMP NOT NULL,
    CustomerName STRING,
    SalesTotal FLOAT NOT NULL
)
USING DELTA

/* create an external table by specifying a LOCATION parameter
--%%sql

CREATE TABLE MyExternalTable
USING DELTA
LOCATION 'Files/mydata' 
    */

StatementMeta(, 80dd3465-44ad-49ef-9a42-48fda997cc8d, 3, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [8]:
-- see the history of a table
--%%sql
DESCRIBE HISTORY products

StatementMeta(, 13aa77be-5785-48f6-9acd-82819bd031ce, 10, Finished, Available)

<Spark SQL result set with 2 rows and 15 fields>

In [10]:
--To see the history of an external table
--%%sql

DESCRIBE HISTORY 'Files/covid'

StatementMeta(, 13aa77be-5785-48f6-9acd-82819bd031ce, 12, Finished, Available)

<Spark SQL result set with 1 rows and 15 fields>

In [18]:
# using dataframe to read delta files history
df = spark.read.format("delta").option("versionAsOf", 0).load("Files/covid")
df.show()

StatementMeta(, 13aa77be-5785-48f6-9acd-82819bd031ce, 20, Finished, Available)

+--------+----------+---------+----------+-----------+----------------+-------------+---------------+------------------+-----------+
|Date.Day|Date.Month|Date.Year|Data.Cases|Data.Deaths|Location.Country|Location.Code|Data.Population|Location.Continent|  Data.Rate|
+--------+----------+---------+----------+-----------+----------------+-------------+---------------+------------------+-----------+
|       9|         3|     2020|      1492|        133|           Italy|          ITA|       60359546|            Europe|11.99975891|
|      10|         3|     2020|      1797|         98|           Italy|          ITA|       60359546|            Europe|14.81621482|
|      11|         3|     2020|       977|        167|           Italy|          ITA|       60359546|            Europe|16.28077189|
|      12|         3|     2020|      2313|        196|           Italy|          ITA|       60359546|            Europe|19.98358304|
|      13|         3|     2020|      2651|        189|           Ital

In [22]:
delta_stream.start();

StatementMeta(, 13aa77be-5785-48f6-9acd-82819bd031ce, 24, Finished, Available)

NameError: name 'delta_stream' is not defined

In [None]:
# Spark includes native support for streaming data through Spark Structured Streaming
# https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
# https://learn.microsoft.com/en-us/training/modules/work-delta-lake-tables-fabric/5-use-delta-lake-streaming-data

#Using a delta table as a streaming source
#When using a delta table as a streaming source, only append operations can be included in the stream. 
#Data modifications will cause an error unless you specify the ignoreChanges or ignoreDeletes option.
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .load("Files/covid")

# Process the streaming data in the DataFrame
# For example, you can display it using the console sink
query = stream_df.writeStream.format("console").start()

# Now you can process the streaming data in the dataframe
query.awaitTermination()

In [None]:
# Using a delta table as a streaming sink
#https://learn.microsoft.com/en-us/training/modules/work-delta-lake-tables-fabric/5-use-delta-lake-streaming-data

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = 'Files/streamingdata/'
jsonSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write the stream to a delta table
table_path = 'Files/delta/devicetable'
checkpoint_path = 'Files/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)