In [1]:
import os
import pyspark
from pyspark.sql import SparkSession

In [2]:
conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
  		#packages 
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.1,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
  		#SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
  		#Configuring Catalog
        .set('spark.sql.catalog.iceberg', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.iceberg.type', 'hadoop')
        .set('spark.sql.catalog.iceberg.warehouse', 'iceberg-warehouse')
)

In [3]:
## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/anshumanr/.ivy2/cache
The jars for the packages stored in: /Users/anshumanr/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.4_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-240c95f1-7e58-4680-91f7-5d2bf796364f;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.4_2.12;1.3.1 in central
	found software.amazon.awssdk#bundle;2.17.178 in central
	found software.amazon.eventstream#eventstream;1.0.1 in central
	found software.amazon.awssdk#url-connection-client;2.17.178 in central
	found software.amazon.awssdk#utils;2.17.178 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found software.amazon.awssdk#annotations;2.17.178 in central
	found org.slf4j#slf4j-api;1.7.30 in local-m2-cache
	found software.amazon.awssdk#http-client-spi;2.17.178 in central


Spark Running


In [4]:
df=spark.read.format("csv").options(inferSchema='True',delimiter=',',header='True').load("/Users/anshumanr/Documents/Anime_Dataset/users-score-2023.csv")

                                                                                

In [5]:
spark.sql("create table iceberg.db.anime_users_score(user_id int,Username string,anime_id int,anime_title string,rating int) using iceberg partitioned by (anime_id)")

DataFrame[]

In [6]:
df = df.withColumn("rating", df["rating"].cast("int"))
df.write.mode("append").partitionBy("anime_id").saveAsTable("iceberg.db.anime_users_score", options={"write.parquet.page-row-limit": "100000"})

                                                                                

In [7]:
spark.sql("select * from iceberg.db.anime_users_score").show(1000)

+-------+----------------+--------+-------------+------+
|user_id|        Username|anime_id|  anime_title|rating|
+-------+----------------+--------+-------------+------+
|     23|           Amuro|      26|   Texhnolyze|     4|
|     48|            seif|      26|   Texhnolyze|     5|
|     70|          Cruzle|      26|   Texhnolyze|     7|
|    185|           Fador|      26|   Texhnolyze|     7|
|    254|             Ayu|      26|   Texhnolyze|     2|
|    271|          Divine|      26|   Texhnolyze|     6|
|    282|           Hopis|      26|   Texhnolyze|     9|
|    333|              db|      26|   Texhnolyze|     1|
|    341|         melange|      26|   Texhnolyze|     9|
|    344|          shance|      26|   Texhnolyze|     7|
|    460|         Fehrant|      26|   Texhnolyze|     6|
|    467|            June|      26|   Texhnolyze|     3|
|    488|      stoneblade|      26|   Texhnolyze|     7|
|    492|          Tasura|      26|   Texhnolyze|     8|
|    504|   JusticeGundam|     

In [8]:
spark.sql("select rating, count(*)  from iceberg.db.anime_users_score where rating > 7 group by rating").show(100)



+------+--------+
|rating|count(1)|
+------+--------+
|     9|  174750|
|     8|  259153|
|    10|  111958|
+------+--------+



                                                                                

In [9]:
spark.sql("select count(*)  from iceberg.db.anime_users_score where anime_title = 'One Piece'").show()

+--------+
|count(1)|
+--------+
|    2182|
+--------+



# Updating records

In [12]:
spark.sql("update iceberg.db.anime_users_score set anime_title = 'One_Piece' where anime_title='One Piece'")

DataFrame[]

In [13]:
spark.sql("select count(*)  from iceberg.db.anime_users_score where anime_title = 'One_Piece'").show()

+--------+
|count(1)|
+--------+
|    2182|
+--------+



In [14]:
spark.sql("select count(*)  from iceberg.db.anime_users_score where anime_id = 21").show()

+--------+
|count(1)|
+--------+
|    2182|
+--------+



# Inspecting our table

In [20]:
df=spark.sql("SELECT * FROM iceberg.db.my_table.history")
df.show()
df.write.save("/Users/anshumanr/Documents/Iceberg/Apache_iceberg/iceberg-warehouse/db/anime_users_score/history",header=True)

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2023-08-11 18:07:...|2936313136471033108|               null|               true|
|2023-08-11 18:07:...|4480161109182211569|2936313136471033108|               true|
|2023-08-11 18:07:...| 925325890966312565|4480161109182211569|               true|
|2023-08-11 18:07:...|8852684860612151750| 925325890966312565|               true|
+--------------------+-------------------+-------------------+-------------------+



In [21]:
df=spark.sql("SELECT * FROM iceberg.db.my_table.snapshots")
df.show()
df.write.save("/Users/anshumanr/Documents/Iceberg/Apache_iceberg/iceberg-warehouse/db/anime_users_score/snapshots",header=True)

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2023-08-11 18:07:...|2936313136471033108|               null|   append|iceberg-warehouse...|{spark.app.id -> ...|
|2023-08-11 18:07:...|4480161109182211569|2936313136471033108|overwrite|iceberg-warehouse...|{spark.app.id -> ...|
|2023-08-11 18:07:...| 925325890966312565|4480161109182211569|   append|iceberg-warehouse...|{spark.app.id -> ...|
|2023-08-11 18:07:...|8852684860612151750| 925325890966312565|overwrite|iceberg-warehouse...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [22]:
df=spark.sql("SELECT * FROM iceberg.db.my_table.files")
df.show()
df.write.save("/Users/anshumanr/Documents/Iceberg/Apache_iceberg/iceberg-warehouse/db/anime_users_score/files",header=True)

+-------+--------------------+-----------+-------+------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+--------------------+
|content|           file_path|file_format|spec_id|record_count|file_size_in_bytes|        column_sizes|        value_counts|   null_value_counts|nan_value_counts|        lower_bounds|        upper_bounds|key_metadata|split_offsets|equality_ids|sort_order_id|    readable_metrics|
+-------+--------------------+-----------+-------+------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+--------------------+
|      0|iceberg-warehouse...|    PARQUET|      0|           1|               976|{1 -> 55, 2 -> 51...|{1 -> 1, 2 -> 1, ...|{1 -> 0, 2 -> 0, ...|              {

In [23]:
df=spark.sql("SELECT * FROM iceberg.db.my_table.manifests")
df.show()
df.write.save("/Users/anshumanr/Documents/Iceberg/Apache_iceberg/iceberg-warehouse/db/anime_users_score/manifests",header=True)

+-------+--------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+
|content|                path|length|partition_spec_id|  added_snapshot_id|added_data_files_count|existing_data_files_count|deleted_data_files_count|added_delete_files_count|existing_delete_files_count|deleted_delete_files_count|partition_summaries|
+-------+--------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+
|      0|iceberg-warehouse...|  5852|                0|8852684860612151750|                     1|                        0|                       0|                       0|                          0|                         0|                 []|


In [24]:
df=spark.sql("SELECT * FROM iceberg.db.my_table.partitions")
df.show()
df.write.save("/Users/anshumanr/Documents/Iceberg/Apache_iceberg/iceberg-warehouse/db/anime_users_score/partitions",header=True)

+------------+----------+----------------------------+--------------------------+----------------------------+--------------------------+
|record_count|file_count|position_delete_record_count|position_delete_file_count|equality_delete_record_count|equality_delete_file_count|
+------------+----------+----------------------------+--------------------------+----------------------------+--------------------------+
|           5|         5|                           0|                         0|                           0|                         0|
+------------+----------+----------------------------+--------------------------+----------------------------+--------------------------+

