In [None]:
from confluent_kafka import Consumer, KafkaError, KafkaException
import json, datetime, time, smtplib, ssl
from email.mime.text import MIMEText
from email.header import Header
from email.utils import formataddr


# Https and email (gmail) account auth credentials
port = 465  # For SSL
smtp_server = "smtp.gmail.com" # The requests to SMTP server is limited, therefore there's loss of messages that can't be captured while limit is crossed
# There's also a dailiy limit and when you cross it you can see the next message (550, b'5.4.5 Daily user sending quota exceeded. f23sm14277102wmc.3 - gsmtp')
sender = 'roid.sender1@gmail.com' # The sender "roid.sender1@gmail.com"
password = 'BinaRoi1234-'
recipients = ["roid.receiver1@gmail.com"] # A list of recipients ['roid.receiver1@gmail.com'] 

# Connecting to the sender's email
try:
    context = ssl.create_default_context()
    server_ssl = smtplib.SMTP_SSL(smtp_server, port, context=context)
    server_ssl.ehlo()
    server_ssl.login(sender, password)
    print("Secure connection to SMTP server started")
except Exception as error:
    print(error)


# Callback on commit
def commit_completed(error, partitions):
    if error:
        print(str(error))
    else:
        print("Committed partition offsets: " + str(partitions))

# Consumer and commit loop configurations
bootstrap_servers = "kafka-bootstrap.lab.test:443" # bootstrap_servers = "localhost:9092"
security_protocol = 'SSL'
sasl_password = 'password'
ssl_ca_location = r'/home/priel/ca.pem'
group_id = "twitter_to_mail"
auto_offset_reset = "latest" # Here in this case the important messages to capture are the latest since we won't be able to get all the messages from the beginning of the earlist offset because of the gmail requests limit
default_topic_config = {'auto.offset.reset': auto_offset_reset} 
auto_commit = True
# thread_millis_timeout = 0.0
topics = ['twitter_tweets']
min_offsets_commit = 10
# max_poll_records = 100
consume_callback_max_messages = 0
polling_seconds_timeout = 0.1
fetch_message_max_bytes = 1048576
max_poll_interval_ms = 90000 # 300000
fetch_max_bytes = 52428800 # 52428800
auto_commit_interval_ms = 5000 # 5000     
fetch_min_bytes = 1
heartbeat_interval_ms = 3300 # 3000
fetch_wait_max_ms = 500
running = True

# The Java producer produces faster than the Python consumer can consume, increase in lag that doesn't end and causes delay.. Very hard to max the python consumer's throughput


"""
# Asynchronous + Delivery Gurantees at least once - Can be maxed to increase throughput, can cause more duplicates, the consumer does not retry the request if the commit fails - can be handled with commit callback. 
# 'The problem with asynchronous commits is dealing with commit ordering. By the time the consumer finds out that a commit has failed, we may already have processed the next batch of messages and even sent the next commit. In this case, a retry of the old commit could cause duplicate consumption.'
"""

# Consumer and commit configurations
consumer_config = {'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'security.protocol': security_protocol,
        'sasl.password': sasl_password,
        'ssl.ca.location': ssl_ca_location,
        'default.topic.config': default_topic_config,
        'enable.auto.commit': auto_commit,
        'consume.callback.max.messages': consume_callback_max_messages,
        'fetch.message.max.bytes': fetch_message_max_bytes,
        'max.poll.interval.ms': max_poll_interval_ms,
        'fetch.max.bytes': fetch_max_bytes,
        'auto.commit.interval.ms': auto_commit_interval_ms,
        'fetch.min.bytes': fetch_min_bytes,
        'heartbeat.interval.ms': heartbeat_interval_ms,
        'fetch.wait.max.ms': fetch_wait_max_ms,
        'on_commit': commit_completed}

# Creating the consumer
consumer = Consumer(consumer_config)

print("Consumer Started")

# Consumer loop while running == True
def consume_loop(consumer, topics):
    try:
        # Subscribing the consumer to the topic 
        consumer.subscribe(topics)
        msg_count = 0
        while running:
            # Polling for messages 1 by 1
            msg = consumer.poll(timeout=polling_seconds_timeout)
            # print("The message object is: ", msg.value())
            # Handling bad data or errors: 1) msg == none object, 2) end of partition event, 3) kafka exceptions
            if msg is None: continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            # If all good - proccess the message, try sending the proccessed message to the recipients, increment the messages count and call the commit function every min_offsets_commit messages have passed
            else:
                # Proccessing message
                msg_str = msg.value().decode('utf-8')
                msg_json = json.loads(msg_str)
                text = f"Received a tweet created at {msg_json['created_at']} by {msg_json['user']['name']}"
                # Designing the email message
                message = MIMEText(text) # REPLACE IT WITH text or add text and change text to what you want
                time_now = str(datetime.datetime.now()) # might want to delete or move it
                message['Subject'] = "Kafka Assignment 1 - Twitter Tweet - %s" % (time_now)
                message['From'] = formataddr((str(Header('From Twitter With Love', 'utf-8')), sender))
                message['To'] = ", ".join(recipients)
                try:
                    # Sending the email
                    server_ssl.sendmail(sender, recipients, message.as_string())
                    print("Successfully sent message #" + str(msg_count + 1))
                except Exception as error:
                    print(error)
                msg_count += 1
                # Call the commit function every min_offsets_commit messages have passed
                if msg_count % min_offsets_commit == 0:
                    consumer.commit(asynchronous=True)
                # time.sleep(thread_millis_timeout)  # Only when we commit batches (lists of max 100 msg 's in certain max time interval)
    finally:
        # Close down consumer to commit final offsets and for rebalance, quit the mail https session
        consumer.close()
        server_ssl.quit()
        print("Consumer ended")
        print("Ending secure connection to SMTP server")


# Run
consume_loop(consumer, topics)

# Disconnect the mail https session in case it's still open
server_ssl.quit()

print("Secure connection to SMTP server ended")




"""
# Synchronous + Delivery Gurantees at least once - most reliable, less duplicates, less thourghput because the thread is blocking
# 'enable.auto.commit': True
# Might be best - 'enable.auto.commit': False with manual synchronous messages batch commiting

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)
        msg_count = 0
        while running:
            msg = consumer.poll(timeout=polling_seconds_timeout)
            # print("The message object is: ", msg.value())
            if msg is None: continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_str = msg.value().decode('utf-8')
                msg_json = json.loads(msg_str)
                text = f"Received a tweet created at {msg_json['created_at']} by {msg_json['user']['name']}"
                message = MIMEText(text) # REPLACE IT WITH text or add text and change text to what you want
                time_now = str(datetime.datetime.now()) # might want to delete or move it
                message['Subject'] = "Kafka Assignment 1 - Twitter Tweet - %s" % (time_now)
                message['From'] = formataddr((str(Header('From Twitter With Love', 'utf-8')), sender))
                message['To'] = ", ".join(recipients)
                try:
                    server_ssl.sendmail(sender, recipients, message.as_string())
                    print("Successfully sent message #" + str(msg_count + 1))
                except Exception as error:
                    print(error)
                msg_count += 1
                if msg_count % min_offsets_commit == 0:
                    consumer.commit(asynchronous=False)
                # time.sleep(thread_millis_timeout)  # only when we commit batches (lists of max 100 msg 's in certain max time interval)

    finally:
        # Close down consumer to commit final offsets.
        consumer.close()
        server_ssl.quit()
"""