## Evgenia Stamati, Charalampos Kaidos

# Part A

In the code sequence bellow we have created two function to calculate the out-degree-sequence on local files and on S3 based buckets.

Calling the program with parameter "local" computes the result on files located on local filesystem by using the compute_local function.

Calling the program with parameter "s3" computes the result on buckets located on an S3 filesystem using the boto3 module. The implementing function is compute_s3.

In [None]:
# %load partA/assignment.py
#!/usr/local/bin/python2.7
'''
Part A -- Calculate out degree sequence of RDF graph

@author:     Charalampos Kaidos, Evgenia Stamati

@copyright:  2016 Charalampos Kaidos, Evgenia Stamati. All rights reserved.

@contact:    kaidosc@aueb.gr
'''

import sys
import os
import matplotlib.pyplot
import numpy
import boto3

from collections import Counter
from datetime import datetime

from argparse import ArgumentParser
from argparse import RawDescriptionHelpFormatter
from inspect import ArgSpec

__all__ = []
__version__ = 0.1
__date__ = '2016-04-10'
__updated__ = '2016-04-18'

DEBUG = 0
TESTRUN = 0
PROFILE = 0

def plot_figs(out_degree_sequence, verbose):
    """Plots figures of out-degree-sequence

    Creates 2 figures;
    first figure plots the out-degree-sequence
    second figure plots the log values of out-degree-sequence

    Both figures are saved on filesystem with names test.png and testlog.png
    respectively
    """
    filename = 'out-degree-sequence.png'
    filename_log = 'log-out-degree-sequence.png'

    out_degree = out_degree_sequence.keys()
    nodes = out_degree_sequence.values()

    if(verbose):
        print 'Plotting out-degree-sequence to file ' + filename
    matplotlib.pyplot.scatter(out_degree,nodes)
    matplotlib.pyplot.suptitle('out degree sequence scatterplot', fontsize=20)
    matplotlib.pyplot.xlabel('out degree', fontsize=18)
    matplotlib.pyplot.ylabel('out degree sequence', fontsize=16)
    matplotlib.pyplot.grid(True)
    matplotlib.pyplot.savefig(filename)
    matplotlib.pyplot.close()

    x = numpy.log(out_degree)
    y = numpy.log(nodes)

    if(verbose):
        print 'Plotting log-out-degree-sequence to file ' + filename_log
    matplotlib.pyplot.scatter(x,y)
    matplotlib.pyplot.suptitle('log of out degree sequence scatterplot', fontsize=20)
    matplotlib.pyplot.xlabel('log out degree', fontsize=18)
    matplotlib.pyplot.ylabel('log out degree sequence', fontsize=16)
    matplotlib.pyplot.grid(True)
    matplotlib.pyplot.savefig(filename_log)

def save_to_file(out_degree_sequence, verbose):
    """Saves out-degree-sequence to file
    """
    filename = 'out-degree-sequence'
    if(verbose):
        print 'Saving out-degree-sequence to file ' + filename
    with open(filename, 'w') as fout:
        for key,value in out_degree_sequence.iteritems():
            fout.write(str(key) + '\t' + str(value) + '\n')

def compute_local(args):
    path = args.localpath
    verbose = args.verbose
    
    file_list = []
    if os.path.isdir(path):
        if(verbose):
            print 'Given path "' + path + '" is a directory. Will read all files in it.'
        file_list = [os.path.join(path,file) for file in os.listdir(path) if os.path.isfile(os.path.join(path,file))]
    elif os.path.isfile(path):
        if(verbose):
            print 'Given path "' + path + '" is a file.'
        file_list.append(path)
    else:
        print('Given path: "%s" is not a directory nor a file' % path)
        raise os.error
    
    subject_list = Counter()
    
    for fileName in file_list:
        if(verbose):
            print 'Reading file: ' + filename
        count = 0
        with open( fileName, "r" ) as fin:
            for line in fin:
                count += 1
                key = line.split(' ', 1)[0]
                subject_list[key] += 1
        print 'File: ' + filename + ' complete: ' + str(count) + ' records processed'

    if(verbose):
        print 'All files complete: ' + str(len(subject_list)) + ' nodes processed'

    out_degree_sequence = Counter(subject_list.itervalues())

    if(verbose):
        print 'Out-degree-sequence calculated: ' + str(len(out_degree_sequence)) + ' entries'
    
    plot_figs(out_degree_sequence, verbose)
    save_to_file(out_degree_sequence, verbose)

