# Main class 1

### (by default for Maven dataset)

In [81]:
import os
import json
import time, datetime
import itertools
import re
import numpy as np

from pyspark.sql.functions import udf
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors, SparseVector
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils

from pyspark import SparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)


alp = 'abcdefghijklmnopqrstuvwxyz'
lib = {alp[i]: 10+i for i in range(len(alp))}


def loop(key, val):
    pass

def unique(s):
    n = len(s)
    if n == 0: return []

    u = {}
    try:
        for x in s: u[x] = 1
    except TypeError:
        del u  # move on to the next method
    else:
        return u.keys()

    try:
        t = list(s)
        t.sort()
    except TypeError:
        del t  # move on to the next method
    else:
        assert n > 0
        last = t[0]
        lasti = i = 1
        while i < n:
            if t[i] != last:
                t[lasti] = last = t[i]
                lasti += 1
            i += 1
        return t[:lasti]
        
    u = []    # Brute force is all that's left
    for x in s:
        if x not in u:
            u.append(x)
    return u
    
def author_transform(a):
    return '-'.join(a) if len(a)> 1 else a[0]

def path_joining(p):
    pos = maxLen = 0
    for i in range(len(p)):
        if len(p[i]) > maxLen: 
            maxLen = len(p[i])
            pos = i
    p = p[pos]
    if len(p) > 1:
        p = '/'.join(p).replace('...', '')
        p = ' '.join(p.split("/")).strip().replace('  ', '')
    else:
        if p == []: 
            p = ''
        else:
            p = p[0]
            p = ' '.join(p.split("/")).strip().replace('...', '')
    return p

def date_coding(d):
    d.sort()
    s = float(float(d[-1])-float(d[0]))/3600.0 if len(d) > 1 else 0.0
    return (d[0],s)

def only_nums(a):
    s = 0
    for item in a:
        x = [int(i) if i.isdigit() else lib[i] for i in item]
        s += sum(x)*len(item) if x != [] else 0
    return s

def revNo_coding(no):
    if no == '':
        return 0
    else:
        is_alpha = False
        for i in no:
            if i.isalpha():
                is_alpha = True
                break
        if is_alpha:
            return sum([int(i) if i.isdigit() else lib[i] for i in no])*len(no)
        else:
            return float(no)

def extract_data(x):
    a = list(x[0])
    a.extend(x[1])
    return tuple(a)

def toCSVLine(data):
    return ','.join([str(d) for d in data])

def parsePoint(line):
    values = [float(x) for x in line.split(',')]
    return LabeledPoint(values[1], values[2:])

def remove_dir_fully(path):
    import shutil
    shutil.rmtree(path)


class PriorityLabelCoding(object):
    """  """
    def coding(self, b):
        label_binary = 0
        if b == 'Blocker':
            label_multi = label_binary = 1
        elif b == 'Critical':
            label_multi = label_binary = 1
        elif b == 'Major':
            label_multi = 2
        elif b == 'Minor':
            label_multi = 3
        elif b == 'Trivial':
            label_multi = 4
        elif b == 10:
            label_multi = 10
        return (label_multi, label_binary)
    
    
class DateFormating(object):
    """  """
    def __init__(self, date_format):
        self.date_format = date_format
    
    def date_to_datetime(self, date):
        date = str(date).split('-')[0].strip().split('+')[0].strip()
        if date != '' and date != 'nan':
            return str(time.mktime(datetime.datetime.strptime(str(date), self.date_format).timetuple()))
        else:
            return '0'
          

