In [0]:
%sql
USE CATALOG workspace;
USE SCHEMA learning;

In [0]:
%sql
SELECT *
FROM read_files(
  'dbfs:/Volumes/workspace/learning/learn/nyctaxi_yellow_parquet',
  format => 'parquet'
)

LIMIT 10


Adding Columns :
+ Pickup and Dropoff Timestamp -> Date
+ \+ last modification timestamp
+ \+ input file name
+ \+ current timestamp \(for tracking ingestion time\)


In [0]:
%sql
SELECT 
  *,
  CAST(pickup_datetime AS DATE) AS pickup_date,
  CAST(dropoff_datetime AS DATE) AS dropoff_date,
  _metadata.file_modification_time AS file_modification_time,
  _metadata.file_name AS source_file,
  current_timestamp() AS ingestion_time
FROM read_files(
  'dbfs:/Volumes/workspace/learning/learn/nyctaxi_yellow_parquet',
  format => 'parquet')
LIMIT 10


In [0]:
%sql
DROP TABLE IF EXISTS nyc_taxi_yellow_bronze;

CREATE TABLE nyc_taxi_yellow_bronze AS
SELECT
*,
CAST(pickup_datetime AS DATE) AS pickup_date,
CAST(dropoff_datetime AS DATE) AS dropoff_date,
_metadata.file_modification_time AS file_modification_time,
_metadata.file_name AS source_file,
current_timestamp() as ingestion_time
FROM read_files(
  "dbfs:/Volumes/workspace/learning/learn/nyctaxi_yellow_parquet",
  format => "parquet"
);

SELECT *
FROM nyc_taxi_yellow_bronze
LIMIT 10

In [0]:
from pyspark.sql.functions import col, current_timestamp
from pyspark.sql.types import DateType

df = (spark
      .read
      .format('parquet')
      .load("dbfs:/Volumes/workspace/learning/learn/nyctaxi_yellow_parquet/")
      )

df_with_metadata = (
      df.withColumn("pickup_date",col("pickup_datetime").cast(DateType()))
      .withColumn("dropoff_date",col("dropoff_datetime").cast(DateType()))
      .withColumn("source_file",col("_metadata.file_name"))
      .withColumn("ingestion_time", current_timestamp())
)

(df_with_metadata
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable(f"workspace.learning.nyctaxi_bronze_python")
 )

In [0]:
table = spark.table(f"workspace.learning.nyctaxi_bronze_python")
display(table)