def compute_s3(args):
    verbose = args.verbose

    s3 = boto3.resource('s3')
    
    bucket_name = args.s3bucket
    if(verbose):
        print 'Bucket selected: ' + bucket_name
    bucket = s3.Bucket(bucket_name)

    key_name = args.s3key
    if(verbose):
        print 'Key selected: ' + key_name
    
    subject_list = Counter()
    
    if key_name == '*':
        for object in bucket.objects.all():
            if(verbose):
                print 'Reading object: ' + object.key
            leftover = ''
            data = object.get()
            chunk = data['Body'].read(1024*1024)
            count = 0
            while chunk:
                lines = chunk.splitlines(True)
                for line in lines:
                    if(line[-1] == '\n'):
                        count += 1
                        key = line.split(' ', 1)[0]
                        subject_list[key] += 1
                    else:
                        leftover = line
                chunk = data['Body'].read(1024*1024*100)
                if (chunk == '' and leftover != ''):
                    chunk += '\n'
                chunk = leftover + chunk
                leftover = ''
            print 'Object ' + object.key + ' complete: ' + str(count) + ' records processed'
    else:
        leftover = ''
        object = s3.Object(bucket_name, args.s3key)
        if(verbose):
            print 'Reading object: ' + object.key
        data = object.get()
        chunk = data['Body'].read(1024*1024)
        if(verbose):
            print 'Read chunk'
        count = 0
        while chunk:
            lines = chunk.splitlines(True)
            for line in lines:
                if(line[-1] == '\n'):
                    count += 1
                    key = line.split(' ', 1)[0]
                    subject_list[key] += 1
                else:
                    leftover = line
            chunk = data['Body'].read(1024*1024*100)
            if (chunk == '' and leftover != ''):
                chunk += '\n'
            chunk = leftover + chunk
            leftover = ''
        print 'Object ' + object.key + ' complete: ' + str(count) + ' records processed'

    if(verbose):
        print 'All files complete: ' + str(len(subject_list)) + ' nodes processed'

    out_degree_sequence = Counter(subject_list.itervalues())

    if(verbose):
        print 'Out-degree-sequence calculated: ' + str(len(out_degree_sequence)) + ' entries'

    plot_figs(out_degree_sequence, verbose)
    save_to_file(out_degree_sequence, verbose)

class CLIError(Exception):
    '''Generic exception to raise and log different fatal errors.'''
    def __init__(self, msg):
        super(CLIError).__init__(type(self))
        self.msg = "E: %s" % msg
    def __str__(self):
        return self.msg
    def __unicode__(self):
        return self.msg

def main(argv=None): # IGNORE:C0111
    '''Command line options.'''

    if argv is None:
        argv = sys.argv
    else:
        sys.argv.extend(argv)

    program_name = os.path.basename(sys.argv[0])
    program_version = "v%s" % __version__
    program_build_date = str(__updated__)
    program_version_message = '%%(prog)s %s (%s)' % (program_version, program_build_date)
    program_shortdesc = __import__('__main__').__doc__.split("\n")[1]
    program_license = '''%s

  Created by user_name on %s.
  Copyright 2016 organization_name. All rights reserved.

  Licensed under the Apache License 2.0
  http://www.apache.org/licenses/LICENSE-2.0

  Distributed on an "AS IS" basis without warranties
  or conditions of any kind, either express or implied.

USAGE
''' % (program_shortdesc, str(__date__))

    try:
        # Setup argument parser
        parser = ArgumentParser(description=program_license, formatter_class=RawDescriptionHelpFormatter)
        parser.add_argument("-v", "--verbose", dest="verbose", action="count", help="set verbosity level [default: %(default)s]")
        parser.add_argument('-V', '--version', action='version', version=program_version_message)
        
        subparsers = parser.add_subparsers(dest='fs')
        parser_s3 = subparsers.add_parser('s3', help='Calculate on data stored on S3 cloud')
        parser_local = subparsers.add_parser('local', help='Calculate on data stored on a local filesystem')
        
        parser_s3.add_argument(dest="s3bucket", help='paths to folder(s) with source file(s) [default: %(default)s]', metavar='bucket')
        parser_s3.add_argument(dest="s3key", help='paths to folder(s) with source file(s) [default: %(default)s]', metavar='key')
        
        parser_local.add_argument(dest="localpath", help="paths to folder(s) with source file(s) [default: %(default)s]", metavar="path")

        # Process arguments
        args = parser.parse_args()
        
        verbose = args.verbose
        if verbose > 0:
            print("Verbose mode on")
        
        startTime = datetime.now()
        if args.fs == 's3':
            compute_s3(args)
        elif args.fs == 'local':
            compute_local(args)
        print (datetime.now() - startTime)

        return 0
    except KeyboardInterrupt:
        ### handle keyboard interrupt ###
        return 0
    except Exception, e:
        if DEBUG or TESTRUN:
            raise(e)
        indent = len(program_name) * " "
        sys.stderr.write(program_name + ": " + repr(e) + "\n")
        sys.stderr.write(indent + "  for help use --help")
        return 2