class TrainDataTransformer(object):
    """ All parameters are set for 'maven' dataset by default """
    def __init__(self, path_to_dataset_directory):
        self.path = path_to_dataset_directory
        self._dataset_name = 'maven'
        self._commits_path = os.path.join(self.path, 'commits')
        self._issues_path = os.path.join(self.path, 'issues')
        self._commits_columns = ['bugNo', 'author', 'date', 'modifiedFiles', 'revNo']
        self._issues_columns = ["bugNo", "Priority"]
        self._priority_column = 'Priority'
        self._date_format = "%a %b  %d %H:%M:%S %Y"
        self._commits_for_rename = {}
        self._issues_for_rename = {}
    
    def _create_commits_df(self):
        obj = DateFormating(self._date_format)
        self._commits_df = sqlContext.createDataFrame(self._read_json(self._commits_path))
        if isinstance(self._commits_for_rename, dict) and self._commits_for_rename != {}:
            self._rename_commits_df_columns(self._commits_for_rename)
        self._commits_df = self._commits_df.select(self._commits_columns)
        if self._date_format != '':
            d2dt = udf(lambda date: obj.date_to_datetime(date))
            self._commits_df = self._commits_df.withColumn("date2", d2dt(self._commits_df.date))\
                                             .drop('date').withColumnRenamed('date2', 'date')
            self._commits_df = self._commits_df.filter(self._commits_df.date != '0')
        # for grouping it is necessary that the column 'bugNo' was the first positioned
        if self._commits_df.columns[0] != 'bugNo':
            col1 = self._commits_df.columns[0]
            self._commits_df = self._commits_df.withColumn(col1+'_duplicate', self._commits_df[col1]) \
                                                .withColumn("bugNo_duplicate", self._commits_df['bugNo']) \
                                                .drop('bugNo').withColumnRenamed('bugNo_duplicate', 'bugNo') \
                                                .drop(col1).withColumnRenamed(col1+'_duplicate', col1)
   
    def _create_commits_rdd(self):
        self.commits_rdd = self._commits_df.rdd
        self.commits_rdd = self.commits_rdd.map(lambda x: (x[0], x[1], x[2], x[3], x[4]))
        self._no_bug_rdd = self.commits_rdd.filter(lambda x: x[0] == '').map(lambda x: (10, [x[1]], [x[2]], [x[3]], [x[4]]))
        
    def _create_issues_df(self):
        data = self._read_json(self._issues_path)
        
        parsed_data = []
        for row in data:
            el = {}
            els1 = []
            els2 = []
            for key,val in row.iteritems():
                if isinstance(val, list):
                    if val == []:
                        el[key] = ''
                    elif len(val) == 1:
                        is_nested_list = False
                        while isinstance(val, list):
                            val = val[0]
                            if len(val) > 1 and not isinstance(val, dict):
                                is_nested_list = True
                                break
                        if is_nested_list:
                            els1 = loop(key,val)
                        else:
                            for key1,val1 in val.iteritems():
                                el[key1] = val1
                    else:   # i.e. len(val) > 1
                        els2 = loop(key,val)
                elif isinstance(val, dict):
                    for key1,val1 in val.iteritems():
                        el[key1] = val1
                else:
                    el[key] = val
            parsed_data.append(el)
            
        self._issues_df = sqlContext.createDataFrame(parsed_data)
        if isinstance(self._issues_for_rename, dict) and self._issues_for_rename != {}:
            self._rename_issues_df_columns(self._issues_for_rename)
        self._issues_df = self._issues_df.filter(self._issues_df[self._priority_column] != '')
        self._issues_df = self._issues_df.select(self._issues_columns)
        
    def _create_issues_rdd(self):
        self.issues_rdd = self._issues_df.rdd
        self.issues_rdd = self.issues_rdd.map(lambda x: (x[0], x[1]))

    def _bug_grouping_and_joining(self):
        rdd_bug_grouped = self.commits_rdd.map(lambda x: (x[0], x[1])).groupByKey().map(lambda x: (x[0], unique(list(x[1]))))
        rdd_joined = self.issues_rdd.join(rdd_bug_grouped)
        for i in range(2,5):
            rdd_bug_grouped_i = self.commits_rdd.map(lambda x: (x[0], x[i])).groupByKey().map(lambda x: (x[0], unique(list(x[1]))))
            rdd_joined = rdd_joined.join(rdd_bug_grouped_i)
        
        #print 'rdd_joined'
        #print rdd_joined.take(3)
        self.bug_grouped_rdd = rdd_joined.map(lambda x: (x[1][0][0][0][0], x[1][0][0][0][1], x[1][0][0][1], x[1][0][1], x[1][1]))
        #self.bug_grouped_rdd = self.bug_grouped_rdd.union(self._no_bug_rdd)
        
    def _author_coding(self):
        unique_authors = self.bug_grouped_rdd.map(lambda x: x[1]).map(author_transform).distinct().zipWithIndex() 
        self.bug_grouped_rdd = self.bug_grouped_rdd.map(lambda x: (author_transform(x[1]), (x[0], x[2], x[3], x[4])))
        self.bug_grouped_rdd = self.bug_grouped_rdd.leftOuterJoin(unique_authors).\
                                                    map(lambda x: (x[1][0][0], x[1][1], x[1][0][1], x[1][0][2], x[1][0][3]))
            
    def _date_coding(self):
        self.bug_grouped_rdd = self.bug_grouped_rdd.map(lambda x: (x[0], x[1], x[2], x[3], \
                                                                   float(date_coding(x[4])[0]), date_coding(x[4])[1]))
            
    def _revNo_coding(self):
        self.bug_grouped_rdd = self.bug_grouped_rdd.map(lambda x: (x[0], x[1], x[2], revNo_coding(x[3][0]), x[4], x[5]))
      
    def _label_coding(self):        
        obj = PriorityLabelCoding()
        self.bug_grouped_rdd = self.bug_grouped_rdd.\
                                    map(lambda x: (obj.coding(x[0])[0], obj.coding(x[0])[1], x[1], x[2], x[3], x[4], x[5]))    
    
    def _path_coding(self):
        self.coded_rdd = self.bug_grouped_rdd.map(lambda x: (x[0], x[1], x[2], x[4], x[5], x[6]))
        path_rdd = self.bug_grouped_rdd.map(lambda x: x[3]).map(path_joining)
        
        hashingTF = HashingTF(4000)
        tf = hashingTF.transform(path_rdd)
        tf.cache()
        idf = IDF().fit(tf)
        tfidf = idf.transform(tf)
        resData =  tfidf.map(lambda x: list(x.toArray()))
               
        self.coded_rdd = self.coded_rdd.zip(resData)
        self.coded_rdd = self.coded_rdd.map(extract_data).coalesce(1)
     
    def run(self):
        ''' Reads data, transforms and cleans it and saves processed and encoded data to the text file '''
        start = time.time()
        print '-'*20+'commits_rdd creation'.upper()+'-'*20+' '+str(round(time.time()-start,3))+' sec'
        self._create_commits_df()
        self._create_commits_rdd()
        
        print '-'*20+'issues_rdd creation'.upper()+'-'*21+' '+str(round(time.time()-start,3))+' sec'
        self._create_issues_df()
        self._create_issues_rdd()
        
        print '-'*20+'grouping and joining'.upper()+'-'*20+' '+str(round(time.time()-start,3))+' sec'
        self._bug_grouping_and_joining()
        
        print '-'*23+'author coding'.upper()+'-'*24+' '+str(round(time.time()-start,3))+' sec'
        self._author_coding()
        
        print '-'*24+'date coding'.upper()+'-'*25+' '+str(round(time.time()-start,3))+' sec'
        self._date_coding()
        
        print '-'*24+'revNo coding'.upper()+'-'*24+' '+str(round(time.time()-start,3))+' sec'
        self._revNo_coding()
        
        print '-'*24+'label coding'.upper()+'-'*24+' '+str(round(time.time()-start,3))+' sec'
        self._label_coding()
            
        print '-'*24+'path coding'.upper()+'-'*25+' '+str(round(time.time()-start,3))+' sec'
        self._path_coding()        
        
        print '-'*26+'saving'.upper()+'-'*28+' '+str(round(time.time()-start,3))+' sec'
        lines = self.coded_rdd.map(toCSVLine)
        lines.saveAsTextFile(os.path.join(self.path, self._dataset_name+'_encoded_train_dataset.csv'))
        print '-'*26+'finish'.upper()+'-'*28+' '+str(round(time.time()-start,3))+' sec'
        
    def train(self):
        # Load and parse the data file into an RDD of LabeledPoint.
        start = time.time()
        print '\n' + '-'*23+'training start'.upper()+'-'*23
        try:
            parsedData = self.coded_rdd.map(lambda x: LabeledPoint(x[1], x[2:]))
        except NameError:
            try:
                # open(self._dataset_name + '_encoded_dataset.csv', 'r')
                data = sc.textFile(self._dataset_name + '_encoded_dataset.csv')
                parsedData = data.map(parsePoint)
            except IOError:
                raise Exception('File with encoded data was not found')
            else:
                raise Exception('Encoded dataset was not created')
        
        # Split the data into training and test sets (30% held out for testing)
        (trainingData, testData) = parsedData.randomSplit([0.7, 0.3])

        # Train a RandomForest model.
        #  Empty categoricalFeaturesInfo indicates all features are continuous.
        #  Note: Use larger numTrees in practice.
        #  Setting featureSubsetStrategy="auto" lets the algorithm choose.
        model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                             numTrees=300, featureSubsetStrategy="auto",
                                             impurity='gini', maxDepth=4, maxBins=32)

        #model.save(sc, os.path.join(self.path, self._dataset_name+'_model.pkl'))
        # Evaluate model on test instances and compute test error
        predictions = model.predict(testData.map(lambda x: x.features))
        labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
        testErr = labelsAndPredictions.filter(lambda (v, p): v == p).count() / float(testData.count())
        print '-'*20+'training is finished'.upper()+'-'*20+' '+str(round(time.time()-start,3))+' sec'
        print('Accuracy = ' + str(testErr))
        
    def _rename_commits_df_columns(self, col_names):
        """ col_names = {'old_name1': 'new_name1', 'old_name2': 'new_name2', ... } """
        assert (isinstance(col_names, dict)), "Argument 'col_names' should be a dict."
        for old_name, new_name in col_names.iteritems():
            self._commits_df = self._commits_df.withColumnRenamed(old_name, new_name)
    
    def _rename_issues_df_columns(self, col_names):
        """ col_names = {'old_name1': 'new_name1', 'old_name2': 'new_name2', ... } """
        assert (isinstance(col_names, dict)), "Argument 'col_names' should be a dict."
        for old_name, new_name in col_names.iteritems():
            self._issues_df = self._issues_df.withColumnRenamed(old_name, new_name)
            
    def _read_json(self, path, coding='utf8'):
        files = [os.path.join(path, f) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f)) \
                                                                 and f.split('.')[-1] == 'json']
        return [json.loads(line, coding) for f in files for line in open(f, 'r')]
    
    
