Written by: Emma Bailin

Created on: October 15, 2019

Last Modified: October 22, 2019

Description: Identifies the duplicate Digital Certificates based on their leaf certificate fingerprint, and generate an output file with sets of duplicate input rows, one row per set of duplicates.

### Disclaimer
I have gone back through the code I originally wrote, called searchDuplicatesCrts.py, to create this for you. This is significantly cleaner (or at least better organized) than the original. That said, I'm assuming that you're just as interested in *how* I came to my solutions as you are in seeing *what* I did. Luckily for you, a huge portion of my thought process comes from simple train-of-thought writing, which I tend to do in block comments. While I usually erase the ramblings and print statements before I share my scripts, I decided not to for this exercise. To make it easier on you, however, I've gone through those train-of-thoughts and organized them chronologically and added, hopefully, clarifying commentary. You're welcome to skip them. If you see any typos, which you probably will, it's because there's no spellcheck in my script editor (SublimeText). I'll try to catch what I can here, but I'm definitely going to miss some. I apologize in advance.

Additionally, if you feel brave, you can find all of my work, including my print-statment debugging, timing records, and less-successful avenues of pursuit, in the raw searchDuplicatesCrts.py. 

# Initial Thoughts 
**ad-hoc**

Pre-coding Notes:
	1) Output is a dictionary, so presumably, converting the JSON object into a dictionary
	2) Brute force ==> read in each json, compare to contents in dictionary, if not there, put in dict, otherwise, put json dict into hash of repeats for eventual return. Rinse. Repeat
	3) Known problem: We can't just compare dictionaries if there are lists inside the dictionary, the lists may be out of order and it'll miss repeats. Solution: Make sure the orders are the same. 
		Could sort each column in dict before putting in bigger?
		--> Let's start there and see if we can improve later. 
		**-->WAIT!!! NO! We only need to look at the fingerprints! The fingerprints aren't lists! So we don't actually care about the sorting!**
	4) PANDAS dataframe is better suited to huge datasets than dicts. It may be better to store the info in there. 