if __name__ == "__main__":
    if DEBUG:
        sys.argv.append("-h")
        sys.argv.append("-v")
        sys.argv.append("-r")
    if TESTRUN:
        import doctest
        doctest.testmod()
    if PROFILE:
        import cProfile
        import pstats
        profile_filename = 'assignment_profile.txt'
        cProfile.run('main()', profile_filename)
        statsfile = open("profile_stats.txt", "wb")
        p = pstats.Stats(profile_filename, stream=statsfile)
        stats = p.strip_dirs().sort_stats('cumulative')
        stats.print_stats()
        statsfile.close()
        sys.exit(0)
    sys.exit(main())


We execute the program on the AWS server in order to have somewhat comparable results with the hadoop executions. The s3 filesystem is remote and trementously affects the results.

In [None]:
(env)[user2@ip-172-31-8-18 partA]$ python assignment.py -v s3 hw2test "*"
Verbose mode on
Bucket selected: hw2test
Key selected: *
Reading object: hw2-test-data.txt
Object hw2-test-data.txt complete: 30 records processed
All files complete: 8 nodes processed
Out-degree-sequence calculated: 4 entries
Plotting out-degree-sequence to file out-degree-sequence.png
Plotting log-out-degree-sequence to file log-out-degree-sequence.png
Saving out-degree-sequence to file out-degree-sequence
0:00:01.075258

So for the test data the program calculates the out-degree-sequence in about 1 second. The sequence table is in the following cell.

<img src="partA/out-degree-sequence-test.png">

<img src="partA/log-out-degree-sequence-test.png">

In [18]:
resultTest = {}
with open('partA/out-degree-sequence-test', 'r') as fin:
    for line in fin:
        key, value = line.split()
        resultTest[key] = value

print 'Seq\tNodes'
for key,value in resultTest.items():
    print str(key) + '\t' + str(value)

Seq	Nodes
1	2
3	3
5	1
7	2


We execute the same program on all files:


    (env)[user2@ip-172-31-8-18 partA]$ python assignment.py -v s3 hw2storage "*"
    Verbose mode on
    Bucket selected: hw2storage
    Key selected: *
    Reading object: hw2-rdf-2016_1
    Object hw2-rdf-2016_1 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_10
    Object hw2-rdf-2016_10 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_11
    Object hw2-rdf-2016_11 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_12
    Object hw2-rdf-2016_12 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_13
    Object hw2-rdf-2016_13 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_14
    Object hw2-rdf-2016_14 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_15
    Object hw2-rdf-2016_15 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_16
    Object hw2-rdf-2016_16 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_17
    Object hw2-rdf-2016_17 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_18
    Object hw2-rdf-2016_18 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_19
    Object hw2-rdf-2016_19 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_2
    Object hw2-rdf-2016_2 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_20
    Object hw2-rdf-2016_20 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_21
    Object hw2-rdf-2016_21 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_22
    Object hw2-rdf-2016_22 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_23
    Object hw2-rdf-2016_23 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_24
    Object hw2-rdf-2016_24 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_25
    Object hw2-rdf-2016_25 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_26
    Object hw2-rdf-2016_26 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_27
    Object hw2-rdf-2016_27 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_28
    Object hw2-rdf-2016_28 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_29
    Object hw2-rdf-2016_29 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_3
    Object hw2-rdf-2016_3 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_30
    Object hw2-rdf-2016_30 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_31
    Object hw2-rdf-2016_31 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_32
    Object hw2-rdf-2016_32 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_33
    Object hw2-rdf-2016_33 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_34
    Object hw2-rdf-2016_34 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_35
    Object hw2-rdf-2016_35 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_36
    Object hw2-rdf-2016_36 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_37
    Object hw2-rdf-2016_37 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_38
    Object hw2-rdf-2016_38 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_39
    Object hw2-rdf-2016_39 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_4
    Object hw2-rdf-2016_4 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_40
    Object hw2-rdf-2016_40 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_41
    Object hw2-rdf-2016_41 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_42
    Object hw2-rdf-2016_42 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_43
    Object hw2-rdf-2016_43 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_44
    Object hw2-rdf-2016_44 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_45
    Object hw2-rdf-2016_45 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_46
    Object hw2-rdf-2016_46 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_5
    Object hw2-rdf-2016_5 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_6
    Object hw2-rdf-2016_6 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_7
    Object hw2-rdf-2016_7 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_8
    Object hw2-rdf-2016_8 complete: 10000000 records processed
    Reading object: hw2-rdf-2016_9
    Object hw2-rdf-2016_9 complete: 10000000 records processed
    All files complete: 11498443 nodes processed
    Out-degree-sequence calculated: 1250 entries
    Plotting out-degree-sequence to file out-degree-sequence.png
    Plotting log-out-degree-sequence to file log-out-degree-sequence.png
    Saving out-degree-sequence to file out-degree-sequence
    1:03:05.548768

