#### Assignment on Spark

One of the most common uses of Spark is analyzing and processing log files. In this assignment, we will put Spark to good use for an OSS project that retrieves and downloads data from GitHub, called GHTorrent. GHTorrent works by following the Github event timeline and then retrieving all items linked from each event recursively and exhaustively. To make monitoring and debugging easier, the GHTorrent maintainers use extensive runtime logging for the downloader scripts.

An extract of what the GHTorrent log looks like:
DEBUG, 2017-03-23T10:02:27+00:00, ghtorrent-40 -- ghtorrent.rb: Repo EFForg/https-everywhere exists
DEBUG, 2017-03-24T12:06:23+00:00, ghtorrent-49 -- ghtorrent.rb: Repo Shikanime/print exists
INFO, 2017-03-23T13:00:55+00:00, ghtorrent-42 -- api_client.rb: Successful request. URL: https://api.github.com/repos/CanonicalLtd/maas-docs/issues/365/events?per_page=100, Remaining: 4943, Total: 88 ms
WARN, 2017-03-23T20:04:28+00:00, ghtorrent-13 -- api_client.rb: Failed request. URL: https://api.github.com/repos/greatfakeman/Tabchi/commits?sha=Tabchi&per_page=100, Status code: 404, Status: Not Found, Access: ac6168f8776, IP: 0.0.0.0, Remaining: 3031
DEBUG, 2017-03-23T09:06:09+00:00, ghtorrent-2 -- ghtorrent.rb: Transaction committed (11 ms)

Each log line comprises of a standard part (up to .rb:) and an operation-specific part. The standard part fields are like so:
Logging level, one of DEBUG, INFO, WARN, ERROR (separated by ,)
A timestamp (separated by ,)
The downloader id, denoting the downloader instance (separated by --)
The retrieval stage, denoted by the Ruby class name, one of:
event_processing
ght_data_retrieval
api_client
retriever
ghtorrent

In [1]:
# lets ignore warnings for now
import warnings
warnings.filterwarnings('ignore')

In [2]:
%cd C:\Users\hag8665\Desktop\MSDS 436\Assignment3\data

C:\Users\hag8665\Desktop\MSDS 436\Assignment3\data


In [3]:
import findspark
findspark.init()
findspark.find()

'C:\\Users\\hag8665\\Anaconda3\\envs\\new_environment\\lib\\site-packages\\pyspark'

In [4]:
# Import Libraries
import pyspark
from pyspark import SparkConf, SparkContext
import os

# Initialize Spark Context
conf = SparkConf().setMaster("local").setAppName("Assignment3")
sc = SparkContext(conf = conf)



In [5]:
# Loading and parsing the file
# for parsing rdd rows
# Columns:
# 0: logging level, 1: timestamp, 2: downloader id, 
# 3: retrieval stage, 4: Action?
def myParse(line):
    line = line.replace(' -- ', ', ')
    line = line.replace('.rb: ', ', ')
    line = line.replace(', ghtorrent-', ', ')
    return line.split(', ', 4)


In [6]:
def getRDD(filename):
    textFile = sc.textFile("ghtorrent-logs.txt")
    parsedRDD = textFile.map(myParse)
    return parsedRDD

In [7]:
rowrdd = getRDD("ghtorrent-logs.txt").cache()

In [8]:
# How many lines does the RDD contain?

rowrdd.count()


9669788

In [9]:
# Count the number of WARNing messages
countwarns = rowrdd.filter(lambda x: x[0] == "WARN")
print(countwarns.count())

132158


In [10]:
# How many repositories where processed in total? Use the api_client lines only.

import itertools
# Add repositories as column 5

# rewrite with split, and use only api_client
def parseRepos(x):
    try:
        # Filter for repos by looking for it in url
        # For instance:
        # Successful request. URL: https://api.github.com/repos/CanonicalLtd/maas-docs
        # /issues/365/events?per_page=100, Remaining: 4943, Total: 88 ms
        # Should return "CanonicalLtd/maas-docs/maas-docs"
        split = x[4].split('/')[4:6]
        joinedSplit = '/'.join(split)
        result = joinedSplit.split('?')[0]
    except: 
        result = ''
    x.append(result)
    return x


# Filters out rows without enough elements (about 50 rows)
filteredRdd = rowrdd.filter(lambda x: len(x) == 5) 

# Only look at api_client calls
apiRdd = filteredRdd.filter(lambda x: x[3] == "api_client")

# Add another column with the repo if can find one, otherwise ''
reposRdd = apiRdd.map(parseRepos)


In [11]:
# Filter out rows without repo
removedEmpty = reposRdd.filter(lambda x: x[5] != '')

# Group by repo and count
uniqueRepos = removedEmpty.groupBy(lambda x: x[5])

# How Many UniqueRepo do we have ?
print(uniqueRepos.count())


78588


##### INSIGHTS & ANALYTICS

In [12]:
# Which client did most HTTP requests?
# Group by, count and find max

