In [0]:
# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fce6c85a260>

In [0]:
df = spark.read.table(table_name)

In [0]:
display(df)

action,time,_rescued_data,source_file,processing_time
Close,1469571524,,/databricks-datasets/structured-streaming/events/file-19.json,2023-11-13T22:27:01.635Z
Open,1469571526,,/databricks-datasets/structured-streaming/events/file-19.json,2023-11-13T22:27:01.635Z
Close,1469571532,,/databricks-datasets/structured-streaming/events/file-19.json,2023-11-13T22:27:01.635Z
Open,1469571535,,/databricks-datasets/structured-streaming/events/file-19.json,2023-11-13T22:27:01.635Z
Open,1469571536,,/databricks-datasets/structured-streaming/events/file-19.json,2023-11-13T22:27:01.635Z
Open,1469571536,,/databricks-datasets/structured-streaming/events/file-19.json,2023-11-13T22:27:01.635Z
Open,1469571537,,/databricks-datasets/structured-streaming/events/file-19.json,2023-11-13T22:27:01.635Z
Close,1469571538,,/databricks-datasets/structured-streaming/events/file-19.json,2023-11-13T22:27:01.635Z
Open,1469571539,,/databricks-datasets/structured-streaming/events/file-19.json,2023-11-13T22:27:01.635Z
Close,1469571550,,/databricks-datasets/structured-streaming/events/file-19.json,2023-11-13T22:27:01.635Z


In [0]:
sparkDF = spark.read.csv("/databricks-datasets/bikeSharing/data-001/day.csv", header="true", inferSchema="true")
display(sparkDF)

instant,dteday,season,yr,mnth,holiday,weekday,workingday,weathersit,temp,atemp,hum,windspeed,casual,registered,cnt
1,2011-01-01,1,0,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331,654,985
2,2011-01-02,1,0,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131,670,801
3,2011-01-03,1,0,1,0,1,1,1,0.196364,0.189405,0.437273,0.248309,120,1229,1349
4,2011-01-04,1,0,1,0,2,1,1,0.2,0.212122,0.590435,0.160296,108,1454,1562
5,2011-01-05,1,0,1,0,3,1,1,0.226957,0.22927,0.436957,0.1869,82,1518,1600
6,2011-01-06,1,0,1,0,4,1,1,0.204348,0.233209,0.518261,0.0895652,88,1518,1606
7,2011-01-07,1,0,1,0,5,1,2,0.196522,0.208839,0.498696,0.168726,148,1362,1510
8,2011-01-08,1,0,1,0,6,0,2,0.165,0.162254,0.535833,0.266804,68,891,959
9,2011-01-09,1,0,1,0,0,0,1,0.138333,0.116175,0.434167,0.36195,54,768,822
10,2011-01-10,1,0,1,0,1,1,1,0.150833,0.150888,0.482917,0.223267,41,1280,1321
