In [1]:
import pyspark

from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
# Creating spark application
builder = pyspark.sql.SparkSession.builder.appName("pyspark-iceberg")

In [3]:
# Creating spark context
spark = builder.getOrCreate()

24/04/24 19:41:49 WARN Utils: Your hostname, galaxia-vostro-3520 resolves to a loopback address: 127.0.1.1; using 10.32.9.180 instead (on interface wlp0s20f3)
24/04/24 19:41:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/24 19:41:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


24/04/24 19:42:09 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [20]:
# Creating the table in iceberg parquet format
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("url", StringType(), True),
    StructField("title", StringType(), True),
    StructField("upc", StringType(), True),
    StructField("product_type", StringType(), True),
    StructField("price_excl_tax", IntegerType(), True),
    StructField("price_incl_tax", IntegerType(), True),
    StructField("tax", IntegerType(), True),
    StructField("price", IntegerType(), True),
    StructField("availability", IntegerType(), True),
    StructField("num_reviews", IntegerType(), True),
    StructField("stars", IntegerType(), True),
    StructField("category", StringType(), True),
    StructField("description", StringType(), True)
])

books_df = spark.read.json('books.json', schema)
books_df.write.mode(saveMode='overwrite').parquet('data/iceberg/books')
books_df.write.mode(saveMode='overwrite').partitionBy('category').parquet('data/iceberg/books_partitioned_by_category')

                                                                                

In [21]:
# Read parquet and create a temporary view for querying results
books = spark.read.parquet('data/iceberg/books')

books.createOrReplaceTempView("books")

In [16]:
# Read five stars books from parquet
five_stars_books_df = spark.sql("SELECT title, stars, category FROM books WHERE stars > 4 ORDER BY stars DESC")

five_stars_books_df.show()

+--------------------+-----+---------------+
|               title|stars|       category|
+--------------------+-----+---------------+
|Scott Pilgrim's l...|    5| sequential art|
|Aaron Ledbetter’s...|    5|    young adult|
|From a renowned h...|    5|        history|
|Punk's raw power ...|    5|          music|
|No matter how bus...|    5|        romance|
|A Michelin two-st...|    5|        romance|
|Anti-apartheid ac...|    5|     nonfiction|
|A page-turning no...|    5|     philosophy|
|Mark Fallon is an...|    5|       thriller|
|In The Four Agree...|    5|   spirituality|
|Paris is burning-...|    5|        fiction|
|There is a cosmic...|    5|     nonfiction|
|On a searing summ...|    5|        fiction|
|The Freeman famil...|    5|        fiction|
|Slay Procrastinat...|    5|        default|
|Change and anger ...|    5|   spirituality|
|Just as Annie and...|    5|        fantasy|
|THE LONG-AWAITED ...|    5| sequential art|
|What if you could...|    5|science fiction|
|Tired of 

In [42]:
# Read books partitioned by category and create a temporary view for querying results
books = spark.read.parquet('data/iceberg/books_partitioned_by_category')

books.createOrReplaceTempView("books_partitioned_by_category")

In [50]:
# Querying 5 stars horror books from partitioned books table
five_stars_horror_books_df = spark.sql("""SELECT title, stars, category
                                FROM books_partitioned_by_category
                                WHERE stars > 4
                                  AND category = 'horror'
                                ORDER BY stars DESC""")

five_stars_horror_books_df.show()

+--------------------+-----+--------+
|               title|stars|category|
+--------------------+-----+--------+
|The original Psyc...|    5|  horror|
|At last—the seque...|    5|  horror|
+--------------------+-----+--------+

