In [None]:
!pip install pandas python-dotenv
! pip install cx_Oracle sqlalchemy

In [19]:
from dotenv import load_dotenv
load_dotenv('.env')

import os
hostname = os.getenv('hostname')
port = os.getenv('port')
sid = os.getenv('sid')
service_name = os.getenv('service_name')
username = os.getenv('username')
password = os.getenv('password')

In [None]:
import pandas as pd 
import time

In [None]:
df = pd.read_csv('./transaction_data.csv')
# fix format time 
df["TransactionTime"] = pd.to_datetime(df['TransactionTime'])
df = df.sort_values(by='TransactionTime', ascending=True)

In [None]:
df.head()

In [20]:
from sqlalchemy import create_engine
import cx_Oracle

# Set up the Oracle connection
dsn = cx_Oracle.makedsn(host=hostname, port=port, service_name=service_name) # result: (DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=localhost)(PORT=1521))(CONNECT_DATA=(SERVICE_NAME=ORCLPDB1)))
connection = cx_Oracle.connect(user=username, password=password, dsn=dsn)

# Create an SQLAlchemy engine
engine = create_engine(f'oracle+cx_oracle://{username}:{password}@{dsn}')

In [21]:

print("Test connection query")
pd.read_sql_query("SELECT * FROM DEBEZIUM.PRODUCTS", engine).head(1)

Test connection query


Unnamed: 0,id,name,description,weight
0,101,scooter,Small 2-wheel scooter,3.14


In [22]:
from sqlalchemy.dialects.oracle import (
                                        FLOAT,
                                        NUMBER,
                                        VARCHAR2,
                                        DATE
                                        )

dtype = {"UserId" : NUMBER,
         "TransactionId" : NUMBER,      
         "TransactionTime": DATE,
         "ItemCode": NUMBER,
         "ItemDescription": VARCHAR2(255),
         "NumberOfItemsPurchased": NUMBER,
         "CostPerItem": FLOAT,
         "Country": VARCHAR2(124),
        }

In [None]:
# # Define the table name and schema
# table_name = 'test_table'
# schema_name = 'C##KEVIN'

Update : 
- Import data transaksi was down,
- make looping for insert data oracle, for testing cdc with delay 10 second

In [23]:
table_name = "TRANSACTIONS"
schema_name = 'DEBEZIUM'

In [24]:
# runing query in oracle
from sqlalchemy import create_engine, Table, MetaData, Column, Integer, String, Float, Date

# Initialize metadata object
metadata = MetaData()

# Define the table
table = Table(
   table_name, metadata, 
   Column('UserId', Integer), 
   Column('TransactionId', Integer),
   Column('TransactionTime', Date),
   Column('ItemCode', Integer),
   Column('ItemDescription', String(255)),
   Column('NumberOfItemsPurchased', Integer),
   Column('CostPerItem', Float),
   Column('Country', String(124)),
   schema=schema_name
)

# Create the table
metadata.create_all(engine)

Grant that table was create for make able to CDC to kafka

In [25]:
from sqlalchemy import text

# Connect to the database
connection = engine.connect()

# Define your SQL queries
sql_query1 = text(f"GRANT SELECT ON {schema_name}.{table_name} TO c##dbzuser")
sql_query2 = text(f"ALTER TABLE {schema_name}.{table_name} ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS")

# Execute the queries
connection.execute(sql_query1)
print(f"Permissions granted successfully on {schema_name}.{table_name} to c##dbzuser")

connection.execute(sql_query2)
print(f"Table {schema_name}.{table_name} altered successfully")


Permissions granted successfully on DEBEZIUM.TRANSACTIONS to c##dbzuser
Table DEBEZIUM.TRANSACTIONS altered successfully


In [None]:
# Export the DataFrame to the Oracle database

# Connect to the database
connection = engine.connect()

print("Import bulk data to Oracle database")
df.to_sql(table_name, connection, schema=schema_name, if_exists='append', index=False, dtype=dtype)


In [27]:
with engine.connect() as connection:
    for i, row in df.iterrows():
        row_df = pd.DataFrame(row).T # Transform the row into a DataFrame
        lower_case_table_name = table_name.lower() # Convert table name to lower case
        row_df.to_sql(lower_case_table_name, connection, schema=schema_name, if_exists='append', index=False, dtype=dtype)
        print(f"Inserted row {i + 1} into {lower_case_table_name}")
        time.sleep(1) # Wait for 1 second

Inserted row 538324 into transactions
Inserted row 933959 into transactions
Inserted row 577479 into transactions
Inserted row 846357 into transactions
Inserted row 55170 into transactions
Inserted row 635174 into transactions
Inserted row 1024277 into transactions
Inserted row 466406 into transactions
Inserted row 323264 into transactions
Inserted row 918917 into transactions
Inserted row 412256 into transactions
Inserted row 920592 into transactions


KeyboardInterrupt: 

In [None]:
# Close the connection  
connection.close()