# Lab Session: LDF Log Analysis with Spark and Dataframes

![LDF Logo](http://linkeddatafragments.org/images/logo.svg)

![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png)

## Introduction

Data Science Lab has a tradition in Semantic Web Research. Linked Data is basically data with a graph layout where edges are stored as triples: subject -predicate-> object
These entities are represented as URIs, the objects can also be string literals. Predicates are often taken from public ontologies. This ensures that linked datasets can be integrated without any additional effort and this is what makes this data model so interesting.

Linked Data is offered on the web via a SPARQL endpoint which can be queried using the SPARQL language which has a lot of similarities with SQL. 

The main difference between SPARQL endpoints and Web APIs is that the first allows unrestricted querying. Unfortunately this means that the server can have arbitrarily complex loads. As a result current Semantic Web SPARQL endpoints have an availability which is < 95%.

Data Science Lab has come up with a solution to tackle this availability issue: the linked data fragment server only answers very simple triple queries and some statistics about them, while the linked data fragments client does the more complicated job of performing mainly joins. (more info at: http://linkeddatafragments.org/ or play with an online LDF client here: http://client.linkeddatafragments.org/) 

The LDF concept is supposed to address the lack of high availability of SPARQL endpoints on the web. We tested this claim by putting an <b>LDF server on Amazon EC2</b> and hosting a number of linked open datasets.
The server logs were stored and are offered here as a dataset for you to explore. Log analysis is a Big Data use case: logs can be very high in volume but it's not entirely clear up front which information in the logs is valuable to use (schema on write), therefore collecting the logs in for example HDFS is a more sensible approach.

* TIP: you will need the Spark programming guide a lot: http://spark.apache.org/docs/latest/programming-guide.html
* TIP: the full API documentation: http://spark.apache.org/docs/latest/api/python/index.html


In [None]:
import pyspark
#necessary to use matplotlib in a notebook
%matplotlib inline  
import matplotlib.pylab as plt
import pandas as pd
sc = pyspark.SparkContext('local[*]')

In [None]:
print (type(sc))
print (sc.version)

Let's start exploring. The logs dataset is quite large, since we are working on a single machine (4 threads) it makes sense to only take a subset while writing our code. Take a subset of the data (1000 records) and cache it in memory.

In [None]:
logs = <FILL IN>
preview = logs.<FILL IN>
for line in preview:
    print (line)
local_sample = logs.<FILL IN>
sample = <FILL_IN>

After triggering execution you can have a look at the SparkUI on ip_virtual_wall:3030 by clicking on a job and next on <b>DAG Visualization</b>, this visualisation might give you more information in case a one of your jobs is running slowly. (note that you can also click on subtasks and on their DAG visualization)

The LDF logs you downloaded contain the following information (albeit not in this order)
1. the client’s ip address;
2. the uri requested by the client;
3. value of the client’s Accept header;
4. value of the client’s Referer header;
5. value of the client’s User-Agent header;
6. the server’s local time;
7. the server’s response size;
8. the server’s response cache status; 
9. the server’s response http status code. 

More about apache web logs at: https://httpd.apache.org/docs/1.3/logs.html (note that the logs contain more information than the standard web logs)

The logs are somewhat structured but we will have to do some preprocessing to have them in a clean format. Let's try to manually extract all these components from a single log line.


In [None]:
logline = local_sample[0]
print (logline)

Try to identify the different parts of the log line, by simply slicing the log line, this will help you in the next steps where you will try to automate this with regular expressions.

How would you get all the individual components of this log string? Write a function to split by space? Does it work?

In [None]:
parts = logline.<FILL IN>
for p in parts:
    print (p) 

The safest way to parse a string is to use <b>regular expressions</b>, since they allow for fine-grained data quality control. 

For a quick intro into regular expressions see: https://developers.google.com/edu/python/regular-expressions

For a regex tester: http://pythex.org/

Let's start with some basic symbols:

<b>^</b>,<b>$</b> are beginning/end of string

<b>\S</b> matches any non-whitespace character. 

<b>\w</b> matches a word character while <b>\W</b> matches any non word character

<b>\d</b> matches a digit

<b>.</b> is a wildcard (i.e. matches any character)

if you want to match <b> special characters </b> you need to escape them, for example <b>\\[</b>

<b> Quantities: </b>

\+ 1 or more occurrences of the pattern to its left, e.g. 'i+' = one or more i's

\* 0 or more occurrences of the pattern to its left

?  match 0 or 1 occurrences of the pattern to its left 

<b> Brackets: </b>

are  in principle an or operator: <b>[ab]</b> matches a or b

curly bracks are used for repeats: <b>\d{3}</b> corresponds to 3 digits


In [None]:
import re
#demo
match = re.search("^\S+", logline)
print (match)
print (match.group())
match = re.search("^\S+ \S+", logline)
print (match.group())
match = re.search("^\S+_\S+", logline)
print (match)

We can match multiple patterns by using groups, which are surrounded by curly brackets (), for example:

In [None]:
match = re.search("^(\S+) (\S+) (\S+)", logline)

print (match.group())
print (match.group(1))
print (match.group(2))
print (match.group(3))

Now proceed to make a regex which extracts all the fields from the log string. Then write a function qualitycheck with returns (0,logline) if the line doesn't parse, or (1, Row) if it does (https://spark.apache.org/docs/1.1.1/api/python/pyspark.sql.Row-class.html)

In [None]:
from pyspark.sql import Row

regex = <FILL IN>

match = re.search(regex,logline)
for i in range(1,12):
    print (match.group(i))

print ("")

def qualityCheck1(regex, logline):

    match = <FILL IN>
    
    if (match == None):
        <FILL IN>
    
    else:
        return (1, Row(
        client_ip = match.group(1),
        identd = <FILL IN>,
        userid = <FILL IN>,
        server_local_time = <FILL IN>,
        uri_request = <FILL IN>,
        server_http_statuscode = <FILL IN>,
        server_response_size = <FILL IN>,
        client_referer_header = <FILL IN>,
        client_user_agent_header = <FILL IN>,
        client_accept_header = <FILL IN>,
        server_cache_status = <FILL IN>
        ))
    
r = qualityCheck1(regex, logline)    
print (r[1].client_ip)

Now it's time to check whether our server logs comply with the suggested regex pattern, use the correct spark functions to convert the log lines and count the number of lines which have parsed successfully and the number that didn't parse successfully. Next have a look at the ones that didn't parse.

In [None]:
parsedLogs = <FILL IN>
badLogs = <FILL IN>
goodLogs = <FILL IN>

In [None]:
print ("Number of times parse successful: " + str(goodLogs.count()))
print ("Number of times parse not succesful: " + str(badLogs.count()))

There is a reasonable chance that not all log lines where parsed correctly, so the previous process will be iterative.

If you have found a working regex, rerun the log parsing, the number of successful parses should be 100%. Note that this process might take some time, make sure to monitor the job progress via the <b>Spark UI</b>!

# LDF specific analysis

NOTE: it's more interesting to take a real random sample for the remainder of the lab, the original sample only contains data of one month, start by taking a 'real_sample' of 1% of the logs

#### **1a. How many requests are contained in the sample?**

In [None]:
#remove sample RDD from cache
<FILL IN>

#NOTE: when applying multiple transformations apply the coding style below)
real_sample = (logs
               <FILL IN>
               .cache()
               )

print ("1a) " + str(real_sample.count()))

#### **1b. How many different months does your sample cover?**


In [None]:
#1b
def extractMonthYear(slt):
    start = <FILL IN>
    stop = <FILL IN>
    return slt[start:stop]

months = (real_sample
              <FILL IN>
              .collect()
                  )

print ("1b) " + str(len(months)))

#### **1c. How many requests per month?**

In [None]:
requests_per_month = (real_sample
              <FILL IN>
              .collectAsMap()        
                     )
print("1c) " + str(requests_per_month))

#### **1d. Visualize the number of requests per month in the sample **

Have a look at the plot and the bar function in matplotlib:

* help(plt.plot)

* help(plt.bar)

<b>Note</b> that a python function consists of a list of obligatory arguments and a number of optional arguments of the type key=value

http://matplotlib.org/api/pyplot_api.html

In [None]:
#shift the xticks to the center of the bars
l = list(range(len(requests_per_month)))  
shifted = [x + 0.5 for x in l]

plt.figure(figsize=(20,10))
plt.bar(range(len(requests_per_month)), requests_per_month.values()) 
plt.xticks(shifted, list(requests_per_month.keys()))
plt.title("Number of requests per month in a 1% sample")

The huge peak at April/2015 is due to the fact that the LDF Server was used in a benchmarking study at a conference which was organised that same month

#### **2. How effective has the cache been? **
#### **2a. What are the different values for cache status? **

In [None]:
cache_values = (<FILL IN>)
                  
dist_values = <FILL IN>                  )
print (dist_values)

#### **2b. What are their frequencies + one more pie chart? **

In [None]:
total = cache_values.count()

cache_value_freqs = (<FILL IN>
                        .collectAsMap()
                    )

print (cache_value_freqs)

In [None]:
plt.figure(figsize=(10,10))
plt.pie(list(cache_value_freqs.values()), labels=list(cache_value_freqs.keys()), autopct='%1.1f%%')


#### **3. Estimate the availability of the LDF server based on the number of pingdom requests **

Pingdom was used to test the high availability of the server by sending a request every minute, adding up to 393,120 minutes. Since we are taking a 1% sample we can expect the number of pingdom requests to be approximately 3931,2 . How many are in the sample?

In [None]:
expected = 3931.2
freq_ping = <FILL IN>
sample_freq = <FILL IN>

print (sample_freq / expected)


#### ** 4. What kind of triple patterns were requested?**
#### ** 4a. Use the dataframe API for this question, start by selecting the relevant columns (date, ip, response split into s, p, and o )**

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

def parseURIRequest(req):
    qm_id = req.find('?')
    sub = '*'
    pred = '*'
    obj = '*'
    
    if (qm_id < 0):
        return sub,pred,obj
    else:
        req_clean = req[0:req.rfind(" ")]
        
        spo = req_clean.split('?')[1]
        spo_split = spo.split('&')
        
        for s in spo_split:
            entitity_split = s.split('=')
            print (entitity_split[0])
            print (entitity_split[1])
            if entitity_split[0] == "subject":
                sub = entitity_split[1]
            elif entitity_split[0] == "predicate":
                pred = entitity_split[1]
            else :
                obj = entitity_split[1]
    
        return sub,pred,obj
    
test = (real_sample
              <FILL IN>
        )

df = test.toDF(["stime", "ip", "s","p","o"])

from pyspark.sql.functions import udf
pretty_url = udf(<FILL IN>)

df_pretty = df.select(pretty_url(df.s), pretty_url(df.p), pretty_url(df.o))

df_pretty = (df_pretty
             .withColumnRenamed("PythonUDF#<lambda>(s)", "s")
        .withColumnRenamed("PythonUDF#<lambda>(p)", "p")
        .withColumnRenamed("PythonUDF#<lambda>(o)", "o")
             )

df_pretty.show()


print ("total records: " + str(df_pretty.count()))

star_star_star = <FILL IN>
star_star_notstar = <FILL IN>
star_notstar_star = <FILL IN>
star_notstar_notstar = <FILL IN>

notstar_star_star = <FILL IN>
notstar_star_notstar = <FILL IN>
notstar_notstar_star = <FILL IN>
notstar_notstar_notstar = <FILL IN>

print ( "(*,*,*): " + str(star_star_star.count()))
print ( "(*,*,o): " + str(star_star_notstar.count()))
print ( "(*,p,*): " + str(star_notstar_star.count()))
print ( "(*,p,o): " + str(star_notstar_notstar.count()))
print ( "(s,*,*): " + str(notstar_star_star.count()))
print ( "(s,*,o): " + str(notstar_star_notstar.count()))
print ( "(s,p,*): " + str(notstar_notstar_star.count()))
print ( "(s,p,o): " + str(notstar_notstar_notstar.count()))




#### **  5. Is there a trend in the LDF server usage? (number of requests per day) **
#### **  5a. Start by mapping every day on an integer number (be pragmatic, it shouldn't be perfect!) **
(TIP: Do you remember the purpose of broadcast variables and accumulators?)

In [None]:
mapping = {'Jan':3, 'Feb':4, 'Mar':5, 'Apr':6, 'May':7, 'Jun':8, 'Jul':9, 'Aug':10, 'Sep':11, 
           'Oct':0, 'Nov':1, 'Dec':2}

def extractDayMonth(slt):
    #keep in mind that the mapping dictionary should be shipped to the workers
    day = <FILL IN>
    month = <FILL IN>
    return (day,month)

day_month_to_int = (real_sample
              <FILL IN>
    
            )

day_month_frequencies = (<FILL IN>
                        )





#### **  5b. Create a bar chart with the number of requests per day **


In [None]:
first_day = <FILL IN>
last_day = <FILL IN>
frequencies = day_month_frequencies.collect()

values = [0] * (last_day - first_day +1)
keys = range(first_day, last_day +1)

for f in frequencies:
    values[f[0]-first_day]=f[1]
    

plt.figure(figsize=(20,10))
plt.bar(range(len(values)), values) 
plt.title("Number of requests per day in a 1% sample")