In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName('spark-nb') \
    .master('spark://spark-master:7077') \
    .config('spark.ui.port', '4051') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog') \
    .config('spark.sql.catalog.spark_catalog.types', 'hive') \
    .enableHiveSupport() \
    .getOrCreate()

In [3]:
!mkdir -p /home/data/nyc_taxi_data/green_tripdata && \
    rm -f /home/data/nyc_taxi_data/green_tripdata/* && \
    wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-01.parquet -P /home/data/nyc_taxi_data/green_tripdata/

--2024-10-26 05:29:59--  https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-01.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 3.164.82.160, 3.164.82.197, 3.164.82.112, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|3.164.82.160|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1427002 (1.4M) [binary/octet-stream]
Saving to: ‘/home/data/nyc_taxi_data/green_tripdata/green_tripdata_2023-01.parquet’


2024-10-26 05:30:00 (1.34 MB/s) - ‘/home/data/nyc_taxi_data/green_tripdata/green_tripdata_2023-01.parquet’ saved [1427002/1427002]



In [4]:
df = spark.read.format('parquet').load('/home/data/nyc_taxi_data/green_tripdata/')

In [5]:
df.count()

68211

In [6]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [7]:
df.limit(10).toPandas()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2023-01-01 00:26:10,2023-01-01 00:37:11,N,1.0,166,143,1.0,2.58,14.9,1.0,0.5,4.03,0.0,,1.0,24.18,1.0,1.0,2.75
1,2,2023-01-01 00:51:03,2023-01-01 00:57:49,N,1.0,24,43,1.0,1.81,10.7,1.0,0.5,2.64,0.0,,1.0,15.84,1.0,1.0,0.0
2,2,2023-01-01 00:35:12,2023-01-01 00:41:32,N,1.0,223,179,1.0,0.0,7.2,1.0,0.5,1.94,0.0,,1.0,11.64,1.0,1.0,0.0
3,1,2023-01-01 00:13:14,2023-01-01 00:19:03,N,1.0,41,238,1.0,1.3,6.5,0.5,1.5,1.7,0.0,,1.0,10.2,1.0,1.0,0.0
4,1,2023-01-01 00:33:04,2023-01-01 00:39:02,N,1.0,41,74,1.0,1.1,6.0,0.5,1.5,0.0,0.0,,1.0,8.0,1.0,1.0,0.0
5,2,2023-01-01 00:53:31,2023-01-01 01:11:04,N,1.0,41,262,1.0,2.78,17.7,1.0,0.5,0.0,0.0,,1.0,22.95,2.0,1.0,2.75
6,1,2023-01-01 00:09:14,2023-01-01 00:26:39,N,1.0,181,45,2.0,3.8,19.1,3.75,1.5,4.85,0.0,,1.0,29.2,1.0,1.0,2.75
7,2,2023-01-01 00:11:58,2023-01-01 00:24:55,N,1.0,24,75,1.0,1.88,14.2,1.0,0.5,0.0,0.0,,1.0,16.7,2.0,1.0,0.0
8,2,2023-01-01 00:41:29,2023-01-01 00:46:26,N,1.0,41,166,2.0,1.11,7.2,1.0,0.5,1.0,0.0,,1.0,10.7,1.0,1.0,0.0
9,2,2023-01-01 00:50:32,2023-01-01 01:13:42,N,1.0,24,140,1.0,4.22,24.7,1.0,0.5,3.0,0.0,,1.0,32.95,1.0,1.0,2.75


In [8]:
df.limit(1000).write.format('parquet').mode('overwrite').save('s3a://spark-warehouse/raw_data/save_from_notebook')

In [9]:
from pyspark.sql.functions import col

# hive table does not support timestamp_ntz
df = df.withColumn("lpep_pickup_datetime", col("lpep_pickup_datetime").cast("timestamp"))
df = df.withColumn("lpep_dropoff_datetime", col("lpep_dropoff_datetime").cast("timestamp"))

In [10]:
df.writeTo("local_db.table_from_notebook").create()

In [11]:
print(spark.sql("SHOW CREATE TABLE local_db.table_from_notebook").collect()[0].createtab_stmt)

CREATE TABLE spark_catalog.local_db.table_from_notebook (
  VendorID BIGINT,
  lpep_pickup_datetime TIMESTAMP,
  lpep_dropoff_datetime TIMESTAMP,
  store_and_fwd_flag STRING,
  RatecodeID DOUBLE,
  PULocationID BIGINT,
  DOLocationID BIGINT,
  passenger_count DOUBLE,
  trip_distance DOUBLE,
  fare_amount DOUBLE,
  extra DOUBLE,
  mta_tax DOUBLE,
  tip_amount DOUBLE,
  tolls_amount DOUBLE,
  ehail_fee INT,
  improvement_surcharge DOUBLE,
  total_amount DOUBLE,
  payment_type DOUBLE,
  trip_type DOUBLE,
  congestion_surcharge DOUBLE)
USING iceberg
LOCATION 's3a://spark-warehouse/hive/local_db/table_from_notebook'
TBLPROPERTIES (
  'current-snapshot-id' = '911154990462472214',
  'format' = 'iceberg/parquet',
  'format-version' = '2',
  'write.parquet.compression-codec' = 'zstd')



In [12]:
spark.sql("SELECT * FROM local_db.table_from_notebook LIMIT 10").toPandas()

  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):
  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):


Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2023-01-01 00:26:10,2023-01-01 00:37:11,N,1.0,166,143,1.0,2.58,14.9,1.0,0.5,4.03,0.0,,1.0,24.18,1.0,1.0,2.75
1,2,2023-01-01 00:51:03,2023-01-01 00:57:49,N,1.0,24,43,1.0,1.81,10.7,1.0,0.5,2.64,0.0,,1.0,15.84,1.0,1.0,0.0
2,2,2023-01-01 00:35:12,2023-01-01 00:41:32,N,1.0,223,179,1.0,0.0,7.2,1.0,0.5,1.94,0.0,,1.0,11.64,1.0,1.0,0.0
3,1,2023-01-01 00:13:14,2023-01-01 00:19:03,N,1.0,41,238,1.0,1.3,6.5,0.5,1.5,1.7,0.0,,1.0,10.2,1.0,1.0,0.0
4,1,2023-01-01 00:33:04,2023-01-01 00:39:02,N,1.0,41,74,1.0,1.1,6.0,0.5,1.5,0.0,0.0,,1.0,8.0,1.0,1.0,0.0
5,2,2023-01-01 00:53:31,2023-01-01 01:11:04,N,1.0,41,262,1.0,2.78,17.7,1.0,0.5,0.0,0.0,,1.0,22.95,2.0,1.0,2.75
6,1,2023-01-01 00:09:14,2023-01-01 00:26:39,N,1.0,181,45,2.0,3.8,19.1,3.75,1.5,4.85,0.0,,1.0,29.2,1.0,1.0,2.75
7,2,2023-01-01 00:11:58,2023-01-01 00:24:55,N,1.0,24,75,1.0,1.88,14.2,1.0,0.5,0.0,0.0,,1.0,16.7,2.0,1.0,0.0
8,2,2023-01-01 00:41:29,2023-01-01 00:46:26,N,1.0,41,166,2.0,1.11,7.2,1.0,0.5,1.0,0.0,,1.0,10.7,1.0,1.0,0.0
9,2,2023-01-01 00:50:32,2023-01-01 01:13:42,N,1.0,24,140,1.0,4.22,24.7,1.0,0.5,3.0,0.0,,1.0,32.95,1.0,1.0,2.75
