In [0]:
#Eliminando arquivos caso existam nas pastas, pois serão destinados as tabelas Delta
#%fs rm -r /tmp/delta/events
#%fs rm -r /tmp/delta/checkpoint

# Listando os diversos arquivos Json para carga no Delta Lake
#%fs ls /databricks-datasets/structured-streaming/events/


In [0]:

%fs ls /databricks-datasets/structured-streaming/events/

path,name,size,modificationTime
dbfs:/databricks-datasets/structured-streaming/events/file-0.json,file-0.json,72530,1469673865000
dbfs:/databricks-datasets/structured-streaming/events/file-1.json,file-1.json,72961,1469673866000
dbfs:/databricks-datasets/structured-streaming/events/file-10.json,file-10.json,73025,1469673878000
dbfs:/databricks-datasets/structured-streaming/events/file-11.json,file-11.json,72999,1469673879000
dbfs:/databricks-datasets/structured-streaming/events/file-12.json,file-12.json,72987,1469673880000
dbfs:/databricks-datasets/structured-streaming/events/file-13.json,file-13.json,73006,1469673881000
dbfs:/databricks-datasets/structured-streaming/events/file-14.json,file-14.json,73003,1469673882000
dbfs:/databricks-datasets/structured-streaming/events/file-15.json,file-15.json,73007,1469673883000
dbfs:/databricks-datasets/structured-streaming/events/file-16.json,file-16.json,72978,1469673885000
dbfs:/databricks-datasets/structured-streaming/events/file-17.json,file-17.json,73008,1469673886000


In [0]:
#Lendo um dos arquivos JSON
dataf3 = spark.read.json("/databricks-datasets/structured-streaming/events/file-1.json")
dataf3.show()

+------+----------+
|action|      time|
+------+----------+
| Close|1469506633|
| Close|1469506636|
|  Open|1469506642|
|  Open|1469506644|
|  Open|1469506646|
|  Open|1469506647|
|  Open|1469506648|
|  Open|1469506651|
| Close|1469506653|
|  Open|1469506653|
|  Open|1469506656|
|  Open|1469506659|
|  Open|1469506659|
| Close|1469506660|
| Close|1469506660|
|  Open|1469506662|
| Close|1469506668|
| Close|1469506669|
|  Open|1469506670|
|  Open|1469506670|
+------+----------+
only showing top 20 rows



In [0]:
%sql

-- Criando um banco de dados em separado e uma tabela Delta que irá receber os dados do Json em Streaming
CREATE DATABASE IF NOT EXISTS db_stream;
USE db_stream;
DROP TABLE IF EXISTS db_stream.tab_stream;
CREATE TABLE db_stream.tab_stream(
 action STRING,
 time STRING
 )
USING delta
LOCATION "/tmp/delta/events"


In [0]:
# Executando a carga na pasta do Delta Lake, onde serão armazenados os dados
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Streaming reads and append into delta table (Start !)
read_schema = StructType([
 StructField("action", StringType(), False),
 StructField("time", StringType(), True)
 ])

df2 = (spark.readStream
 .option("maxFilesPerTrigger", "1")
 .schema(read_schema)
 .json("/databricks-datasets/structured-streaming/events/"))

(df2.writeStream
 .format("delta")
 .outputMode("append")
 .option("checkpointLocation", "/tmp/delta/checkpoint")
 .option("path", "/tmp/delta/events").start())

Out[10]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f86ec91eca0>

In [0]:
%sql
-- Exibindo os dados em tempo real oriunda da tabela Delta

select distinct action, count(*) from db_stream.tab_stream
group by action


action,count(1)
Open,50000
Close,50000


In [0]:
%sql
-- Listando os históricos registrados na tabela Delta
DESCRIBE HISTORY '/tmp/delta/events'

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
50,2024-02-21T21:12:18.000+0000,7406534468247714,anacadriano20@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 11df48df-41ba-4d00-aaee-4ca41f34dedd, epochId -> 49)",,List(1643776971945716),0221-202852-us9uczep,49.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 12486, numAddedFiles -> 1)",,Databricks-Runtime/12.2.x-scala2.12
49,2024-02-21T21:12:13.000+0000,7406534468247714,anacadriano20@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 11df48df-41ba-4d00-aaee-4ca41f34dedd, epochId -> 48)",,List(1643776971945716),0221-202852-us9uczep,48.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11682, numAddedFiles -> 1)",,Databricks-Runtime/12.2.x-scala2.12
48,2024-02-21T21:12:09.000+0000,7406534468247714,anacadriano20@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 11df48df-41ba-4d00-aaee-4ca41f34dedd, epochId -> 47)",,List(1643776971945716),0221-202852-us9uczep,47.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11810, numAddedFiles -> 1)",,Databricks-Runtime/12.2.x-scala2.12
47,2024-02-21T21:12:06.000+0000,7406534468247714,anacadriano20@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 11df48df-41ba-4d00-aaee-4ca41f34dedd, epochId -> 46)",,List(1643776971945716),0221-202852-us9uczep,46.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11501, numAddedFiles -> 1)",,Databricks-Runtime/12.2.x-scala2.12
46,2024-02-21T21:12:02.000+0000,7406534468247714,anacadriano20@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 11df48df-41ba-4d00-aaee-4ca41f34dedd, epochId -> 45)",,List(1643776971945716),0221-202852-us9uczep,45.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11711, numAddedFiles -> 1)",,Databricks-Runtime/12.2.x-scala2.12
45,2024-02-21T21:11:58.000+0000,7406534468247714,anacadriano20@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 11df48df-41ba-4d00-aaee-4ca41f34dedd, epochId -> 44)",,List(1643776971945716),0221-202852-us9uczep,44.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11731, numAddedFiles -> 1)",,Databricks-Runtime/12.2.x-scala2.12
44,2024-02-21T21:11:54.000+0000,7406534468247714,anacadriano20@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 11df48df-41ba-4d00-aaee-4ca41f34dedd, epochId -> 43)",,List(1643776971945716),0221-202852-us9uczep,43.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11445, numAddedFiles -> 1)",,Databricks-Runtime/12.2.x-scala2.12
43,2024-02-21T21:11:51.000+0000,7406534468247714,anacadriano20@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 11df48df-41ba-4d00-aaee-4ca41f34dedd, epochId -> 42)",,List(1643776971945716),0221-202852-us9uczep,42.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11634, numAddedFiles -> 1)",,Databricks-Runtime/12.2.x-scala2.12
42,2024-02-21T21:11:46.000+0000,7406534468247714,anacadriano20@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 11df48df-41ba-4d00-aaee-4ca41f34dedd, epochId -> 41)",,List(1643776971945716),0221-202852-us9uczep,41.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11823, numAddedFiles -> 1)",,Databricks-Runtime/12.2.x-scala2.12
41,2024-02-21T21:11:41.000+0000,7406534468247714,anacadriano20@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 11df48df-41ba-4d00-aaee-4ca41f34dedd, epochId -> 40)",,List(1643776971945716),0221-202852-us9uczep,40.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 2000, numOutputBytes -> 11538, numAddedFiles -> 1)",,Databricks-Runtime/12.2.x-scala2.12
