In [1]:
import json
import os
import re
import uuid
from datetime import datetime
from statistics import mean

import snowflake.connector
from confluent_kafka import Consumer
from dotenv import load_dotenv


load_dotenv()

KAFKA_SERVER = os.getenv('KAFKA_SERVER')
KAFKA_USERNAME=os.getenv('KAFKA_USERNAME')
KAFKA_PASSWORD=os.getenv('KAFKA_PASSWORD')
KAFKA_TOPIC_NAME = os.getenv('KAFKA_TOPIC_NAME')

USER = os.environ.get('USER')
ACCOUNT = os.environ.get('ACCOUNT')
PASSWORD = os.environ.get('PASSWORD')
WAREHOUSE= os.environ.get('WAREHOUSE')
DATABASE= os.environ.get('DATABASE')
SCHEMA= os.environ.get('SCHEMA')


In [2]:
def subscribe_to_kafka_topic():
    """Produce a consumer that subscribes to the relevant Kafka topic"""
    c = Consumer({
    'bootstrap.servers': KAFKA_SERVER,
    'group.id': f'deleton' + str(uuid.uuid1()),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': KAFKA_USERNAME,
    'sasl.password': KAFKA_PASSWORD,
    'session.timeout.ms': 6000,
    'heartbeat.interval.ms': 1000,
    'fetch.wait.max.ms': 6000,
    'auto.offset.reset': 'latest',
    'enable.auto.commit': 'false',
    'max.poll.interval.ms': '86400000',
    'topic.metadata.refresh.interval.ms': "-1",
    "client.id": 'id-002-005',
})

    c.subscribe([KAFKA_TOPIC_NAME])
    return c

In [3]:
def extract_values_from_log(string):
    """Extract numerical values from Kafka log using regular expression"""
    regexp = r'\d+.?\d+|\d'
    numerical_values = re.findall(regexp,string)
    return numerical_values

In [4]:
def wait_for_system_log(c):
    """Waits for current user when starting script to finish their ride, as their user data is not retrievable"""
    print('Waiting for first user to finish... This may take some time.')
    while True:
        kafka_message = c.poll(0.5)

        if kafka_message is not None:
            kafka_log = kafka_message.value().decode('utf-8')

            if 'SYSTEM' in kafka_log:
                print('First user has finished, now beginning data processing')
                return kafka_message

In [5]:
def convert_unix_to_date(unix_timestamp):
    """Converts unix timestamp to datetime"""
    unix_timestamp /= 1000  #convert to seconds as unix timestamp is in milliseconds
    converted_to_date = datetime.utcfromtimestamp(unix_timestamp).strftime('%Y-%m-%d')
 
    return converted_to_date

In [6]:
def split_full_name(name):
    """Split full name based on various conditions, such as if they have a title or not, checks to see if they have a last name"""
    name_list = name.split(' ')
    titles = ['Mr','Mrs','Miss','Ms','Dr']

    if name_list[0] in titles: #Exclude user titles from snowflake
        first_name = name_list[1]

        try:
            last_name = name_list[2]

        except IndexError: #Catch index error incase user did not give lastname
            last_name = None

    else:
        first_name = name_list[0]
        last_name = name_list[1]

    return first_name,last_name

In [7]:
def flatten_list(address_list):
    """Expands sublists present in lists, which is a consequence of splitting addresses"""
    flat_list = []

    for element in address_list:
        if type(element) is list:
          
            for item in element:
                flat_list.append(item)
        else:
            flat_list.append(element)
    return flat_list


def split_address(address):
    """Splits address based on various conditions, such as if they have whitespace or commas, or how many elements they have"""
    address_list = address.split(',')

    if len(address_list) < 4:
        address_list[0] = address_list[0].split(' ', 1) 
        address_list = flatten_list(address_list)

    house_number = address_list[0]
    street_name = address_list[1].title()
    region = address_list[2].title()
    postcode = address_list[3]

    return house_number,street_name,region,postcode

In [8]:
def clean_user_data(user_dictionary):
    """Clean user data by converting timestamps, obtaining first and lastname, and splitting address field"""
    user_dictionary['account_create_date'] = convert_unix_to_date(user_dictionary['account_create_date'])
    user_dictionary['date_of_birth'] = convert_unix_to_date(user_dictionary['date_of_birth'])

    first_name,last_name = split_full_name(user_dictionary['name'])

    user_dictionary['first_name'] = first_name
    user_dictionary['last_name'] = last_name

    house_number,street_name,region,postcode = split_address(user_dictionary['address'])

    user_dictionary['house_number'] = house_number
    user_dictionary['street_name'] = street_name
    user_dictionary['region'] = region
    user_dictionary['postcode'] = postcode
    user_dictionary['gender'] = user_dictionary['gender'].title()

    return user_dictionary

In [9]:
"""connect to snowflake to make queries"""
conn = snowflake.connector.connect(
    user=USER,
    password=PASSWORD,
    account=ACCOUNT,
    warehouse=WAREHOUSE,
    database=DATABASE,
    schema='ZOOKEEPERS_BATCH_PRODUCTION'
)
cs = conn.cursor()