class MavenTrainDataTransformer(TrainDataTransformer):
    pass

### 1. Maven dataset

In [2]:
maven = MavenTrainDataTransformer('./maven')
maven.run()
maven.train()

--------------------COMMITS_RDD CREATION-------------------- 0.0 sec
--------------------ISSUES_RDD CREATION--------------------- 8.1 sec
--------------------GROUPING AND JOINING-------------------- 9.467 sec
-----------------------AUTHOR CODING------------------------ 9.95 sec
------------------------DATE CODING------------------------- 32.359 sec
------------------------REVNO CODING------------------------ 32.36 sec
------------------------LABEL CODING------------------------ 32.36 sec
------------------------PATH CODING------------------------- 32.36 sec
--------------------------SAVING---------------------------- 41.942 sec
--------------------------FINISH---------------------------- 152.701 sec

-----------------------TRAINING START------------------------
--------------------TRAINING IS FINISHED--------------------- 458.192 sec
Accuracy = 0.95145631068




### 2. Hadoop dataset 

In [4]:
class HadoopTrainDataTransformer(TrainDataTransformer):
    
    def __init__(self, path_to_dataset_directory):
        TrainDataTransformer.__init__(self, path_to_dataset_directory)
        self._dataset_name = 'hadoop'
        self._date_format = "%a %b %d %H:%M:%S %Y"
        self._commits_for_rename = {'authored': 'date'}

In [5]:
hadoop = HadoopTrainDataTransformer('./hadoop')
hadoop.run()
hadoop.train()

--------------------COMMITS_RDD CREATION-------------------- 0.0 sec
--------------------ISSUES_RDD CREATION--------------------- 10.834 sec
--------------------GROUPING AND JOINING-------------------- 11.717 sec
-----------------------AUTHOR CODING------------------------ 13.072 sec
------------------------DATE CODING------------------------- 33.008 sec
------------------------REVNO CODING------------------------ 33.013 sec
------------------------LABEL CODING------------------------ 33.013 sec
------------------------PATH CODING------------------------- 33.013 sec
--------------------------SAVING---------------------------- 41.892 sec
--------------------------FINISH---------------------------- 75.07 sec

-----------------------TRAINING START-----------------------
--------------------TRAINING IS FINISHED-------------------- 156.24 sec
Accuracy = 0.882352941176




### 3. Hbase dataset

In [7]:
class HbaseTrainDataTransformer(TrainDataTransformer):
    
    def __init__(self, path_to_dataset_directory):
        TrainDataTransformer.__init__(self, path_to_dataset_directory)
        self._dataset_name = 'hbase'

In [8]:
hbase = HbaseTrainDataTransformer('./hbase')
hbase.run()
hbase.train()

