# 1. Create database 'data_db' in HMS

In [2]:
%%sql

-- Check metastore status
--show schemas from hms;
--show tables from hms.data_db;
--drop table hms.data_db.test;
--drop schema hms.data_db;

CREATE DATABASE if not exists data_db LOCATION 's3a://lakehouse/warehouse/data_db';

In [4]:
%%sql

show tables from hms.data_db;

namespace,tableName,isTemporary
data_db,olympic_regions,False
data_db,sales_summary,False
data_db,trips,False
data_db,olympic_events,False
data_db,new_york_taxi,False


# 2. Create and populate 'New york taxi' data model

In [5]:
NEW_YORK_TAXI="hms.data_db.new_york_taxi"

In [25]:
%%sql

drop table if exists hms.data_db.new_york_taxi;

In [27]:
%%sql
CREATE TABLE hms.data_db.new_york_taxi (
tpep_pickup_datetime timestamp,
tpep_dropoff_datetime timestamp,
passenger_count bigint,
trip_distance decimal(10,2),
PULocationID bigint,
DOLocationID bigint,
fare_amount float)
partitioned by (days(tpep_pickup_datetime))
TBLPROPERTIES 
(
'format' = 'parquet',
'format-version' = '2',
'write.parquet.compression-codec' = 'zstd');


In [28]:
%%sql



ALTER TABLE hms.data_db.new_york_taxi WRITE ORDERED BY PULocationID, DOLocationID
-- show create table hms.data_db.new_york_taxi;

In [29]:
taxi_df = spark.read.option("header", True).csv("/home/iceberg/datasets/new_york_taxi/yellow_tripdata_2019-01.csv")
taxi_df.show(2)

+--------------------+---------------------+---------------+-------------+------------+------------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|fare_amount|
+--------------------+---------------------+---------------+-------------+------------+------------+-----------+
|    01-01-2019 00:46|     01-01-2019 00:53|              1|          1.5|         151|         239|          7|
|    01-01-2019 00:59|     01-01-2019 01:18|              1|          2.6|         239|         246|         14|
+--------------------+---------------------+---------------+-------------+------------+------------+-----------+
only showing top 2 rows



In [30]:
from pyspark.sql.functions import col, to_timestamp, round

# Convert the datetime columns and passenger_count column to the appropriate data types
taxi_df = (
    taxi_df.withColumn("tpep_pickup_datetime", to_timestamp("tpep_pickup_datetime", "dd-MM-yyyy HH:mm"))
      .withColumn("tpep_dropoff_datetime", to_timestamp("tpep_dropoff_datetime", "dd-MM-yyyy HH:mm"))
      .withColumn("passenger_count", col("passenger_count").cast("BIGINT"))
      .withColumn("trip_distance", round(col("trip_distance"),2))
      .withColumn("trip_distance", col("trip_distance").cast("decimal(10,2)"))
      .withColumn("PULocationID", col("PULocationID").cast("BIGINT"))
      .withColumn("DOLocationID", col("DOLocationID").cast("BIGINT"))
      .withColumn("fare_amount", col("fare_amount").cast("FLOAT"))
)

In [31]:
#taxi_df.count()
taxi_df.createOrReplaceTempView("temp_taxi")

# Execute the SQL INSERT INTO query
#spark.sql("INSERT INTO hms.data_db.new_york_taxi SELECT * FROM temp_taxi")

In [32]:
spark.sql("INSERT INTO hms.data_db.new_york_taxi SELECT * FROM temp_taxi")

                                                                                

DataFrame[]

In [15]:
# taxi_df.writeTo(NEW_YORK_TAXI).create()
spark.read.table(NEW_YORK_TAXI).show(5)

+--------------------+---------------------+---------------+-------------+------------+------------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|fare_amount|
+--------------------+---------------------+---------------+-------------+------------+------------+-----------+
| 2003-01-01 00:05:00|  2003-01-01 00:05:00|              1|          0.0|         264|         193|        0.0|
| 2003-01-01 00:15:00|  2003-01-01 00:15:00|              6|          0.0|         264|         193|        0.0|
| 2008-12-31 23:22:00|  2009-01-01 00:06:00|              1|        20.67|         132|         239|       52.0|
| 2008-12-31 23:22:00|  2008-12-31 23:34:00|              6|         1.43|         140|         162|        7.5|
| 2008-12-31 23:41:00|  2009-01-01 00:12:00|              6|         4.64|         233|         141|       22.5|
+--------------------+---------------------+---------------+-------------+------------+---------

In [33]:
%%sql

-- SELECT count(1) from hms.data_db.new_york_taxi;

select * from hms.data_db.new_york_taxi limit 5;

tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,PULocationID,DOLocationID,fare_amount
2003-01-01 00:05:00,2003-01-01 00:05:00,1,0.0,264,193,0.0
2003-01-01 00:15:00,2003-01-01 00:15:00,6,0.0,264,193,0.0
2008-12-31 23:22:00,2009-01-01 00:06:00,1,20.67,132,239,52.0
2008-12-31 23:22:00,2008-12-31 23:34:00,6,1.43,140,162,7.5
2008-12-31 23:41:00,2009-01-01 00:12:00,6,4.64,233,141,22.5


In [37]:
%%sql

select * from hms.data_db.new_york_taxi.snapshots; 

committed_at,snapshot_id,parent_id,operation,manifest_list,summary
2024-11-12 06:03:35.578000,1244204739098313036,,append,s3a://lakehouse/warehouse/data_db/new_york_taxi/metadata/snap-1244204739098313036-1-82dee8d2-9fc4-458e-a95c-16ff48dc44f1.avro,"{'spark.app.id': 'local-1731390781641', 'changed-partition-count': '15', 'added-data-files': '16', 'total-equality-deletes': '0', 'added-records': '1048575', 'total-position-deletes': '0', 'added-files-size': '5894973', 'total-delete-files': '0', 'total-files-size': '5894973', 'total-records': '1048575', 'total-data-files': '16'}"
