# Tutorial: Building an Web Logs Analytics Data Pipeline

## Before you begin
If you’ve ever wanted to learn Python online with streaming data, or data that changes quickly, you may be familiar with the concept of a data pipeline. Data pipelines allow you transform data from one representation to another through a series of steps. 
In this tutorial, we’re going to walk through building a simple data pipeline with help of Kale.

A common use case for a data pipeline is figuring out information about the visitors to your web site. If you’re familiar with Google Analytics, you know the value of seeing real-time and historical information on visitors. In this tutorial, we’ll use data from web server logs to answer questions about our visitors.

If you’re unfamiliar, every time you visit a web page, your browser is sent data from a web server. To host this tutorial, we use a high-performance web server called Nginx. Here’s how the process of you typing in a URL and seeing a result works:

![image info](./img/process.png)

The process of sending a request from a web browser to a server.

First, the client sends a request to the web server asking for a certain page. The web server then loads the page from the filesystem and returns it to the client. As it serves the request, the web server writes a line to a log file on the filesystem that contains some metadata about the client and the request. This log enables someone to later see who visited which pages on the website at what time, and perform other analysis.

A typical line from the Nginx log could look like this: 

`X.X.X.X - - [09/Mar/2017:01:15:59 +0000] "GET /blog/assets/css/jupyter.css HTTP/1.1" 200 30294 "http://www.dataquest.io/blog/" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/53.0.2785.143 Chrome/53.0.2785.143 Safari/537.36 PingdomPageSpeed/1.0 (pingbot/2.0; +http://www.pingdom.com/)"`

Each request is a single line, and lines are appended in chronological order, as requests are made to the server. The format of each line is the Nginx combined format, below are some descriptions of each variable in this format:

- remote_addr: the ip address of the client making the request to the server.
- remote_user: if the client authenticated with basic authentication, this is the user name.
- time_local: the local time when the request was made. For instance 09/Mar/2017:01:15:59 +0000
- request: the type of request, and the URL that it was made to. For instance GET /blog/assets/css/jupyter.css HTTP/1.1 
- status: the response status code from the server.
- body_bytes_sent: the number of bytes sent by the server to the client in the response body.
- http_referrer: the page that the client was on before sending the current request.
- http_user_agent: information about the browser and system of the client

As you can imagine, companies derive a lot of value from knowing which visitors are on their site, and what they’re doing. For example, realizing that users who use the Google Chrome browser rarely visit a certain page may indicate that the page has a rendering issue in that browser.

Another example is in knowing how many users from each district visit your site each day. It can help you figure out which district to focus your marketing efforts on. At the simplest level, just knowing how many visitors you have per day can help you understand if your marketing efforts are working properly.
In order to calculate these metrics, we need to parse the log files and analyze them. And to do this, we need to construct a data pipeline.

## Pipeline structure

Getting from raw logs to browser and status counts per day.

![image info](./img/pipeline.png)

As you can see above, we go from raw log data to statistical queries where we can see different browser/status counts per day. If necessary, this pipeline can run continuously — when new entries are added to the server log, it grabs them and processes them. There are a few things you’ve hopefully noticed about how we structured the pipeline:

- Each pipeline component is separated from the others, and takes in a defined input, and returns a defined output. Each output will be further stored in a Google Storage Bucket to pass the data between pipeline steps. And these cached outputs can be used for further analysis.
- We also store the raw log data to a SQLite database. This ensures that if we ever want to run a different analysis, we have access to all of the raw data.
- We remove duplicate records. It’s very easy to introduce duplicate data into your analysis process, so deduplicating before passing data through the pipeline is critical.
- Each pipeline component feeds data into another component. We want to keep each component as small as possible, so that we can individually scale pipeline components up, or use the outputs for a different type of analysis.

Now that we’ve seen how this pipeline looks at a high level, let’s begin.

## Generating webserver logs
In order to create our data pipeline, we’ll need access to webserver log data. 
In this step we created a script that will continuously generate fake (but somewhat realistic) log data. 
After running the following cells, you should see new entries being written to logs.txt in the same folder. 

In [21]:
import os
import sys
import subprocess
"""
Please note that currently you may have to install the extra packages in this way
As "!" symbol is not supported yet by Kale
"""
subprocess.check_call([sys.executable, "-m", "pip", "install", "Faker==0.7.9"])
subprocess.check_call([sys.executable, "-m", "pip", "install", "google-cloud-storage==1.24.1"])

import random
from faker import Faker
from datetime import datetime
from google.cloud import storage

In [23]:
LINE = """\
{remote_addr} - - [{time_local} +0000] "{request_type} {request_path} HTTP/1.1" {status} {body_bytes_sent} "{http_referer}" "{http_user_agent}"\
"""

LOG_FILE = "logs.txt"
LOG_MAX = 100 # Define the size of webserver logs to be generated
BUCKET = "web-log-test" 