--------------------COMMITS_RDD CREATION-------------------- 0.0 sec
--------------------ISSUES_RDD CREATION--------------------- 2.195 sec
--------------------GROUPING AND JOINING-------------------- 5.285 sec
-----------------------AUTHOR CODING------------------------ 5.577 sec
------------------------DATE CODING------------------------- 17.547 sec
------------------------REVNO CODING------------------------ 17.551 sec
------------------------LABEL CODING------------------------ 17.551 sec
------------------------PATH CODING------------------------- 17.551 sec
--------------------------SAVING---------------------------- 25.353 sec
--------------------------FINISH---------------------------- 284.258 sec

-----------------------TRAINING START-----------------------
--------------------TRAINING IS FINISHED-------------------- 739.635 sec
Accuracy = 0.837554045707


### 4. Ignite dataset

In [88]:
class IgniteTrainDataTransformer(TrainDataTransformer):
    
    def __init__(self, path_to_dataset_directory):
        TrainDataTransformer.__init__(self, path_to_dataset_directory)
        self._dataset_name = 'ignite'
        self._date_format = "%a %b %d %H:%M:%S %Y"
        self._commits_for_rename = {'authored': 'date'}

In [89]:
ignite = IgniteTrainDataTransformer('./ignite')
ignite.run()
ignite.train()

--------------------COMMITS_RDD CREATION-------------------- 0.0 sec
--------------------ISSUES_RDD CREATION--------------------- 8.131 sec
--------------------GROUPING AND JOINING-------------------- 8.439 sec
-----------------------AUTHOR CODING------------------------ 8.653 sec
------------------------DATE CODING------------------------- 48.154 sec
------------------------REVNO CODING------------------------ 48.169 sec
------------------------LABEL CODING------------------------ 48.169 sec
------------------------PATH CODING------------------------- 48.17 sec
--------------------------SAVING---------------------------- 51.583 sec
--------------------------FINISH---------------------------- 81.898 sec

-----------------------TRAINING START-----------------------
--------------------TRAINING IS FINISHED-------------------- 122.888 sec
Accuracy = 0.647058823529


### 5. Jmeter dataset

In [77]:
def getBugNoByURL(item):
    x = re.findall('\d+', str(item))
    return x[0] if x != [] else ''


class ImportanceLabelCoding(PriorityLabelCoding):
    """  """
    def coding(self, b):
        try:
            label_multi = int(b[1])
        except:
            label_multi = 10
        label_binary = 1 if label_multi == 1 else 0
        return (label_multi, label_binary)


class JmeterTrainDataTransformer(TrainDataTransformer):
    
    def __init__(self, path_to_dataset_directory):
        TrainDataTransformer.__init__(self, path_to_dataset_directory)
        self._dataset_name = 'jmeter'
        self._issues_columns = ["bugNo", "Importance:"]
        self._priority_column = 'Importance:'
        
    def _create_issues_df(self):
        data = self._read_json(self._issues_path)
        
        parsed_data = []
        for row in data:
            el = {}
            for key,val in row.iteritems():
                if isinstance(val, list):
                    if val == []:
                        el[key] = ''
                    elif len(val) == 1:
                        while isinstance(val, list):
                            val = val[0]
                        for key1,val1 in val.iteritems():
                            el[key1] = val1
                    else:
                        for i in val:
                            for key1,val1 in i.iteritems():
                                el[key1] = val1
                elif isinstance(val, dict):
                    for key1,val1 in val.iteritems():
                        el[key1] = val1
                else:
                    el[key] = val
            url = ''
            if 'titleURL' in el.keys():
                url = el['titleURL']
            elif 'detailsURL' in el.keys():
                url = el['detailsURL']
            el['bugNo'] = getBugNoByURL(url)
            parsed_data.append(el)
        
        self._issues_df = sqlContext.createDataFrame(parsed_data)
        if isinstance(self._issues_for_rename, dict) and self._issues_for_rename != {}:
            self._rename_issues_df_columns(self._issues_for_rename)
        self._issues_df = self._issues_df.filter(self._issues_df[self._priority_column] != '')
        self._issues_df = self._issues_df.select(self._issues_columns)
        
    def _label_coding(self):        
        obj = ImportanceLabelCoding()
        self.bug_grouped_rdd = self.bug_grouped_rdd.\
                                    map(lambda x: (obj.coding(x[0])[0], obj.coding(x[0])[1], x[1], x[2], x[3], x[4], x[5]))  

In [78]:
jmeter = JmeterTrainDataTransformer('./jmeter')
jmeter.run()
jmeter.train()

--------------------COMMITS_RDD CREATION-------------------- 0.0 sec
--------------------ISSUES_RDD CREATION--------------------- 0.604 sec
--------------------GROUPING AND JOINING-------------------- 1.341 sec
-----------------------AUTHOR CODING------------------------ 1.61 sec
------------------------DATE CODING------------------------- 6.508 sec
------------------------REVNO CODING------------------------ 6.512 sec
------------------------LABEL CODING------------------------ 6.512 sec
------------------------PATH CODING------------------------- 6.512 sec
--------------------------SAVING---------------------------- 9.358 sec
--------------------------FINISH---------------------------- 46.059 sec

-----------------------TRAINING START-----------------------
--------------------TRAINING IS FINISHED-------------------- 98.208 sec
Accuracy = 0.977099236641


### 6. Spark dataset

In [82]:
class SparkTrainDataTransformer(TrainDataTransformer):
    
    def __init__(self, path_to_dataset_directory):
        TrainDataTransformer.__init__(self, path_to_dataset_directory)
        self._dataset_name = 'spark'
        self._date_format = "%a %b %d %H:%M:%S %Y"
        self._commits_for_rename = {'authored': 'date'}