The execution took more than 1 hour. This is mainly due to the remote filesystem. When executing on local filesystem, on PCs less powerfull than the AWS server, the execution time was less than 17 minutes.

The plots are bellow:
<img src="partA/out-degree-sequence.png">

<img src="partA/log-out-degree-sequence.png">

Fianlly on the file out-degree-sequence is the result of the processing. We will load it to compare it to the result of the other execution tools.

In [21]:
resultA = {}
with open('partA/out-degree-sequence', 'r') as fin:
    for line in fin:
        key, value = line.split()
        resultA[key] = value

#print 'Seq\tNodes'
#for key,value in resultA.items():
#    print str(key) + '\t' + str(value)

# Part B

For the hadoop implementation we have designed a solution with 2 chained map-reduce jobs.
The first job calculates the occurences of each subject. The map job counts each subject and the reduce sums all counts. The outcome is a two column table with the subject on the first column and its frequency on the second.
This table is the input to the second job. The second mapper counts the frequencies of the first job; that is, how many subjects have a frequency of 1, how many have a frequency of 2 etc. Then the reducer sums all these and produces the out-degree-sequence.

The mapper of the first job is:

In [None]:
# %load partB/mapper_out_degree.py
#!/usr/bin/env python
import sys
 
for line in sys.stdin:
    line = line.strip()
    key = line.split(' ')
    key_value = key[0]
    print >>sys.stdout, "%s\t%s" % (key_value, 1)

The reducer of the first job is:

In [None]:
# %load partB/reducer_out_degree.py
#!/usr/bin/env python

import sys

total = 0
prev_key = False

# input comes from STDIN
for line in sys.stdin:
    line = line.strip()
    data = line.split('\t')
    curr_key = data[0]
    count = int(data[1])

    if prev_key and curr_key != prev_key:
        print >>sys.stdout, "%s\t%i" % (prev_key, total)
        prev_key = curr_key
        total = count
    #same key; accumulate sum
    else:
        prev_key = curr_key
        total += count

# emit last key
if prev_key:
    print >>sys.stdout, "%s\t%i" % (prev_key, total)

The mapper of the second job is:

In [None]:
# %load partB/mapper_out_sequece.py
#!/usr/bin/env python
import sys
 
for line in sys.stdin:
    line = line.strip()
    key = line.split('\t')
    value = key[1]
    #for word in key_value:
    print >>sys.stdout, "%s\t%s" % (value, 1)


The reducer of the second job is:

In [None]:
# %load partB/reducer_out_sequece.py
#!/usr/bin/env python

import sys

total = 0
prev_key = False

# input comes from STDIN
for line in sys.stdin:
    line = line.strip()
    data = line.split('\t')
    curr_key = data[0]
    count = int(data[1])

    if prev_key and curr_key != prev_key:
        print >>sys.stdout, "%s\t%i" % (prev_key, total)
        prev_key = curr_key
        total = count
     # same key; accumulate sum
    else:
        prev_key = curr_key
        total += count

