In [None]:
from nbdev import *
# default_exp utils

In [None]:
#export
import random
import sys
import os.path
import zipfile
from pathlib import Path
import pandas as pd

"@danaderp May'20 Refactoring for enhancing time complexity with pandas vectorization"

class Dynamic_Dataset:
	"""
	This class efficiently 'stores' a dataset. Only a list of filenames and
	mappings to their ground truth values are stored in memory. The file
	contents are only brought into memory when requested.

	This class supports indexing, slicing, and iteration.

	A user can treat an instance of this class exactly as they would a list.
	Indexing an instance of this class will return a tuple consisting of
	the ground truth value and the file content of the filename at that index.

	A user can request the filename at an index with get_id(index)

	Example:

		dataset = Dynamic_Dataset(ground_truth)

		print(dataset.get_id(0))
			-> gitlab_79.txt

		print(dataset[0])
			-> ('(1,0)', 'The currently used Rails version, in the stable ...

		for x in dataset[2:4]:
			print(x)
				-> ('(1,0)', "'In my attempt to add 2 factor authentication ...
				-> ('(1,0)', 'We just had an admin accidentally push to a ...

	"""

	def __init__(self, ground_truth, path, isZip):
		'''
		@param ground_truth (dict): A dictionary mapping filenames to ground truth values
		'''
		self.__keys = list(ground_truth.keys())
		self.__ground_truth = ground_truth
		self.__path = path
		self.__isZip = isZip

	def __get_issue(self, filename):
		if self.__isZip:
			paths = [str(x) for x in Path(self.__path).glob("**/*.zip")]
			for onezipath in paths:
				archive = zipfile.ZipFile( onezipath, 'r')
				contents = archive.read('issues/' + filename)
		else:
			with open(self.__path+'issues/' + filename, 'r') as file:
				contents = file.read()
		return contents.strip()

	def get_id(self, index):
		return self.__keys[index]

	def __len__(self):
		return len(self.__keys)

	def __setitem__(self, key, item):
		raise ValueError

	def __getitem__(self, key):
		if type(key) == slice:
			new_keys = self.__keys[key.start:key.stop:key.step]
			new_gt = dict()
			for key in new_keys:
				new_gt[key] = self.__ground_truth[key]
			return Dynamic_Dataset(new_gt)
		else:
			id = self.__keys[key]
			return (self.__ground_truth[id], self.__get_issue(id))

	def __iter__(self):
		self.__index = 0
		return self

	def __next__(self):
		if self.__index < len(self.__keys):
			to_return = self[self.__index]
			self.__index += 1
			return to_return
		else:
			raise StopIteration

In [None]:
show_doc(Dynamic_Dataset)

<h2 id="Dynamic_Dataset" class="doc_header"><code>class</code> <code>Dynamic_Dataset</code><a href="" class="source_link" style="float:right">[source]</a></h2>

> <code>Dynamic_Dataset</code>(**`ground_truth`**, **`path`**, **`isZip`**)

This class efficiently 'stores' a dataset. Only a list of filenames and
mappings to their ground truth values are stored in memory. The file
contents are only brought into memory when requested.

This class supports indexing, slicing, and iteration.

A user can treat an instance of this class exactly as they would a list.
Indexing an instance of this class will return a tuple consisting of
the ground truth value and the file content of the filename at that index.

A user can request the filename at an index with get_id(index)

