## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/health_tracker_data_2020_1-4.json"
file_type = "json"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,1577836800.0
0,53.9078900098,Deborah Powell,1577840400.0
0,52.7129593616,Deborah Powell,1577844000.0
0,52.2880422685,Deborah Powell,1577847600.0
0,52.5156095386,Deborah Powell,1577851200.0
0,53.6280743846,Deborah Powell,1577854800.0
0,52.1760037066,Deborah Powell,1577858400.0
0,90.0456721836,Deborah Powell,1577862000.0
0,89.4695644522,Deborah Powell,1577865600.0
0,88.1490304138,Deborah Powell,1577869200.0


In [0]:
# Create a view or table

temp_table_name = "health_tracker_data_2020_1_json"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `health_tracker_data_2020_1_json`

device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,1577836800.0
0,53.9078900098,Deborah Powell,1577840400.0
0,52.7129593616,Deborah Powell,1577844000.0
0,52.2880422685,Deborah Powell,1577847600.0
0,52.5156095386,Deborah Powell,1577851200.0
0,53.6280743846,Deborah Powell,1577854800.0
0,52.1760037066,Deborah Powell,1577858400.0
0,90.0456721836,Deborah Powell,1577862000.0
0,89.4695644522,Deborah Powell,1577865600.0
0,88.1490304138,Deborah Powell,1577869200.0


In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "health_tracker_data_2020_1_json"

