
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
dbutils.fs.ls('/FileStore/tables/')

In [0]:
df_parqueta = spark.read.format('parquet') \
    .option('inferSchema', True) \
    .load('/FileStore/tables/yellow_tripdata_2025_01.parquet/')

df_parquete = spark.read.format('parquet') \
    .option('inferSchema', True) \
    .load('/FileStore/tables/yellow_tripdata_2025_02.parquet/')

df_parqueti = spark.read.format('parquet') \
    .option('inferSchema', True) \
    .load('/FileStore/tables/yellow_tripdata_2025_03.parquet/')


In [0]:
df_parqueta.display()

df_parquete.display()

df_parqueti.display()



In [0]:
from pyspark.sql.types import *

df_parqueta = spark.read \
    .format("parquet") \
    .schema(StructType([
        StructField("VendorID", IntegerType(), True),
        StructField("tpep_pickup_datetime", TimestampType(), True),
        StructField("tpep_dropoff_datetime", TimestampType(), True),
        StructField("passenger_count", LongType(), True),
        StructField("trip_distance", DoubleType(), True),
        StructField("RatecodeID", LongType(), True),
        StructField("store_and_fwd_flag", StringType(), True),
        StructField("PULocationID", IntegerType(), True),
        StructField("DOLocationID", IntegerType(), True),
        StructField("payment_type", LongType(), True),
        StructField("fare_amount", DoubleType(), True),
        StructField("extra", DoubleType(), True),
        StructField("mta_tax", DoubleType(), True),
        StructField("tip_amount", DoubleType(), True),
        StructField("tolls_amount", DoubleType(), True),
        StructField("improvement_surcharge", DoubleType(), True),
        StructField("total_amount", DoubleType(), True),
        StructField("congestion_surcharge", DoubleType(), True),
        StructField("airport_fee", DoubleType(), True),
        StructField("cbd_congestion_fee", DoubleType(), True)
    ])) \
    .load("/FileStore/tables/yellow_tripdata_2025_01.parquet/")


In [0]:
from pyspark.sql.types import *

df_parquete = spark.read \
    .format("parquet") \
    .schema(StructType([
        StructField("VendorID", IntegerType(), True),
        StructField("tpep_pickup_datetime", TimestampType(), True),
        StructField("tpep_dropoff_datetime", TimestampType(), True),
        StructField("passenger_count", LongType(), True),
        StructField("trip_distance", DoubleType(), True),
        StructField("RatecodeID", LongType(), True),
        StructField("store_and_fwd_flag", StringType(), True),
        StructField("PULocationID", IntegerType(), True),
        StructField("DOLocationID", IntegerType(), True),
        StructField("payment_type", LongType(), True),
        StructField("fare_amount", DoubleType(), True),
        StructField("extra", DoubleType(), True),
        StructField("mta_tax", DoubleType(), True),
        StructField("tip_amount", DoubleType(), True),
        StructField("tolls_amount", DoubleType(), True),
        StructField("improvement_surcharge", DoubleType(), True),
        StructField("total_amount", DoubleType(), True),
        StructField("congestion_surcharge", DoubleType(), True),
        StructField("airport_fee", DoubleType(), True),
        StructField("cbd_congestion_fee", DoubleType(), True)
    ])) \
    .load("/FileStore/tables/yellow_tripdata_2025_02.parquet/")


In [0]:
from pyspark.sql.types import *