# emit last key
if prev_key:
    print >>sys.stdout, "%s\t%i" % (prev_key, total)

We execute the first job by calling:

In [None]:
hadoop jar /usr/lib/hadoop/hadoop-streaming-2.7.1-amzn-1.jar -file partB/mapper_out_degree.py -mapper 'mapper_out_degree.py' -file partB/reducer_out_degree.py -reducer 'reducer_out_degree.py' -input s3n://hw2storage/ -output /user/user2/mapreduce/output &> map1.out &

The standard output is stored in map1.out. From that we paste here the results:


    16/04/15 09:38:31 INFO mapreduce.Job: Counters: 54
        File System Counters
                FILE: Number of bytes read=1721693469
                FILE: Number of bytes written=4152486533
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=126879
                HDFS: Number of bytes written=827765180
                HDFS: Number of read operations=3043
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=10
                S3N: Number of bytes read=100127899750
                S3N: Number of bytes written=0
                S3N: Number of read operations=0
                S3N: Number of large read operations=0
                S3N: Number of write operations=0
        Job Counters 
                Launched map tasks=1514
                Launched reduce tasks=5
                Data-local map tasks=1514
                Total time spent by all maps in occupied slots (ms)=1464006840
                Total time spent by all reduces in occupied slots (ms)=727183104
                Total time spent by all map tasks (ms)=61000285
                Total time spent by all reduce tasks (ms)=15149648
                Total vcore-seconds taken by all map tasks=61000285
                Total vcore-seconds taken by all reduce tasks=15149648
                Total megabyte-seconds taken by all map tasks=46848218880
                Total megabyte-seconds taken by all reduce tasks=23269859328
        Map-Reduce Framework
                Map input records=460000000
                Map output records=460000000
                Map output bytes=28950311440
                Map output materialized bytes=2239145033
                Input split bytes=126879
                Combine input records=0
                Combine output records=0
                Reduce input groups=11498443
                Reduce shuffle bytes=2239145033
                Reduce input records=460000000
                Reduce output records=11498443
                Spilled Records=920000000
                Shuffled Maps =7570
                Failed Shuffles=0
                Merged Map outputs=7570
                GC time elapsed (ms)=601687
                CPU time spent (ms)=21537560
                Physical memory (bytes) snapshot=764612165632
                Virtual memory (bytes) snapshot=2058085785600
                Total committed heap usage (bytes)=682084532224
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=100127899750
        File Output Format Counters 
                Bytes Written=827765180
    16/04/15 09:38:31 INFO streaming.StreamJob: Output directory: /user/user2/mapreduce/output


We execute the second job by calling:

In [None]:
hadoop jar /usr/lib/hadoop/hadoop-streaming-2.7.1-amzn-1.jar -file partB/mapper_out_sequece.py -mapper 'mapper_out_sequece.py' -file partB/reducer_out_sequece.py -reducer 'reducer_out_sequece.py' -input /user/user2/mapreduce/output -output /user/user2/mapreduce/output2 &> map2.out&

The standard output is stored in map2.out. From that we paste here the results:


    16/04/16 14:31:49 INFO mapreduce.Job: Counters: 51
        File System Counters
                FILE: Number of bytes read=3412121
                FILE: Number of bytes written=9385708
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=828201995
                HDFS: Number of bytes written=8652
                HDFS: Number of read operations=60
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=10
        Job Counters 
                Killed reduce tasks=1
                Launched map tasks=15
                Launched reduce tasks=6
                Data-local map tasks=14
                Rack-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=13452192
                Total time spent by all reduces in occupied slots (ms)=7820160
                Total time spent by all map tasks (ms)=560508
                Total time spent by all reduce tasks (ms)=162920
                Total vcore-seconds taken by all map tasks=560508
                Total vcore-seconds taken by all reduce tasks=162920
                Total megabyte-seconds taken by all map tasks=430470144
                Total megabyte-seconds taken by all reduce tasks=250245120
        Map-Reduce Framework
                Map input records=11498443
                Map output records=11498443
                Map output bytes=49180641
                Map output materialized bytes=3449584
                Input split bytes=2160
                Combine input records=0
                Combine output records=0
                Reduce input groups=1250
                Reduce shuffle bytes=3449584
                Reduce input records=11498443
                Reduce output records=1250
                Spilled Records=22996886
                Shuffled Maps =75
                Failed Shuffles=0
                Merged Map outputs=75
                GC time elapsed (ms)=5717
                CPU time spent (ms)=149040
                Physical memory (bytes) snapshot=7627571200
                Virtual memory (bytes) snapshot=29847506944
                Total committed heap usage (bytes)=7432830976
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=828199835
        File Output Format Counters 
                Bytes Written=8652
    16/04/16 14:31:49 INFO streaming.StreamJob: Output directory: /user/user2/mapreduce/output2

