In [None]:
!pip install findspark
import findspark

findspark.init("c:/spark")


In [34]:
import pyspark
from pyspark.sql import SparkSession
import os


conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
  		#packages
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.6.1')
  		#SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
  		#Configuring Catalog
        .set('spark.sql.catalog.iceberg', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.iceberg.type','hadoop')
        .set('spark.sql.catalog.iceberg.warehouse', 'iceberg-warehouse')
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

## Run Queries
spark.sql("CREATE OR REPLACE TABLE iceberg.table1 (name STRING) USING iceberg")

spark


Spark Running


In [35]:
spark.sql("INSERT INTO iceberg.table1 VALUES ('Alex Merced'), ('Dipankar Mazumdar'), ('Jason Huges')")

spark.sql("SELECT * FROM iceberg.table1").show()

+-----------------+
|             name|
+-----------------+
|      Alex Merced|
|Dipankar Mazumdar|
|      Jason Huges|
+-----------------+



In [36]:
spark.sql("SHOW TBLPROPERTIES  iceberg.table1").show(truncate=False)

+-------------------------------+-------------------+
|key                            |value              |
+-------------------------------+-------------------+
|current-snapshot-id            |7070471910204122103|
|format                         |iceberg/parquet    |
|format-version                 |2                  |
|write.parquet.compression-codec|zstd               |
+-------------------------------+-------------------+



In [37]:
spark.sql("describe formatted  iceberg.table1").show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|                name|              string|   null|
|                    |                    |       |
|  # Metadata Columns|                    |       |
|            _spec_id|                 int|       |
|          _partition|            struct<>|       |
|               _file|              string|       |
|                _pos|              bigint|       |
|            _deleted|             boolean|       |
|                    |                    |       |
|# Detailed Table ...|                    |       |
|                Name|      iceberg.table1|       |
|                Type|             MANAGED|       |
|            Location|iceberg-warehouse...|       |
|            Provider|             iceberg|       |
|               Owner|         TommyShelby|       |
|    Table Properties|[current-snapshot...|       |
+-----------

In [38]:
spark.sql("describe table iceberg.table1").show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|    name|   string|   null|
+--------+---------+-------+



In [39]:
df = spark.read.format('CSV').option('header','true').load('movies_2482024.csv')
df.createOrReplaceTempView('temp_movies')
df = spark.sql('select * from temp_movies where year > 1000 and id > 0')
df.createOrReplaceTempView('temp_movies')
df.show()
# spark.sql("CREATE OR REPLACE TABLE iceberg.movies (id INT, name STRING, year INT, ranking STRING) partitioned by (year) location 'iceberg-warehouse/movies' tblproperties ('table_type'='ICEBERG') ")

+---+--------------------+----+-------+
| id|               movie|year|ranking|
+---+--------------------+----+-------+
|  2|                   $|1971|    6.4|
| 11|   $1000 a Touchdown|1939|    6.7|
| 13|$21 a Day Once a ...|1941|     \N|
| 15|                 $30|1999|    7.5|
| 17|      $300 y tickets|2002|     \N|
| 21|           $5.15/Hr.|2004|     \N|
| 31|               $pent|2000|    4.3|
| 32|     $ucces Part One|1986|     \N|
| 33|             $windle|2002|    5.4|
| 34|             & frres|2000|     \N|
| 36|                '15'|2002|    6.8|
| 37|'24-25' ne vozvra...|1968|     \N|
| 38|                 '38|1987|    6.7|
| 39|                 '42|1951|     \N|
| 41|             '49-'17|1917|    5.8|
| 42|'60s Pop Rock Reu...|2004|     \N|
| 44|                 '68|1988|    5.5|
| 47|     '88 Dodge Aries|2002|     \N|
| 48| '93 jie tou ba wang|1993|     \N|
| 49|'94 du bi dao zhi...|1994|    5.7|
+---+--------------------+----+-------+
only showing top 20 rows



In [40]:
spark.sql('select * from temp_movies ').show()

+---+--------------------+----+-------+
| id|               movie|year|ranking|
+---+--------------------+----+-------+
|  2|                   $|1971|    6.4|
| 11|   $1000 a Touchdown|1939|    6.7|
| 13|$21 a Day Once a ...|1941|     \N|
| 15|                 $30|1999|    7.5|
| 17|      $300 y tickets|2002|     \N|
| 21|           $5.15/Hr.|2004|     \N|
| 31|               $pent|2000|    4.3|
| 32|     $ucces Part One|1986|     \N|
| 33|             $windle|2002|    5.4|
| 34|             & frres|2000|     \N|
| 36|                '15'|2002|    6.8|
| 37|'24-25' ne vozvra...|1968|     \N|
| 38|                 '38|1987|    6.7|
| 39|                 '42|1951|     \N|
| 41|             '49-'17|1917|    5.8|
| 42|'60s Pop Rock Reu...|2004|     \N|
| 44|                 '68|1988|    5.5|
| 47|     '88 Dodge Aries|2002|     \N|
| 48| '93 jie tou ba wang|1993|     \N|
| 49|'94 du bi dao zhi...|1994|    5.7|
+---+--------------------+----+-------+
only showing top 20 rows



In [41]:
spark.sql('Create TABLE IF NOT EXISTS iceberg.movies(id int, movie string, year int, ranking string) using ICEBERG PARTITIONED BY (year)')


DataFrame[]

In [42]:
# spark.sql("Create TABLE IF NOT EXISTS iceberg.movies using ICEBERG partitioned by (year) AS select CAST(id as decimal(14,0)) as id, movie, CAST(year as decimal(14,0)) as year, ranking from temp_movies")
spark.sql("insert overwrite table iceberg.movies select CAST(id as decimal(14,0)) as id, movie, CAST(year as decimal(14,0)) as year, ranking from temp_movies").show()

++
||
++
++



In [43]:
out_df = spark.sql('select * from iceberg.movies')
out_df.show()

+-----+--------------------+----+-------+
|   id|               movie|year|ranking|
+-----+--------------------+----+-------+
|12718|Alumnos de Chapul...|1896|     \N|
|12719|Alumnos de Chapul...|1896|     \N|
|13895|American Falls fr...|1896|     \N|
|15226|          Amy Muller|1896|    4.3|
|17624|Annabelle in Flag...|1896|     \N|
|19429|            Aquarium|1896|     \N|
|20816|Arrive d'un train...|1896|     \N|
|20817|Arrive d'un train...|1896|     \N|
|20820|Arrive d'un train...|1896|     \N|
|21588|Asalto a sable de...|1896|     \N|
|23212|Atlantic City Bat...|1896|     \N|
|23215|Atlantic City Boa...|1896|     \N|
|23216|Atlantic City Boa...|1896|     \N|
|24687|           Autruches|1896|    5.4|
|24981|Avenida da Liberdade|1896|     \N|
|25396|    Awakening of Rip|1896|     \N|
|25693|  Azenhas no Rio Ave|1896|     \N|
|26189|      Babies Quarrel|1896|    6.4|
|27835|     Baignade en mer|1896|     \N|
|27862|Baile de la romer...|1896|     \N|
+-----+--------------------+----+-

In [None]:
spark.sql("update iceberg.movies set ranking = '8.5' where id=13895")
spark.sql("select * from iceberg.movies where ranking = '8.5'").show()

In [44]:
spark.sql('show partitions iceberg.movies').show()
#spark UI


AnalysisException: Table iceberg.movies does not support partition management.; line 1 pos 16;
ShowPartitions [partition#666]
+- ResolvedTable org.apache.iceberg.spark.SparkCatalog@2d03eb6, movies, iceberg.movies, [id#667, movie#668, year#669, ranking#670]


In [None]:
# out_df.write.format("csv").save('movies_refined')

In [46]:
x=spark.sql('SELECT * FROM iceberg.movies.files;')
x.show(truncate=False,vertical = True)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 content            | 0                                                                                                                                                                
 file_path          | iceberg-warehouse/movies/data/year=1896/00000-38-e1576c44-0bec-4160-b8c2-9bbf8758a64f-0-00003.parquet                                                            
 file_format        | PARQUET                                                                                                                                                          
 spec_id            | 0                                                                                                                                                                
 partition          | {1896}                                                    

In [45]:
spark.sql('SELECT * FROM iceberg.movies.history;').show()

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2024-09-10 13:26:...|9171394837912538288|               null|               true|
|2024-09-10 13:44:...|5073614139514850453|9171394837912538288|               true|
|2024-09-11 12:51:...|1761602126054006653|5073614139514850453|               true|
+--------------------+-------------------+-------------------+-------------------+



In [47]:
x=spark.sql('SELECT * FROM iceberg.movies.snapshots;')
x.show(truncate=False,vertical = True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 committed_at  | 2024-09-10 13:26:29.106                                                                                                                                                                                                                                                                                                                                                                                           

In [48]:
spark.sql("select h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id'] from iceberg.movies.history h join iceberg.movies.snapshots s on h.snapshot_id = s.snapshot_id order by made_current_at").show(truncate=False,vertical = True)

-RECORD 0----------------------------------------
 made_current_at       | 2024-09-10 13:26:29.106 
 operation             | overwrite               
 snapshot_id           | 9171394837912538288     
 is_current_ancestor   | true                    
 summary[spark.app.id] | local-1725954925756     
-RECORD 1----------------------------------------
 made_current_at       | 2024-09-10 13:44:13.919 
 operation             | overwrite               
 snapshot_id           | 5073614139514850453     
 is_current_ancestor   | true                    
 summary[spark.app.id] | local-1725954925756     
-RECORD 2----------------------------------------
 made_current_at       | 2024-09-11 12:51:12.552 
 operation             | overwrite               
 snapshot_id           | 1761602126054006653     
 is_current_ancestor   | true                    
 summary[spark.app.id] | local-1725954925756     



In [49]:
spark.sql('select * from iceberg.movies.manifests').show(truncate=False, vertical=True)
"""
Fields within partition_summaries column of the manifests table correspond to field_summary struct within manifest_list, with the following order: 
contains_null - contains_nan - lower_bound - upper_bound
Note: contains_nan could return null, which indicates that this information is not available from files metadata
"""

-RECORD 0-----------------------------------------------------------------------------------------------------
 content                     | 0                                                                              
 path                        | iceberg-warehouse/movies/metadata/3c7045ed-4725-4ee5-b74c-48d902603202-m1.avro 
 length                      | 15908                                                                          
 partition_spec_id           | 0                                                                              
 added_snapshot_id           | 1761602126054006653                                                            
 added_data_files_count      | 122                                                                            
 existing_data_files_count   | 0                                                                              
 deleted_data_files_count    | 0                                                                              
 

'\nFields within partition_summaries column of the manifests table correspond to field_summary struct within manifest_list, with the following order: \ncontains_null - contains_nan - lower_bound - upper_bound\nNote: contains_nan could return null, which indicates that this information is not available from files metadata\n'

In [59]:
df = spark.read.format('CSV').option('header','true').load('movies_refined/1.csv')
df.createOrReplaceTempView('temp_movies_r')
df = spark.sql('select * from temp_movies_r where year >= 1995 and year <= 2000')
df.createOrReplaceTempView('temp_movies_r')
df.show()
spark.sql("insert overwrite table iceberg.movies select CAST(id as decimal(14,0)) as id, movie, CAST(year as decimal(14,0)) as year, ranking from temp_movies_r").show()

+---+--------------------+----+-------+
| id|               movie|year|ranking|
+---+--------------------+----+-------+
| 15|                 $30|1999|    7.5|
| 31|               $pent|2000|    4.3|
| 34|             & frres|2000|     \N|
| 64|       'Bats' Abound|1999|     \N|
| 66|'Betty Bee' (sopr...|1999|     \N|
|101|'El Chicko' - der...|1995|    4.1|
|118|'Halloween' Unmas...|1999|     \N|
|136|'Java Madness' fo...|1995|    6.6|
|145|'Ks-Bels' letmdja...|1995|     \N|
|152|            'M' Word|1996|     \N|
|155|'Man Who Couldn't...|1996|     \N|
|176|'N Sync & Britney...|2000|     \N|
|177|  'N Sync In Concert|1998|     \N|
|178|'N Sync: 'Ntimate...|2000|     \N|
|179|'N Sync: Live Fro...|2000|     \N|
|180|'N Sync: Making t...|2000|     \N|
|181|'N Sync: No Strin...|1999|     \N|
|184|'Ne gnstige Geleg...|1999|    4.6|
|199|         'O' de amor|1995|     \N|
|201|'On the Inside: C...|2000|     \N|
+---+--------------------+----+-------+
only showing top 20 rows

++
||
++
++



In [60]:
spark.sql('select count(*) from iceberg.movies').show(truncate=False, vertical=True)

-RECORD 0---------
 count(1) | 48230 



In [61]:
spark.sql('select count(*) from iceberg.movies for SYSTEM_TIME as of "2024-09-11 12:23:54.899"').show(truncate=False, vertical=True)

-RECORD 0---------
 count(1) | 86714 



In [62]:
spark.sql('select count(*) from iceberg.movies for SYSTEM_VERSION as of "9171394837912538288"').show(truncate=False, vertical=True)

-RECORD 0----------
 count(1) | 302134 



In [None]:
spark.sql('select count(*) from iceberg.movies for SYSTEM_VERSION as of "3647774809020789135"').show(truncate=False, vertical=True)

In [None]:
spark.sql('select count(*) from iceberg.movies for SYSTEM_VERSION as of "3651939683099648566"').show(truncate=False, vertical=True)

In [None]:
spark.sql('select count(*) from iceberg.movies for SYSTEM_VERSION as of "8158436517200082356"').show(truncate=False, vertical=True)

In [63]:
spark.sql("ROLLBACK TABLE iceberg.movies TO SNAPSHOT '6687819643318083920'")
# spark.sql("CALL iceberg.system.rollback_to_snapshot('iceberg.movies', '6687819643318083920')")
# spark.sql("ALTER TABLE iceberg.movies EXECUTE ROLLBACK(3651939683099648566);")

ParseException: 
Operation not allowed: ROLLBACK.(line 1, pos 0)

== SQL ==
ROLLBACK TABLE iceberg.movies TO SNAPSHOT '6687819643318083920'
^^^


In [None]:
# !spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.4.0 --verbose

In [None]:
spark.read.option("snapshot-id", 6687819643318083920).table("iceberg.movies").show()

In [None]:
spark.sql("select * from iceberg.movies where year >= 2000 and year < 2002").show()