In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime

In [2]:
spark = SparkSession.builder \
    .appName("IcebergTest") \
    .config("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.rest.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") \
    .config("spark.sql.catalog.rest.uri", "http://iceberg-rest:8181") \
    .config("spark.sql.catalog.rest.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.rest.s3.endpoint", "http://minio:9000") \
    .config("spark.sql.catalog.rest.warehouse", "s3://warehouse/") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "password") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

25/08/04 07:01:46 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
data = [
    ["Product A", 1001, datetime.strptime("2023-07-20", "%Y-%m-%d"), datetime.strptime("2023-07-20 10:15:30", "%Y-%m-%d %H:%M:%S"), 29.99],
    ["Product B", 1002, datetime.strptime("2023-07-19", "%Y-%m-%d"), datetime.strptime("2023-07-19 14:20:45", "%Y-%m-%d %H:%M:%S"), 49.99],
    ["Product C", 1003, datetime.strptime("2023-07-18", "%Y-%m-%d"), datetime.strptime("2023-07-18 09:30:15", "%Y-%m-%d %H:%M:%S"), 39.99],
    ["Product D", 1004, datetime.strptime("2023-07-17", "%Y-%m-%d"), datetime.strptime("2023-07-17 16:45:00", "%Y-%m-%d %H:%M:%S"), 19.99]
]

# Define schema
schema = StructType([
    StructField("Product", StringType(), True),
    StructField("ID", IntegerType(), True),
    StructField("Date", DateType(), True),
    StructField("Timestamp", TimestampType(), True),
    StructField("Price", FloatType(), True)
])

df = spark.createDataFrame(data, schema)

df.printSchema()

df.show()

root
 |-- Product: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Price: float (nullable = true)



                                                                                

+---------+----+----------+-------------------+-----+
|  Product|  ID|      Date|          Timestamp|Price|
+---------+----+----------+-------------------+-----+
|Product A|1001|2023-07-20|2023-07-20 10:15:30|29.99|
|Product B|1002|2023-07-19|2023-07-19 14:20:45|49.99|
|Product C|1003|2023-07-18|2023-07-18 09:30:15|39.99|
|Product D|1004|2023-07-17|2023-07-17 16:45:00|19.99|
+---------+----+----------+-------------------+-----+



In [4]:
# First read example should not infer schema, ignore header row, provide explicit column name and datatype

# Define schema
schema = StructType([
    StructField("OrderID", StringType(), True),
    StructField("OrderItemID", IntegerType(), True),
    StructField("ProductID", StringType(), True),
    StructField("SellerID", StringType(), True),
    StructField("ShippingLimitDate", TimestampType(), True),
    StructField("Price", DoubleType(), True),
    StructField("FreightValue", DoubleType(), True)
])
path = '/home/iceberg/data/order_items_dataset.csv'

df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'false').schema(schema).load(hdfs_path)

df.printSchema()

df.show(5)