In [83]:
spark = SparkTrainDataTransformer('./spark')
spark.run()
spark.train()

--------------------COMMITS_RDD CREATION-------------------- 0.0 sec
--------------------ISSUES_RDD CREATION--------------------- 1.615 sec
--------------------GROUPING AND JOINING-------------------- 2.909 sec
-----------------------AUTHOR CODING------------------------ 3.098 sec
------------------------DATE CODING------------------------- 10.455 sec
------------------------REVNO CODING------------------------ 10.458 sec
------------------------LABEL CODING------------------------ 10.459 sec
------------------------PATH CODING------------------------- 10.459 sec
--------------------------SAVING---------------------------- 17.813 sec
--------------------------FINISH---------------------------- 366.342 sec

-----------------------TRAINING START-----------------------
--------------------TRAINING IS FINISHED-------------------- 743.062 sec
Accuracy = 0.765087605451


# Main class 2

### (by default for Depot dataset)

In [28]:
def get_all_keys(dataset):
    keys = []
    for i in dataset:
        if isinstance(i, list): i = i[0]
        for k in i.keys():
            if k not in keys:
                keys.append(k)
    return keys

def size_coding(l):
    try:
        try: 
            l[0]
            l = list(itertools.chain.from_iterable(l))
        except: pass
        l = filter(lambda x: isinstance(x, str) or isinstance(x, unicode), l)
        l = filter(lambda x: x != '' and float(x) != 0.0, l)
        l = map(lambda x: float(x), l)
        x = np.mean(l)
        return 0.0 if np.isnan(x) else x
    except:
        return 0.0


class SeverityLabelCoding(PriorityLabelCoding):
    """  """
    def coding(self, b):
        label_binary = 0
        if b == 'A':
            labe_multi = label_binary = 1
        elif b == 'B':
            labe_multi = 2
        elif b == 'C':
            labe_multi = 3
        elif b == 10:
            labe_multi = 10
        return (labe_multi, label_binary)