So The PLAN:
	json=loads(file)
	data=json[] 
	#sort
	for col in data:
		look for any lists-->yes--> order --> repeat
		--> no --> continue
	#dump into data.frame [OR is it faster to append to list and then put list of dicts into dataframe? Yeah, that's faster, according to stackoverflow]
 	datas.append(data)
	files=pd.DataFrame(datas)
	#now we can do comparisons
	( What do you know! PANDAS has a function for that! it's just pd.DataFrame.duplicated. ) [Perhaps look at other methods and timeit?]
	dups=files.duplicated()
	dups.write_csv()

In [2]:
import json
import sys
import timeit  # testing purposes
import timing  # for complete script timings. Taken from an answer on stackoverflow years ago. NOT MINE
import dask.dataframe as dd
import pandas as pd
import ijson.backends.yajl2 as ijson  # WAY faster for importing and working with jsons (https://pypi.org/project/ijson/)
from glob import glob
from os import path, getcwd, mkdir
from pandas.io.json import json_normalize
from io import StringIO

ModuleNotFoundError: No module named 'ijson'

## Take 1: usingJson
**post-hoc**

This is what I consider the 'brute' attack. It was my first thought. You'll see that I wrote a method called 'sortDictLists'. Only after running this a few did it hit me that I had no reason to sort the lists. 

In [3]:
#This function is never used. It is a relic of me thinking we needed to sort the lists for comparisons
def sortDictLists(data):
    if isinstance(data, dict):
        return sorted((k, sortDictLists(v)) for k, v in data.items())
    if isinstance(data, list):
        return sorted(sortDictLists(x) for x in data)
    else:
        return data

# Noted copied from https://stackoverflow.com/questions/24448543/how-would-i-flatten-a-nested-dictionary-in-python-3
def flattenDict(current, key, result):
    if isinstance(current, dict):
        for k in current:
            new_key = "{0}.{1}".format(key, k) if len(key) > 0 else k
            flattenDict(current[k], new_key, result)
    else:
        result[key] = current
    return result


def usingJson(file):
    certs = []  # for holding all of the ordered data
    with open(file, "r") as f:
        for line in f:
            cert = flattenDict(json.loads(line), "", {})  # , '', {})

            # we need to put the strings from sortDict back into dicts.
            # MAYBE THIS ISN'T NECESSARY?!
            cs = {}
            for c in cert:
                cs.update({c[0]: c[1]})
            # print("*****type: ", type(cert))
            # print("cert: ",cert)
            # certs.append(cs)
            certs.append(cert)

    certsDF = pd.DataFrame(certs)

    dups = certsDF.duplicated(subset="data.leaf_cert.fingerprint")
    dups = certsDF[dups]
    # print(dups)
    # dups.to_csv("subsample_json_DUPS.csv")

## Take 2: usingIjson
**post-hoc**

After getting a baseline of usingJson with the 82MB (ctl_records_subsample), 177MB (subsample2), and 996.5MB (subsample3) files, I went back to the internet, specifically looking for alternatives to my brute force. I found ijson.

Unfortunately, ijson wasn't the magic I'd hoped for. I quickly found that it was much more useful for a list of json *files*, not *strings* inside one file. Like any good coder, I googled this issue until I found a solution: making new parsers for each line. That leads us to usingIjson.

In [4]:
def usingIjson(file):
    certs = []  # for holding all of the ordered data

    # from https://stackoverflow.com/questions/37200302/using-python-ijson-to-read-a-large-json-file-with-multiple-json-objects
    with open(file, encoding="UTF-8") as json_file:
        for line_number, line in enumerate(json_file):
            line_as_file = io.StringIO(line)
            
            # Use a new parser for each line
            json_parser = ijson.parse(line_as_file)
            cert = {}
            for prefix, kind, value in json_parser:
                if "string" == kind:
                    cert.update({prefix: value})
            certs.append(cert)

    certsDF = pd.DataFrame(certs)
    # print("dim(certsDF): ", certsDF.shape)
    # print("data.columns: ", certsDF.columns)
    # certsDF.to_csv("subsample_ijson_pd.csv")

    # print()
    dups = certsDF.duplicated(subset="data.leaf_cert.fingerprint")
    dups = certsDF[dups]
    # print(dups)
    # dups.to_csv("subsample_ijson_DUPS.csv")

**post-hoc**

With two methods written, I was ready to see which direction to continue down, straight json or ijson.

I ran python's timeit function and found that usingJson outstripped usingIjson _considerably_. (~37 seconds to 8 seconds, on the smallest sample!)

My choice was therefore easy. I was going to pursue json, and leave ijson alone. 

## Take 3: usingJson_v2
**ad-hoc**

The problem is probably that I'm having to store everything in memory. But what if I don't need to? I just want to keep things in 'working' memory...

**post-hoc**

You'll notice that usingJson_v2 is nearly identical to usingJson. The only difference is that I'm creating a generator, not making a list. 

In [5]:
def usingJson_v2(file):
    cs = []
    with open(file, "rb") as f:
        certs = (
            json.loads(line) for line in f
        )  # make generator, don't store in memory directly.

        for cert in certs:
            cert = flattenDict(cert, "", {})
            cs.append(cert)

    # certsDF=pd.DataFrame(certs)
    certsDF = pd.DataFrame(cs)

    # print("dim(certsDF): ", certsDF.shape)
    # print("data.columns: ", certsDF.columns)
    # certsDF.to_csv("subsample_ijson_pd.csv")

    # print()
    dups = certsDF.duplicated(subset="data.leaf_cert.fingerprint")
    dups = certsDF[dups]
    # dups.to_csv("duplicates.csv")

**post-hoc**

In hindsight, I understand that the increase in speed was negligible overall, finding that usingJson_v2 was only _barely_ faster than usingJson threw me for a loop. I decided to approach it from a completely different angle, which takes us to...

In [6]:
def usingPANDAS(file):
    # set the size of the information to process at any time, so we're not sending it to memory
    chunks = (
        []
    )  # for storing all the chunks so that we can send them all to a merged dataframe later.

    # PANDAS has a jsonreader that works with chunks! AND it works well with line-delimitors
    timing.log("Starting reading-in")
    reader = pd.read_json(
        (file), lines=True, chunksize=100000, dtype=False
    )  # so it doesn't infer the type?
    timing.log("Starting chunk processing")
    for chunk in reader:
        new = chunk["data"].apply(json.dumps)
        new = json_normalize(new.apply(json.loads))
        
        for column in chunk.columns:
            if "data" != column:
                new[column] = chunk[column]
        del new["data"]  # remove data now because dictionary screws things up later.
        
        chunks.append(new)

**ad-hoc**


[FORMER IN-LINE BLOCK-COMMENT]

NEW plan. Clearly the chunking is a good idea, but we still have to deal with the json strings. So let's try to normalize those AFTER. If the column contains strings that are valid jsons, convert them.
	Nope. Not working. We can't treat them like jsons, because they're going in as dictionaries. They're not acting like dictionaries, though. The data column is a series. I want to be able to extract the dictionaries and then attach them to the rest of the dataframe, so we preserve any columns that aren't the 'data'. 

In [7]:
    # now make a major dataframe
    timing.log("Starting Concat")
    certsDF = pd.concat(chunks, ignore_index=True, sort=True)
    
    timing.log("finding dups")
    dups = certsDF.duplicated(subset="leaf_cert.fingerprint")
    dups = certsDF[dups]
    timing.log("writing dups to file")
    dups.to_csv("duplicates_PANDAS.csv")


2019-10-21 20:57:18 - Starting Concat



NameError: name 'chunks' is not defined

## Re-brainstorming! 

**ad-hoc**

I was on to something with the usingPANDAS. With that one, I found that the reading in was NOT the problem, it was the calculations. I also know that for small chunks, the calculations can be done much faster. So why not combine that?
	1) Read in with chunks like in usingPANDAS
	2) For each chunk, run the process, THEN WRITE OUT TO A PANDAS csv
	3) At the end, read in all of the csvs. (Or maybe just append to one csv )
	4) Find the duplicates
	5) DONE


