In [1]:
%%pyspark
df = spark.read.load('abfss://files@datalakes5vexzn.dfs.core.windows.net/products/products.csv', format='csv'
## If header exists uncomment line below
, header=True
)
display(df.limit(10))

In [2]:
delta_table_path = "/delta/products-delta"
df.write.format("delta").save(delta_table_path)

In [3]:
 from delta.tables import *
 from pyspark.sql.functions import *

 # Create a deltaTable object
 deltaTable = DeltaTable.forPath(spark, delta_table_path)

 # Update the table (reduce price of product 771 by 10%)
 deltaTable.update(
     condition = "ProductID == 771",
     set = { "ListPrice": "ListPrice * 0.9" })

 # View the updated data as a dataframe
 deltaTable.toDF().show(10)


In [4]:
new_df = spark.read.format("delta").load(delta_table_path)
new_df.show(10)

In [5]:
 new_df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
 new_df.show(10)


In [6]:
deltaTable.history(10).show(20, False, True)

In [11]:
spark.sql("create database AdventureWorks")
spark.sql("create table AdventureWorks.ProductsExternal using delta location '{0}'".format(delta_table_path))
spark.sql("describe extended AdventureWorks.ProductsExternal").show(truncate=False)

In [12]:
%%sql
use AdventureWorks;

select * from ProductsExternal;

In [13]:
df.write.format("delta").saveAsTable("AdventureWorks.ProductsManaged")
spark.sql("describe extended AdventureWorks.ProductsManaged").show(truncate=False)

In [14]:
%%sql
use AdventureWorks;

select * from ProductsManaged;

In [15]:
%%sql
use AdventureWorks;

show tables;

In [16]:
 %%sql

 use AdventureWorks;

 drop table if exists ProductsExternal;
 drop table if exists  ProductsManaged;


In [17]:
 %%sql

 USE AdventureWorks;

 CREATE TABLE Products
 USING DELTA
 LOCATION '/delta/products-delta';


In [18]:
%%sql
use AdventureWorks;

select * from Products;

In [19]:
 from notebookutils import mssparkutils
 from pyspark.sql.types import *
 from pyspark.sql.functions import *

 # Create a folder
 inputPath = '/data/'
 mssparkutils.fs.mkdirs(inputPath)

 # Create a stream that reads data from the folder, using a JSON schema
 jsonSchema = StructType([
 StructField("device", StringType(), False),
 StructField("status", StringType(), False)
 ])
 iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

 # Write some event data to the folder
 device_data = '''{"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev2","status":"error"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"error"}
 {"device":"Dev2","status":"ok"}
 {"device":"Dev2","status":"error"}
 {"device":"Dev1","status":"ok"}'''
 mssparkutils.fs.put(inputPath + "data.txt", device_data, True)
 print("Source stream created...")


In [20]:
 # Write the stream to a delta table
 delta_stream_table_path = '/delta/iotdevicedata'
 checkpointpath = '/delta/checkpoint'
 deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
 print("Streaming to delta sink...")


In [21]:
 # Read the data in delta format into a dataframe
 df = spark.read.format("delta").load(delta_stream_table_path)
 display(df)


In [22]:
 # create a catalog table based on the streaming sink
 spark.sql("CREATE TABLE IotDeviceData USING DELTA LOCATION '{0}'".format(delta_stream_table_path))


In [23]:
%%sql

select * from IotDeviceData;

In [24]:
 deltastream.stop()