Downloading data files into filesystem used by cluster.

In [0]:
 %sh
 rm -r /dbfs/delta_lab
 mkdir /dbfs/delta_lab
 wget -O /dbfs/delta_lab/covid_data.csv https://github.com/MicrosoftLearning/mslearn-databricks/raw/main/data/covid_data.csv

rm: cannot remove '/dbfs/delta_lab': No such file or directory
--2024-12-06 15:33:45--  https://github.com/MicrosoftLearning/mslearn-databricks/raw/main/data/covid_data.csv
Resolving github.com (github.com)... 140.82.114.3
Connecting to github.com (github.com)|140.82.114.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/covid_data.csv [following]
--2024-12-06 15:33:45--  https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/covid_data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 222235 (217K) [text/plain]
Saving to: ‘/dbfs/delta_lab/covid_data.csv’

     0K .......... .......... .......... .......... .......... 23%

 This code defines a Delta Live Table that will be populated by the raw data previously downloaded

In [0]:
%sql
 CREATE OR REFRESH LIVE TABLE raw_covid_data
 COMMENT "COVID sample dataset. This data was ingested from the COVID-19 Data Repository by the Center for Systems Science and Engineering (CSSE) at Johns Hopkins University."
 AS
 SELECT
   Last_Update,
   Country_Region,
   Confirmed,
   Deaths,
   Recovered
 FROM read_files('dbfs:/delta_lab/covid_data.csv', format => 'csv', header => true)

Name,Type
Last_Update,date
Country_Region,string
Confirmed,int
Deaths,int
Recovered,int


The following code query, filter, and format the data from the previous table before analysis.


In [0]:
%sql
 CREATE OR REFRESH LIVE TABLE processed_covid_data(
   CONSTRAINT valid_country_region EXPECT (Country_Region IS NOT NULL) ON VIOLATION FAIL UPDATE
 )
 COMMENT "Formatted and filtered data for analysis."
 AS
 SELECT
     TO_DATE(Last_Update, 'MM/dd/yyyy') as Report_Date,
     Country_Region,
     Confirmed,
     Deaths,
     Recovered
 FROM live.raw_covid_data;

Name,Type
Report_Date,date
Country_Region,string
Confirmed,int
Deaths,int
Recovered,int


Following code create an enriched data view for further analysis, once the pipeline is successfully executed.

In [0]:
%sql
 CREATE OR REFRESH LIVE TABLE aggregated_covid_data
 COMMENT "Aggregated daily data for the US with total counts."
 AS
 SELECT
     Report_Date,
     sum(Confirmed) as Total_Confirmed,
     sum(Deaths) as Total_Deaths,
     sum(Recovered) as Total_Recovered
 FROM live.processed_covid_data
 GROUP BY Report_Date;

Name,Type
Report_Date,date
Total_Confirmed,bigint
Total_Deaths,bigint
Total_Recovered,bigint


Once the pipeline is succsfully execued, all the create new tables can be verified in a storage location with:

In [None]:
 display(dbutils.fs.ls("dbfs:/pipelines/delta_lab/tables"))

Result can be viewed as an visualization

In [None]:
df = spark.read.format("delta").load('/pipelines/delta_lab/tables/aggregated_covid_data')
display(df)

Certain post actions which can be done are:

Above the table of results, select + and then select Visualization to view the visualization editor, and then apply the following options:
Visualization type: Line
X Column: Report_Date
Y Column: Add a new column and select Total_Confirmed. Apply the Sum aggregation.
Save the visualization and view the resulting chart in the notebook.