
<div  style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://raw.githubusercontent.com/derar-alhussein/Databricks-Certified-Data-Engineer-Associate/main/Includes/images/bookstore_schema.png" alt="Databricks Learning" style="width: 600">
</div>

In [0]:
%run ../Includes/Copy-Datasets

In [0]:
%sql
select * from books

book_id,title,author,category,price
B14,Data Communications and Networking,Behrouz A. Forouzan,Computer Science,34.0
B15,Inside the Java Virtual Machine,Bill Venners,Computer Science,41.0
B13,Linux pocket guide,Daniel J. Barrett,Computer Science,26.0
B10,Beginning Database Design Solutions,Rod Stephens,Computer Science,44.0
B11,Business Intelligence for Dummies,Swain Scheps,Computer Science,38.0
B12,Big Data in Practice,Bernard Marr,Computer Science,30.0
B01,The Soul of a New Machine,Tracy Kidder,Computer Science,49.0
B02,Learning JavaScript Design Patterns,Addy Osmani,Computer Science,28.0
B03,Make Your Own Neural Network,Tariq Rashid,Computer Science,35.0
B07,The Hundred-Page Machine Learning,Andriy Burkov,Computer Science,33.0



## Reading Stream

In [0]:
(spark.readStream
      .table("books")
      .createOrReplaceTempView("books_streaming_tmp_vw")
)


## Displaying Streaming Data

In [0]:
%sql
SELECT * FROM books_streaming_tmp_vw

book_id,title,author,category,price
B14,Data Communications and Networking,Behrouz A. Forouzan,Computer Science,34.0
B15,Inside the Java Virtual Machine,Bill Venners,Computer Science,41.0
B13,Linux pocket guide,Daniel J. Barrett,Computer Science,26.0
B10,Beginning Database Design Solutions,Rod Stephens,Computer Science,44.0
B11,Business Intelligence for Dummies,Swain Scheps,Computer Science,38.0
B12,Big Data in Practice,Bernard Marr,Computer Science,30.0
B01,The Soul of a New Machine,Tracy Kidder,Computer Science,49.0
B02,Learning JavaScript Design Patterns,Addy Osmani,Computer Science,28.0
B03,Make Your Own Neural Network,Tariq Rashid,Computer Science,35.0
B07,The Hundred-Page Machine Learning,Andriy Burkov,Computer Science,33.0


In [0]:
%sql
select author , count(*) from books_streaming_tmp_vw group by author  having count(*)=1

author,count(1)
Mark W. Spong,1
Chris Bernhardt,1
Tariq Rashid,1
Peter Brass,1
Luciano Ramalho,1
Addy Osmani,1
Andriy Burkov,1
Tracy Kidder,1
Swain Scheps,1
François Chollet,1


## Applying Transformations

In [0]:
%sql
SELECT author, count(book_id) AS total_books
FROM books_streaming_tmp_vw
GROUP BY author

author,total_books
Mark W. Spong,1
Chris Bernhardt,1
Tariq Rashid,1
Peter Brass,1
Luciano Ramalho,1
Addy Osmani,1
Andriy Burkov,1
Tracy Kidder,1
Swain Scheps,1
François Chollet,1



## Unsupported Operations

In [0]:
%sql
 SELECT * 
 FROM books_streaming_tmp_vw
 ORDER BY author

In [0]:
%sql
create or replace temp view author_counts_tmp_vwcheck AS ( select author,count(book_id) as total_books from books_streaming_tmp_vw group by author )

In [0]:
%sql
select * from author_counts_tmp_vwcheck

author,total_books
Mark W. Spong,1
Chris Bernhardt,1
Tariq Rashid,1
Peter Brass,1
Luciano Ramalho,1
Addy Osmani,1
Andriy Burkov,1
Tracy Kidder,1
Swain Scheps,1
François Chollet,1



## Persisting Streaming Data

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW author_counts_tmp_vw AS (
  SELECT author, count(book_id) AS total_books
  FROM books_streaming_tmp_vw
  GROUP BY author
)

In [0]:
(spark.table("author_counts_tmp_vw")                               
      .writeStream  
      .trigger(processingTime='4 seconds')
      .outputMode("complete")
      .option("checkpointLocation", "dbfs:/mnt/demo/author_counts_checkpoint")
      .table("author_counts")
)

<pyspark.sql.streaming.query.StreamingQuery at 0x7f68b0a84fd0>

In [0]:
%sql
SELECT *
FROM author_counts

author,total_books
Behrouz A. Forouzan,1
François Chollet,1
Daniel J. Barrett,1
Chris Bernhardt,1
Luciano Ramalho,1
Mark W. Spong,1
Andriy Burkov,1
Tariq Rashid,1
Tracy Kidder,1
Swain Scheps,1


## Adding New Data

In [0]:
%sql
INSERT INTO books
values ("B19", "Introduction to Modeling and Simulation", "Mark W. Spong", "Computer Science", 25),
        ("B20", "Robot Modeling and Control", "Mark W. Spong", "Computer Science", 30),
        ("B21", "Turing's Vision: The Birth of Computer Science", "Chris Bernhardt", "Computer Science", 35)

num_affected_rows,num_inserted_rows
3,3


## Streaming in Batch Mode 

In [0]:
%sql
INSERT INTO books
values ("B16", "Hands-On Deep Learning Algorithms with Python", "Sudharsan Ravichandiran", "Computer Science", 25),
        ("B17", "Neural Network Methods in Natural Language Processing", "Yoav Goldberg", "Computer Science", 30),
        ("B18", "Understanding digital signal processing", "Richard Lyons", "Computer Science", 35)

num_affected_rows,num_inserted_rows
3,3


In [0]:
%sql
SELECT *
FROM author_counts

author,total_books
Sudharsan Ravichandiran,1
Behrouz A. Forouzan,1
François Chollet,1
Daniel J. Barrett,1
Luciano Ramalho,1
Chris Bernhardt,2
Mark W. Spong,3
Richard Lyons,1
Andriy Burkov,1
Yoav Goldberg,1


In [0]:
%sql
INSERT INTO books
values ("B50", "Python for Data Science with Biju", "Biju Thottathil", "Computer Science", 25),
        ("B51", "Neural Network Methods in Natural Language Processing and Machine Learning", "Gourav", "Computer Science", 10)
        

num_affected_rows,num_inserted_rows
2,2


In [0]:
(spark.table("author_counts_tmp_vw")  
 .writeStream
 .trigger(availableNow=True)
 .outputMode("complete")
 .option("checkpointLocation", "dbfs:/mnt/demo/author_counts_checkpoint")
 .table("author_counts")
 .awaitTermination()
)

In [0]:
%sql
SELECT *
FROM author_counts

author,total_books
Sudharsan Ravichandiran,1
Behrouz A. Forouzan,1
François Chollet,1
Daniel J. Barrett,1
Luciano Ramalho,1
Chris Bernhardt,2
Biju Thottathil,1
Mark W. Spong,3
Richard Lyons,1
Andriy Burkov,1