In [10]:
def dict_from_system_log(log):
    """Obtains user dictionary from system log"""
    system_log = json.loads(log).get('log')
    split_log = system_log.split(' mendoza v9: [SYSTEM] data = ')
    begin_timestamp = split_log[0][:-7] #remove milliseconds from timestamp for snowflake compatability
    
    dictionary_string = split_log[1][:-1]
    user_dictionary = json.loads(dictionary_string)
    
    user_dictionary = clean_user_data(user_dictionary)
    return begin_timestamp, user_dictionary

In [11]:
def insert_into_users(cs,user_dictionary):
    """Makes insert query into users table once all relevant information has been obtained"""
    cs.execute(
                """INSERT INTO users(user_id, first_name, last_name, gender, date_of_birth, 
                height_cm, weight_kg, house_name, street, region, postcode, email, account_created) """
                "VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
                (
                user_dictionary['user_id'],user_dictionary['first_name'],user_dictionary['last_name'],user_dictionary['gender'],
                user_dictionary['date_of_birth'],user_dictionary['height_cm'],user_dictionary['weight_kg'],user_dictionary['house_number'],
                user_dictionary['street_name'],user_dictionary['region'],user_dictionary['postcode'],
                user_dictionary['email_address'],user_dictionary['account_create_date']
                )
                )
    print('made insert into users')

In [12]:
def insert_into_rides(cs, user_dictionary, begin_timestamp, duration, total_power,
                mean_power, mean_resistance, mean_rpm, mean_heart_rate):
    """Makes insert query into rides table once all relevant information has been obtained"""
    cs.execute(
                "INSERT INTO rides(user_id, begin_timestamp, total_duration_sec, total_power, mean_power, mean_resistance, mean_rpm, mean_heart_rate) "
                "VALUES(%s,%s,%s,%s,%s,%s,%s,%s)",
                (user_dictionary['user_id'], begin_timestamp, duration, 
                total_power, mean_power, mean_resistance, 
                mean_rpm, mean_heart_rate)
                
                )
    print('made insert into rides')
    

In [13]:
c = subscribe_to_kafka_topic()

def polling_kafka(c):
    """Polls kafka in an infinite loop, making insert queries into snowflake tables once data processing has finished"""
    wait_for_first_user = True #Wait for current user to finish, so can obtain all relevant information
    first_user_collected = False #Add check for first user so data is not sent to snowflake prematurely

    resistance_list = []
    power_list = []
    heart_rate_list = []
    rpm_list = []

    while True:
        
        kafka_message = c.poll(0.5)

        if wait_for_first_user:
            kafka_message = wait_for_system_log(c)
            wait_for_first_user = False


        if kafka_message is not None: #exclude none values
            log = kafka_message.value().decode('utf-8')

            if 'SYSTEM' in log:
                first_user_collected = True
                begin_timestamp,user_dictionary = dict_from_system_log(log)
                

            elif 'INFO' in log: #only check for strings with INFO

                values = json.loads(log)
                log = values.get('log')
            

                if 'Ride' in log: #process strings with Ride info
                    split_by_timestamp_and_logs = ' mendoza v9: [INFO]: Ride - '
                    timestamp_and_values = log.split(split_by_timestamp_and_logs)
                
                    log_values = extract_values_from_log(timestamp_and_values[1])

                    duration = int(float(log_values[0]))
                    resistance_list.append(int(log_values[1]))
            

                elif 'Telemetry' in log:
                    split_by_timestamp_and_logs = ' mendoza v9: [INFO]: Telemetry - '
                    timestamp_and_values = log.split(split_by_timestamp_and_logs)
                
                    log_values = extract_values_from_log(timestamp_and_values[1])

                    heart_rate_list.append(int(log_values[0]))
                    rpm_list.append(int(log_values[1]))
                    power_list.append(round(float(log_values[2]),3))


            elif 'new ride' in log and first_user_collected: #New user is starting, so load collected data into snowflake and reset
                total_power = sum(power_list)
                mean_power = mean(power_list)
                mean_rpm = mean(rpm_list)
                mean_heart_rate = mean(heart_rate_list)
                mean_resistance = mean(resistance_list)
                
                insert_into_users(cs,user_dictionary)
                insert_into_rides(cs,user_dictionary,begin_timestamp,duration,total_power,
                mean_power,mean_resistance,mean_rpm,mean_heart_rate)
                
                power_list = []
                rpm_list = []
                heart_rate_list = []
                resistance_list = []
                
polling_kafka(c)


Waiting for first user to finish... This may take some time.


KeyboardInterrupt: 

In [62]:
cs.execute('delete from rides')

<snowflake.connector.cursor.SnowflakeCursor at 0x1172f8460>

%5|1665045933.913|REQTMOUT|id-002-005#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/12: Timed out HeartbeatRequest in flight (after 6006ms, timeout #0)
%4|1665045933.913|REQTMOUT|id-002-005#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/12: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%3|1665045933.914|FAIL|id-002-005#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: b12-pkc-l6wr6.europe-west2.gcp.confluent.cloud:9092: 1 request(s) timed out: disconnect (after 977502ms in state UP)
