# Create a table

In [0]:
catalog = "databricks_ws_35ef6e3c_9b13_4349_a245_c6e3e49c3f11"

In [0]:
%sql

USE CATALOG databricks_ws_35ef6e3c_9b13_4349_a245_c6e3e49c3f11


In [0]:
%sql
CREATE TABLE IF NOT EXISTS default.department
(
   deptcode   INT,
   deptname  STRING,
   location  STRING
);


In [0]:
%sql
INSERT INTO default.department VALUES
   (10, 'FINANCE', 'EDINBURGH'),
   (20, 'SOFTWARE', 'PADDINGTON');


# Query and visualize data from a notebook

In [0]:
%sql
SELECT * FROM samples.nyctaxi.trips

Databricks visualization. Run in Databricks to view.

#Import and visualize CSV data from a notebook
https://docs.databricks.com/en/getting-started/import-visualize-data.html

In [0]:
catalog = "databricks_ws_35ef6e3c_9b13_4349_a245_c6e3e49c3f11"
schema = "default"
volume = "volume"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
file_name = "baby_names.csv"
table_name = "baby_names"
path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
path_table = catalog + "." + schema
print(path_table) # Show the complete path
print(path_volume) # Show the complete path


In [0]:
dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")

In [0]:
df = spark.read.csv(f"{path_volume}/{file_name}",
  header=True,
  inferSchema=True,
  sep=",")


In [0]:
display(df)

Databricks visualization. Run in Databricks to view.

In [0]:
df = df.withColumnRenamed("First Name", "First_Name")
df.printSchema()

In [0]:
df.write.mode("overwrite").saveAsTable(f"{path_table}" + "." + f"{table_name}")

# Ingest and insert additional data

In [0]:
# catalog = "<catalog_name>"
# schema = "<schema_name>"
# volume = "<volume_name>"
file_name = "new_baby_names.csv"
table_name = "baby_names"
path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
path_table = catalog + "." + schema
print(path_table) # Show the complete path
print(path_volume) # Show the complete path

In [0]:
data = [[2022, "CARL", "Albany", "M", 42]]

df = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
# display(df)
(df.coalesce(1)
    .write
    .option("header", "true")
    .mode("overwrite")
    .csv(f"{path_volume}/{file_name}"))


In [0]:
df1 = spark.read.csv(f"{path_volume}/{file_name}",
    header=True,
    inferSchema=True,
    sep=",")
display(df1)

In [0]:
df.write.mode("append").insertInto(f"{path_table}.{table_name}")
display(spark.sql(f"SELECT * FROM {path_table}.{table_name} WHERE Year = 2022"))

# Enhance and cleanse data

In [0]:
# catalog = "<catalog_name>"
# schema = "<schema_name>"
table_name = "baby_names"
silver_table_name = "baby_names_prepared"
gold_table_name = "top_baby_names_2021"
path_table = catalog + "." + schema
print(path_table) # Show the complete path


In [0]:
df_raw = spark.read.table(f"{path_table}.{table_name}")
display(df_raw)

In [0]:
from pyspark.sql.functions import col, initcap, when

# Rename "Year" column to "Year_Of_Birth"
df_rename_year = df_raw.withColumnRenamed("Year", "Year_Of_Birth")

# Change the case of "First_Name" column to initcap
df_init_caps = df_rename_year.withColumn("First_Name", initcap(col("First_Name").cast("string")))

# Update column values from "M" to "male" and "F" to "female"
df_baby_names_sex = df_init_caps.withColumn(
"Sex",
    when(col("Sex") == "M", "Male")
    .when(col("Sex") == "F", "Female")
)

# display
display(df_baby_names_sex)

# Save DataFrame to table
df_baby_names_sex.write.mode("overwrite").saveAsTable(f"{path_table}.{silver_table_name}")


In [0]:
from pyspark.sql.functions import expr, sum, desc
from pyspark.sql import Window

# Count of names for entire state of New York by sex
df_baby_names_2021_grouped=(df_baby_names_sex
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("Sex", "First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count")))

# Display data
display(df_baby_names_2021_grouped)

# Save DataFrame to a table
df_baby_names_2021_grouped.write.mode("overwrite").saveAsTable(f"{path_table}.{gold_table_name}")

Databricks visualization. Run in Databricks to view.

# Run your first ETL workload on Databricks


In [0]:
# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))


In [0]:
df = spark.read.table(table_name)

In [0]:
display(df)