By adding the total CPU time of the 2 jobs we get a total CPU execution time of 21,686,600 ms; that is about 6 hours.

The output of the job is on the HDFS. We have brought it to the local filesystem.

In [36]:
resultB = {}
path = 'partB/output2'
file_list = [os.path.join(path,file) for file in os.listdir(path) if os.path.isfile(os.path.join(path,file)) and file != '_SUCCESS']

for file in file_list:
    with open(file, 'r') as fin:
        for line in fin:
            key, value = line.split()
            resultB[key] = value
            
equal = False
if(resultA == resultB):
    equal = True
print 'Results are equal: ' + str(equal)
#print 'Seq\tNodes'
#for key,value in result.items():
#    print str(key) + '\t' + str(value)

Results are equal: True


We can see that the results of the hadoop execution are the same as the results of the execution in Part A. Thus we don't need to plot the graphs again.

# Part C

Pig is a high level language for hadoop. It allows to abstract much of the low level programming tasks and provides and SQL-like api to create queries that will be executed on hadoop.

We have written a pig script that will to the exact same process as the python script in Part A and the hadoop scripts in Part B

In [None]:
# %load partC/assignment.pig
register s3n://hw2files/myudfs.jar
raw = LOAD  's3n://hw2storage/*' USING TextLoader as (line:chararray);
ntriples = foreach raw generate FLATTEN(myudfs.RDFSplit3(line)) as (subject:chararray,predicate:chararray,object:chararray); 
subject_groups = group ntriples by subject;
subject_frequencies = foreach subject_groups generate group, COUNT(ntriples) as out_degree;
out_degree_groups = group subject_frequencies by out_degree;
out_degree_sequence = foreach out_degree_groups generate group, COUNT(subject_frequencies);
store out_degree_sequence into '/user/user2/out_degree_sequence' using PigStorage();



We also created a small wrapper script to execute the process in the background:

In [None]:
# %load partC/exec_pig.sh
#!/bin/bash

START_TIME=$(date +"%s")

pig assignment.pig &> exec_pig.log

END_TIME=$(date +"%s") 
DIFF=$(($END_TIME-$START_TIME))
echo "$(($DIFF / 60)) minutes and $(($DIFF % 60)) seconds."


In exec_pig.log is the standard output of the program. Here is an extract:

    16/04/10 17:08:32 INFO mapreduce.SimplePigStats: Script Statistics: 

    HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt      Features
    2.7.1-amzn-1    0.14.0-amzn-0   user2   2016-04-10 13:46:36     2016-04-10 17:08:32     GROUP_BY

    Success!

    Job Stats (time in seconds):
    JobId   Maps    Reduces MaxMapTime      MinMapTime      AvgMapTime      MedianMapTime   MaxReduceTime   MinReduceTime   AvgReduceTime   MedianReducetime        Alias   Feature Outputs
    job_1459245605938_0141  1492    101     101     20      53      58      9179    18      186     29      ntriples,raw,subject_frequencies,subject_groups GROUP_BY,COMBINER
    job_1459245605938_0142  7       1       55      39      47      49      15      15      15      15      out_degree_groups,out_degree_sequence   GROUP_BY,COMBINER       /user/user2/out_degree_sequence,

    Input(s):
    Successfully read 460000000 records (536497 bytes) from: "s3n://hw2storage/*"

    Output(s):
    Successfully stored 1250 records (8652 bytes) in: "/user/user2/out_degree_sequence"

    Counters:
    Total records written : 1250
    Total bytes written : 8652
    Spillable Memory Manager spill count : 0
    Total bags proactively spilled: 6
    Total records proactively spilled: 456922

    Job DAG:
    job_1459245605938_0141  ->      job_1459245605938_0142,
    job_1459245605938_0142


    16/04/10 17:08:32 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-8-18.eu-west-1.compute.internal/172.31.8.18:8032
    16/04/10 17:08:32 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    16/04/10 17:08:34 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-8-18.eu-west-1.compute.internal/172.31.8.18:8032
    16/04/10 17:08:34 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    16/04/10 17:08:34 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-8-18.eu-west-1.compute.internal/172.31.8.18:8032
    16/04/10 17:08:34 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    16/04/10 17:08:34 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-8-18.eu-west-1.compute.internal/172.31.8.18:8032
    16/04/10 17:08:34 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    16/04/10 17:08:34 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-8-18.eu-west-1.compute.internal/172.31.8.18:8032
    16/04/10 17:08:34 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    16/04/10 17:08:34 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-8-18.eu-west-1.compute.internal/172.31.8.18:8032
    16/04/10 17:08:34 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    12126415 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher  - Success!
    16/04/10 17:08:34 INFO mapReduceLayer.MapReduceLauncher: Success!
    12126449 [main] INFO  org.apache.pig.Main  - Pig script completed in 3 hours, 22 minutes, 6 seconds and 594 milliseconds (12126594 ms)