df_parqueti = spark.read \
    .format("parquet") \
    .schema(StructType([
        StructField("VendorID", IntegerType(), True),
        StructField("tpep_pickup_datetime", TimestampType(), True),
        StructField("tpep_dropoff_datetime", TimestampType(), True),
        StructField("passenger_count", LongType(), True),
        StructField("trip_distance", DoubleType(), True),
        StructField("RatecodeID", LongType(), True),
        StructField("store_and_fwd_flag", StringType(), True),
        StructField("PULocationID", IntegerType(), True),
        StructField("DOLocationID", IntegerType(), True),
        StructField("payment_type", LongType(), True),
        StructField("fare_amount", DoubleType(), True),
        StructField("extra", DoubleType(), True),
        StructField("mta_tax", DoubleType(), True),
        StructField("tip_amount", DoubleType(), True),
        StructField("tolls_amount", DoubleType(), True),
        StructField("improvement_surcharge", DoubleType(), True),
        StructField("total_amount", DoubleType(), True),
        StructField("congestion_surcharge", DoubleType(), True),
        StructField("airport_fee", DoubleType(), True),
        StructField("cbd_congestion_fee", DoubleType(), True)
    ])) \
    .load("/FileStore/tables/yellow_tripdata_2025_03.parquet/")


In [0]:
%sql
CREATE DATABASE IF NOT EXISTS my_nyctaxi;
USE my_nyctaxi;



In [0]:
%sql

CREATE OR REPLACE TABLE my_nyctaxi.yellow_tripdata1 (
  VendorID INT,
  tpep_pickup_datetime TIMESTAMP,
  tpep_dropoff_datetime TIMESTAMP,
  passenger_count BIGINT,
  trip_distance DOUBLE,
  RatecodeID BIGINT,
  store_and_fwd_flag STRING,
  PULocationID INT,
  DOLocationID INT,
  payment_type BIGINT,
  fare_amount DOUBLE,
  extra DOUBLE,
  mta_tax DOUBLE,
  tip_amount DOUBLE,
  tolls_amount DOUBLE,
  improvement_surcharge DOUBLE,
  total_amount DOUBLE,
  congestion_surcharge DOUBLE,
  airport_fee DOUBLE,
  cbd_congestion_fee DOUBLE
)USING DELTA
LOCATION 'dbfs:/user/hive/warehouse/my_nyctaxi.db/yellow_tripdata1';


In [0]:
%sql

CREATE OR REPLACE TABLE my_nyctaxi.yellow_tripdata2 (
  VendorID INT,
  tpep_pickup_datetime TIMESTAMP,
  tpep_dropoff_datetime TIMESTAMP,
  passenger_count BIGINT,
  trip_distance DOUBLE,
  RatecodeID BIGINT,
  store_and_fwd_flag STRING,
  PULocationID INT,
  DOLocationID INT,
  payment_type BIGINT,
  fare_amount DOUBLE,
  extra DOUBLE,
  mta_tax DOUBLE,
  tip_amount DOUBLE,
  tolls_amount DOUBLE,
  improvement_surcharge DOUBLE,
  total_amount DOUBLE,
  congestion_surcharge DOUBLE,
  airport_fee DOUBLE,
  cbd_congestion_fee DOUBLE
)USING DELTA
LOCATION 'dbfs:/user/hive/warehouse/my_nyctaxi.db/yellow_tripdata2';



In [0]:
%sql

CREATE OR REPLACE TABLE my_nyctaxi.yellow_tripdata3 (
  VendorID INT,
  tpep_pickup_datetime TIMESTAMP,
  tpep_dropoff_datetime TIMESTAMP,
  passenger_count BIGINT,
  trip_distance DOUBLE,
  RatecodeID BIGINT,
  store_and_fwd_flag STRING,
  PULocationID INT,
  DOLocationID INT,
  payment_type BIGINT,
  fare_amount DOUBLE,
  extra DOUBLE,
  mta_tax DOUBLE,
  tip_amount DOUBLE,
  tolls_amount DOUBLE,
  improvement_surcharge DOUBLE,
  total_amount DOUBLE,
  congestion_surcharge DOUBLE,
  airport_fee DOUBLE,
  cbd_congestion_fee DOUBLE
)USING DELTA
LOCATION 'dbfs:/user/hive/warehouse/my_nyctaxi.db/yellow_tripdata3';



In [0]:
%sql

