In [None]:
import clickhouse_connect

%load_ext sql
%config SqlMagic.autocommit=False

In [None]:
# Connect to ClickHouse
client = clickhouse_connect.get_client(
    host='localhost', 
    port=8123,
    username='default',
    password='ClickHousePassword'
)

In [None]:
sql_create_table="""
CREATE TABLE IF NOT EXISTS kafka_adsb_raw
(
    `message_type` String,
    `transmission_type` UInt8,
    `session_id` UInt32,
    `aircraft_id` UInt32,
    `hex_ident` String,
    `hex_ident_val` UInt32,
    `flight_id` String,
    `date_message_generated` String,
    `time_message_generated` String,
    `date_message_logged` String,
    `time_message_logged` String,
    `callsign` Nullable(String),
    `altitude` Nullable(Int32),
    `ground_speed` Nullable(Int32),
    `track` Nullable(Float64),
    `latitude` Nullable(Float64),
    `longitude` Nullable(Float64),
    `vertical_rate` Nullable(Int32),
    `squawk` Nullable(String),
    `alert` Nullable(UInt8),
    `emergency` Nullable(UInt8),
    `spi` Nullable(UInt8),
    `is_on_ground` Nullable(UInt8),
    `raw_message` String,
    `timestamp` DateTime
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY (timestamp, hex_ident);
"""

sql_create_consumer="""
CREATE TABLE adsb_kafka_queue
(
    `raw_message` String,
    `timestamp` String
)
ENGINE = Kafka
SETTINGS 
    kafka_broker_list = 'kafka.hughevans.dev:9092',
    kafka_topic_list = 'adsb-raw',
    kafka_group_name = 'clickhouse-adsb-csv',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 1,
    kafka_security_protocol = 'sasl_plaintext',
    kafka_sasl_mechanism = 'PLAIN',
    kafka_sasl_username = 'consumer',
    kafka_sasl_password = 'consumer-secret';
"""

sql_create_mv="""
CREATE MATERIALIZED VIEW kafka_to_adsb_mv TO kafka_adsb_raw AS
SELECT 
    -- Parse CSV fields from raw_message
    splitByChar(',', raw_message)[1] as message_type,
    toUInt8OrZero(splitByChar(',', raw_message)[2]) as transmission_type,
    toUInt32OrZero(splitByChar(',', raw_message)[3]) as session_id,
    toUInt32OrZero(splitByChar(',', raw_message)[4]) as aircraft_id,
    splitByChar(',', raw_message)[5] as hex_ident,
    toUInt32OrZero(concat('0x', splitByChar(',', raw_message)[5])) as hex_ident_val,
    splitByChar(',', raw_message)[6] as flight_id,
    splitByChar(',', raw_message)[7] as date_message_generated,
    splitByChar(',', raw_message)[8] as time_message_generated,
    splitByChar(',', raw_message)[9] as date_message_logged,
    splitByChar(',', raw_message)[10] as time_message_logged,
    
    -- Nullable fields
    nullIf(trim(splitByChar(',', raw_message)[11]), '') as callsign,
    
    nullIf(toInt32OrZero(splitByChar(',', raw_message)[12]), 0) as altitude,
    nullIf(toInt32OrZero(splitByChar(',', raw_message)[13]), 0) as ground_speed,
    nullIf(toFloat64OrZero(splitByChar(',', raw_message)[14]), 0) as track,
    nullIf(toFloat64OrZero(splitByChar(',', raw_message)[15]), 0) as latitude,
    nullIf(toFloat64OrZero(splitByChar(',', raw_message)[16]), 0) as longitude,
    nullIf(toInt32OrZero(splitByChar(',', raw_message)[17]), 0) as vertical_rate,
    
    nullIf(trim(splitByChar(',', raw_message)[18]), '') as squawk,
    toUInt8OrZero(splitByChar(',', raw_message)[19]) as alert,
    toUInt8OrZero(splitByChar(',', raw_message)[20]) as emergency,
    toUInt8OrZero(splitByChar(',', raw_message)[21]) as spi,
    toUInt8OrZero(splitByChar(',', raw_message)[22]) as is_on_ground,
    
    raw_message,
    parseDateTimeBestEffort(timestamp) as timestamp
FROM adsb_kafka_queue
WHERE length(raw_message) > 5;  -- Filter out empty or invalid messages
"""

client.command(sql_create_table)
client.command(sql_create_consumer)
client.command(sql_create_mv)

In [None]:
%sql clickhouse://default:ClickHousePassword@localhost:8123/default

%sql SELECT * FROM demo_adsb_raw ORDER BY date_message_generated LIMIT 10

# %sql SELECT * FROM kafka_adsb_raw ORDER BY date_message_generated LIMIT 10


In [None]:
client.command("DROP TABLE  demo_adsb_raw")
client.command("DROP TABLE  adsb_kafka_queue")
client.command("DROP TABLE  kafka_to_adsb_mv")