I could find the duplicates in each chunk, and in fact, that was my first thought, but that assumes that the json strings are sorted in a way that all of the repeats are near each other. More likely, the jsons are sorted by time or something, so there's no promise that I'll find all of the repeats. Better to just send out the whole chunk to file. 


**post-hoc**

As you can see, usingPANDAS wasn't as intuitive as the previous methods. I'd found json_normalize, but I spent an inordinate amount of time trying to get it to work with me (hence the in-block comment). Finally, it hit me: convert the contents of 'data' BACK into a json string! To my shock, it worked. 

I spent some time tuning the chunk size and using timeit again, I found that usingPANDAS worked _much_ better either of the usingJson methods. 

It was at this time that I emailed you asking for your definition of 'reasonable' time. Your response of "1-2 hours" left me gaping. Despite usingPANDAS big success, when I attempted to run the 31GB file, I had to cancel the program after over four hours. So I went back to the drawing board. 

## Take 4: readInOutIn


In [8]:
def readInOutIn(file):
    chunks = []
    folder = path.join(getcwd(), "temp")
    if not path.isdir(folder):
        mkdir(folder)

    timing.log("Starting reading-in")
    reader = pd.read_json((file), lines=True, chunksize=100000, dtype=False)

    timing.log("Starting chunk processing")
    label = (
        0
    )  # for keeping track of the files. Could just do enumerate, but why get the length of reader? It could be huge.
    for chunk in reader:
        new = chunk["data"].apply(json.dumps)
        new = json_normalize(new.apply(json.loads))

        for column in chunk.columns:
            if "data" != column:
                new[column] = chunk[column]
        del new["data"]  # remove data now because dictionary screws things up later.

        # NOW WRITE OUT!
        timing.log("Writing csv chunk %s" % label)
        new.to_csv(path.join(folder, "chunk_%s.csv" % label), chunksize=100000)
        label += 1

    timing.log("Reading back in!")
    files = glob(path.join(folder, "*.csv"))
    for f in files:
        new = pd.read_csv(file)
        chunks.append(new)

    # now convert the list of dataframes certsDF=pd.concat(chunks, ignore_index=True, sort=True)into a single dataframe
    certsDF = pd.concat(chunks, ignore_index=True, sort=True)

    timing.log("finding dups")
    dups = certsDF.duplicated(subset="leaf_cert.fingerprint")
    dups = certsDF[dups]
    timing.log("writing dups to file")
    dups.to_csv("duplicates_readInOutIn.csv")