We observe that Pig created 2 map reduce jobs, just like in our execution.

The first job creates 1,492 map and 101 reduce tasks. The second jobs creates 7 map and 1 reduce task. The total execution time is 3 hours and 22 minutes.

We now compare the results to the ones from Part A.

In [39]:
resultC = {}
path = 'partC/out_degree_sequence'
file_list = [os.path.join(path,file) for file in os.listdir(path) if os.path.isfile(os.path.join(path,file)) and file != '_SUCCESS']

for file in file_list:
    with open(file, 'r') as fin:
        for line in fin:
            key, value = line.split()
            resultC[key] = value
            
equal = False
if(resultA == resultC):
    equal = True
print 'Results are equal: ' + str(equal)

Results are equal: True


The results are again equal, as expected, so we don't need to plot the graphs.

# Part D

The query to calculate the out-degree-sequence is:

    SELECT out_degree, COUNT(*) AS out_degree_sequence

    FROM (
        SELECT subject, COUNT(*) AS out_degree
        FROM rdf 
        GROUP BY subject) AS out_a
    GROUP BY out_degree;

Using the EXPLAIN ANALYZE function we get statistics concerning the executed query:

    EXPLAIN ANALYZE
    SELECT out_degree, COUNT(*) AS out_degree_sequence
    FROM (
        SELECT subject, COUNT(*) AS out_degree
        FROM rdf 
        GROUP BY subject) AS out_a
    GROUP BY out_degree;

In [None]:
                                                                                     QUERY PLAN                                                                                     
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Gather Motion 4:1  (slice2; segments: 4)  (cost=213061.59..213074.09 rows=1000 width=16)
   Rows out:  1250 rows at destination with 1021366 ms to first row, 1021369 ms to end, start offset by 29 ms.
   ->  HashAggregate  (cost=213061.59..213074.09 rows=250 width=16)
         Group By: out_a.out_degree
         Rows out:  Avg 312.5 rows x 4 workers.  Max 319 rows (seg3) with 1021364 ms to first row, 1021365 ms to end, start offset by 30 ms.
         Executor memory:  4201K bytes avg, 4201K bytes max (seg0).
         ->  Redistribute Motion 4:4  (slice1; segments: 4)  (cost=213026.59..213046.59 rows=250 width=16)
               Hash Key: out_a.out_degree
               Rows out:  Avg 808.0 rows x 4 workers at destination.  Max 830 rows (seg3) with 867122 ms to first row, 1021357 ms to end, start offset by 36 ms.
               ->  HashAggregate  (cost=213026.59..213026.59 rows=250 width=16)
                     Group By: out_a.out_degree
                     Rows out:  Avg 808.0 rows x 4 workers.  Max 822 rows (seg3) with 867049 ms to first row, 867050 ms to end, start offset by 37 ms.
                     Executor memory:  4201K bytes avg, 4201K bytes max (seg0).
                     ->  Subquery Scan out_a  (cost=207401.41..212003.83 rows=51138 width=8)
                           Rows out:  Avg 2874612.0 rows x 4 workers.  Max 3128256 rows (seg2) with 988902 ms to first row, 1020252 ms to end, start offset by 42 ms.
                           ->  HashAggregate  (cost=207401.41..209958.31 rows=51138 width=40)
                                 Group By: rdf.subject
                                 Rows out:  Avg 2874612.0 rows x 4 workers.  Max 3128256 rows (seg2) with 988902 ms to first row, 1018753 ms to end, start offset by 42 ms.
                                 Executor memory:  50528K bytes avg, 51040K bytes max (seg1).
                                 Work_mem used:  42601K bytes avg, 42601K bytes max (seg0). Workfile: (4 spilling, 0 reused)
                                 Work_mem wanted: 339792K bytes avg, 365609K bytes max (seg2) to lessen workfile I/O affecting 4 workers.
                                 (seg2)   3128256 groups total in 32 batches; 1 overflows; 15674600 spill groups.
                                 (seg2)   Hash chain length 1.5 avg, 13 max, using 2326981 of 4325376 buckets.
                                 ->  Seq Scan on rdf  (cost=0.00..157623.27 rows=2488907 width=61)
                                       Rows out:  Avg 115000000.0 rows x 4 workers.  Max 134428969 rows (seg0) with 0.499 ms to first row, 907688 ms to end, start offset by 44 ms.
 Slice statistics:
   (slice0)    Executor memory: 267K bytes.
   (slice1)  * Executor memory: 54895K bytes avg x 4 workers, 55407K bytes max (seg1).  Work_mem: 42601K bytes max, 365609K bytes wanted.
   (slice2)    Executor memory: 4451K bytes avg x 4 workers, 4451K bytes max (seg0).
 Statement statistics:
   Memory used: 128000K bytes
   Memory wanted: 1097224K bytes
 Optimizer status: legacy query optimizer
 Total runtime: 1021398.511 ms