df.write.format("parquet").saveAsTable(permanent_table_name)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-1161632427756437>[0m in [0;36m<module>[0;34m[0m
[1;32m      5[0m [0mpermanent_table_name[0m [0;34m=[0m [0;34m"health_tracker_data_2020_1_json"[0m[0;34m[0m[0;34m[0m[0m
[1;32m      6[0m [0;34m[0m[0m
[0;32m----> 7[0;31m [0mdf[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"parquet"[0m[0;34m)[0m[0;34m.[0m[0msaveAsTable[0m[0;34m([0m[0mpermanent_table_name[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/readwriter.py[0m in [0;36msaveAsTable[0;34m(self, name, format, mode, partitionBy, **options)[0m
[1;32m    804[0m         [0;32mif[0m [0mformat[0m [0;32mis[0m [0;32mnot[0m [0;32mNone[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m    805[0m             [0mself[0m[0;34m.[0m[0mformat

In [0]:
%sql
-- #Task 1: Ingest the file for month of January from the remote location and get it into a raw directory in dbfs. (Bronze layer) You can either download all the files to your local machine and ingest it or you can use python urllib to retrieve and ingest file at a dbfs location.
-- # Task 2: Review the data and analyse it. What’s the format of data? List out columns present in data along with their data type
-- # Task 3: Extract the data for first month and visualise it
select * from health_tracker_data_2020_1_json

[0;36m  File [0;32m"<command-129576151147163>"[0;36m, line [0;32m1[0m
[0;31m    *#(Task, 1:, Ingest, the, file, for, month, of, January, from, the, remote, location, and, get, it, into, a, raw, directory, in, dbfs., (Bronze, layer), You, can, either, download, all, the, files, to, your, local, machine, and, ingest, it, or, you, can, use, python, urllib, to, retrieve, and, ingest, file, at, a, dbfs, location.)[0m
[0m     ^[0m
[0;31mSyntaxError[0m[0;31m:[0m invalid syntax


In [0]:
%scala
val df = spark.read.json("/FileStore/tables/health_tracker_data_2020_1-4.json")
df.printSchema()

In [0]:
%scala
display(df)

device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,1577836800.0
0,53.9078900098,Deborah Powell,1577840400.0
0,52.7129593616,Deborah Powell,1577844000.0
0,52.2880422685,Deborah Powell,1577847600.0
0,52.5156095386,Deborah Powell,1577851200.0
0,53.6280743846,Deborah Powell,1577854800.0
0,52.1760037066,Deborah Powell,1577858400.0
0,90.0456721836,Deborah Powell,1577862000.0
0,89.4695644522,Deborah Powell,1577865600.0
0,88.1490304138,Deborah Powell,1577869200.0


In [0]:
%scala
val df1 = df
          .withColumn("time", df("time").cast("timestamp"))
display(df1)


device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,2020-01-01T00:00:00.000+0000
0,53.9078900098,Deborah Powell,2020-01-01T01:00:00.000+0000
0,52.7129593616,Deborah Powell,2020-01-01T02:00:00.000+0000
0,52.2880422685,Deborah Powell,2020-01-01T03:00:00.000+0000
0,52.5156095386,Deborah Powell,2020-01-01T04:00:00.000+0000
0,53.6280743846,Deborah Powell,2020-01-01T05:00:00.000+0000
0,52.1760037066,Deborah Powell,2020-01-01T06:00:00.000+0000
0,90.0456721836,Deborah Powell,2020-01-01T07:00:00.000+0000
0,89.4695644522,Deborah Powell,2020-01-01T08:00:00.000+0000
0,88.1490304138,Deborah Powell,2020-01-01T09:00:00.000+0000


In [0]:
%scala
df1.printSchema

In [0]:
%scala
import org.apache.spark.sql.functions.{col}
val stringColumns = df1.schema.fields.filter(_.dataType.typeName == "long")
df1.select(stringColumns.map(x=>col(x.name)):_*).show()

In [0]:
%sql
create table if not exists health
(
   device_id int,
   heartrate double,
   name varchar(100),
   time timestamp
)

In [0]:
%scala
val df2 = df1.withColumn("device_id", df1("device_id").cast("Integer"))
display(df2)

device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,2020-01-01T00:00:00.000+0000
0,53.9078900098,Deborah Powell,2020-01-01T01:00:00.000+0000
0,52.7129593616,Deborah Powell,2020-01-01T02:00:00.000+0000
0,52.2880422685,Deborah Powell,2020-01-01T03:00:00.000+0000
0,52.5156095386,Deborah Powell,2020-01-01T04:00:00.000+0000
0,53.6280743846,Deborah Powell,2020-01-01T05:00:00.000+0000
0,52.1760037066,Deborah Powell,2020-01-01T06:00:00.000+0000
0,90.0456721836,Deborah Powell,2020-01-01T07:00:00.000+0000
0,89.4695644522,Deborah Powell,2020-01-01T08:00:00.000+0000
0,88.1490304138,Deborah Powell,2020-01-01T09:00:00.000+0000


In [0]:
%scala
df2.write.mode(SaveMode.Overwrite).saveAsTable("health")

In [0]:
%sql
select * from health

device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,2020-01-01T00:00:00.000+0000
0,53.9078900098,Deborah Powell,2020-01-01T01:00:00.000+0000
0,52.7129593616,Deborah Powell,2020-01-01T02:00:00.000+0000
0,52.2880422685,Deborah Powell,2020-01-01T03:00:00.000+0000
0,52.5156095386,Deborah Powell,2020-01-01T04:00:00.000+0000
0,53.6280743846,Deborah Powell,2020-01-01T05:00:00.000+0000
0,52.1760037066,Deborah Powell,2020-01-01T06:00:00.000+0000
0,90.0456721836,Deborah Powell,2020-01-01T07:00:00.000+0000
0,89.4695644522,Deborah Powell,2020-01-01T08:00:00.000+0000
0,88.1490304138,Deborah Powell,2020-01-01T09:00:00.000+0000


In [0]:
%scala
val rd = spark.read.table("health")

In [0]:
%scala
import org.apache.spark.sql.functions._
val df2 = rd
  .withColumn("date", $"Time".cast("date"))
  .withColumn("time2", date_format($"time", "H:m:S"))
df2.show()

In [0]:
%sql
select * from health

device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,2020-01-01T00:00:00.000+0000
0,53.9078900098,Deborah Powell,2020-01-01T01:00:00.000+0000
0,52.7129593616,Deborah Powell,2020-01-01T02:00:00.000+0000
0,52.2880422685,Deborah Powell,2020-01-01T03:00:00.000+0000
0,52.5156095386,Deborah Powell,2020-01-01T04:00:00.000+0000
0,53.6280743846,Deborah Powell,2020-01-01T05:00:00.000+0000
0,52.1760037066,Deborah Powell,2020-01-01T06:00:00.000+0000
0,90.0456721836,Deborah Powell,2020-01-01T07:00:00.000+0000
0,89.4695644522,Deborah Powell,2020-01-01T08:00:00.000+0000
0,88.1490304138,Deborah Powell,2020-01-01T09:00:00.000+0000


In [0]:
%sql
alter table health alter column device_id comment "Id cua thiet bi";
alter table health alter column heartrate comment "Nhip tim";
alter table health alter column name comment "Ten benh nhan";
alter table health alter column time comment "Thoi gian do";

In [0]:
--Task 7
%sql
Describe health



col_name,data_type,comment
device_id,int,Id cua thiet bi
heartrate,double,Nhip tim
name,varchar(100),Ten benh nhan
time,timestamp,Thoi gian do
,,
# Partitioning,,
Not partitioned,,


In [0]:
%scala
display(df2)

device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,2020-01-01T00:00:00.000+0000
0,53.9078900098,Deborah Powell,2020-01-01T01:00:00.000+0000
0,52.7129593616,Deborah Powell,2020-01-01T02:00:00.000+0000
0,52.2880422685,Deborah Powell,2020-01-01T03:00:00.000+0000
0,52.5156095386,Deborah Powell,2020-01-01T04:00:00.000+0000
0,53.6280743846,Deborah Powell,2020-01-01T05:00:00.000+0000
0,52.1760037066,Deborah Powell,2020-01-01T06:00:00.000+0000
0,90.0456721836,Deborah Powell,2020-01-01T07:00:00.000+0000
0,89.4695644522,Deborah Powell,2020-01-01T08:00:00.000+0000
0,88.1490304138,Deborah Powell,2020-01-01T09:00:00.000+0000


In [0]:
%scala
// task 8
val mindf = df2.groupBy("device_id").min("heartrate")
val maxdf = df2.groupBy("device_id").max("heartrate")
val avgdf = df2.groupBy("device_id").mean("heartrate")

