In [3]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import os
import re

In [None]:
# sc = SparkContext("local", "Simple App")

In [4]:
filepath = "/storage/Study/myCode/spark/BDAS-P-Resource-Bundle/RB-Python/"

In [6]:
logpath = "/storage/Study/myCode/spark/logs/ghtorrent-logs.txt"

In [7]:
logData = sc.textFile(logpath)

In [8]:
logData.count()

9669788

In [10]:
logData.filter(lambda x: "WARN" in x).count()

132158

In [11]:
apiRDD = logData.filter(lambda x: "api_client" in x).filter(lambda x: "https://api.github.com/repos" in x)

In [12]:
def maprepos(x):
    try:
        return re.search('^.*repos/(.*?)/.*$', x.split(",")[2]).groups()[0]
    except:
        return "invalid"

In [13]:
apiRDD.map(maprepos).filter(lambda x: x != "invalid").distinct().count()

55986

In [14]:
apiRDD.groupBy(lambda x: x.split(",")[2].split(" --")[0]) \
        .map(lambda x: (x[0], len(x[1]))) \
        .max(key=lambda x: x[1])

(' ghtorrent-13', 85463)

In [15]:
apiRDD.filter(lambda x: 'Failed request' in x) \
        .groupBy(lambda x: x.split(",")[2].split(" --")[0]) \
        .map(lambda x: (x[0], len(x[1]))) \
        .max(key=lambda x: x[1])

(' ghtorrent-13', 79617)

In [16]:
apiRDD.map(lambda x: x.split(",")[1].split("T")[1].split(":")[0]) \
        .groupBy(lambda x: x) \
        .map(lambda x: (x[0], len(x[1]))) \
        .max(key=lambda x: x[1])

('10', 217947)

In [17]:
apiRDD.map(maprepos) \
        .filter(lambda x: x != "invalid") \
        .groupBy(lambda x: x) \
        .map(lambda x: (x[0], len(x[1]))) \
        .max(key=lambda x: x[1])

('greatfakeman', 79524)

In [18]:
def mapaccess(x):
    try:
        return x.split("Access: ")[1].split(",")[0]
    except:
        return "invalid"

In [19]:
apiRDD.filter(lambda x: "Access" in x) \
        .filter(lambda x: "Status code: 200" not in x) \
        .map(mapaccess) \
        .filter(lambda x: x != "invalid") \
        .groupBy(lambda x: x) \
        .map(lambda x: (x[0], len(x[1]))) \
        .max(key=lambda x: x[1])

('ac6168f8776', 79617)

In [20]:
reposfilepath = "/storage/Study/myCode/spark/logs/important-repos.csv"
repos = sc.textFile(reposfilepath)

In [21]:
repos.take(5)

['id,url,owner_id,name,language,created_at,forked_from,deleted,updated_at',
 '60256709,https://api.github.com/repos/1010026218/Basic,34433644,Basic,Java,2017-03-21 07:57:59,35370568,0,1970-01-02 00:00:00',
 '60250609,https://api.github.com/repos/10imaging/nextcloud-server,6978755,nextcloud-server,PHP,2017-03-13 06:31:16,38091986,0,1970-01-02 00:00:00',
 '40175831,https://api.github.com/repos/123wowow123/chronopin_node,5114164,chronopin_node,JavaScript,2016-06-29 02:24:43,NULL,0,1970-01-02 00:00:00',
 '25265021,https://api.github.com/repos/1N3/Sn1per,5002066,Sn1per,PHP,2015-09-06 15:47:38,NULL,0,2016-02-03 15:43:59']

In [22]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [23]:
reposSql = sqlContext.read.csv(reposfilepath, inferSchema=True, header = True)

In [24]:
reposSql.printSchema()
reposSql.show(3)

root
 |-- id: integer (nullable = true)
 |-- url: string (nullable = true)
 |-- owner_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- language: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- forked_from: string (nullable = true)
 |-- deleted: integer (nullable = true)
 |-- updated_at: string (nullable = true)

+--------+--------------------+--------+----------------+----------+-------------------+-----------+-------+-------------------+
|      id|                 url|owner_id|            name|  language|         created_at|forked_from|deleted|         updated_at|
+--------+--------------------+--------+----------------+----------+-------------------+-----------+-------+-------------------+
|60256709|https://api.githu...|34433644|           Basic|      Java|2017-03-21 07:57:59|   35370568|      0|1970-01-02 00:00:00|
|60250609|https://api.githu...| 6978755|nextcloud-server|       PHP|2017-03-13 06:31:16|   38091986|      0|1970-01-02 00:00

In [26]:
def mapSuccFail(x):
    try:
        return (re.search('^.*repos/(.*?)/.*$', x.split(",")[2]).groups()[0],
               int("Successful request" in x),
               int("Failed request" in x))
    except:
        return ("invalid",0,0)

In [27]:
apiRDD.map(mapSuccFail).filter(lambda x: x[0] != "invalid").take(5)

[('Particular', 1, 0),
 ('javier-serrano', 1, 0),
 ('llyp618', 1, 0),
 ('mdmahamodur2013', 1, 0),
 ('cultuurnet', 1, 0)]

In [28]:
reposRDD = apiRDD.map(mapSuccFail).filter(lambda x: x[0] != "invalid")

In [29]:
reposSqlRDD = reposRDD.toDF()

In [30]:
reposSqlRDD.show()

+---------------+---+---+
|             _1| _2| _3|
+---------------+---+---+
|     Particular|  1|  0|
| javier-serrano|  1|  0|
|        llyp618|  1|  0|
|mdmahamodur2013|  1|  0|
|     cultuurnet|  1|  0|
|        Shopify|  1|  0|
|   CanonicalLtd|  1|  0|
|   greatfakeman|  0|  1|
|   abdelguezrou|  1|  0|
|      openfisca|  1|  0|
|   greatfakeman|  0|  1|
|       Midhilaj|  1|  0|
|   CanonicalLtd|  1|  0|
|      JoffZhang|  1|  0|
|     tianshapjq|  1|  0|
|     OsmanBrito|  1|  0|
|     bangonkali|  1|  0|
|       Lena1985|  1|  0|
| marcelobdsilva|  1|  0|
|        RaoUmer|  1|  0|
+---------------+---+---+
only showing top 20 rows



In [31]:
reposSqlRDD.createOrReplaceTempView("reposSqlRDD")
reposSql.createOrReplaceTempView("reposSql")

In [32]:
sqlString = '''select t1.name, sum(t2._2) as Successful, sum(t2._3) as Failed from reposSql t1 
                inner join reposSqlRDD t2 
                on (t1.name = t2._1)
                group by t1.name'''

In [33]:
sqlContext.sql(sqlString).show()

+--------------+----------+------+
|          name|Successful|Failed|
+--------------+----------+------+
|      collectd|        26|     0|
|     openresty|        20|     0|
|           jwt|         1|     0|
|     benchflow|        20|     0|
|  speedtracker|         6|     0|
|       angular|      2265|    21|
|    AngleSharp|         2|     0|
|PoE-TradeMacro|         1|     0|
|     mediadrop|         3|     0|
|        scrapy|        25|     0|
|           aui|        13|     0|
|       facette|         3|     0|
|         cdnjs|        83|     0|
|      rapidpro|         3|     0|
|   thingsboard|         7|     0|
|       3xp10it|         9|     0|
|        neovim|        91|     0|
|           vcr|         1|     0|
|         netty|        21|     2|
|       nistats|         6|     0|
+--------------+----------+------+
only showing top 20 rows