root
 |-- OrderID: string (nullable = true)
 |-- OrderItemID: integer (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- SellerID: string (nullable = true)
 |-- ShippingLimitDate: timestamp (nullable = true)
 |-- Price: double (nullable = true)
 |-- FreightValue: double (nullable = true)

+--------------------+-----------+--------------------+--------------------+-------------------+-----+------------+
|             OrderID|OrderItemID|           ProductID|            SellerID|  ShippingLimitDate|Price|FreightValue|
+--------------------+-----------+--------------------+--------------------+-------------------+-----+------------+
|00010242fe8c5a6d1...|          1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|       13.29|
|00018f77f2f0320c5...|          1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|       19.93|
|000229ec398224ef6...|          1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|       17.87|
|

25/08/04 07:03:59 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: order_id, order_item_id, product_id, seller_id, shipping_limit_date, price, freight_value
 Schema: OrderID, OrderItemID, ProductID, SellerID, ShippingLimitDate, Price, FreightValue
Expected: OrderID but found: order_id
CSV file: file:///home/iceberg/data/order_items_dataset.csv


In [5]:

# Second read example should infer schema, ignore header row

path = '/home/iceberg/data/order_items_dataset.csv'


df = spark.read.format('csv').option('header','true').option('inferSchema','true').load(hdfs_path)

df.printSchema()

df.show()

                                                                                

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 

In [6]:
# check default number of partitions
print(spark.sparkContext.defaultParallelism)

8


In [7]:
# check default size of each partition
print(spark.conf.get("spark.sql.files.maxPartitionBytes"))

134217728b


In [8]:
print(f'Number of partitions: {df.rdd.getNumPartitions()}')

df_new = df.repartition(5)

print(f'Number of partitions: {df_new.rdd.getNumPartitions()}')

Number of partitions: 4
Number of partitions: 5


In [10]:
# Select columns in different options

from pyspark.sql.functions import *

df.select('order_id').show(5)

df.select('order_id', 'shipping_limit_date').show(5)

df.select(col('order_id'), col('shipping_limit_date')).show(5)

df.select(col('order_id').alias('oid'), col('shipping_limit_date').alias('limit_date')).show(5)

+--------------------+
|            order_id|
+--------------------+
|00010242fe8c5a6d1...|
|00018f77f2f0320c5...|
|000229ec398224ef6...|
|00024acbcdf0a6daa...|
|00042b26cf59d7ce6...|
+--------------------+
only showing top 5 rows

+--------------------+-------------------+
|            order_id|shipping_limit_date|
+--------------------+-------------------+
|00010242fe8c5a6d1...|2017-09-19 09:45:35|
|00018f77f2f0320c5...|2017-05-03 11:05:13|
|000229ec398224ef6...|2018-01-18 14:48:30|
|00024acbcdf0a6daa...|2018-08-15 10:10:18|
|00042b26cf59d7ce6...|2017-02-13 13:57:51|
+--------------------+-------------------+
only showing top 5 rows

+--------------------+-------------------+
|            order_id|shipping_limit_date|
+--------------------+-------------------+
|00010242fe8c5a6d1...|2017-09-19 09:45:35|
|00018f77f2f0320c5...|2017-05-03 11:05:13|
|000229ec398224ef6...|2018-01-18 14:48:30|
|00024acbcdf0a6daa...|2018-08-15 10:10:18|
|00042b26cf59d7ce6...|2017-02-13 13:57:51|
+-----------

In [11]:
# Derive new column using withColumn

df2 = df.withColumn("year", year(col("shipping_limit_date"))).withColumn("month", month(col("shipping_limit_date")))

df2.select("order_id", "shipping_limit_date", "year", "month").show(5)

+--------------------+-------------------+----+-----+
|            order_id|shipping_limit_date|year|month|
+--------------------+-------------------+----+-----+
|00010242fe8c5a6d1...|2017-09-19 09:45:35|2017|    9|
|00018f77f2f0320c5...|2017-05-03 11:05:13|2017|    5|
|000229ec398224ef6...|2018-01-18 14:48:30|2018|    1|
|00024acbcdf0a6daa...|2018-08-15 10:10:18|2018|    8|
|00042b26cf59d7ce6...|2017-02-13 13:57:51|2017|    2|
+--------------------+-------------------+----+-----+
only showing top 5 rows



In [12]:
# Rename existing column using withColumnRenamed

df3 = df2.withColumnRenamed('shipping_limit_date', 'shipping_limit_datetime')

df3.select("order_id", "shipping_limit_datetime").show(5)

+--------------------+-----------------------+
|            order_id|shipping_limit_datetime|
+--------------------+-----------------------+
|00010242fe8c5a6d1...|    2017-09-19 09:45:35|
|00018f77f2f0320c5...|    2017-05-03 11:05:13|
|000229ec398224ef6...|    2018-01-18 14:48:30|
|00024acbcdf0a6daa...|    2018-08-15 10:10:18|
|00042b26cf59d7ce6...|    2017-02-13 13:57:51|
+--------------------+-----------------------+
only showing top 5 rows



In [13]:
# Filter condition

df3.filter( col('order_id') == '00010242fe8c5a6d1ba2dd792cb16214' ).show(5)

order_ids = ['00010242fe8c5a6d1ba2dd792cb16214','00018f77f2f0320c557190d7a144bdd3']

df3.filter( col('order_id').isin(order_ids) ).show(5)

df3.filter( (col('price') < 50) & (col("freight_value")  < 10) ).show(5)

# SQL Type Expression
df3.filter("price < 50 and freight_value  < 10").show(5)

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|    2017-09-19 09:45:35| 58.9|        13.29|2017|    9|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+----

In [14]:
# Example to drop a column

df3.drop('month').show(5)

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|    2017-09-19 09:45:35| 58.9|        13.29|2017|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|    2017-05-03 11:05:13|239.9|        19.93|2017|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|    2018-01-18 14:48:30|199.0|        17.87|2018|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|    2018-08-15 10:10:18|12.99|        12.79|2018|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|    2017-02-13 13:57:51|19

In [15]:
#drop duplicates row based on multiple columns

df3.dropDuplicates(['order_id', 'order_item_id']).show(5)

# order_id , order_item_id, c1, c2
#   1      ,    2        , A , B
#   1      ,    2        , A , B
#   1      ,    2        , C , D
#   1      ,    3        , E , F

# order_id , order_item_id, c1, c2
#   1      ,    2        , A , B
#   1      ,    3        , E , F

[Stage 25:>                                                         (0 + 4) / 4]

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|    2017-05-03 11:05:13|239.9|        19.93|2017|    5|
|00061f2a7bc09da83...|            1|d63c1011f49d98b97...|cc419e0650a3c5ba7...|    2018-03-29 22:28:09|59.99|         8.88|2018|    3|
|0014ae671de39511f...|            1|23365beed316535b4...|92eb0f42c21942b65...|    2017-05-29 03:15:24| 16.5|         14.1|2017|    5|
|0015ebb40fb17286b...|            1|50fd2b788dc166edd...|8b321bb669392f516...|    2018-01-18 09:11:24| 21.9|         15.1|2018|    1|
|001ab0a7578dd66cd...|            2|0b0172eb0fd18479d...|56565

                                                                                

In [16]:
# get distinct rows

df3.dropDuplicates().show(5)



+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|0008288aa423d2a3f...|            1|368c6c730842d7801...|1f50f920176fa81da...|    2018-02-21 02:55:52|  49.9|        13.37|2018|    2|
|004fb5e6f90a178dc...|            1|601a360bd2a916ece...|7a67c85e85bb2ce85...|    2017-06-15 10:02:38|129.99|        13.93|2017|    6|
|00dc6ad47477b3b62...|            1|4fe644d766c7566db...|c31eff8334d6b3047...|    2018-03-01 20:20:25|110.99|        33.23|2018|    3|
|02119fc90b970a6b0...|            1|99aa268744e2967e0...|52562a9f449c3dc3d...|    2017-07-05 02:45:31|  84.9|        27.44|2017|    7|
|0232512713ff84ce6...|            1|41b43381a92451746..

                                                                                

In [17]:
# arrange data using order by

df3.orderBy( col('price').desc() ).show(5)

df3.orderBy( col('price').asc(), col('freight_value').desc() ).show(5)

+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime| price|freight_value|year|month|
+--------------------+-------------+--------------------+--------------------+-----------------------+------+-------------+----+-----+
|0812eb902a67711a1...|            1|489ae2aa008f02150...|e3b4998c7a498169d...|    2017-02-16 20:37:36|6735.0|       194.31|2017|    2|
|fefacc66af859508b...|            1|69c590f7ffc7bf8db...|80ceebb4ee9b31afb...|    2018-08-02 04:05:13|6729.0|       193.21|2018|    8|
|f5136e38d1a14a4db...|            1|1bdf5e6731585cf01...|ee27a8f15b1dded4d...|    2017-06-15 02:45:17|6499.0|       227.66|2017|    6|
|a96610ab360d42a2e...|            1|a6492cc69376c469a...|59417c56835dd8e2e...|    2017-04-18 13:25:18|4799.0|       151.34|2017|    4|
|199af31afc78c699f...|            1|c3ed642d592594bb6..

In [18]:
# Group By Operation

# on single column
df3.groupBy('year').agg( count('*').alias('total_count'),
                         avg('price').alias('avg_price'),
                         sum('price').alias('sum_price'),
                         min('price').alias('min_price'),
                         max('price').alias('max_price')
                       ).show(5)

# on multiple column
df3.groupBy('year', 'month').agg( count('*').alias('total_count'),
                                 avg('price').alias('avg_price'),
                                 sum('price').alias('sum_price'),
                                 min('price').alias('min_price'),
                                 max('price').alias('max_price')
                               ).orderBy( col('year').asc(), col('month').asc() ).show(20)

+----+-----------+------------------+-----------------+---------+---------+
|year|total_count|         avg_price|        sum_price|min_price|max_price|
+----+-----------+------------------+-----------------+---------+---------+
|2018|      62511|120.08515685238126|7506643.239999205|     0.85|   6729.0|
|2020|          4|             86.49|           345.96|    69.99|    99.99|
|2016|        370|134.55654054054037|49785.91999999993|      6.0|   1399.0|
|2017|      49765|121.26732804178806|6034868.579999583|      1.2|   6735.0|
+----+-----------+------------------+-----------------+---------+---------+

+----+-----+-----------+------------------+------------------+---------+---------+
|year|month|total_count|         avg_price|         sum_price|min_price|max_price|
+----+-----+-----------+------------------+------------------+---------+---------+
|2016|    9|          4|           48.6175|            194.47|    44.99|     59.5|
|2016|   10|        365|135.83712328767106| 49580.549999999

In [19]:
# Case-When statement

df3.withColumn("price_category", when( col('price') >= 100 , "High" )
                                .when( (col('price') < 100) & (col('price') >= 50) , "Medium" )
                                .otherwise("Low")).show(5)
                               

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+--------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|price_category|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+--------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|    2017-09-19 09:45:35| 58.9|        13.29|2017|    9|        Medium|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|    2017-05-03 11:05:13|239.9|        19.93|2017|    5|          High|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|    2018-01-18 14:48:30|199.0|        17.87|2018|    1|          High|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|    2018-08-15 10:10:18|12.9

In [20]:
# Window functions

from pyspark.sql.window import Window

windowSpec1 = Window.partitionBy('year').orderBy( col('price').asc())

df3.withColumn("dense_rank" , dense_rank().over(windowSpec1) ).show(5)

windowSpec2 = Window.partitionBy('year').orderBy(col('shipping_limit_datetime').asc())

df3.withColumn('running_sum', sum('price').over(windowSpec2)).select('year','price','shipping_limit_datetime','running_sum').show(5)

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|dense_rank|
+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------+
|3ee6513ae7ea23bdf...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-04 03:55:26| 0.85|        18.23|2018|    5|         1|
|6e864b3f0ec710311...|            1|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-02 20:30:34| 0.85|        18.23|2018|    5|         1|
|c5bdd8ef3c0ec4202...|            2|8a3254bee785a526d...|96804ea39d96eb908...|    2018-05-07 02:55:22| 0.85|         22.3|2018|    5|         1|
|f1d5c2e6867fa93ce...|            1|46fce52cef5caa7cc...|2d2322d8421188677...|    2018-08-28 21:30:15|  2.2|         7.39|2018|   

In [22]:
path = '/home/iceberg/data/sellers_dataset.csv'

sdf = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(hdfs_path)

sdf.printSchema()
sdf.show(5)

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)

