In [1]:
import pandas as pd
from sqlalchemy.engine import create_engine
from trino.dbapi import connect

csv_file_path = '/home/jovyan/data/stage/CashTransactionHistorical.txt'
# delimiter is '|'
df = pd.read_csv(csv_file_path, delimiter='|', header=None, names=['id', 'timestamp', 'amount', 'description'])


In [2]:


# Establish a connection to Trino
conn = connect(
    host='trino',  # The hostname should match the service name in docker-compose
    port=8060,     # The Trino coordinator port
    user='trino',  # The username
    catalog='iceberg',  # The catalog
    schema='default'  # The schema, adjust if you have a different schema
)

# Create a cursor object using the connection
cur = conn.cursor()

# Check if the 'default' schema exists, and create it if not
cur.execute("SHOW SCHEMAS")
schemas = [schema[0] for schema in cur.fetchall()]
print(schemas)
if 'default' not in schemas:
    cur.execute("CREATE SCHEMA default")



['default', 'information_schema']


In [3]:
cur.execute("DROP TABLE IF EXISTS cash_transactions")
conn.commit()
# Define the SQL query to create an Iceberg table
create_table_sql = """
CREATE TABLE IF NOT EXISTS cash_transactions (
    id INTEGER,
    timestamp TIMESTAMP(6),
    amount DOUBLE,
    description VARCHAR(100)
)
WITH (
    format = 'PARQUET',
    partitioning = ARRAY['HOUR(timestamp)']
)
"""
# Execute the create table query using the cursor
cur.execute(create_table_sql)

# Commit the transaction
conn.commit()

In [4]:
from time import time
# Create a SQLAlchemy engine
engine = create_engine('trino://trino@trino:8060/iceberg/default')

# Define the chunk size
chunk_size = 100  # Adjust the chunk size based on your needs and Trino's limits
t = time()
df['timestamp'] = pd.to_datetime(df['timestamp'])  # Cast timestamp column to datetime
# Insert the data into the Iceberg table in chunks
df.to_sql(
    'cash_transactions', 
    con=engine, 
    schema='default', 
    if_exists='append', 
    index=False,
    method='multi',
    chunksize=chunk_size  # Insert data in chunks
)
print(f"Took {time() - t:.4f}s to insert data")

EXECUTE IMMEDIATE not available for trino:8060; defaulting to legacy prepared statements (TrinoUserError(type=USER_ERROR, name=SYNTAX_ERROR, message="line 1:19: mismatched input ''SELECT 1''. Expecting: 'USING', <EOF>", query_id=20240419_112030_15024_zwwbp))
EXECUTE IMMEDIATE not available for trino:8060; defaulting to legacy prepared statements (TrinoUserError(type=USER_ERROR, name=SYNTAX_ERROR, message="line 1:19: mismatched input ''SELECT 1''. Expecting: 'USING', <EOF>", query_id=20240419_122031_24676_zwwbp))


Took -5862.6933 to insert data


In [5]:
# Define the SQL query to select all rows from the cash_transactions table
query = """
SELECT *
FROM cash_transactions
ORDER BY timestamp DESC
LIMIT 10
"""
cur.execute(query)

# Fetch and print the results
rows = cur.fetchall()
for row in rows:
    print(row)


[5265, datetime.datetime(2016, 11, 7, 13, 1, 45), 196416.46, 'UfHOD WNzXUGTFqBgFDJvJpAw']
[4101, datetime.datetime(2016, 11, 7, 12, 49, 59), -2741.02, 'VdosndUR qx']
[4927, datetime.datetime(2016, 11, 7, 3, 31, 17), -20633.34, 'GDYnJzuiJSIKCRVpKPIiAzrRMgxXMRSBRtkRHdiqLvNm bItowJKGK']
[5756, datetime.datetime(2016, 11, 7, 1, 52, 5), -728028.57, 'eihYRGRJHtEyAeFLNwgKcfsLDknZYhUJ']
[278, datetime.datetime(2016, 11, 6, 9, 38, 36), -9737.21, 'OUOpGOvpeZXNdEDXRJkvF hZLYHDIBiYQVWNwrLhUGbUiyMYTkIQJSKSWOqQrhiqKHXfgxrRFnFyqrhAOErBWr']
[2600, datetime.datetime(2016, 11, 6, 6, 26, 4), -90385.22, 'HDSzYqiMoGAQmNGEuLEZNRUmLZojkEOnJwcPLTSmRqHNGrxNhxy']
[2402, datetime.datetime(2016, 11, 6, 3, 37, 48), 9326.28, 'IwaulMNCzquzIMgzBSbpptyAoUJTsFugJP']
[1699, datetime.datetime(2016, 11, 6, 1, 55, 37), 26912.14, 'gPQxqAxoHDdnSeEKAVoOEOW mxG']
[1521, datetime.datetime(2016, 11, 5, 20, 47, 4), -9024.71, 'vitYIzwFCaVTtYLZKAsBMmuyIHAWqBnZNsTTG']
[4495, datetime.datetime(2016, 11, 5, 14, 18, 31), -5122.96, 'dDs

In [6]:
# Close the cursor and connection
cur.close()
conn.close()

In [8]:
%%markdown
# Trying out the ingested table

# Trying out the ingested table


In [None]:
# Establish a connection to Trino
conn = connect(
    host='trino',  # The hostname should match the service name in docker-compose
    port=8060,     # The Trino coordinator port
    user='trino',  # The username
    catalog='iceberg',  # The catalog
    schema='default'  # The schema, adjust if you have a different schema
)

# Create a cursor object using the connection
cur = conn.cursor()
