In [1]:
from pyspark.sql import SparkSession
from delta import *

scala_version = '2.12'  
spark_version = '3.3.0'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.3.0',
    'io.delta:delta-core_2.12:2.1.1',
    'org.apache.sedona:sedona-spark-shaded-3.4_2.12:1.5.0,'
    'org.datasyslab:geotools-wrapper:1.5.0-28.2'
]
builder = SparkSession.builder\
   .master("local")\
   .appName("kafka-example")\
   .config("spark.jars.packages", ",".join(packages))\
   .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
   .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
   .enableHiveSupport()


spark = builder.getOrCreate()
#spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [2]:
trips_df = (spark.read
           .option("inferSchema","True")
           .parquet('2022.parquet'))

In [3]:
trips_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [4]:
trips_df.show(2,False)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|1       |2023-02-01 00:32:53 |2023-02-01 00:34:34  |2              |0.3          |1         |N                 |142         |163         |2           |4.4        |3.5  |0.5    |0.0      

In [5]:
trips_df.write.format('delta').mode('overwrite').saveAsTable('trips_data')

In [6]:
%load_ext sparksql_magic

In [7]:
%%sparksql
show tables in default

0,1,2
namespace,tableName,isTemporary
default,device_data_events,False
default,people10m,False
default,taxi_zone_lookup,False
default,taxi_zones,False
default,trips_data,False
