In [2]:
import sys

import ibis
import ibis.expr.datatypes as dt
import ibis.expr.schema as sch
from kafka import KafkaConsumer, TopicPartition
from pyflink.table import EnvironmentSettings, TableEnvironment
import pandas as pd
import json

In [5]:
consumer = KafkaConsumer() # bootstrap_servers='localhost:9092'
consumer.topics()

{'category_trans_amt_last_10080min', 'transaction'}

In [6]:
local = True

# 1. create a TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# write all the data to one file
table_env.get_config().set("parallelism.default", "1")

# The `flink` backend does not create `TableEnvironment` objects; pass
# the `TableEnvironment` object created above to `ibis.flink.connect`.
connection = ibis.flink.connect(table_env)

# Flink’s streaming connectors aren't part of the binary distribution.
# Link the Kafka connector for cluster execution by adding a JAR file.
connection._exec_sql("ADD JAR '../flink-sql-connector-kafka-3.0.2-1.18.jar'")

<pyflink.table.table_result.TableResult at 0x7fe6c8c88eb0>

In [None]:
class CreditCardTransaction:
    trans_num: dt.str # primary key
    user_id: dt.int64 # user_id
    cc_num: dt.int64 # creadit card number
    amt: dt.float64 # credit card transaction amount
    merchant: dt.str
    category: dt.str     
    is_fraud: dt.int32 # Fraud Label
    first: dt.str # first name
    last: dt.str # last name
    dob: dt.str # date of birth
    zipcode: dt.str
    trans_date_trans_time: dt.timestamp(scale=3)

In [8]:
# 2. Create source table
source_topic_name = "transaction"
kafka_offset = "earliest-offset"
source_schema = sch.Schema(
    {
        "trans_num": dt.str,
        "user_id": dt.int64,
        "cc_num": dt.int64,
       
        
        "amt": dt.float64,
        
        "merchant": dt.str,
        "category": dt.str ,      
        "is_fraud": dt.int32,
        "first": dt.str,
        "last": dt.str,
        "dob": dt.str,
        "zipcode": dt.str,
         "trans_date_trans_time": dt.timestamp(scale=3),
    }
)

# Configure the source table with Kafka connector properties.
source_configs = {
    "connector": "kafka",
    "topic": source_topic_name,
    "properties.bootstrap.servers": "localhost:9092" if local else "kafka:29092",
    "properties.group.id": "test",
    "scan.startup.mode": kafka_offset,
    "format": "json",
}

# Create the source table using the defined schema, Kafka connector properties,
# and set watermarking for real-time processing with a 15-second delay.
source_table = connection.create_table(
    source_topic_name,
    schema=source_schema,
    tbl_properties=source_configs,
    watermark=ibis.watermark(
        time_col="trans_date_trans_time", allowed_delay=ibis.interval(seconds=15)
    ),
)

In [9]:
source_table

In [14]:
original_cols = source_table.columns

In [11]:
# 2. Create feature table
interval_in_minutes = 10080
source_topic_name = f"category_trans_amt_last_{interval_in_minutes}min"
kafka_offset = "earliest-offset"
source_schema = sch.Schema(
    {
        "category": dt.str,
        f"category_max_trans_amt_last_{interval_in_minutes}min": dt.float64,
        f"category_min_trans_amt_last_{interval_in_minutes}min": dt.float64,
        f"category_mean_trans_amt_last_{interval_in_minutes}min": dt.float64,
        f"category_trans_count_last_{interval_in_minutes}min": dt.int64,
        "trans_date_trans_time": dt.timestamp(scale=3), # used for future temporal join
    }
)

# Configure the source table with Kafka connector properties.
source_configs = {
    "connector": "kafka",
    "topic": source_topic_name,
    "properties.bootstrap.servers": "localhost:9092" if local else "kafka:29092",
    "properties.group.id": "test",
    "scan.startup.mode": kafka_offset,
    "format": "json",
}

# Create the source table using the defined schema, Kafka connector properties,
# and set watermarking for real-time processing with a 15-second delay.
feature_table = connection.create_table(
    source_topic_name,
    schema=source_schema,
    tbl_properties=source_configs,
    watermark=ibis.watermark(
        time_col="trans_date_trans_time", allowed_delay=ibis.interval(seconds=15)
    ),
)