USER = "hong" # The name of your sub-directory in the storage bucket

In [24]:
def generate_log_line():
    fake = Faker()
    now = datetime.now()
    remote_addr = fake.ipv4()
    time_local = now.strftime('%d/%b/%Y:%H:%M:%S')
    request_type = random.choice(["GET", "POST", "PUT"])
    request_path = "/" + fake.uri_path()

    status = random.choice([200, 401, 403, 404, 500])
    body_bytes_sent = random.choice(range(5, 1000, 1))
    http_referer = fake.uri()
    http_user_agent = fake.user_agent()

    log_line = LINE.format(
        remote_addr=remote_addr,
        time_local=time_local,
        request_type=request_type,
        request_path=request_path,
        status=status,
        body_bytes_sent=body_bytes_sent,
        http_referer=http_referer,
        http_user_agent=http_user_agent
    )
    return log_line


def write_log_line(log_file, line):
    with open(log_file, "a") as f:
        f.write(line)
        f.write("\n")
        
        
def generate_log_file():
    """
    Generate the weblog file with defined size.
    This file will be stored in the given bucket.
    """
    current_log_file = LOG_FILE
    lines_written = 0
    
    while lines_written != LOG_MAX:
        line = generate_log_line()
        write_log_line(current_log_file, line)
        lines_written += 1
    print("{}{}{}".format("Log file with length ", LOG_MAX, " successfully generated"))
    
    client = storage.Client()
    bucket = client.get_bucket(BUCKET)
    blob = bucket.blob(USER + "/logs.txt")
    print("{}{}{}".format("User directory ", USER, " created"))
    
    with open(current_log_file, "rb") as log_file:
        blob.upload_from_file(log_file)
    print("Uploaded web logs data into google storage bucket")
    
    return True

In [4]:
generate_log_file()

Log file with length 50 successfully generated
User directory hong created
Uploaded web logs data into google storage bucket


True

## Processing and storing webserver logs
Once we’ve finished creating the data, we just need to write some code to ingest (or read in) the logs. The script will need to:

- Open the log files and read from them line by line.
- Parse each line into fields.
- Write each line and the parsed fields to a database.
- Ensure that duplicate lines aren’t written to the database.

In [5]:
import time
import sqlite3

In [27]:
# We picked SQLite in this tutorial because it’s simple, and stores all of the data in a single file. 
# This enables us to upload the database into a bucket. 
# If you’re more concerned with performance, you might be better off with a database like Postgres.
DB_NAME = "db.sqlite"
DOWNLOAD_FILE = "downloaded_logs.txt"

In [26]:
def create_table():
    """
    Create table logs in the SQLite database.
    The table schema is defined accroding to the log format.
    """
    conn = sqlite3.connect(DB_NAME)
    conn.execute("""
    CREATE TABLE IF NOT EXISTS logs (
      raw_log TEXT NOT NULL,
      remote_addr TEXT,
      time_local TEXT,
      request_type TEXT,
      request_path TEXT,
      status INTEGER,
      body_bytes_sent INTEGER,
      http_referer TEXT,
      http_user_agent TEXT,
      created DATETIME DEFAULT CURRENT_TIMESTAMP
      )
    """)
    conn.close()
    
    
def parse_line(line):
    """
    Parse each log line by splitting it into structured fields.
    Extract all of the fields from the split representation. 
    """
    split_line = line.split(" ")
    if len(split_line) < 12:
        return []
    remote_addr = split_line[0]
    time_local = split_line[3] + " " + split_line[4]
    request_type = split_line[5]
    request_path = split_line[6]
    status = split_line[8]
    body_bytes_sent = split_line[9]
    http_referer = split_line[10]
    http_user_agent = " ".join(split_line[11:])
    created = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")

    return [
        remote_addr,
        time_local,
        request_type,
        request_path,
        status,
        body_bytes_sent,
        http_referer,
        http_user_agent,
        created
    ]


def insert_record(line, parsed):
    """Insert the parsed records into the logs table of the SQLite database."""
    conn = sqlite3.connect(DB_NAME,timeout=10)
    cur = conn.cursor()
    args = [line] + parsed # Parsed is a list of the values parsed earlier
    cur.execute('INSERT INTO logs VALUES (?,?,?,?,?,?,?,?,?,?)', args)
    conn.commit()
    conn.close()   

In [20]:
client = storage.Client()
bucket = client.get_bucket(BUCKET)
blob = bucket.blob(USER + "/logs.txt")

with open("downloaded_logs.txt", "wb") as log_file:
    blob.download_to_file(log_file)
    
create_table()

try:
    f = open("downloaded_logs.txt", 'r')
    lines = f.readlines()
    for line in lines:
        parsed = parse_line(line.strip())
        time.sleep(1)
        insert_record(line, parsed)
    f.close()
except KeyboardInterrupt:
    pass

