In [0]:
catalog = "tut_catalog"
schema = "default"
volume = "tut_volume"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
file_name = "baby_names.csv"
table_name = "baby_names"
path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
path_table = catalog + "." + schema
print(path_table) # Show the complete path
print(path_volume) # Show the complete path

In [0]:
dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")

In [0]:
%sql

SHOW VOLUMES

In [0]:
df = spark.read.csv(f"{path_volume}/{file_name}",
  header=True,
  inferSchema=True,
  sep=",")

In [0]:
display(df)

Databricks visualization. Run in Databricks to view.

In [0]:
df = df.withColumnRenamed("First Name", "First_Name")
df.printSchema()

In [0]:
df.write.mode("overwrite").saveAsTable(f"{path_table}" + "." + f"{table_name}")


In [0]:
f'{path_table}.{table_name}'

In [0]:
data = [[2022, "CARL", "Albany", "M", 42]]

df = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")

In [0]:
display(df)

In [0]:
(df.coalesce(1)
    .write
    .option("header", "true")
    .mode("overwrite")
    .csv(f"{path_volume}/{file_name}"))

In [0]:
f"{path_volume}/{file_name}"

In [0]:
df1 = spark.read.csv(f"{path_volume}/{file_name}",
    header=True,
    inferSchema=True,
    sep=",")
display(df1)

In [0]:
df.write.mode("append").insertInto(f"{path_table}.{table_name}")
display(spark.sql(f"SELECT * FROM {path_table}.{table_name} WHERE Year = 2022"))

In [0]:
f"{path_table}.{table_name}"

In [0]:
display(spark.sql(f"SELECT * FROM {path_table}.{table_name} WHERE Year = 2022"))

In [0]:
silver_table_name = "baby_names_prepared"
gold_table_name = "top_baby_names_2021"
path_table = catalog + "." + schema
print(path_table) # Show the complete path

In [0]:
df_raw = spark.read.table(f"{path_table}.{table_name}")
display(df_raw)

In [0]:
from pyspark.sql.functions import col, initcap, when

# Rename "Year" column to "Year_Of_Birth"
df_rename_year = df_raw.withColumnRenamed("Year", "Year_Of_Birth")

# Change the case of "First_Name" column to initcap
df_init_caps = df_rename_year.withColumn("First_Name", initcap(col("First_Name").cast("string")))

# Update column values from "M" to "male" and "F" to "female"
df_baby_names_sex = df_init_caps.withColumn(
"Sex",
    when(col("Sex") == "M", "Male")
    .when(col("Sex") == "F", "Female")
)

# display
display(df_baby_names_sex)

# Save DataFrame to table
df_baby_names_sex.write.mode("overwrite").saveAsTable(f"{path_table}.{silver_table_name}")

In [0]:
df_rename_year.head(10)

In [0]:
df_init_caps.head(10)

In [0]:
df_baby_names_sex.head(10)

In [0]:
from pyspark.sql.functions import expr, sum, desc
from pyspark.sql import Window

# Count of names for entire state of New York by sex
df_baby_names_2021_grouped=(df_baby_names_sex.filter(expr("Year_Of_Birth == 2021"))
.groupBy("Sex", "First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count")))

# Display data
display(df_baby_names_2021_grouped)

# Save DataFrame to a table
df_baby_names_2021_grouped.write.mode("overwrite").saveAsTable(f"{path_table}.{gold_table_name}")

Databricks visualization. Run in Databricks to view.