# **GHTorrent Data Analytics with PySpark RDD: An unstructured case study**


##### source 1: https://ghtorrent.org
##### source 2: https://ghtorrent.org/downloads.html


In [1]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########



In [2]:
########## ONLY in Ubuntu Machine ##########
# Load Spark engine
!pip3 install -q findspark
import findspark
findspark.init()
########## ONLY in Ubuntu Machine ##########

In [3]:
from pyspark import SparkContext, SparkConf

# Initializing Spark
conf = SparkConf().setAppName("GHTorrent_PySpark").setMaster("local[*]")
sc = SparkContext(conf=conf)
print(sc)
print("Ready to go!")

<SparkContext master=local[*] appName=GHTorrent_PySpark>
Ready to go!


In [4]:
# ########## ONLY in Colab ##########
# from google.colab import drive
# drive.mount('/content/drive')
# ########## ONLY in Colab ##########

In [5]:
# Read and Load Data to Spark
rdd = sc.textFile("./Files/ghtorrent-logs.txt.gz")

In [6]:
# Repartition and Cache Data:
rdd.repartition(10)

from pyspark import StorageLevel
rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)

./Files/ghtorrent-logs.txt.gz MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [7]:
sc.defaultParallelism

2

## Question 1: Count the number of records and get twenty records randomly.


In [8]:
rdd.count()

9669788

In [9]:
rdd.takeSample(False,20 , seed=1234)