CREATE OR REPLACE TABLE my_nyctaxi.yellow_tripdata4 (
  VendorID INT,
  tpep_pickup_datetime TIMESTAMP,
  tpep_dropoff_datetime TIMESTAMP,
  passenger_count BIGINT,
  trip_distance DOUBLE,
  RatecodeID BIGINT,
  store_and_fwd_flag STRING,
  PULocationID INT,
  DOLocationID INT,
  payment_type BIGINT,
  fare_amount DOUBLE,
  extra DOUBLE,
  mta_tax DOUBLE,
  tip_amount DOUBLE,
  tolls_amount DOUBLE,
  improvement_surcharge DOUBLE,
  total_amount DOUBLE,
  congestion_surcharge DOUBLE,
  airport_fee DOUBLE,
  cbd_congestion_fee DOUBLE
)USING DELTA
LOCATION 'dbfs:/user/hive/warehouse/my_nyctaxi.db/yellow_tripdata4';



In [0]:
print(type(df_parqueta))

In [0]:
df_parqueta.write.mode("overwrite").saveAsTable("my_nyctaxi.yellow_tripdata1")



In [0]:
df_parquete.write.mode("overwrite").saveAsTable("my_nyctaxi.yellow_tripdata2")


In [0]:
df_parqueti.write.mode("overwrite").saveAsTable("my_nyctaxi.yellow_tripdata3")


In [0]:
%sql
select * from  my_nyctaxi.yellow_tripdata1

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
new_yellow_df = df_parqueta.unionByName(df_parquete).unionByName(df_parqueti)
display(new_yellow_df)

In [0]:
new_yellow_df.write.mode("overwrite").saveAsTable("my_nyctaxi.yellow_tripdata4")


In [0]:
%sql
select * from my_nyctaxi.yellow_tripdata4

In [0]:
new_yellow_df = new_yellow_df.fillna('No vendor_ID', subset=['VendorID'])#.display()

In [0]:
new_yellow_df = new_yellow_df.fillna(0, subset=['passenger_count'])#.display()

In [0]:
new_yellow_df = new_yellow_df.withColumn('RatecodeID', when(col('RatecodeID').between(1,6), col('RatecodeID')).otherwise(0))

In [0]:
new_yellow_df = new_yellow_df.fillna('Unknown', subset=['store_and_fwd_flag'])#.display()

In [0]:
new_yellow_df = new_yellow_df.withColumn('fare_amount', when(col('fare_amount') < 0, lit(0)).otherwise(col('fare_amount')))

In [0]:
new_yellow_df = new_yellow_df.withColumn('extra', when(col('extra') < 0, lit(0)).otherwise(col('extra')))

In [0]:
new_yellow_df = new_yellow_df.withColumn('tolls_amount', when(col('tolls_amount') < 0, lit(0)).otherwise(col('mta_tax')))

In [0]:
new_yellow_df = new_yellow_df.withColumn('mta_tax', when(col('mta_tax') < 0, lit(0)).otherwise(col('mta_tax')))

In [0]:
new_yellow_df = new_yellow_df.withColumn('tolls_amount', when(col('tolls_amount') < 0, lit(0)).otherwise(col('tolls_amount')))

In [0]:
new_yellow_df = new_yellow_df.withColumn('tip_amount', when((col('tip_amount') < 0) & (col('payment_type') == 2), lit(0)).otherwise(col('tip_amount')))

In [0]:
new_yellow_df = new_yellow_df.withColumn('improvement_surcharge', when(col('improvement_surcharge') < 0, lit(0)).otherwise(col('improvement_surcharge')))

In [0]:
new_yellow_df = new_yellow_df.withColumn('total_amount', when(col('total_amount') <= 0, lit(0)).otherwise(col('total_amount')))

In [0]:
new_yellow_df = new_yellow_df.fillna(0, subset=['congestion_surcharge'])#.display()

In [0]:
new_yellow_df = new_yellow_df.withColumn('congestion_surcharge', when(col('congestion_surcharge') < 0, lit(0)).otherwise(col('congestion_surcharge')))

