# Experimenting with data pipeline

This notebook demonstrates a log of events from a website and a pipeline to extract, transform and load the data into an sqlite database and to process the information in two streams to count the number of unique ips accessing the site per day and minute as well as the unique browsers per day.

Note to:
- make sure to install faker (can be done through pip install faker)
- Some parts may need to be simply exited out of using either a keyboard command or the stop button for jupyter notebook

In [83]:
from faker import Faker
from datetime import datetime
import random
import time
import sys
import sqlite3
from datetime import datetime

In [84]:
DB_NAME = "db.sqlite"

conn = sqlite3.connect(DB_NAME)

conn.execute("""drop table logs;""")
conn.close()

In [85]:
def create_table():
    conn = sqlite3.connect(DB_NAME)

    conn.execute("""
    CREATE TABLE IF NOT EXISTS logs (
      raw_log TEXT NOT NULL UNIQUE,
      remote_addr TEXT,
      time_local TEXT,
      request_type TEXT,
      request_path TEXT,
      status INTEGER,
      body_bytes_sent INTEGER,
      http_referer TEXT,
      http_user_agent TEXT,
      created DATETIME DEFAULT CURRENT_TIMESTAMP
      )
    """)
    conn.close()

def parse_line(line):
    split_line = line.split(" ")
    if len(split_line) < 12:
        return []
    remote_addr = split_line[0]
    time_local = split_line[3] + " " + split_line[4]
    request_type = split_line[5]
    request_path = split_line[6]
    status = split_line[8]
    body_bytes_sent = split_line[9]
    http_referer = split_line[10]
    http_user_agent = " ".join(split_line[11:])
    created = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")

    return [
        remote_addr,
        time_local,
        request_type,
        request_path,
        status,
        body_bytes_sent,
        http_referer,
        http_user_agent,
        created]

def insert_record(line, parsed):
    conn = sqlite3.connect(DB_NAME)
    cur = conn.cursor()
    args = [line] + parsed
    cur.execute('INSERT INTO logs VALUES (?,?,?,?,?,?,?,?,?,?)', args)
    conn.commit()
    conn.close()

def load_records(LOG_FILE,iterations):
    create_table()        
    try:
        f = open(LOG_FILE, 'r')
        i = 0 
        while i <= iterations:
            where_a = f.tell()
            line_a = f.readline()
            if not line_a:
                time.sleep(1)
                f.seek(where_a)
                continue
            else:
                if line_a:
                    line = line_a
                line = line.strip()
                parsed = parse_line(line)
                if len(parsed) > 0:
                    insert_record(line, parsed)
            i+=1
    except KeyboardInterrupt:
        pass
    except Exception as e:
        print(e)
    finally:
        f.close()


In [86]:
# conn = sqlite3.connect(DB_NAME)
# cur = conn.cursor()
# cur.execute("""select * from logs;""")
# log = cur.fetchall()

In [87]:
def get_lines(time_obj):
    conn = sqlite3.connect(DB_NAME)
    cur = conn.cursor()
    cur.execute("SELECT remote_addr,time_local FROM logs WHERE created > ?", [time_obj])
    resp = cur.fetchall()
    return resp

def get_time_and_ip(lines):
    ips = []
    times = []
    for line in lines:
        ips.append(line[0])
        times.append(parse_time(line[1]))
    return ips, times

In [88]:
def parse_time(time_str):
    try:
        time_obj = datetime.strptime(time_str, '[%d/%b/%Y:%H:%M:%S %z]')
    except Exception:
        time_obj = ""
    return time_obj

In [89]:
def count_visitors():
    unique_ips = {}
    counts = {}
    start_time = datetime(year=2017, month=10, day=30)
    lines = get_lines(start_time)
    ips, times = get_time_and_ip(lines)
    if len(times) > 0:
        start_time = times[-1]
    for ip, time_obj in zip(ips, times):
        day = time_obj.strftime("%Y-%m-%d-%H-%M")
        if day not in unique_ips:
            unique_ips[day] = set()
        unique_ips[day].add(ip)

    for k, v in unique_ips.items():
        counts[k] = len(v)

    count_list = counts.items()
    count_list = sorted(count_list, key=lambda x: x[0])
    return(count_list)

In [90]:
load_records('log_a.txt',100)

In [91]:
cv = count_visitors()
print(cv)

[('2017-10-30-18-30', 9), ('2017-10-30-18-31', 22), ('2017-10-30-18-32', 26), ('2017-10-30-18-33', 20), ('2017-10-30-18-34', 9)]


## Counting unique browsers per day

In [92]:
def get_lines_browser_agent(time_obj):
    conn = sqlite3.connect(DB_NAME)
    cur = conn.cursor()
    cur.execute("SELECT time_local,http_user_agent FROM logs WHERE created > ?", [time_obj])
    resp = cur.fetchall()
    return resp

In [93]:
def parse_user_agent(user_agent):
    browsers = ["Firefox", "Chrome", "Opera", "Safari", "MSIE"]
    for browser in browsers:
        if browser in user_agent:
            return browser
    return "Other"

def get_time_and_browser(lines):
    browsers = []
    times = []
    for line in lines:
        times.append(parse_time(line[0]))
        browsers.append(parse_user_agent(line[1]))
    return browsers, times

In [94]:
def browser_counts():
    browser_counts = {}
    start_time = datetime(year=2017, month=3, day=9)
    lines = get_lines_browser_agent(start_time)
    
    browsers, times = get_time_and_browser(lines)
    if len(times) > 0:
        start_time = times[-1]
    for browser, time_obj in zip(browsers, times):
        if browser not in browser_counts:
            browser_counts[browser] = 0
        browser_counts[browser] += 1

    count_list = browser_counts.items()
    count_list = sorted(count_list, key=lambda x: x[0])
    return(count_list)

In [95]:
browser_counts()

[('Chrome', 19), ('Firefox', 18), ('MSIE', 12), ('Opera', 24), ('Safari', 13)]

## Querying the database to get back the log as an array

In [96]:
conn = sqlite3.connect(DB_NAME)
cur = conn.cursor()
cur.execute("""select * from logs;""")
log = cur.fetchall()