class JobTrainDataTransformer(TrainDataTransformer):
    
    def __init__(self, path_to_dataset_directory):
        TrainDataTransformer.__init__(self, path_to_dataset_directory)
        self._dataset_name = 'depot'
        self._commits_for_rename = {'authored': 'date'}
        self._commits_columns = ['bugNo', 'change', 'depotFile', 'time', 'user', 'rev', 'action', 'fileSize']
        self._issues_columns = ["bugNo", "Severity"]
        self._priority_column = 'Severity'
        self._date_format = ''
        self._commits_for_rename = {'job': 'bugNo'}
        self._issues_for_rename = {'Job': 'bugNo'}
    
    def _create_commits_rdd(self):
        self.commits_rdd = self._commits_df.rdd
        self.commits_rdd = self.commits_rdd.map(lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7]))
        self._no_bug_rdd = self.commits_rdd.filter(lambda x: x[0] == ''). \
                                            map(lambda x: (10, [x[1]], [x[2]], [x[3]], [x[4]], x[5], x[6], x[7]))
        
    def _bug_grouping_and_joining(self):
        rdd_bug_grouped = self.commits_rdd.map(lambda x: (x[0], x[1])).groupByKey().map(lambda x: (x[0], unique(list(x[1]))))
        rdd_joined = self.issues_rdd.join(rdd_bug_grouped)
        for i in range(2,8):
            rdd_bug_grouped_i = self.commits_rdd.map(lambda x: (x[0], x[i])).groupByKey().map(lambda x: (x[0], unique(list(x[1]))))
            rdd_joined = rdd_joined.join(rdd_bug_grouped_i)
        
        #print 'rdd_joined'
        #print rdd_joined.take(3)
        self.bug_grouped_rdd = rdd_joined.map(lambda x: (x[1][0][0][0][0][0][0][0], x[1][0][0][0][0][0][0][1], 
                                                         x[1][0][0][0][0][0][1],  x[1][0][0][0][0][1], 
                                                         x[1][0][0][0][1], x[1][0][0][1], x[1][0][1], x[1][1]))
        
    def _author_coding(self):
        unique_authors = self.bug_grouped_rdd.map(lambda x: x[4]).map(author_transform).distinct().zipWithIndex() 
        self.bug_grouped_rdd = self.bug_grouped_rdd.map(lambda x: (author_transform(x[4]), (x[0], x[1], x[2], 
                                                                                            x[3], x[5], x[6], x[7])))
        self.bug_grouped_rdd = self.bug_grouped_rdd.leftOuterJoin(unique_authors).\
                                                    map(lambda x: (x[1][0][0], x[1][1], x[1][0][1], x[1][0][2], 
                                                                   x[1][0][3], x[1][0][4], x[1][0][5], x[1][0][6]))
        
    def _date_coding(self):
        self.bug_grouped_rdd = self.bug_grouped_rdd.map(lambda x: (x[0], x[1], x[2], x[3], float(date_coding(x[4])[0]), 
                                                                   date_coding(x[4])[1], x[5], x[6], x[7]))
            
    def _revNo_coding(self):
        self.bug_grouped_rdd = self.bug_grouped_rdd.map(lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], 
                                                                   revNo_coding(x[6][0][0]), x[7], x[8]))
        
    def _change_coding(self):
        self.bug_grouped_rdd = self.bug_grouped_rdd.map(lambda x: (x[0], x[1], revNo_coding(x[2][0]), x[3], 
                                                                   x[4], x[5], x[6], x[7], x[8]))
        
    def _action_coding(self):
        unique_actions = self.bug_grouped_rdd.map(lambda x: unique(x[7][0])).map(author_transform).distinct().zipWithIndex() 
        self.bug_grouped_rdd = self.bug_grouped_rdd.map(lambda x: (author_transform(unique(x[7][0])), (x[0], x[1], x[2], x[3], 
                                                                                                       x[4], x[5], x[6], x[8])))
        self.bug_grouped_rdd = self.bug_grouped_rdd.leftOuterJoin(unique_actions).\
                                                    map(lambda x: (x[1][0][0], x[1][1], x[1][0][1], x[1][0][2], x[1][0][3], 
                                                                   x[1][0][4], x[1][0][5], x[1][0][6], x[1][0][7]))
      
    def _size_coding(self):
        self.bug_grouped_rdd = self.bug_grouped_rdd.map(lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], 
                                                                   x[6], x[7], size_coding(x[8][0])))  
        
    def _label_coding(self):        
        obj = SeverityLabelCoding()
        self.bug_grouped_rdd = self.bug_grouped_rdd.map(lambda x: (obj.coding(x[0])[0], obj.coding(x[0])[1], 
                                                                   x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8]))    
    
    def _path_coding(self):
        self.coded_rdd = self.bug_grouped_rdd.map(lambda x: (x[0], x[1], x[2], x[3], x[4], x[6], x[7], x[8], x[9]))
        path_rdd = self.bug_grouped_rdd.map(lambda x: x[5]).map(path_joining)
        
        hashingTF = HashingTF(4000)
        tf = hashingTF.transform(path_rdd)
        tf.cache()
        idf = IDF().fit(tf)
        tfidf = idf.transform(tf)
        resData =  tfidf.map(lambda x: list(x.toArray()))
               
        self.coded_rdd = self.coded_rdd.zip(resData)
        self.coded_rdd = self.coded_rdd.map(extract_data).coalesce(1)
     
    def run(self):
        ''' Reads data, transforms and cleans it and saves processed and encoded data to the text file '''
        start = time.time()
        print '-'*20+'commits_rdd creation'.upper()+'-'*20+' '+str(round(time.time()-start,3))+' sec'
        self._create_commits_df()
        self._create_commits_rdd()
        
        print '-'*20+'issues_rdd creation'.upper()+'-'*21+' '+str(round(time.time()-start,3))+' sec'
        self._create_issues_df()
        self._create_issues_rdd()
        
        print '-'*20+'grouping and joining'.upper()+'-'*20+' '+str(round(time.time()-start,3))+' sec'
        self._bug_grouping_and_joining()
        
        print '-'*23+'author coding'.upper()+'-'*24+' '+str(round(time.time()-start,3))+' sec'
        self._author_coding()
        
        print '-'*24+'date coding'.upper()+'-'*25+' '+str(round(time.time()-start,3))+' sec'
        self._date_coding()
        
        print '-'*24+'revNo coding'.upper()+'-'*24+' '+str(round(time.time()-start,3))+' sec'
        self._revNo_coding()
        
        print '-'*23+'change coding'.upper()+'-'*24+' '+str(round(time.time()-start,3))+' sec'
        self._change_coding()
        
        print '-'*23+'action coding'.upper()+'-'*24+' '+str(round(time.time()-start,3))+' sec'
        self._action_coding()
        
        print '-'*24+'size coding'.upper()+'-'*25+' '+str(round(time.time()-start,3))+' sec'
        self._size_coding()
        
        print '-'*24+'label coding'.upper()+'-'*24+' '+str(round(time.time()-start,3))+' sec'
        self._label_coding()
            
        print '-'*24+'path coding'.upper()+'-'*25+' '+str(round(time.time()-start,3))+' sec'
        self._path_coding()        
        
        print '-'*26+'saving'.upper()+'-'*28+' '+str(round(time.time()-start,3))+' sec'
        lines = self.coded_rdd.map(toCSVLine)
        lines.saveAsTextFile(os.path.join(self.path, self._dataset_name+'_encoded_train_dataset.csv'))
        print '-'*26+'finish'.upper()+'-'*28+' '+str(round(time.time()-start,3))+' sec'
        
        
    def _read_json(self, path, coding="ISO-8859-1"):
        files = [os.path.join(path, f) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f)) \
                                                                 and f.split('.')[-1] == 'json']
        x = [json.loads(line, coding) for f in files for line in open(f, 'r')]
        x = x = [i[0] if isinstance(i, list) else i for i in x[0]]
        if 'job' in get_all_keys(x):
            new_x = []
            counter = 0
            for i in x:
                if 'job' not in i.keys():
                    new_x.append(i)
                    new_x[counter].update({'job': ''})
                    counter += 1
                    continue
                if isinstance(i['job'], list) and len(i['job']) > 1:
                    for j in i['job']:
                        new_x.append(i)
                        new_x[counter].update({'job': j})
                        counter += 1
                else:
                    new_x.append(i)
                    if isinstance(i['job'], list):
                        new_x[counter].update({'job': i['job'][0]})
                    counter += 1
            return new_x
        else:
            return x
        
        
class DepotTrainDataTransformer(JobTrainDataTransformer):
    pass

### 7. Depot dataset

In [105]:
depot = DepotTrainDataTransformer('./depot')
depot.run()
depot.train()

--------------------COMMITS_RDD CREATION-------------------- 0.0 sec
--------------------ISSUES_RDD CREATION--------------------- 3.864 sec
--------------------GROUPING AND JOINING-------------------- 4.683 sec
-----------------------AUTHOR CODING------------------------ 4.938 sec
------------------------DATE CODING------------------------- 57.558 sec
------------------------REVNO CODING------------------------ 57.563 sec
-----------------------CHANGE CODING------------------------ 57.564 sec
-----------------------ACTION CODING------------------------ 57.564 sec
------------------------SIZE CODING------------------------- 62.72 sec
------------------------LABEL CODING------------------------ 62.724 sec
------------------------PATH CODING------------------------- 62.724 sec
--------------------------SAVING---------------------------- 76.703 sec
--------------------------FINISH---------------------------- 260.742 sec