+--------------------+----------------------+-----------------+------------+
|           seller_id|seller_zip_code_prefix|      seller_city|seller_state|
+--------------------+----------------------+-----------------+------------+
|3442f8959a84dea7e...|                 13023|         campinas|          SP|
|d1b65fc7debc3361e...|                 13844|       mogi guacu|          SP|
|ce3ad9de960102d06...|                 20031|   rio de janeiro|          RJ|
|c0f3eea2e14555b6f...|                  4195|        sao paulo|          SP|
|51a04a8a6bdcb23de...|                 12914|braganca paulista|          SP|
+--------------------+----------------------+-----------------+------------+
only showing top 5 rows



In [23]:
# Join transformation

result1 = df3.join(broadcast(sdf), df3.seller_id == sdf.seller_id  , 'inner').drop(sdf.seller_id)


result1.printSchema()
result1.show(5)


root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_datetime: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------------------+-------------+------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|seller_zip_code_prefix|  seller_city|seller_state|
+--------------------+-------------+--------------------+--------------------+----------------------

In [24]:
# Perform join with alias names of dataframes

result2 = df3.alias('oid').join(sdf.alias('sid'), col('oid.seller_id') == col('sid.seller_id') , 'inner').drop(col('sid.seller_id'))


result2.printSchema()
result2.show(5)

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_datetime: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+----------------------+-------------+------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|seller_zip_code_prefix|  seller_city|seller_state|
+--------------------+-------------+--------------------+--------------------+----------------------