Example:

        dataset = Dynamic_Dataset(ground_truth)

        print(dataset.get_id(0))
                -> gitlab_79.txt

        print(dataset[0])
                -> ('(1,0)', 'The currently used Rails version, in the stable ...

        for x in dataset[2:4]:
                print(x)
                        -> ('(1,0)', "'In my attempt to add 2 factor authentication ...
                        -> ('(1,0)', 'We just had an admin accidentally push to a ...

In [None]:
#export
class Processing_Dataset:
    """
    A class to wrap up processing functions
    """

    def __init__(self, path):
        self.__path = path

    def get_issue(self, filename):
        with open('combined_dataset/issues/' + filename, 'r') as file:
            contents = file.read()
        return contents.strip()

    def get_ground_truth(self):
        gt = dict()
        #print(sys.path[0])
        #path = "combined_dataset/full_ground_truth.txt"
        #path = os.path.join(sys.path[0], path)
        with open(self.__path+'full_ground_truth.txt') as gt_file:
            for line in gt_file.readlines():
                tokens = line.split()
                filename = tokens[0]
                security_status = tokens[1]
                if filename in gt:
                    raise KeyError("Invalid Ground Truth: Duplicate issue [{}]".format(filename))
                gt[filename] = security_status
        return gt

    def get_test_and_training(self, ground_truth, test_ratio=0.1, isZip = False):
        ids = list(ground_truth.keys())
        sr = []
        nsr = []

        for id in ids:
            if ground_truth[id] == '(1,0)':
                sr.append(id)
            elif ground_truth[id] == '(0,1)':
                nsr.append(id)
            else:
                raise ValueError("There was an issue with ground truth: {} - {}".format(id, ground_truth[id]))


        n_test = int(len(sr) * test_ratio)
        sr_test = random.sample(sr, n_test)
        nsr_test = random.sample(nsr, n_test)

        test_gt = dict()
        train_gt = dict(ground_truth)

        for i in range(n_test):
            sr.remove(sr_test[i])
            test_gt[sr_test[i]] = '(1,0)'
            del train_gt[sr_test[i]]

            nsr.remove(nsr_test[i])
            test_gt[nsr_test[i]] = '(0,1)'
            del train_gt[nsr_test[i]]

        test = Dynamic_Dataset(test_gt,self.__path, isZip)
        train = Dynamic_Dataset(train_gt,self.__path, isZip)

        return (test, train)

In [None]:
show_doc(Processing_Dataset)

<h2 id="Processing_Dataset" class="doc_header"><code>class</code> <code>Processing_Dataset</code><a href="" class="source_link" style="float:right">[source]</a></h2>

> <code>Processing_Dataset</code>(**`path`**)

A class to wrap up processing functions

In [None]:
# Code for testing. RC leaving for testing crew to use as a reference
#if __name__ == '__main__':
#ground_truth = get_ground_truth()
#dataset = Dynamic_Dataset(ground_truth)

#test, train = get_test_and_training(ground_truth)

#print(test[0])
#print(train[0])

In [None]:
#export
import numpy
import pandas
import re
from string import punctuation
import nltk
from nltk.stem.snowball import SnowballStemmer
englishStemmer=SnowballStemmer("english")

class Embeddings:

    def __init__(self):
        self.__wpt = nltk.WordPunctTokenizer()
        self.__stop_words = nltk.corpus.stopwords.words('english')
        self.__remove_terms = punctuation + '0123456789'

    def __split_camel_case_token(self, token):
        return re.sub('([a-z])([A-Z])', r'\1 \2', token).split()

    def __clean_punctuation(self, token):
        remove_terms = '!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~0123456789'
        cleaned = token
        for p in remove_terms:
            cleaned = cleaned.replace(p, ' ')
        return cleaned.split()

    def __clean(self, token):
        to_return = self.__clean_punctuation(token)
        new_tokens = []
        for t in to_return:
            new_tokens += self.__split_camel_case_token(t)
        to_return = new_tokens
        return to_return


    def __normalize_document(self, doc):
        # lower case and remove special characters\whitespaces
        doc = re.sub(r'[^a-zA-Z\s]', '', doc, re.I|re.A)
        doc = doc.lower()
        doc = doc.strip()
        # tokenize document
        tokens = self.__wpt.tokenize(doc)
        #Filter stopwords out of document
        filtered_tokens = [token for token in tokens if token not in self.__stop_words]
        #Filtering Stemmings
        filtered_tokens = [englishStemmer.stem(token) for token in filtered_tokens]
        #Filtering remove-terms
        filtered_tokens = [token for token in filtered_tokens if token not in self.__remove_terms and len(token)>2]
        # re-create document from filtered tokens
        return filtered_tokens

    def preprocess(self, sentence, vocab_set=None):
        tokens = sentence.split()
        new_tokens = []
        for token in tokens:
            new_tokens += self.__clean(token)
        tokens = new_tokens

        tokens = self.__normalize_document(' '.join(tokens))

        return tokens

    def get_embeddings_dict(self, embeddings_filename):
        embeddings_df = pandas.read_csv(embeddings_filename)
        embeddings_dict = dict()
        for col in list(embeddings_df)[1:]:
            embeddings_dict[col] = list(embeddings_df[col])
        return embeddings_dict

    def vectorize(self, sentence, embeddings_dict):
        processed_sentence = self.preprocess(sentence)

        matrix = []
        for token in processed_sentence:
            if token in embeddings_dict:
                matrix.insert(0, embeddings_dict[token])
        return numpy.matrix(matrix)

In [None]:
show_doc(Embeddings)

<h2 id="Embeddings" class="doc_header"><code>class</code> <code>Embeddings</code><a href="" class="source_link" style="float:right">[source]</a></h2>

> <code>Embeddings</code>()



In [None]:
# Code for testing, left here for reference purposes
#sentence = "AAA AAA xxx BBB yyy CCC"
#embeddings = Embeddings()
#embeddings_dict = embeddings.get_embeddings_dict('test.csv')
#print(embeddings_dict)
#print(vectorize(sentence, embeddings_dict))