-----------------------TRAINING START-----------------------
--------

### 8. P4_Java dataset

In [45]:
class P4JavaTrainDataTransformer(JobTrainDataTransformer):
    
    def __init__(self, path_to_dataset_directory):
        JobTrainDataTransformer.__init__(self, path_to_dataset_directory)
        self._dataset_name = 'p4_java'        

In [46]:
p4_java = P4JavaTrainDataTransformer('./p4_java')
p4_java.run()
p4_java.train()

--------------------COMMITS_RDD CREATION-------------------- 0.0 sec
--------------------ISSUES_RDD CREATION--------------------- 0.521 sec
--------------------GROUPING AND JOINING-------------------- 0.759 sec
-----------------------AUTHOR CODING------------------------ 1.178 sec
------------------------DATE CODING------------------------- 10.393 sec
------------------------REVNO CODING------------------------ 10.396 sec
-----------------------CHANGE CODING------------------------ 10.396 sec
-----------------------ACTION CODING------------------------ 10.397 sec
------------------------SIZE CODING------------------------- 14.539 sec
------------------------LABEL CODING------------------------ 14.543 sec
------------------------PATH CODING------------------------- 14.543 sec
--------------------------SAVING---------------------------- 23.536 sec
--------------------------FINISH---------------------------- 60.86 sec

-----------------------TRAINING START-----------------------
---------

### 9. P4_all dataset

In [29]:
class P4AllTrainDataTransformer(JobTrainDataTransformer):
    
    def __init__(self, path_to_dataset_directory):
        JobTrainDataTransformer.__init__(self, path_to_dataset_directory)
        self._dataset_name = 'p4_all'     

In [30]:
p4_all = P4AllTrainDataTransformer('./p4_all')
p4_all.run()
p4_all.train()

--------------------COMMITS_RDD CREATION-------------------- 0.0 sec
--------------------ISSUES_RDD CREATION--------------------- 5.791 sec
--------------------GROUPING AND JOINING-------------------- 7.479 sec
-----------------------AUTHOR CODING------------------------ 7.767 sec
------------------------DATE CODING------------------------- 88.719 sec
------------------------REVNO CODING------------------------ 88.722 sec
-----------------------CHANGE CODING------------------------ 88.722 sec
-----------------------ACTION CODING------------------------ 88.722 sec
------------------------SIZE CODING------------------------- 94.01 sec
------------------------LABEL CODING------------------------ 94.014 sec
------------------------PATH CODING------------------------- 94.014 sec
--------------------------SAVING---------------------------- 113.558 sec
--------------------------FINISH---------------------------- 305.348 sec

-----------------------TRAINING START-----------------------
-------

## Block for checking

In [12]:
dt = P4AllTrainDataTransformer('./p4_all')
dt._create_commits_df()
dt._create_issues_df()
c_df = dt._commits_df
print c_df
print c_df.show(3)
i_df = dt._issues_df
print i_df
print i_df.show(3)
dt._create_commits_rdd()
dt._create_issues_rdd()
c_rdd = dt.commits_rdd
i_rdd = dt.issues_rdd
print c_rdd
print c_rdd.take(3)

dt._bug_grouping_and_joining()
print 'after grouping and joining'
print dt.bug_grouped_rdd.take(3)

print 'author coding'
dt._author_coding()
print dt.bug_grouped_rdd.take(3)

print 'date coding'
dt._date_coding()
print dt.bug_grouped_rdd.take(3)

print 'revNo coding'
dt._revNo_coding()
print dt.bug_grouped_rdd.take(3)

print 'change_coding'
dt._change_coding()
print dt.bug_grouped_rdd.take(3)

print 'action_coding'
dt._action_coding()
print dt.bug_grouped_rdd.take(3)

print 'size_coding'
dt._size_coding()
print dt.bug_grouped_rdd.take(3)

print 'labels coding'
dt._label_coding()
print dt.bug_grouped_rdd.take(3)
 
print 'path coding'
dt._path_coding()
print dt.coded_rdd.take(3)