In [26]:
# work with spark SQL

df3.createOrReplaceTempView("ORDER_ITEM")
sdf.createOrReplaceTempView("SELLERS")

joinedDF = spark.sql("select * from ORDER_ITEM oid INNER JOIN SELLERS sid ON oid.seller_id == sid.seller_id")

joinedDF.printSchema()

joinedDF.show(5)



root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_datetime: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)

+--------------------+-------------+--------------------+--------------------+-----------------------+-----+-------------+----+-----+--------------------+----------------------+-------------+------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_datetime|price|freight_value|year|month|           seller_id|seller_zip_code_prefix|  seller_city|seller_state|
+----------------

In [27]:
result1.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_datetime: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



In [32]:
result1.select("order_id","seller_state").limit(5).show(truncate=False)

+--------------------------------+------------+
|order_id                        |seller_state|
+--------------------------------+------------+
|00010242fe8c5a6d1ba2dd792cb16214|SP          |
|00018f77f2f0320c557190d7a144bdd3|SP          |
|000229ec398224ef6ca0657da4fc703e|MG          |
|00024acbcdf0a6daa1e931b038114c75|SP          |
|00042b26cf59d7ce69dfabb4e55b4fd9|PR          |
+--------------------------------+------------+



In [None]:
# write to local dir
result1.write.parquet("/home/iceberg/result/")

