### Preparation

Step 1: install dependencies, if not installed

In [1]:
!pip install sqlalchemy ipython-sql kafka-python
%conda install -c cyclus java-jre

Collecting sqlalchemy
  Downloading SQLAlchemy-2.0.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.8/2.8 MB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting ipython-sql
  Using cached ipython_sql-0.5.0-py3-none-any.whl (20 kB)
Collecting kafka-python
  Using cached kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
Collecting greenlet!=0.4.17
  Downloading greenlet-2.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (618 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m618.8/618.8 kB[0m [31m7.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting prettytable
  Using cached prettytable-3.6.0-py3-none-any.whl (27 kB)
Collecting sqlparse
  Using cached sqlparse-0.4.3-py3-none-any.whl (42 kB)
Installing collected packages: kafka-python, sqlparse, prettytable, greenlet, sqlalchemy, ipython-sql
Successfully installed greenlet-2.0.2 ip

### Execution

Step 2: Download Kafka

In [2]:
!wget -P /tmp https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz

--2023-03-01 13:53:48--  https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 71542357 (68M) [application/x-gzip]
Saving to: ‘/tmp/kafka_2.12-2.8.0.tgz’


2023-03-01 13:54:38 (1.40 MB/s) - ‘/tmp/kafka_2.12-2.8.0.tgz’ saved [71542357/71542357]



Step 3: Extract Kafka

In [3]:
!tar -xzf /tmp/kafka_2.12-2.8.0.tgz -C /tmp

Step 7: Start Zookeeper

* Open a new terminal and run the following command:
* `/tmp/kafka_2.12-2.8.0/bin/zookeeper-server-start.sh /tmp/kafka_2.12-2.8.0/config/zookeeper.properties`

Step 8: Start Kafka server

* Open a new terminal and run the following command:
* `/tmp/kafka_2.12-2.8.0/bin/kafka-server-start.sh /tmp/kafka_2.12-2.8.0/config/server.properties`

Step 9: Create a topic named toll in kafka

In [4]:
!/tmp/kafka_2.12-2.8.0/bin/kafka-topics.sh --create --topic toll --bootstrap-server localhost:9092

Created topic toll.


Step 10: Download toll traffic simulator program <<Thanks, IBM!>>

In [5]:
!wget -P /tmp https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/toll_traffic_generator.py

--2023-03-01 13:57:29--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/toll_traffic_generator.py
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 828 [text/x-python]
Saving to: ‘/tmp/toll_traffic_generator.py’


2023-03-01 13:57:31 (146 MB/s) - ‘/tmp/toll_traffic_generator.py’ saved [828/828]



Step 11: Customize the generator program, changing topic to "toll"

* Open a new terminal and type `nano /tmp/toll_traffic_generator.py`
* Change the TOPIC variable to "toll"
* Press CTRL+O to save
* Press ENTER/RETURN to confirm
* Press CTRL+X to exit nano

Step 12: Run the Toll Traffic Simulator

* Open a new terminal and type: `python3 /tmp/toll_traffic_generator.py`
* In my case, the command was slightly different: `~/miniconda3/envs/mytest/bin/python /tmp/toll_traffic_generator.py`

Step 13: Download streaming data reader (consumer) <<Thanks, IBM!>>

* You don't need to download if you don't want to. Simply skip this step and go to step 14.
* Font: https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/streaming_data_reader.py

Step 14: Customize the consumer program to write into a SQLite database table

In [6]:
%%bash
echo -e 'from datetime import datetime
from kafka import KafkaConsumer
import sqlite3

TOPIC="toll"

print("Connecting to the database")
try:
    connection = sqlite3.connect("file:/tmp/kafka-project.db", uri=True)
except Exception:
    print("Could not connect to database")
else:
    print("Connected to database")
cursor = connection.cursor()

#Create table
print("creating table")
try:
    cursor.execute("""create table livetolldata(
        timestamp datetime,
        vehicle_id int,
        vehicle_type char(15),
        toll_plaza_id smallint
        );""")
except Exception:
    print("table already exists")
else:
    print("table created successfully")

print("Connecting to Kafka")
consumer = KafkaConsumer(TOPIC)
print("Connected to Kafka")
print(f"Reading messages from the topic {TOPIC}")
for msg in consumer:

    # Extract information from kafka
    message = msg.value.decode("utf-8")

    # Transform the date format to suit the database schema
    (timestamp, vehicle_id, vehicle_type, plaza_id) = message.split(",")

    dateobj = datetime.strptime(timestamp, "%a %b %d %H:%M:%S %Y")
    timestamp = dateobj.strftime("%Y-%m-%d %H:%M:%S")

    # Loading data into the database table

    sql = "insert into livetolldata values(?,?,?,?)"
    result = cursor.execute(sql, (timestamp, vehicle_id, vehicle_type, plaza_id))
    print(f"{timestamp}: A {vehicle_type} was inserted into the database")
    connection.commit()
connection.close()' > /tmp/streaming_data_reader.py

Step 15: Run the consumer script

* Open a new terminal and type: `python3 /tmp/streaming_data_reader.py`
* In my case, the command was slightly different: `~/miniconda3/envs/mytest/bin/python /tmp/streaming_data_reader.py`

Step 16: Verify that streamed data is being collected in the database table

In [8]:
%load_ext sql

In [9]:
%sql sqlite:////tmp/kafka-project.db

In [10]:
%sql SELECT * FROM livetolldata order by timestamp desc LIMIT 10

 * sqlite:////tmp/kafka-project.db
Done.


timestamp,vehicle_id,vehicle_type,toll_plaza_id
2023-03-01 14:03:32,2591719,van,4006
2023-03-01 14:03:30,5157328,car,4010
2023-03-01 14:03:30,6547008,car,4003
2023-03-01 14:03:30,2166340,van,4007
2023-03-01 14:03:28,6019706,truck,4002
2023-03-01 14:03:28,1268925,car,4009
2023-03-01 14:03:27,6792527,car,4006
2023-03-01 14:03:27,1392831,van,4005
2023-03-01 14:03:25,3369299,car,4001
2023-03-01 14:03:24,1421570,truck,4010