In [12]:
feature_table

In [13]:
train_data = source_table.asof_join(feature_table, predicates=[source_table['trans_date_trans_time'] >= feature_table['trans_date_trans_time'], source_table['category'] == feature_table['category']])

In [22]:
original_cols

['user_id',
 'trans_date_trans_time',
 'cc_num',
 'amt',
 'trans_num',
 'merchant',
 'category',
 'is_fraud',
 'first',
 'last',
 'dob',
 'zipcode']

In [15]:
train_data = train_data[original_cols + ['category_max_trans_amt_last_10080min', 'category_min_trans_amt_last_10080min', 'category_mean_trans_amt_last_10080min', 'category_trans_count_last_10080min'] ]

In [18]:
class CreditCardTransaction:
    trans_num: dt.str # primary key
    user_id: dt.int64 # user_id
    cc_num: dt.int64 # creadit card number
    amt: dt.float64 # credit card transaction amount
    merchant: dt.str
    category: dt.str     
    is_fraud: dt.int32 # Fraud Label
    first: dt.str # first name
    last: dt.str # last name
    dob: dt.str # date of birth
    zipcode: dt.str
    trans_date_trans_time: dt.timestamp(scale=3)

In [19]:
sink_schema = sch.Schema(
    {
        "trans_num": dt.str,
        "user_id": dt.int64,
        "cc_num": dt.int64,
        "amt": dt.float64,
        "merchant": dt.str,
        "category": dt.str ,      
        "is_fraud": dt.int32,
        "first": dt.str,
        "last": dt.str,
        "dob": dt.str,
        "zipcode": dt.str,
        "trans_date_trans_time": dt.timestamp(scale=3),
        'category_max_trans_amt_last_10080min': dt.float64,
        'category_min_trans_amt_last_10080min': dt.float64,
        'category_mean_trans_amt_last_10080min': dt.float64,
        'category_trans_count_last_10080min': dt.float64,
    }
)

sink_configs = {
    "connector": "kafka",
    "topic": "train_data",
    "properties.bootstrap.servers": "localhost:9092" if local else "kafka:29092",
    "scan.startup.mode": "earliest-offset",
    # "format": "json",
    "format": "debezium-json",
}

t0 = connection.create_table(
    "train_data",
    schema=sink_schema,
    tbl_properties=sink_configs,
    watermark=ibis.watermark(
        time_col="trans_date_trans_time", allowed_delay=ibis.interval(seconds=15)
    ),
    primary_key = "trans_num",
    overwrite=True
)

In [20]:
t0

In [21]:
connection.insert("train_data", train_data)

Py4JJavaError: An error occurred while calling o8.executeSql.
: org.apache.flink.table.api.ValidationException: Column types of query result and sink for 'default_catalog.default_database.train_data' do not match.
Cause: Incompatible types for sink column 'trans_num' at position 0.

Query schema: [user_id: BIGINT, trans_date_trans_time: TIMESTAMP(3) *ROWTIME*, cc_num: BIGINT, amt: DOUBLE, trans_num: STRING, merchant: STRING, category: STRING, is_fraud: INT, first: STRING, last: STRING, dob: STRING, zipcode: STRING, category_max_trans_amt_last_10080min: DOUBLE, category_min_trans_amt_last_10080min: DOUBLE, category_mean_trans_amt_last_10080min: DOUBLE, category_trans_count_last_10080min: BIGINT]
Sink schema:  [trans_num: STRING, user_id: BIGINT, cc_num: BIGINT, amt: DOUBLE, merchant: STRING, category: STRING, is_fraud: INT, first: STRING, last: STRING, dob: STRING, zipcode: STRING, trans_date_trans_time: TIMESTAMP(3), category_max_trans_amt_last_10080min: DOUBLE, category_min_trans_amt_last_10080min: DOUBLE, category_mean_trans_amt_last_10080min: DOUBLE, category_trans_count_last_10080min: DOUBLE]
	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:1010)
	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:354)
	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:312)
	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertSinkToRel(DynamicSinkUtils.java:272)
	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertSinkToRel(DynamicSinkUtils.java:197)
	at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translateToRel$1(PlannerBase.scala:275)
	at scala.Option.map(Option.scala:146)
	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:231)
	at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:181)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1277)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:862)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1097)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:840)


