In [None]:
# 🧪🧪 ONLY ONCE! 🧪🧪
# Get the NYC Taxi dataset from the network
!curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -o "/home/yarnapp/hopsfs/Resources/nyc_taxiparquet"

In [None]:
# Install the PyIceberg library, then fix the dependency problem with SQLAlchemy library
!pip install pyiceberg[pyarrow,duckdb,sql-sqlite] --upgrade
!pip install sqlalchemy --upgrade
#!pip install sqlalchemy==2.0.28 --upgrade
!pip install pandas --upgrade
!pip install polars

In [None]:
# Import the needed libraries
from pyiceberg.catalog import load_catalog
from pyiceberg.catalog.sql import SqlCatalog
import numpy as np
import pyarrow.parquet as pq
import pyarrow.compute as pc
import pyarrow
import pandas as pd
import polars as pl
import os
import importlib
from urllib.parse import urlparse
from typing import Dict, List
from pyarrow.fs import HadoopFileSystem
from functools import lru_cache
import time
import math
import string
import random
import sys
import warnings
from pyiceberg.exceptions import CommitFailedException

In [None]:
# Create a folder where to save on HopsFS
!mkdir /home/yarnapp/hopsfs/Resources/test_dir/
!mkdir /tmp/test_data/

In [None]:
catalog_file_path = "/home/yarnapp/hopsfs/Resources/test_dir/pyiceberg_catalog.db"
hdfs_path = "/tmp/test_data"

# Create a catalog
test_catalog = SqlCatalog(
    "default",
    **{
        "uri": f"sqlite:///{catalog_file_path}",
        "warehouse": f"{hdfs_path}",
        "hdfs.host": 'namenode.service.consul',
    },
)

# Print the object catalog, to show the catalog type
print(test_catalog)

In [None]:
# Load the data previously downloaded into a Parquet DataFrame (df)
nyc_data_path = "/home/yarnapp/hopsfs/Resources/nyc_taxiparquet"
df = pq.read_table(nyc_data_path)

In [None]:
# Create a new namespace
test_catalog.create_namespace("test_ns")

In [None]:
# Create a new table "test_table", specifying the schema according to the df's schema
test_table = test_catalog.create_table(
    "test_ns.nyc_taxi",
    schema=df.schema,
    location="/tmp/test_data"
)

In [None]:
# Converts the Arrow dataframe to a Polars Dataframe
polars_df = pl.from_arrow(df)

# Gets the name of the numerical columns of the dataframe
numerical_columns = [col for col, dtype in polars_df.schema.items() if dtype in [pl.Int64, pl.Float64]]

In [None]:
# Create a set of num_col casual words of lenghts N, for new column naming
# Every additional 1GB requires to create 40 new columns.
N = 7
num_col = 4000
names = set()
for i in range(num_col):
    names.add(''.join(random.choices(string.ascii_uppercase +
                                 string.digits, k=N)))
    
# Add num_col columns to the previous datafrane
for name in names:
    random_column = random.choice(numerical_columns)
    random_multiplier = random.uniform(1.5, 4.9)
    polars_df = polars_df.with_columns((pl.col(random_column) * random_multiplier).alias(name))

In [None]:
# Create an Arrow Table from a Pandas DataFrame, while tracking down the time needed for the operation.
start_time = time.time()
big_table  = polars_df.to_arrow()
end_time   = time.time()

In [None]:
req_time   = end_time - start_time
print("Time required: " + str(req_time))

In [None]:
print(str(int(sys.getsizeof(big_table))/(1024*1024*1024)) + " GBs occupied by Arrow's Table!")

In [None]:
# Update the Iceberg Table's schema
table_append = test_catalog.load_table("test_ns.nyc_taxi")
with table_append.update_schema() as update_schema:
    update_schema.union_by_name(big_table.schema)

#### Test the data insertion

Insert the full NYC taxi dataframe in the empy table created above.

In [None]:
# Append the dataframe to the Iceberg Table test_table, showing the difference between before and after the operation
# (Remove the commented lines below in order to check how many (new) rows are added to the Iceberg table during your operation(s))
table_append = test_catalog.load_table("test_ns.nyc_taxi")