In [0]:
new_yellow_df = new_yellow_df.fillna(0, subset=['Airport_fee'])#.display()

In [0]:
new_yellow_df = new_yellow_df.withColumn('Airport_fee', when(col('Airport_fee') < 0, lit(0)).otherwise(col('Airport_fee')))

In [0]:
new_yellow_df = new_yellow_df.withColumn('cbd_congestion_fee', when(col('cbd_congestion_fee') < 0, lit(0)).otherwise(col('cbd_congestion_fee')))


In [0]:
new_yellow_df = new_yellow_df.withColumn('pickup_hour', hour(col('tpep_pickup_datetime')))
new_yellow_df.display()

In [0]:
new_yellow_df = new_yellow_df.fillna({'RatecodeID': 0})


In [0]:
df_int_transf = new_yellow_df.groupBy('PULocationID', 'pickup_hour').agg(count('*').alias('travel_volume'))
df_int_transf.display()

In [0]:
df_int_transf.createOrReplaceTempView('TA')


In [0]:
from pyspark.sql.functions import sum

df_int_transf_b = df_int_transf.groupBy("pickup_hour").agg(sum("travel_volume").alias("most_taxi_rides"))
df_int_transf_b.display()


In [0]:
df_int_transf_b.createOrReplaceTempView('TB')

In [0]:
from pyspark.sql.functions import count

zones_freq = new_yellow_df.groupBy('PULocationID') \
               .agg(count('*').alias('travel_nbr_per_zn')) \

zones_freq.display()


In [0]:
zones_freq.createOrReplaceTempView('TC')

In [0]:
pt_df = new_yellow_df.groupBy('payment_type') \
               .agg(count('*').alias('most_payment_tp')) \

pt_df.display()

In [0]:
pt_df.createOrReplaceTempView('TD')