In [49]:
# Specify the topic and partition
topic = "train_data"
partition = 0  # Replace with the partition number you want to set the offset for

# Create a KafkaConsumer
consumer = KafkaConsumer()
consumer.assign([TopicPartition(topic, partition)])
# Set the offset to the earliest available offset
consumer.seek_to_beginning(TopicPartition(topic, partition))
consumer.poll(200)

{}

In [50]:

# Use the Kafka Python client to stream records from the sink topic.
# Otherwise, the mini cluster will shut down upon script completion.
consumer = KafkaConsumer("train_data")
rows = []
for _, msg in zip(range(1000), consumer):
    print(msg)
    rows.append(msg)

In [None]:
import json

import pandas as pd

df = pd.DataFrame([json.loads(row.value.decode('utf-8'))['after'] for row in rows])

df

Unnamed: 0,trans_num,user_id,cc_num,amt,merchant,category,is_fraud,first,last,dob,...,merchant_amt_min_5m,merchant_amt_min_60m,merchant_amt_max_5m,merchant_amt_max_60m,category_amt_sum_5m,category_amt_sum_60m,category_amt_min_5m,category_amt_min_60m,category_amt_max_5m,category_amt_max_60m
0,0b242abb623afc578575680df30655b9,4800525036158247729,2703190000000000,4.97,"fraud_Rippin, Kub and Mann",misc_net,0,Jennifer,Banks,3/9/88,...,4.97,4.97,4.97,4.97,4.97,4.97,4.97,4.97,4.97,4.97
1,1f76529f8574734946361c461b024d99,-4687780397107955484,630423000000,107.23,"fraud_Heller, Gutmann and Zieme",grocery_pos,0,Stephanie,Gill,6/21/78,...,107.23,107.23,107.23,107.23,107.23,107.23,107.23,107.23,107.23,107.23
2,cb598ec00d349dc95e7b3c18de5ae800,929063974751684726,4599740000000000000,82.80,"fraud_Heller, Gutmann and Zieme",grocery_pos,0,Mary,Myers,12/30/64,...,82.80,82.80,107.23,107.23,190.03,190.03,82.80,82.80,107.23,107.23
3,a1a22d70485983eac12b5b88dad1cf95,1615055666352648205,38859500000000,220.11,fraud_Lind-Buckridge,entertainment,0,Edward,Sanchez,1/19/62,...,220.11,220.11,220.11,220.11,220.11,220.11,220.11,220.11,220.11,220.11
4,6b849c168bdad6f867558c3793159a81,2274729778245383962,3534090000000000,45.00,"fraud_Kutch, Hermiston and Farrell",gas_transport,0,Jeremy,White,1/12/67,...,45.00,45.00,45.00,45.00,45.00,45.00,45.00,45.00,45.00,45.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,740ba21b3f6dac605752b84ef73f56e4,-3655859676031312324,4452370000000000,19.12,fraud_Champlin and Sons,home,0,Linda,Davis,3/4/78,...,19.12,19.12,19.12,19.12,19.12,19.12,19.12,19.12,19.12,19.12
996,fc3e6cd1be3853f5bcd8fda06df5a7e8,7004744375844464512,4951650000000000,77.89,fraud_Hoppe-Parisian,kids_pets,0,Kimberly,Miller,6/15/76,...,77.89,77.89,77.89,77.89,77.89,77.89,77.89,77.89,77.89,77.89
997,91561e65399053586ad098022374ec45,3252491939450519963,3576020000000000,9.19,fraud_Hickle Group,shopping_pos,0,Dawn,Gray,12/30/04,...,9.19,9.19,142.88,142.88,279.16,279.16,9.19,9.19,142.88,142.88
998,f73411d6097ddcf5d1e45786bf55023e,-5585615269734509262,4025610000000000,1.39,fraud_Schuppe LLC,entertainment,0,Krystal,Key,3/20/49,...,1.39,1.39,1.39,1.39,1.39,1.39,1.39,1.39,1.39,1.39