usersHttp = apiRdd.groupBy(lambda x: x[2])
usersHttpSum = usersHttp.map(lambda x: (x[0], x[1].__len__()))
print(usersHttpSum.max(key=lambda x: x[1]))


('13', 135978)


In [13]:
# Which client did most FAILED HTTP requests? Use group_by to provide an answer.
# filter failed http requests
onlyFailed = apiRdd.filter(lambda x: x[4].split(' ', 1)[0] == "Failed")

# Group by, count, find max
usersFailedHttp = onlyFailed.groupBy(lambda x: x[2])
usersFailedHttpSum = usersFailedHttp.map(lambda x: (x[0], x[1].__len__()))
print(usersFailedHttpSum.max(key=lambda x: x[1]))


('13', 79623)


In [14]:
# What is the most active hour of day?
# Get hour of the day from timestamp and add it
def appendAndReturn(x, toAdd):
    x.append(toAdd)
    return x

# Split date to hour only
onlyHours = filteredRdd.map(lambda x: appendAndReturn(x, x[1].split('T', 1)[1].split(':', 1)[0]))

# Group by, count, find max
groupOnlyHours = onlyHours.groupBy(lambda x: x[5])
hoursCount = groupOnlyHours.map(lambda x: (x[0], x[1].__len__()))
print(hoursCount.max(key=lambda x: x[1]))


('10', 2662487)


In [15]:
# What is the most active repository (hint: use messages from the ghtorrent.rb layer only)?

# Group by, count, find max
activityRepos = removedEmpty.groupBy(lambda x: x[5])
countActivityRepos = activityRepos.map(lambda x: (x[0], x[1].__len__()))
print(countActivityRepos.max(key=lambda x: x[1]))


('greatfakeman/Tabchi', 79524)


In [16]:
# Which access keys are failing most often? (hint: extract the Access: ... part from failing requests)?
# Add access code
addedCodes = onlyFailed.map(lambda x: appendAndReturn(x, x[4].split('Access: ', 1)[1].split(',', 1)[0]))

# most failed access

accessCodes = addedCodes.groupBy(lambda x: x[5])
countAccessCodes = accessCodes.map(lambda x: (x[0], x[1].__len__()))
print(countAccessCodes.max(key=lambda x: x[1]))


('ac6168f8776', 79623)


In [22]:
# Provide 2 more insights relating to this dataset.

In [39]:
# count the the distinct request type 
getdistinct =filteredRdd.filter(lambda x : x[3]!="")
getdistpair = getdistinct.map(lambda word:(word[3],1))
getdistcount = getdistpair.reduceByKey(lambda x,y :x+y)

for word,count in getdistcount.collect():
    print("{} : {}".format(word, count))

ght_data_retrieval : 335178
ghtorrent : 5703903
api_client : 1299697
event_processing : 12724
geolocator : 538
vhost=/ : 97
retriever : 2317594


In [48]:
# Which client did most scuessfull HTTP requests? Use group_by to provide an answer.
# filter failed http requests
onlySuccess = apiRdd.filter(lambda x: x[4].split(' ', 1)[0] == "Successful")

# Group by, count, find max
usersSuccessHttp = onlySuccess.groupBy(lambda x: x[2])
usersSuccessHttpSum = usersSuccessHttp.map(lambda x: (x[0], x[1].__len__()))
print(usersSuccessHttpSum.max(key=lambda x: x[1]))

('4', 18694)


In [20]:
# Read in the important-repos.csv file to an RDD (let's call it interesting). How many records are there?
textfile = sc.textFile("important-repos.csv")
interesting = textfile.map(lambda line: line.split(","))

In [21]:
# How many records in the log file refer to entries in the interesting file?
interesting.count()

1436

In [22]:
def changeRepo(x):
    try:
        x[5] = x[5].split("/")[1]
    except:
        x[5] = ''
    return x

In [30]:
interestingRepo = interesting.keyBy(lambda x: x[3])
logLineRepo = reposRdd.map(changeRepo).filter(lambda x: x[5] != '').keyBy(lambda x: x[5])

joinedRepo = interestingRepo.join(logLineRepo)

In [56]:
# Which of the interesting repositories has the most failed API calls?

joinedRepo.filter(lambda v: v[1][1][4].startswith("Failed")) \
.map(lambda key: (key[0], 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda  v: v[1], False) \
.take(3)

[('hello-world', 740), ('test', 309), ('demo', 166)]

##### Dataframes

In [20]:
from pyspark.sql import SparkSession

In [21]:
spark = SparkSession.builder.appName("test").getOrCreate()

In [24]:
# Read in the interesting repos file using Spark's CSV parser. Convert the log RDD to a Dataframe.
interesting_df = spark.read \
                        .format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .load("important-repos.csv");


In [28]:
log_df = reposRdd.map(changeRepo).filter(lambda x: x[5] != '').toDF()

In [29]:
interesting_df.count()

1435

In [30]:
# Repeat all 3 queries in the "Joining" section above using either SQL or the Dataframe API. 
# Measure the time it takes to execute them.
joined_df = interesting_df.join(log_df, interesting_df.name == log_df._6);
joined_df.count()

87930