**post-hoc**

I seriously considered not including this method, because I never actually tested it. As I was preparing to do so, it hit me that what I was trying to do was break the file into even smaller pieces than the chunks, but the size of the file wasn't really the issue. The problem stemmed from the amount of time it takes to _process_ each row.

**ad-hoc**

As I was thinking about getting readInOutIn to work, it hit me that I've been thinking about this a consecutive manner. There's only so far I can go with it. I need to think about serializing, ergo, THREADING! 

I know that reading in a file shouldn't be threaded, but given that that's not the part that takes forever (based on my previous attempts). That part is the processing the chunks (i.e. the normalizing) portion. So I can thread that. 

Just as I was about to add threading to my usingPANDAS framework, I decided to google adding threading to PANDAS. This lead to DASK! Rather than reinventing the wheel, I'm using it. https://docs.dask.org/en/latest/dataframe-api.html

## Take 5: usingDASK
**post-hoc**

The code that follows is so small comparied to the amount of time I spent trying to get it to work. There were a number of road-blocks, including having to define the column types (eventually I gave up being flexible and created the "metas" dictionary, which I didn't end up using), the nested jsons (again!) (I suddenly remembered my flattenDict method from days ago), and tuning the blocksize. I did have all of these adventures outlined, but eventually I found myself going in circles and spinning in the mud. I wiped the slate clean and started from scratch. As I'm sure you've experienced, I solved the problems within the hour. 

In [9]:

def usingDASK(file):
    chunks = []
    timing.log("Starting reading-in")

    reader = dd.read_json(
        file,
        lines=True,
        blocksize=2 ** 24, #blocksize tuned using 1GB sample file
        meta={"data": object, "message_type": object},
    )
  
    datas = (
        reader["data"]
        .map_partitions(lambda df: df.apply((lambda row: flattenDict(row, "", {}))))
        .to_bag()
    )
    new = datas.to_dataframe()
    new["message_type"] = reader["message_type"]
    new = new.compute()
    dups = new.duplicated(subset="leaf_cert.fingerprint")
    dups = new[dups]
    dups.to_csv("duplicates_DASK.csv")

## Conclusion

My poor laptop is still running the huge file with usingDASK, but my preliminary testing suggests that it is as much of an improvement in speed as I hope. There are still 30 minutes before it hits the 2 hour mark, but frankly, even if it goes over, it's 8:30pm the day before I have to give this to you. I'm not holding my breath that I'll be able to improve on the final time.

If you were wondering, I did begin the bonus challenge while I was waiting for things to process. I didn't get anywhere near as far as I wanted, but if you're at all curious about my initial plan of attack, the script is 'bonusFindPhishing.py'. It gives a more realistic view of how my scripts are typically organized and commented than searchDuplicateCrts.py.

I want to thank you for giving me this chance and for reading this to the end. I hope you find what you're looking for in me. If you have any questions, you know how to reach me.

Sincerely,

Emma Bailin

Update (morning before submission): usingDASK wasn't as successful as I'd hoped. I'm suspicious that the blocksize I chose for usingDASK was too big. I tuned it using the 1GB file, but I don't think I took the expansion into account. Baring the blocksize tuning, I can see another avenue being using sqlite or something similar to do the loading, leveraging Dask's prep expertise and SQL's storage options. I'd like to explore these options, but I recognize I need to submit what I've got to you now. 