blob2 = bucket.blob(USER + "/" + DB_NAME)
blob2.upload_from_filename(DB_NAME)

## Query data from the database

Now we want to consume the data generated by pulling data out of the SQLite database and does some counting by day.

In the below code, we:
- Connect to the database.
- Query any rows that have been added after a certain timestamp.
- Fetch all the rows, sorting out unique ips by day.
- Count different visitor browsers and HTTP response statuses based on fetched rows.

### [TODO] Finish the pipeline design
Now you've gained some knowledge about how to define the pipeline with help of Kale extensions. It's time to do some practices. 
Take your time to read the following cells, and try to finish the last one/two pipeline components. 
After this, you could complie the pipeline and submit it into the Kubeflow dashboard. 

Hinweis: The counting tasks could run parallelly as each pipeline component is separated from the others.

In [9]:
import csv

In [10]:
YEAR = 2018
MONTH = 9
DAY = 15

In [11]:
def download_db(destination):
    client = storage.Client()
    bucket = client.get_bucket(BUCKET)
    blob = bucket.blob(USER + "/" + DB_NAME)
    
    with open(destination, "wb") as db:
        blob.download_to_file(db)  

        
def parse_time(time_str):
    try:
        time_obj = datetime.strptime(time_str, '[%d/%b/%Y:%H:%M:%S %z]')
    except Exception:
        time_obj = ""
    return time_obj   

In [28]:
def get_lines_browser(time_obj):
    conn = sqlite3.connect(DB_NAME)
    cur = conn.cursor()
    cur.execute("SELECT time_local,http_user_agent FROM logs WHERE created > ?", [time_obj])
    resp = cur.fetchall()
    return resp


def parse_user_agent(user_agent):
    """Parsing the user agent to retrieve the name of the browser."""
    browsers = ["Firefox", "Chrome", "Opera", "Safari", "MSIE"]
    for browser in browsers:
        if browser in user_agent:
            return browser
    return "Other"


def get_time_and_ip_browser(lines):
    """Extract the ip and time from each row we queried."""
    browsers = []
    times = []
    for line in lines:
        times.append(parse_time(line[0]))
        browsers.append(parse_user_agent(line[1]))
    return browsers, times

In [13]:
def get_lines_status(time_obj):
    conn = sqlite3.connect(DB_NAME)
    cur = conn.cursor()
    cur.execute("SELECT time_local,status FROM logs WHERE created > ?", [time_obj])
    resp = cur.fetchall()
    return resp


def parse_response_status(user_status):
    """
    Retrieve the HTTP request response status
    200: OK
    401: Bad Request
    403: Forbidden
    404: Not Found
    500: Internal Server Error
    """
    statuses = [200, 401, 403, 404, 500]
    for status in statuses:
        if status == user_status :
            return status
    return "Other"


def get_time_and_ip_status(lines):
    statuses = []
    times = []
    for line in lines:
        times.append(parse_time(line[0]))
        statuses.append(parse_response_status(line[1]))
    return statuses, times

In [18]:
download_db(destination=DB_NAME)
browser_counts = {}
start_time = datetime(year=YEAR, month=MONTH, day=DAY)

lines = get_lines_browser(start_time)
browsers, times = get_time_and_ip_browser(lines)

if len(times) > 0:
    start_time = times[-1] 
for browser, time_obj in zip(browsers, times):
    if browser not in browser_counts:
        browser_counts[browser] = 0
    browser_counts[browser] += 1

count_list = browser_counts.items()
count_list = sorted(count_list, key=lambda x: x[0])

with open('browser_counts.csv','w') as file:
        writer = csv.writer(file, delimiter=",", lineterminator="\r\n")
        writer.writerow(["browser", "count"])
        writer.writerows(count_list)
        
client = storage.Client()
bucket = client.get_bucket(BUCKET)     
blob = bucket.blob(USER + "/queries/browser_counts.csv")
blob.upload_from_filename("browser_counts.csv")

In [19]:
download_db(destination=DB_NAME)
statuses_counts = {}
start_time = datetime(year=YEAR, month=MONTH, day=DAY)

lines = get_lines_status(start_time)
statuses, times = get_time_and_ip_status(lines)

if len(times) > 0:
    start_time = times[-1] 
for status, time_obj in zip(statuses, times):
    if status not in statuses_counts:
        statuses_counts[status] = 0
    statuses_counts[status] += 1

count_list = statuses_counts.items()
count_list = sorted(count_list, key=lambda x: x[0])

with open('status_counts.csv','w') as file:
        writer = csv.writer(file, delimiter=",", lineterminator="\r\n")
        writer.writerow(["status", "count"])
        writer.writerows(count_list)

client = storage.Client()
bucket = client.get_bucket(BUCKET) 
blob = bucket.blob(USER + "/queries/status_counts.csv")
blob.upload_from_filename("status_counts.csv")