['DEBUG, 2017-03-23T12:00:58+00:00, ghtorrent-19 -- ghtorrent.rb: Repo ccurro/tiefvision exists',
 'DEBUG, 2017-03-23T14:07:36+00:00, ghtorrent-20 -- retriever.rb: issues drone/drone -> 1604 exists',
 'INFO, 2017-03-23T10:07:20+00:00, ghtorrent-29 -- ghtorrent.rb: Added issue_event xcat2/xcat-core -> 2703/xcat2/xcat-core -> 2703/1008161947',
 'INFO, 2017-03-23T09:17:58+00:00, ghtorrent-19 -- api_client.rb: Successful request. URL: https://api.github.com/repos/yuwangying/PyCTP/issues/17/labels?per_page=100, Remaining: 2192, Total: 85 ms',
 'DEBUG, 2017-03-24T13:05:50+00:00, ghtorrent-47 -- ghtorrent.rb: Repo smcintosh881/343-accounting exists',
 'DEBUG, 2017-03-23T10:12:50+00:00, ghtorrent-7 -- ghtorrent.rb: Transaction committed (30 ms)',
 'DEBUG, 2017-03-23T10:42:40+00:00, ghtorrent-14 -- retriever.rb: Commit derlio/JieCaoVideoPlayer -> bd4b967d7e50256fab2e74f531013649d04e0246 exists',
 'DEBUG, 2017-03-23T10:12:52+00:00, ghtorrent-7 -- ghtorrent.rb: User mithro exists',
 'DEBUG, 2017-

# **GHTorrent data format**
Every line of this log file includes:
1.   Logging level, one of `DEBUG`, `INFO`, `WARN`, `ERROR`
2.   A timestamp
3.   The downloader id
4.   The logging stage including at least one of the following names:
    *   `event_processing`
    *   `ght_data_retrieval`
    *   `api_client`
    *   `retriever`
    *   `ghtorrent`

## Question 2: Get the number of lines with both `Transaction` or `Repo` information.


In [10]:
import re
def filter_words(line):
    return re.compile("\w+").findall(line.lower())
filter_words("retriever.rb: repo_ol Commit mzvast/FlappyFrog -> 80bf5c5fde7be6274a2721422f4d9a773583f73c exists")

['retriever',
 'rb',
 'repo_ol',
 'commit',
 'mzvast',
 'flappyfrog',
 '80bf5c5fde7be6274a2721422f4d9a773583f73c',
 'exists']

In [15]:
# Transaction
rdd_trans = rdd.filter(lambda x: "transaction" in filter_words(x))

In [16]:
rdd_repo = rdd.filter(lambda x: "repo" in filter_words(x))

In [27]:
rdd.filter(lambda x: "transaction" in filter_words(x)).filter(lambda x: "repo" in filter_words(x)).collect()

['INFO, 2017-03-23T09:13:16+00:00, ghtorrent-9 -- ghtorrent.rb: Repo xuminwlt/tcc-transaction is a fork of changmingxie/tcc-transaction',
 'DEBUG, 2017-03-23T09:13:17+00:00, ghtorrent-9 -- retriever.rb: Repo xuminwlt -> tcc-transaction exists',
 'DEBUG, 2017-03-23T09:13:27+00:00, ghtorrent-9 -- ghtorrent.rb: Repo xuminwlt/tcc-transaction exists',
 'DEBUG, 2017-03-23T09:26:01+00:00, ghtorrent-11 -- ghtorrent.rb: Repo jwpttcg66/redis-game-transaction exists',
 'DEBUG, 2017-03-23T09:13:27+00:00, ghtorrent-9 -- ghtorrent.rb: Repo xuminwlt/tcc-transaction exists',
 'DEBUG, 2017-03-23T09:26:01+00:00, ghtorrent-11 -- ghtorrent.rb: Repo jwpttcg66/redis-game-transaction exists',
 'DEBUG, 2017-03-23T09:13:17+00:00, ghtorrent-9 -- retriever.rb: Repo changmingxie -> tcc-transaction exists',
 'DEBUG, 2017-03-23T09:13:17+00:00, ghtorrent-9 -- ghtorrent.rb: Repo xuminwlt/tcc-transaction exists',
 'DEBUG, 2017-03-23T09:13:27+00:00, ghtorrent-9 -- ghtorrent.rb: Repo xuminwlt/tcc-transaction exists',
 '

In [24]:
rdd_trans_repo.count()

1846067

In [28]:
rdd_trans.intersection(rdd_repo).count()

19

In [29]:
rdd_repoepo.intersection(rdd_trans)

PythonRDD[46] at RDD at PythonRDD.scala:53

## Question 3: Get the number of lines including `web link` for `WARN` logging levels.

In [51]:
rdd.filter(lambda x: x.split(",")[0] == "WARN").filter(lambda x: "URL" in x).count()

95271

In [52]:
rdd.filter(lambda x: x.split(",")[0] == "WARN").filter(lambda x: "URL" in x).take(50)

['WARN, 2017-03-23T20:04:28+00:00, ghtorrent-13 -- api_client.rb: Failed request. URL: https://api.github.com/repos/greatfakeman/Tabchi/commits?sha=Tabchi&per_page=100, Status code: 404, Status: Not Found, Access: ac6168f8776, IP: 0.0.0.0, Remaining: 3031',
 'WARN, 2017-03-24T00:03:32+00:00, ghtorrent-13 -- api_client.rb: Failed request. URL: https://api.github.com/repos/greatfakeman/Tabchi/commits?sha=Tabchi&per_page=100, Status code: 404, Status: Not Found, Access: ac6168f8776, IP: 0.0.0.0, Remaining: 2663',
 'WARN, 2017-03-23T16:04:59+00:00, ghtorrent-13 -- api_client.rb: Failed request. URL: https://api.github.com/repos/greatfakeman/Tabchi/commits?sha=Tabchi&per_page=100, Status code: 404, Status: Not Found, Access: ac6168f8776, IP: 0.0.0.0, Remaining: 2914',
 'WARN, 2017-03-24T01:03:21+00:00, ghtorrent-13 -- api_client.rb: Failed request. URL: https://api.github.com/repos/greatfakeman/Tabchi/commits?sha=Tabchi&per_page=100, Status code: 404, Status: Not Found, Access: ac6168f8776,

## Question 4: What is the most active `downloader id` for `Failed` connections?

In [85]:
import re
def find_id(line):
    pattern = r'ghtorrent-(\d+)'
    res = re.findall(pattern,line)
    return int(res[0])


find_id('ghtorrent-13 -- api_client.rb: Failed request. URL: https://api.github.com/repos/greatfakeman/Tabchi/commits?sha=Tabchi&per_page=100')

rdd.filter(lambda x: "failed" in x.lower())\
.map(lambda x: (find_id(x.split(",")[2]),1))\
.reduceByKey(lambda x,y : (x+y)).sortBy(lambda x: x[1], ascending=False).first()

(13, 79654)

In [84]:
rdd.filter(lambda x: "failed" in x.lower())\
.map(lambda x: (find_id(x.split(",")[2]),1))\
.groupByKey().mapValues(lambda x: sum(x)).sortBy(lambda x: x[1], ascending=False).first()

(13, 79654)

## Question 5: What is the most active `repository`?

In [143]:
rdd.filter(lambda x: " repo " in x.lower())\
.map(lambda x: ((x.lower().split(" repo ")[1].split(" ")[0]),1))\
.reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending= False).first()

('ovyx/hammerheadn', 22559)

## [challenge] Question 6: Get the number of `Failed HTTP`requests per `hour`.

In [154]:
rdd_failedHttp = rdd.filter(lambda x: "failed" in x.lower()).filter(lambda x: "http" in x.lower())

In [160]:
def hourRegex(line):
    hour_regex = re.compile(r'\d{4}-\d{2}-\d{2}T(\d{2}):(\d{2}):(\d{2})\+\d{2}:\d{2}')
    match = hour_regex.match(line)
    return match.group(1)


In [165]:

rdd_failedHttp.map(lambda x : (hourRegex(x.split(",")[1].strip()),1)).reduceByKey(lambda x ,y: x+y ).count()

24

In [168]:
rdd_failedHttp.map(lambda x:(hourRegex(x.split(",")[1].strip()),1))\
.reduceByKey(lambda x,y:x+y).sortBy(lambda x: x[1], ascending=False).collect()

[('11', 9227),
 ('10', 8281),
 ('12', 6849),
 ('13', 5521),
 ('20', 5490),
 ('14', 5075),
 ('00', 5040),
 ('16', 5040),
 ('01', 5040),
 ('15', 5040),
 ('17', 5040),
 ('21', 5040),
 ('22', 5040),
 ('23', 5040),
 ('18', 5040),
 ('19', 5040),
 ('09', 4078),
 ('08', 50),
 ('02', 50),
 ('06', 50),
 ('05', 50),
 ('03', 50),
 ('04', 50),
 ('07', 50)]