DataFrame[bugNo: string, change: string, depotFile: array<string>, time: string, user: string, rev: array<string>, action: array<string>, fileSize: array<string>]
+---------+------+--------------------+----------+---------+--------------------+--------------------+--------------------+
|    bugNo|change|           depotFile|      time|     user|                 rev|              action|            fileSize|
+---------+------+--------------------+----------+---------+--------------------+--------------------+--------------------+
|         |587196|[//depot/main/p4-...|1360099434|    smoon|                 [9]|              [edit]|              [2809]|
|         |525938|[//depot/main/p4-...|1348016310|   cgrant|[14, 10, 14, 10, ...|[integrate, integ...|[20240, 4877, 202...|
|job057782|512149|[//depot/main/p4-...|1345245031|nlemonier|                 [1]|               [add]|              [2997]|
+---------+------+--------------------+----------+---------+--------------------+------------

"\nprint 'size_coding'\ndt._size_coding()\nprint dt.bug_grouped_rdd.take(3)\n\nprint 'labels coding'\ndt._label_coding()\nprint dt.bug_grouped_rdd.take(3)\n\ny = dt.bug_grouped_rdd  \n#print 'path coding'\n#dt._path_coding()\n#print dt.coded_rdd.take(3)"

In [84]:
dt = SparkTrainDataTransformer('./spark')
dt._create_commits_df()
dt._create_issues_df()
c_df = dt._commits_df
print c_df
print c_df.show(3)
i_df = dt._issues_df
print i_df
print i_df.show(3)
dt._create_commits_rdd()
dt._create_issues_rdd()
c_rdd = dt.commits_rdd
i_rdd = dt.issues_rdd
dt._bug_grouping_and_joining()
print 'after grouping and joining'
print dt.bug_grouped_rdd.take(3)

print 'author coding'
dt._author_coding()
print dt.bug_grouped_rdd.take(3)

print 'date coding'
dt._date_coding()
print dt.bug_grouped_rdd.take(3)

print 'revNo coding'
dt._revNo_coding()
print dt.bug_grouped_rdd.take(3)

print 'labels coding'
dt._label_coding()
print dt.bug_grouped_rdd.take(3)
 
print 'path coding'
dt._path_coding()
print dt.coded_rdd.take(3)

DataFrame[bugNo: string, author: string, modifiedFiles: array<string>, revNo: string, date: string]
+----------+--------------------+--------------------+--------------------+------------+
|     bugNo|              author|       modifiedFiles|               revNo|        date|
+----------+--------------------+--------------------+--------------------+------------+
|SPARK-5035|Josh Rosen <joshr...|[http://git-wip-u...|61eb9be4b1a659766...|1420041767.0|
|SPARK-5035|Josh Rosen <joshr...|[http://git-wip-u...|5cf94775e3a722475...|1420041767.0|
|SPARK-2757|Sean Owen <sowen@...|[http://git-wip-u...|4bb12488d56ea651c...|1420045157.0|
+----------+--------------------+--------------------+--------------------+------------+
only showing top 3 rows

None
DataFrame[bugNo: string, Priority: string]
+----------+--------+
|     bugNo|Priority|
+----------+--------+
|SPARK-5042|   Major|
|SPARK-5043|   Major|
|SPARK-5044|   Major|
+----------+--------+
only showing top 3 rows

None
after grouping and j

In [31]:
def get_all_keys(dataset):
    keys = []
    for i in dataset:
        if isinstance(i, list): i = i[0]
        for k in i.keys():
            if k not in keys:
                keys.append(k)
    return keys

def read_json_0(path, coding='utf8'):
    files = [os.path.join(path, f) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f)) \
                                                                 and f.split('.')[-1] == 'json']
    return [json.loads(line, coding) for f in files for line in open(f, 'r')]

def read_json(path, coding="ISO-8859-1"):
    files = [os.path.join(path, f) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f)) \
                                                                 and f.split('.')[-1] == 'json']
    x = [json.loads(line, coding) for f in files for line in open(f, 'r')]
    x = [i[0] if isinstance(i, list) else i for i in x[0]] 
    if 'job' in get_all_keys(x):
        new_x = []
        counter = 0
        for i in x:
            if 'job' not in i.keys():
                new_x.append(i)
                new_x[counter].update({'job': ''})
                counter += 1
                continue
            if isinstance(i['job'], list) and len(i['job']) > 1:
                for j in i['job']:
                    new_x.append(i)
                    new_x[counter].update({'job': j})
                    counter += 1
            else:
                new_x.append(i)
                if isinstance(i['job'], list):
                    new_x[counter].update({'job': i['job'][0]})
                counter += 1
        return new_x
    else:
        return x

In [85]:
read_json_0('./ignite/issues')[:3]

[{u'attachments': [{u'Author': u'Andrey Gura',
    u'Date': u'16/Apr/15 20:26',
    u'Delete': u'',
    u'Size': u'28 kB',
    u'Title': u'ignite-485.patch',
    u'titleURL': u'https://issues.apache.org/jira/secure/attachment/12725968/ignite-485.patch'},
   {u'Author': u'Andrey Gura',
    u'Date': u'25/Mar/15 10:22',
    u'Delete': u'',
    u'Size': u'26 kB',
    u'Title': u'ignite-485.patch',
    u'titleURL': u'https://issues.apache.org/jira/secure/attachment/12707184/ignite-485.patch'}],
  u'bugNo': u'IGNITE-485',
  u'dates': [{u'Created': u'16/Mar/15 08:54',
    u'Resolved': u'20/Apr/15 16:56',
    u'Updated': u'23/Jul/15 10:02'}],
  u'metadata': [{u'Component': u'streaming',
    u'Fix Versions': u'sprint-4',
    u'Priority': u'Critical',
    u'Resolution': u'Fixed',
    u'Status': u'Closed',
    u'Type': u'Sub-task'}],
  u'people': [{u'Asignee': u'Andrey Gura',
    u'Reporter': u'Dmitriy Setrakyan'}]},
 {u'attachments': [{u'Author': u'Vasiliy Sisko',
    u'Date': u'10/Mar/15 10:35'

In [87]:
print read_json_0('./ignite/commits')[:1][0].keys()
read_json_0('./ignite/commits')[5:8]

[u'committer', u'revNo', u'author', u'bugNo', u'committed', u'authored', u'modifiedFiles']


[{u'author': u'Valentin Kulichenko <vkulichenko@gridgain.com>',
  u'authored': u'Sun Mar 1 00:18:52 2015 -0800',
  u'bugNo': u'IGNITE-141',
  u'committed': u'Sun Mar 1 00:18:52 2015 -0800',
  u'committer': u'Valentin Kulichenko <vkulichenko@gridgain.com>',
  u'modifiedFiles': [u'http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a5aa387/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java',
   u'http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a5aa387/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java',
   u'http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a5aa387/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsFileInfoSelfTest.java'],
  u'revNo': u'3a5aa3871aff5d10c60b4ce16541e9e3160bfb43'},
 {u'author': u'Valentin Kulichenko <vkulichenko@gridgain.com>',
  u'authored': u'Sun Mar 1 07:23:58 2015 -0800',
  u'bugNo': u'IGNITE-141',
  u'committed': u'Sun Mar 1 07:

In [13]:
y = map(lambda x: x['job'] if 'job' in x.keys() else '', read_json('./depot/commits'))
print y[:10]
filter(lambda x: len(x) > 1 and isinstance(x, list), y)

[u'job079872', '', u'job079977', '', u'job075238', u'job077615', '', u'job079729', u'job079924', '']


[]