## Use delta tables for streaming data

Delta lake supports streaming data. Delta tables can be a *sink* or a *source* for data streams created using the Spark Structured Streaming API. In this example, you'll use a delta table as a sink for some streaming data in a simulated internet of things (IoT) scenario.

First, let's get some simulated device data in JSON format. Run the following cell to download a JSON file that looks like this:

```json
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
```

In [None]:
%sh
rm -r /dbfs/device_stream
mkdir /dbfs/device_stream
wget -O /dbfs/device_stream/devices1.json https://raw.githubusercontent.com/Kiran-255666/Pyspark_18-01-24/main/devices1.json


Now you're ready to use Spark Structured Steraming to create a stream based on the folder containing the JSON device data.

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads data from the folder, using a JSON schema
inputPath = '/device_stream/'
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
print("Source stream created...")

Now you'll take the stream of data you're reading from the folder, and perpetually write it to a delta table folder:

In [None]:
# Write the stream to a delta table
delta_stream_table_path = '/delta/iotdevicedata'
checkpointpath = '/delta/checkpoint'
deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
print("Streaming to delta sink...")

To load the streamed table data, just read the delta table folder source like any other dataframe:

In [None]:
# Read the data in delta format into a dataframe
df = spark.read.format("delta").load(delta_stream_table_path)
display(df)

You can also create a table based on the streaming delta table folder:

In [None]:
# create a catalog table based on the streaming sink
spark.sql("CREATE TABLE IotDeviceData USING DELTA LOCATION '{0}'".format(delta_stream_table_path))

You can query the table just like any other:

In [None]:
%sql
SELECT * FROM IotDeviceData;

Now let's add some fresh device data to the stream.

In [None]:
%sh
wget -O /dbfs/device_stream/devices2.json https://raw.githubusercontent.com/Kiran-255666/Pyspark_18-01-24/main/devices2.json

The new JSON data in the device folder is read into the stream and written to the delta folder, where it is reflected in the table:

In [None]:
%sql
SELECT * FROM IotDeviceData;

To stop the stream, use its **stop** method:

In [None]:
deltastream.stop()