In [1]:
# Write as iceberg table
result1.writeTo("warehouse.db.test") \
  .partitionedBy("seller_state") \
  .createOrReplace() 

NameError: name 'result1' is not defined

In [34]:
# update a record 
spark.sql("""
    UPDATE warehouse.db.test
    SET price = 1000
    WHERE order_id = '00010242fe8c5a6d1ba2dd792cb16214'
      AND seller_state = 'SP'
""")


                                                                                

DataFrame[]

In [2]:
# insert a record

result1.limit(1).writeTo("warehouse.db.test").append()


In [None]:
# delete records
spark.sql("""
DELETE FROM warehouse.db.test 
WHERE seller_state = 'SP'
""")

In [None]:
# merge (upsert)

update_df = result1.limit(1).withColumn("order_id",lit("vsdga34234sdfaes2312"))

update_df.createOrReplaceTempView("updates")

spark.sql("""
MERGE INTO warehouse.db.test AS t
USING updates AS u
ON t.order_id = u.order_id 
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

In [None]:
# schema evolution

schema_change_df = result1.limit(1).withColumn("order_status",lit("delivered"))

schema_change_df.writeTo("warehouse.db.test").append()


In [None]:
# get snapshot history
snapshots = spark.sql("SELECT * FROM warehouse.db.test.snapshots")
snapshots.show()

In [None]:
# Time Travel

df = spark.read.option("snapshot-id", "<snapshot_id>").table("warehouse.db.test")