In [8]:
import psycopg
from kafka import KafkaConsumer
import re

DB_NAME = "Project"
DB_USER = "team16"
DB_PASSWORD = "overfit_club"
DB_HOST = "localhost"
DB_PORT = "5432"

KAFKA_TOPIC = "movielog16"
KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"

recommendation_pattern = re.compile(
    r"(?P<timestamp>[\d\-T:.]+),(?P<user_id>\d+),recommendation request.*?status (?P<status>\d+), result: (?P<movies>.+?), \d+ ms"
)
watch_pattern = re.compile(r"(?P<timestamp>[\d\-T:.]+),(?P<user_id>\d+),GET /data/m/(?P<movie_id>[\S]+)/\d+.mpg")
rating_pattern = re.compile(r"(?P<timestamp>[\d\-T:.]+),(?P<user_id>\d+),GET /rate/(?P<movie_id>[\S]+)=(?P<rating>\d)")

# Connect to PostgreSQL
def get_db_connection():
    return psycopg.connect(
        dbname=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD,
        host=DB_HOST,
        port=DB_PORT,
        autocommit=True
    )

# Kafka Consumer (Start from latest messages)
consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    auto_offset_reset="latest",
    enable_auto_commit=True,
    value_deserializer=lambda x: x.decode("utf-8")
)

print("Connected to Kafka. Waiting for messages...")

Connected to Kafka. Waiting for messages...


In [9]:
# Storage for limiting inserts
insert_count = {"recommendations": 0, "watch_events": 0, "ratings": 0}
LIMIT = 10  # Limit inserts to 10 per table

# Open DB connection
conn = get_db_connection()

for message in consumer:
    log = message.value.strip()

    # Match recommendation logs
    if insert_count["recommendations"] < LIMIT and (match := recommendation_pattern.search(log)):
        timestamp, user_id, status, result = (
            match.group("timestamp"),
            int(match.group("user_id")),
            int(match.group("status")),
            match.group("movies"),
        )
        if status == 200:
            recommended_movies = result.split(", ")
            with conn.cursor() as cur:
                cur.execute(
                    "INSERT INTO recommendations (user_id, recommended_movies, timestamp) VALUES (%s, %s, %s)",
                    (user_id, recommended_movies, timestamp)
                )
            insert_count["recommendations"] += 1
            print(f"Inserted Recommendation {insert_count['recommendations']}/10")

    # Match watch events
    elif insert_count["watch_events"] < LIMIT and (match := watch_pattern.search(log)):
        timestamp, user_id, movie_id = match.group("timestamp"), int(match.group("user_id")), match.group("movie_id")
        with conn.cursor() as cur:
            cur.execute(
                "INSERT INTO watch_events (user_id, movie_id, timestamp) VALUES (%s, %s, %s)",
                (user_id, movie_id, timestamp)
            )
        insert_count["watch_events"] += 1
        print(f"Inserted Watch Event {insert_count['watch_events']}/10")

    # Match ratings
    elif insert_count["ratings"] < LIMIT and (match := rating_pattern.search(log)):
        timestamp, user_id, movie_id, rating = (
            match.group("timestamp"),
            int(match.group("user_id")),
            match.group("movie_id"),
            int(match.group("rating")),
        )
        with conn.cursor() as cur:
            cur.execute(
                "INSERT INTO ratings (user_id, movie_id, rating, timestamp) VALUES (%s, %s, %s, %s)",
                (user_id, movie_id, rating, timestamp)
            )
        insert_count["ratings"] += 1
        print(f"Inserted Rating {insert_count['ratings']}/10")

    # Stop when all tables reach limit
    if all(count >= LIMIT for count in insert_count.values()):
        print("Insert limit reached for all tables. Stopping consumer...")
        break

# Close connections
consumer.close()
conn.close()

Inserted Watch Event 1/10
Inserted Watch Event 2/10
Inserted Watch Event 3/10
Inserted Watch Event 4/10
Inserted Watch Event 5/10
Inserted Watch Event 6/10
Inserted Watch Event 7/10
Inserted Watch Event 8/10
Inserted Rating 1/10
Inserted Watch Event 9/10
Inserted Watch Event 10/10
Inserted Rating 2/10
Inserted Rating 3/10
Inserted Rating 4/10
Inserted Rating 5/10
Inserted Rating 6/10
Inserted Rating 7/10
Inserted Rating 8/10
Inserted Rating 9/10
Inserted Rating 10/10
Inserted Recommendation 1/10
Inserted Recommendation 2/10
Inserted Recommendation 3/10
Inserted Recommendation 4/10
Inserted Recommendation 5/10
Inserted Recommendation 6/10
Inserted Recommendation 7/10
Inserted Recommendation 8/10
Inserted Recommendation 9/10
Inserted Recommendation 10/10
Insert limit reached for all tables. Stopping consumer...


In [10]:
# Connect to PostgreSQL
conn = get_db_connection()
cur = conn.cursor()

# Fetch first 5 recommendations
cur.execute("SELECT * FROM recommendations LIMIT 5;")
recommendations = cur.fetchall()
print("\n Sample Recommendations:")
for row in recommendations:
    print(row)

# Fetch first 5 watch events
cur.execute("SELECT * FROM watch_events LIMIT 5;")
watch_events = cur.fetchall()
print("\n Sample Watch Events:")
for row in watch_events:
    print(row)

# Fetch first 5 ratings
cur.execute("SELECT * FROM ratings LIMIT 5;")
ratings = cur.fetchall()
print("\n Sample Ratings:")
for row in ratings:
    print(row)


# Close connection
cur.close()
conn.close()


 Sample Recommendations:
(1, 59404, ['harold++kumar+go+to+white+castle+2004', 'the+count+of+monte+cristo+2002', 'guys+and+dolls+1955', 'coyote+ugly+2000', 'someone+like+you...+2001', 'thirteen+days+2000', 'cube+hypercube+2002', 'dead+ringers+1988', 'road+trip+2000', 'dead+man+1995', 'tinker+tailor+soldier+spy+2011', 'rush+2013', 'ridicule+1996', 'my+life+as+a+dog+1985', 'half+nelson+2006', 'atl+2006', 'not+another+teen+movie+2001', 'harry+potter+and+the+order+of+the+phoenix+2007', 'top+secret+1984', 'the+dirty+dozen+1967'], datetime.datetime(2025, 3, 13, 4, 12, 44, 68486))
(2, 38186, ['ong-bak+the+thai+warrior+2003', 'my+life+as+a+dog+1985', 'rush+2013', 'gypsy+1962', 'coyote+ugly+2000', 'one+day+in+september+1999', 'the+elephant+man+1980', 'good+bye_+lenin+2003', 'the+fortune+cookie+1966', 'ridicule+1996', 'to+sir_+with+love+1967', 'batman+gotham+knight+2008', 'american+pie+presents+the+naked+mile+2006', 'tinker+tailor+soldier+spy+2011', 'thirteen+days+2000', 'half+nelson+2006', 'the