print("Start APPEND")
#before_len = len(table_append.scan().to_arrow())

start_time = time.time()
table_append.append(big_table)
end_time   = time.time()
req_time   = end_time - start_time

print('End APPEND')
#after_len  = len(table_append.scan().to_arrow())
#print("Before the append operation, there were " + str(before_len) + " rows in the table")
#print("After  the append operation, there were " + str(after_len)  + " rows in the table")
print("Time required: " + str(req_time))

#### Test multiple APPEND operations

In order to test several consecutive APPEND operations, the Arrow Dataframe containing the NYC Taxi data is transformed in Pandas Dataframe, then divided in small part of 1000 rows each.
⚠️ Depending on "how_many" APPEND operations you want to perform, change the former parameters in the following cell.

Several errors might arise, but those should not be related to the functioning of the PyIceberg library: the problem should instead reside in the underlying infrastructure (Jupyter, Hopsworks UI, VM, File access permissions ...)

In [None]:
catalog = load_catalog("default",**{"uri":"sqlite:////home/yarnapp/hopsfs/Resources/test_dir/pyiceberg_catalog.db"})

In [None]:
# Load the data previously downloaded into a Parquet DataFrame (df)
nyc_data_path = "/home/yarnapp/hopsfs/Resources/nyc_taxiparquet"
arrow_df      = pq.read_table(nyc_data_path)

# Create a set for randomizing the insertion
insert_set = set()
for i in range(1, math.floor(arrow_df.shape[0]/1000), 2):
    insert_set.add(i)
    
# Transform the arrow dataframe into a pandas DataFrame
df_append = pd.DataFrame()
df_append = arrow_df.to_pandas()

# Set how many times you want to repeat the APPEND operation
how_many = 10

In [None]:
# Load the table where to append the new data
table_append = catalog.load_table("test_ns.nyc_taxi")

for i in range(how_many):
    elem = insert_set.pop()
    partial_df = df_append[elem*1000:1000*(elem + 1)]
    partial_table = pyarrow.Table.from_pandas(partial_df)

    # Append the dataframe to the test_table, showing the difference between before and after the operation
    print("Start APPEND")
    before_len = len(table_append.scan().to_arrow())
    
    table_append.append(partial_table)
    
    print('End APPEND')
    after_len  = len(table_append.scan().to_arrow())
    print("Before the append operation, there were " + str(before_len) + "rows in the table")
    print("After  the append operation, there were " + str(after_len)  + "rows in the table")
    
    if i == how_many - 1:
        print('\n\n ** All the APPEND operations have been completed **')

#### Test the schema evolution

In [None]:
# Create a new dataframe, equal to df but with a new column
updated_df = df.append_column("tip_per_mile", pc.divide(df["tip_amount"], df["trip_distance"]))

In [None]:
catalog = load_catalog("default",**{"uri":"sqlite:////home/yarnapp/hopsfs/Resources/test_dir/pyiceberg_catalog.db"})
table   = catalog.load_table("test_ns.nyc_taxi")

In [None]:
# Extract then the new schema information and save them in a new file
with table.update_schema() as update_schema:
    update_schema.union_by_name(updated_df.schema)
    
# Overwrite the previous table, replacing the old dataframe with a new one
table.overwrite(updated_df)
print(table.scan().to_arrow())

In [None]:
# Get the table into a pandas DataFrame, in order to verift its length and integrity.
prova = table.scan().to_pandas()

#### Test the table read operation

In [None]:
# Test "reading" (scan) operation on PyIceberg, and track the time needed
read_table = test_catalog.load_table("test_ns.nyc_taxi")
before_read = time.time()
read_df = read_table.scan().to_arrow()
after_read  = time.time()
total_time = after_read - before_read

print(total_time)
print("\n**Time needed to read: " + time_printer(total_time) + "**")
print(sys.getsizeof(read_df))

#### Test the table scan and file retrieval

In [None]:
df = table.scan(row_filter="tip_per_mile > 0").to_arrow()
len(df)

---
#### @FINAL Delete all the data and files created

In [None]:
# Just call it if you are at the end of your own test
! rm -r /home/yarnapp/hopsfs/Resources/test_dir
! rm -r /tmp/test_data/