## MIE1512
Download the raw event data of January 2017 and transform it to desired csv file.
======

The datasets I need is the number of events a user performed in a given period. However, it can not be directly accessed by the Github API. The only source that I found to be appropriate for my research is the Github Archive, which records every single event happened since 2/12/2011. Thus, I chose to download all the events happend in January of 2017 and try to transfer them into the format of number of events users performed in a csv file.

Becase the data on githubArchive are stored every hour, I need to download all the data in January, which are about 720 json files with total size about 80GB. What I need from the raw data are the numbers of events that a user has done on github in January. To achieve this, I merged the json files day by day and then extract the userID, event type for each record from each merged json file. Then I grouped by the user and count the number of each event type, used the pivot method to obtain the number of event a user performed for that day. Finally I union 31 days' records together and write them into a csv file.

The final dataframe Jan_df is downloaded, divided into 2 parts and uploaded to Github as the resource of the main notebook.

Below are the functions used for the download, decompressing and merging these json files together.

In [4]:
def add_zero(s):

    if len(s) < 2:

        return "0" + s

    else:

        return s

In [5]:
import urllib
def downloadNDays(y, m, d, duration):
    for i in range(duration):
        Y= add_zero(str(y))
        M= add_zero(str(m))
        D= add_zero(str(d))
        for i in range(24):
            urllib.urlretrieve(("http://data.githubarchive.org/%s-%s-%s-%s.json.gz" %(Y, M, D, i)), ("/tmp/%s-%s-%s-%s.json.gz" %(Y, M, D, i)))
        d= d+1
## define the download functions to get data from github archive

In [6]:
import dbutils
def deleteNDays(y, m, d, duration):
    for i in range(duration):
        Y= add_zero(str(y))
        M= add_zero(str(m))
        D= add_zero(str(d))
        for i in range(24):
            dbutils.fs.rm(("/tmp/%s-%s-%s-%s.json.gz" %(Y, M, D, i)))
        dbutils.fs.rm(("/tmp/%s-%s-%s-0.json" %(Y, M, D)))
        d= d+1

In [7]:
PATH_PREFIX= "/tmp/"
EXT_JG=".json.gz"
EXT_JSON= ".json"

def get_file_name_json_gz(para_list):

    return get_file_name(para_list) + EXT_JG

def get_file_name_json(para_list):

    return get_file_name(para_list) + EXT_JSON

def get_file_name(para_list):

    list_ = list(map(str,para_list))

    list_[1] = add_zero(list_[1])    

    list_[2] = add_zero(list_[2])

    return '-'.join(list_)
    


In [8]:
import gzip
def merge_one_day(cy, cm, cd):

    tmp = []

    for hh in range(24):

        with gzip.open(PATH_PREFIX + get_file_name_json_gz([cy, cm, cd, hh]), 'r') as f_in:

            tmp += f_in


    with open(PATH_PREFIX + get_file_name_json([cy, cm, cd, 0]), 'wb') as f_out:

       for line in tmp:

           f_out.write(line)



get_one_day is used to get the dataframe of number of each user's event for one day

In [10]:
from pyspark.sql import functions
from pyspark.sql.functions import count
from pyspark.sql.types import *
PATH= 'file:/tmp/'
def get_one_day(cy, cm, cd):
    inputPath = PATH + get_file_name_json([cy, cm, cd, 0])
    print(inputPath)
    InputDF = sqlContext.read.json(inputPath)

    githubInput = InputDF.withColumn("actorID", InputDF.actor.id)
    githubInput = githubInput.withColumn("actorName", InputDF.actor.login)
    githubInput = githubInput.withColumn("repoId", InputDF.repo.id)
    githubInput = githubInput.withColumn("repoName", InputDF.repo.Name)

    githubInputDF= githubInput.select("id","type","actorID","actorName","repoID","repoName","created_at")
    
    githubGroupDF= githubInputDF.groupBy(['actorID','actorName','type']).agg(count("id").alias("_count"))

    typeList= githubGroupDF.select('type').distinct().rdd.map(lambda r: r[0]).collect() ## without actorID

    typeList2= [str(x) for x in typeList]
    type_list= []
    type_list.extend(['actorID'])
    type_list.extend(typeList2)

    githubGroupedData= githubGroupDF.groupBy(['actorID','actorName']).pivot('type', typeList ).sum("_count")

    githubRdd= githubGroupedData.select(type_list).filter("PushEvent<100").na.fill(0).rdd
    
    return githubRdd, type_list

get_n_days merge the 31 days' dataframe together.

In [12]:
def get_n_days(cy, cm, cd, n):
    nDays, type_list= get_one_day(cy, cm, cd)
    for dd in range(cd+1,cd+n):
        oneDayRdd, one_day_type= get_one_day(cy, cm, dd)
        nDays = nDays.union(oneDayRdd)
        type_list= type_list + list(set(one_day_type) - set(type_list))
        print("get"+ str(dd)+ "day")
    return nDays, type_list

Below is the main function and write all the data on a csv file.

In [14]:
def mainDataCollection(cy, cm, sd, ed):
    downloadNDays(cy, cm, sd, ed-sd+1)
    print("Download Complete")
    for dd in range(sd, ed+1):
        merge_one_day(cy,cm,dd)
        print("merge %s" %dd)
    print("Merge Complete")
    ## decompress the data and put these json files into one json
    nDaysRdd, type_list= get_n_days(cy, cm, sd, ed-sd+1)
    nDaysRdd.toDF(type_list).write.csv(("dbfs:/FileStore/tables/MIE1512/%s-%s/%s-%s-Start_%s-End_%s.csv" %(cy, cm, cy, cm, sd, ed)), header= True)
    print("csv writing complete")
    deleteNDays(cy, cm, sd, ed-sd+1)

In [15]:
mainDataCollection(2017,1,1,31)

Above codes cancatenate the data of each day together, but there are users that have records in many days of January.

Thus, I then group the users and sum the number of events for each user.

After the group and sum operation, each row now represents a user and carries information about number of different events the user has done in January.

In [17]:
JanDF = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('dbfs:/FileStore/tables/MIE1512/2017-1/2017-1-Start_1-End_31.csv')

In [18]:
type_list= JanDF.schema.names[1:]
type_list1= JanDF.schema.names
exprs = {x: "sum" for x in type_list}
df= JanDF.groupBy("actorID").agg(exprs)
df.show()

In [19]:
def get_within(column):
    return column[column.find("(")+1:column.find(")")]
## return the column name within sum()
oldColumns= df.schema.names[1:]
newColumns= JanDF.schema.names[1:]
Jan_df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], get_within(oldColumns[idx])), xrange(len(oldColumns)), df)

In [20]:
Jan_df.describe().show()

In [21]:
display(Jan_df)