In [1]:
import logging
import os
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, IntegerType, StringType, FloatType, TimestampType
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform
from raw_s3_iceberg_etl import *

In [2]:
# ----- Load .env and conf -----
load_dotenv()
config = configparser.ConfigParser()
config.read_file(open(r'conf'))
s3_object_key_path = config.get('Upload To S3', 's3_object_key_path')
s3_lakehouse_path  = config.get('Raw S3 Iceberg Lakehouse ETL', 's3_lakehouse_path')
gz_file_name       = config.get('Raw S3 Iceberg Lakehouse ETL', 'gz_file_name')
local_gz_dir_path  = config.get('Raw S3 Iceberg Lakehouse ETL', 'local_gz_dir_path')
region_name        = config.get('Raw S3 Iceberg Lakehouse ETL', 'region_name')
# ----- Load .env and conf -----

In [20]:
catalog = load_local_sqlite_catalog()
catalog

lakehouse (<class 'pyiceberg.catalog.sql.SqlCatalog'>)

In [27]:
create_raw_schema(catalog = catalog, name_space = 'PacketX_Raw', table_name = 'Packets')
iceberg_table = catalog.load_table("PacketX_Raw.Packets")
iceberg_table

Packets(
  1: user: optional string,
  2: time_stamp: required timestamp,
  3: source_ip: required string,
  4: destination_ip: required string,
  5: bandwidth_kb: required float,
  6: id: required string
),
partition by: [partition_date],
sort order: [],
snapshot: Operation.APPEND: id=2939511074434193825, parent_id=5302911279310217071, schema_id=0

In [33]:
date_filter = gz_file_name.split('.')[0].split('-')
day_filter = date_filter[1] + '-' + date_filter[2] + '-' + date_filter[3]

start_time = f"{day_filter}T00:00:00"
end_time = f"{day_filter}T23:59:59"

filtered_day_table = iceberg_table.scan(row_filter=f"time_stamp >= '{start_time}' AND time_stamp <= '{end_time}'")

In [35]:
filtered_day_table.to_arrow()

pyarrow.Table
user: large_string
time_stamp: timestamp[us] not null
source_ip: large_string not null
destination_ip: large_string not null
bandwidth_kb: float not null
id: large_string not null
----
user: [["dyab","dyab","dyab","dyab","dyab",...,"dyab","dyab","dyab","dyab","dyab"]]
time_stamp: [[2025-03-20 19:15:00.000000,2025-03-20 19:15:00.000000,2025-03-20 19:15:00.000000,2025-03-20 19:15:00.000000,2025-03-20 19:15:00.000000,...,2025-03-20 23:59:00.000000,2025-03-20 23:59:00.000000,2025-03-20 23:59:00.000000,2025-03-20 23:59:00.000000,2025-03-20 23:59:00.000000]]
source_ip: [["192.168.1.5","3.164.182.84","192.168.1.5","20.42.73.31","51.104.15.253",...,"192.168.1.5","192.168.1.5","192.168.1.5","192.168.1.5","192.168.1.5"]]
destination_ip: [["150.171.27.11","192.168.1.5","104.208.16.92","192.168.1.5","192.168.1.5",...,"108.159.102.123","163.121.128.134","163.121.128.135","192.168.1.1","108.159.108.209"]]
bandwidth_kb: [[2.02539,0.158203,9.6084,0.170898,40,...,18.0078,13.3828,10.5146,2

In [None]:
filtered_day_table.to_pandas()

Unnamed: 0,user,time_stamp,source_ip,destination_ip,bandwidth_kb,id
0,dyab,2025-03-20 19:15:00,192.168.1.5,150.171.27.11,2.025390,dyab-2025-03-20 19:15:00.000000-1
1,dyab,2025-03-20 19:15:00,3.164.182.84,192.168.1.5,0.158203,dyab-2025-03-20 19:15:00.000000-2
2,dyab,2025-03-20 19:15:00,192.168.1.5,104.208.16.92,9.608400,dyab-2025-03-20 19:15:00.000000-3
3,dyab,2025-03-20 19:15:00,20.42.73.31,192.168.1.5,0.170898,dyab-2025-03-20 19:15:00.000000-4
4,dyab,2025-03-20 19:15:00,51.104.15.253,192.168.1.5,40.000000,dyab-2025-03-20 19:15:00.000000-5
...,...,...,...,...,...,...
7038,dyab,2025-03-20 23:59:00,192.168.1.5,108.159.102.123,18.007799,dyab-2025-03-20 23:59:00.000000-7039
7039,dyab,2025-03-20 23:59:00,192.168.1.5,163.121.128.134,13.382800,dyab-2025-03-20 23:59:00.000000-7040
7040,dyab,2025-03-20 23:59:00,192.168.1.5,163.121.128.135,10.514600,dyab-2025-03-20 23:59:00.000000-7041
7041,dyab,2025-03-20 23:59:00,192.168.1.5,192.168.1.1,27.179701,dyab-2025-03-20 23:59:00.000000-7042


In [None]:
def aggregate_bandwidth_by_user(df):
    grouped = df.group_by('user').aggregate([
        ('bandwidth_kb', 'sum')
    ])
    
    return grouped.rename_columns(['user', 'total_bandwidth_kb'])

In [None]:
aggregate_bandwidth_by_user(filtered_day_table.to_arrow())

pyarrow.Table
user: large_string
total_bandwidth_kb: double
----
user: [["dyab"]]
total_bandwidth_kb: [[153132.5259416923]]