In [0]:
new_yellow_df = new_yellow_df.withColumn( "travel_duration", (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60)
new_yellow_df.display()

In [0]:
new_yellow_df = new_yellow_df.withColumn(
    "payment_type_desc",
    when(col("payment_type") == 0, "Flex Fare trip")
    .when(col("payment_type") == 1, "Credit card")
    .when(col("payment_type") == 2, "Cash")
    .when(col("payment_type") == 3, "No charge")
    .when(col("payment_type") == 4, "Dispute")
    .when(col("payment_type") == 5, "Unknown")
    .when(col("payment_type") == 6, "Voided trip")
    .otherwise("Other")
)
new_yellow_df.display()

In [0]:
new_yellow_df = new_yellow_df.withColumn(
    "rate_code_desc",
    when(col("RatecodeID") == 1, "Standard rate")
    .when(col("RatecodeID") == 2, "JFK")
    .when(col("RatecodeID") == 3, "Newark")
    .when(col("RatecodeID") == 4, "Nassau or Westchester")
    .when(col("RatecodeID") == 5, "Negotiated fare")
    .when(col("RatecodeID") == 6, "Group ride")
    .when(col("RatecodeID") == 99, "Null/unknown")
    .otherwise("Other")
)
new_yellow_df.display()


In [0]:
new_yellow_df.createOrReplaceTempView('new_yellow_view')


In [0]:
%sql
CREATE DATABASE IF NOT EXISTS my_nyctaxi_dwh;

In [0]:
%sql
CREATE OR REPLACE TABLE my_nyctaxi_dwh.Dim_Location AS
SELECT PULocationID, DOLocationID, 
 row_number() OVER (ORDER BY PULocationID) AS surrogate_key
FROM new_yellow_view;


In [0]:
%sql
select * from my_nyctaxi_dwh.dim_location

In [0]:
%sql
CREATE OR REPLACE TABLE my_nyctaxi_dwh.Dim_Location AS
SELECT LocationID_Source, row_number() OVER (ORDER BY LocationID_Source) AS location_surrogate_key
FROM (
    SELECT PULocationID AS LocationID_Source
    FROM my_nyctaxi_dwh.dim_location
    UNION
    SELECT DOLocationID AS LocationID_Source
    FROM my_nyctaxi_dwh.dim_location
   
) AS d_loc  ;

In [0]:
%sql
select * from my_nyctaxi_dwh.dim_location

In [0]:
%sql
DROP TABLE IF EXISTS my_nyctaxi_dwh.payment_type

In [0]:
%sql
CREATE OR REPLACE TABLE my_nyctaxi_dwh.payment_type AS
SELECT 
    payment_type,
    payment_type_desc,
    ROW_NUMBER() OVER (ORDER BY payment_type) AS payment_surrogate_key
FROM (
    SELECT DISTINCT payment_type, payment_type_desc
    FROM new_yellow_view
) as d_p_type;


In [0]:
%sql
select * from my_nyctaxi_dwh.payment_type

In [0]:
%sql
CREATE OR REPLACE TABLE my_nyctaxi_dwh.dim_rate_code AS
SELECT 
    RatecodeID,
    rate_code_desc,
    ROW_NUMBER() OVER (ORDER BY RatecodeID) AS ratecode_surrogate_key
FROM (
    SELECT DISTINCT RatecodeID, rate_code_desc
    FROM new_yellow_view
) as d_r_code ;

In [0]:
%sql
select * from my_nyctaxi_dwh.dim_rate_code

FACT TABLE

In [0]:
%sql
select * from new_yellow_view

In [0]:
%sql
CREATE OR REPLACE TABLE my_nyctaxi_dwh.fact_table as 
SELECT
 new_yellow_view.VendorID,
  new_yellow_view.tpep_pickup_datetime,
  new_yellow_view.tpep_dropoff_datetime,
  new_yellow_view.passenger_count,
  new_yellow_view.trip_distance,
  new_yellow_view.store_and_fwd_flag,
  new_yellow_view.fare_amount,
  new_yellow_view.extra,
  new_yellow_view.mta_tax,
  new_yellow_view.tip_amount,
  new_yellow_view.tolls_amount,
  new_yellow_view.improvement_surcharge,
  new_yellow_view.congestion_surcharge,
  new_yellow_view.airport_fee,
  new_yellow_view.cbd_congestion_fee,
  new_yellow_view.pickup_hour,
  new_yellow_view.travel_duration,
  d_loc.location_surrogate_key as Pu_Location_sk,
  do_loc.location_surrogate_key as Do_Location_sk,
  d_p_type.payment_surrogate_key,
  d_r_code.ratecode_surrogate_key,
  TA.travel_volume,
  TB.most_taxi_rides,
  TC.travel_nbr_per_zn,
  TD.most_payment_tp

from new_yellow_view
LEFT JOIN 
my_nyctaxi_dwh.Dim_Location as d_loc
ON new_yellow_view.PULocationID = d_loc.LocationID_Source
LEFT JOIN 
my_nyctaxi_dwh.Dim_Location as do_loc
ON new_yellow_view.DOLocationID = do_loc.LocationID_Source
LEFT JOIN 
my_nyctaxi_dwh.payment_type as d_p_type
ON new_yellow_view.payment_type = d_p_type.payment_type
LEFT JOIN 
my_nyctaxi_dwh.dim_rate_code as d_r_code
ON new_yellow_view.RatecodeID = d_r_code.RatecodeID
LEFT JOIN TA
ON new_yellow_view.PULocationID = TA.PULocationID
AND new_yellow_view.pickup_hour = TA.pickup_hour
LEFT JOIN TB  
  ON new_yellow_view.pickup_hour = TB.pickup_hour
LEFT JOIN TC
  ON new_yellow_view.PULocationID = TC.PULocationID
LEFT JOIN TD
  ON new_yellow_view.payment_type = TD.payment_type



In [0]:
%sql
select * from my_nyctaxi_dwh.fact_table 