(34 rows)

As we see from the results above, the query was executed in 1,021 seconds, that is a bit more than 17 minutes. And the results where again the same as with previous executions.

In [4]:
rdfdb=> SELECT gp_segment_id, count(*)
FROM rdf         
GROUP BY gp_segment_id;

 gp_segment_id |   count   
---------------+-----------
             3 |  96079527
             1 |  95162209
             2 | 134329295
             0 | 134428969
(4 rows)


SyntaxError: invalid syntax (<ipython-input-4-0cc8a575cdef>, line 1)

With the query above we observer that the segments do not have the same amount of data. Segments 0 and 2 have significantly more rows than segments 1 and 3. The data is distributed by hashing the subject field, this is why the distribution is not equal as if it were distributed using a random (round-robin) process.

The hashing distribution uses a field of hte data as input to a hash algorithm. The hash output determines the segment where the data will be stored. This guarantees that rows with the same value in the field being hashed will reside in the same segment.

On the other hand the random distribution, assigns the data to the segments in a round robin function as they are inserted into the system. We believe that this distribution would not be beneficial tou our queries. We use group by to count the frequencies of subjects and it is beneficial if all the entries concerning one sbject are in one segment to avoid inter-segment communication.

Partitioning a table using the PARTITION BY (AND SUBPARTITION BY) clauses during table creation, breaks the table to smaller ones based on the values of the fields of the partition. For example a field representing date can be used to partition a table where each subtable will have data from a date range. Another partition field could be a field representing the country name, where each country would get a different table. This way an hierarchy of tables is created with the goal to increase performance in queries that need to scan only a subset of the values. For example if a table is partitioned based on a "Year" field, queries about transactions of a year will nt need to scan a large table concerning multiple years but will only scan the (smaller) table with the data of that year.

Distribution on the other hand defines the way that data of a given table (or subtable) are asigned to segments. These are different and non overlaping concepts. A partitioned table can be distributed in different segments. Each segment will have as many subtables as needed to store the data.

In our case the table is not partitioned. But as we need to scan all the table to get result a partition would not speed up the execution.

In [None]:
MariaDB [babis]> LOAD DATA LOCAL INFILE '/home/babis/git/M36103P/assignment2/data/hw2-rdf-2016_2' into table test fields terminated by ' ';
Query OK, 10000000 rows affected, 65535 warnings (7 min 0.89 sec)
Records: 10000000  Deleted: 0  Skipped: 0  Warnings: 1000106

With the query above we inserted 1 of the 46 files in a traditional SQL database (MariaDB, a drop in replacement for MySQL). In the traditional databases much of the time needed for insertion is spent on validating the data and updating the indices. 1 file containing 10,000,000 rows needed 7 minutes. As the indices are most probably trees, inserting a new value is O(logn), thus the time needed for each insertion will increase as the table grows. With this in mind we estimate that several hours (10?) would be needed to inserted all the data in the database.