Python Codes for Foundations of Data Science Course Work 2

Ziyi Guo

zg2u21@soton.ac.uk

In [None]:
# Importing necessary sections
import pymongo
from pymongo import MongoClient
import json
from datetime import datetime
from pprint import pprint
import networkx as nx
import pandas as pd

0) Preparation for the questions: 

    Building a function that returns a pymongo collection object based on the given database and a collection and using this function to create a pymongo collection object of 'articles'.

In [2]:
def get_collection(dbname, collection_name):
    client = MongoClient('mongodb://coursework:coursework@localhost:27017',authSource='coursework')
    db = client[dbname]
    collection = db[collection_name]
    return collection

articles = get_collection('coursework','articles')

1) Data Verification: 
     
     Building a function returning the number of articles in which the 'doc_type' property loses or equals to the empty string.

In [3]:
def count_missing_doc_types(articles):
    find_condition = {
        '$or': [
            {"doc_type": ""},
            {"doc_type": { '$exists': False}}
        ]
    }
    num = articles.count_documents(find_condition)
    return num

articles = get_collection('coursework','articles')
print(count_missing_doc_types(articles))

0


2) Data Details: 

    Building a function that returns a set of publishers whose articles miss the doc_type property or equals to the empty string. 

In [4]:
def get_publishers_of_articles_missing_type(articles):
    find_condition = {
        '$or': [
            {"doc_type": ""},
            {"doc_type": { '$exists': False}}
        ]
    }
    results = articles.find(find_condition)
    Set = list()
    for result in results:
        Set.append(result['publisher'])
    return Set

articles = get_collection('coursework','articles')
Set = get_publishers_of_articles_missing_type(articles)
print(Set)

[]


3) Data Updates: 

    Building a function that updates the "doc_type" property of the article collection according to The Management's decision. Type "Book", "RFC" and "N/A" are corresponding to "Springer, Cham", "RFC Editor" and None.

In [5]:
def update_doc_types(articles):
    find_condition1 = {
        '$and': [ 
            {
                '$or': [
                    {"doc_type": ""},
                    {"doc_type": { '$exists': False}}
                ]
            },
            {"publisher": "Springer, Cham"}
        ]
    }
    newvalues1 = {
        "$set": {
            "doc_type": "Book"
        }
    }
    find_condition2 = {
        '$and': [ 
            {
                '$or': [
                    {"doc_type": ""},
                    {"doc_type": { '$exists': False}}
                 ]
            },
            {"publisher": "RFC Editor"}
        ]
    }
    newvalues2 = {
        "$set": {
            "doc_type": "RFC"
        }
    }
    find_condition3 = {
        '$and': [ 
            {
                '$or': [
                    {"doc_type": ""},
                    {"doc_type": { '$exists': False}}
                ]
            },
            {"publisher": {'$nin': ["Springer, Cham","RFC Editor"]}}
        ]
    }
    newvalues3 = {
        "$set": {
            "doc_type": "N/A"
        }
    }
    UpdateResult = articles.update_many(find_condition1,newvalues1)
    UpdateResult = articles.update_many(find_condition2,newvalues2)
    UpdateResult = articles.update_many(find_condition3,newvalues3)
    return UpdateResult

articles = get_collection('coursework','articles')
articles  =  update_doc_types(articles)

4) Data Distribution:
    
    Building a function returns a dictionary with doc_type as keys and the number of articles in each type as values.

In [21]:
def get_types_distribution(articles):
    articles = get_collection('coursework','articles')
    group = {
        '$group': {
            "_id": '$doc_type', 
            "count":{'$sum': 1},
        }
    }
    Dict = {}
    cursor = articles.aggregate([group])
    for c in cursor:
        Dict[c["_id"]] = c["count"]
    return Dict
TypeDistribution = get_types_distribution(articles)
pprint(TypeDistribution)

{'Book': 17998,
 'BookChapter': 103,
 'Conference': 389375,
 'Dataset': 3,
 'Journal': 385450,
 'N/A': 74168,
 'Patent': 112,
 'RFC': 289,
 'Repository': 87683}


5) Data Aggregation:

    Building a function that diviedes data into 7 groups according to the pages and returns a histogram dictionary containing the average references for each group. 
    The function includes a match section selecting the articles that have references and meet the requirements of the page numbers, two project sections projecting the group classification index object "Page" object with values of integers in [0,6] and the "PageIndex" object corresponding to the index that describes the groups, and a group section dividing the data based on the "PageIndex" value and calculating the average references for each group.

In [22]:
def length_vs_references(articles):
    match = {
        "$match":{
            "$and": [
                {"references": { '$exists': True}},
                {"$and":[{"page_start": {"$ne": ""}},
                         {"page_start": {"$not": {"$regex":"[^\\d]"}}},
                         {"page_start": {"$regex" :"^.{0,7}$"}}]},
                {"$and":[{"page_end": {"$ne": ""}},
                         {"page_end": {"$not": {"$regex":"[^\\d]"}}},
                         {"page_end": {"$regex" :"^.{0,7}$"}}]},
            ]
        }
    }
    project = {
        "$project":{
            "_id": 0, 
            "page_start": 1, 
            "page_end": 1, 
            "references": 1,
            "Page": {
                "$toInt":{ 
                    "$multiply":[
                        {"$subtract":[
                            {"$toInt":"$page_end"},
                            {"$toInt":"$page_start"}
                        ]
                        },0.2
                    ]
                }
                
            }
        }
    }
    project2 = {
        "$project":{
            "_id": 0,  
            "references": 1,
            "Page": 1,
             "PageIndex": 
            {   
                "$switch":
                {
                    "branches":[
                        {
                            "case": {"$eq":["$Page",0]},
                            "then": "01-05"
                        },
                        {
                            "case": {"$eq":["$Page",1]},
                            "then": "06-10"
                        },
                        {
                            "case": {"$eq":["$Page",2]},
                            "then": "11-15"
                        },
                        {
                            "case": {"$eq":["$Page",3]},
                            "then": "16-20"
                        },
                        {
                            "case": {"$eq":["$Page",4]},
                            "then": "21-25"
                        },
                        {
                            "case": {"$eq":["$Page",5]},
                            "then": "26-30"
                        },
                        {
                            "case": {"$gte":["$Page",6]},
                            "then": ">30"
                        },
                    ],
                    "default": "-"
                 }
             },
            "References":{
                "$size": "$references"  
            }
        }
    }
    group = {
        "$group":{
            "_id": "$PageIndex",
            "count": {"$avg": "$References"}   
        }
    }
    from bson import SON
    sort = {
        "$sort":
        SON([
            ("_id", pymongo.ASCENDING)
        ])
    }
    cursor = articles.aggregate([match,project,project2,group,sort])
    Dict = {}

    for c in cursor:
        Dict[c["_id"]] = c["count"]
    return Dict

articles = get_collection('coursework','articles')
LengthDistribution = length_vs_references(articles)
from pprint import pprint
pprint(LengthDistribution)

{'01-05': 7.817500913408842,
 '06-10': 12.943604728505438,
 '11-15': 19.26696415044967,
 '16-20': 21.93083583252851,
 '21-25': 23.84250175192712,
 '26-30': 26.984712799638174,
 '>30': 32.623189759880404}


6) Data Examination:

     Building a function that returns the outliers for each group. The function contains two parts. 
     In the first part, the data is aggregated into groups as question 5 and the mean values and the standard deviations of each group were calculated and stored into a dictionary.
     In the second part, the project functions calculate the Z-score for each data, the match function selects the outliers with Z-score bigger than 3, and the group function aggregates the data again based on the "PageIndex" and shows the information of the outliers in each group.

In [14]:
def get_reference_outliers(articles):
    match = {
        "$match":{
            "$and": [
                {"references": { '$exists': True}},
                {"$and":[{"page_start": {"$ne": ""}},
                         {"page_start": {"$not": {"$regex":"[^\\d]"}}},
                         {"page_start": {"$regex" :"^.{0,7}$"}}]},
                {"$and":[{"page_end": {"$ne": ""}},
                         {"page_end": {"$not": {"$regex":"[^\\d]"}}},
                         {"page_end": {"$regex" :"^.{0,7}$"}}]},
            ]
        }
    }
    project = {
        "$project":{
            "_id": 0, 
            "id":1,
            "page_start": 1, 
            "page_end": 1, 
            "references": 1,
            "Page": {
                "$toInt":{ 
                    "$multiply":[
                        {"$subtract":[
                            {"$toInt":"$page_end"},
                            {"$toInt":"$page_start"}
                        ]
                        },0.2
                    ]
                }
                
            }
        }
    }
    project2 = {
        "$project":{
            "_id": 0,  
            "id":1,
            "references": 1,
            "Page": 1,
             "PageIndex": 
            {   
                "$switch":
                {
                    "branches":[
                        {
                            "case": {"$eq":["$Page",0]},
                            "then": "00"
                        },
                        {
                            "case": {"$eq":["$Page",1]},
                            "then": "01"
                        },
                        {
                            "case": {"$eq":["$Page",2]},
                            "then": "02"
                        },
                        {
                            "case": {"$eq":["$Page",3]},
                            "then": "03"
                        },
                        {
                            "case": {"$eq":["$Page",4]},
                            "then": "04"
                        },
                        {
                            "case": {"$eq":["$Page",5]},
                            "then": "05"
                        },
                        {
                            "case": {"$gte":["$Page",6]},
                            "then": "06"
                        },
                    ],
                    "default": "-"
                 }
             },
            "References":{
                "$size": "$references"  
            }
        }
    }
    group1 = {
        "$group":{
            "_id": "$PageIndex",
            "Mean": {"$avg":"$References"},
            "Std": {"$stdDevPop":"$References"}
        }
    }
    from bson import SON
    sort = {
        "$sort":
        SON([
            ("_id", pymongo.ASCENDING)
        ])
    }
    cursor = articles.aggregate([match,project,project2,group1,sort],allowDiskUse=True)
    Dict = {}
    for c in cursor:
        Dict[c["_id"]] = (c["Mean"],c["Std"])
    project3 = {
        "$project":{
            "_id": 1,  
            "id":1,
            "references": 1,
            "Page": 1,
             "PageIndex2": 
            {   
                "$switch":
                {
                    "branches":[
                        {
                            "case": {"$eq":["$Page",0]},
                            "then": "01-05：outliers"
                        },
                        {
                            "case": {"$eq":["$Page",1]},
                            "then": "06-10：outliers"
                        },
                        {
                            "case": {"$eq":["$Page",2]},
                            "then": "11-15：outliers"
                        },
                        {
                            "case": {"$eq":["$Page",3]},
                            "then": "16-20：outliers"
                        },
                        {
                            "case": {"$eq":["$Page",4]},
                            "then": "21-25：outliers"
                        },
                        {
                            "case": {"$eq":["$Page",5]},
                            "then": "26-30：outliers"
                        },
                        {
                            "case": {"$gte":["$Page",6]},
                            "then": ">30：outliers"
                        },
                    ],
                    "default": "-"
                 }
             },
            "References":{
                "$size": "$references"  
            }
        }
    }
    project4 = {
        "$project":{
            "_id": 1,  
            "id":1,
            "References": 1,
            "Page": 1,
            "PageIndex2":1,
             "Z-score": 
            {   
                "$switch":
                {
                    "branches":[
                        {
                            "case": {"$eq":["$Page",0]},
                            "then": {"$divide":[{"$subtract":["$References",Dict['00'][0]]},Dict['00'][1]]}
                        },
                        {
                            "case": {"$eq":["$Page",1]},
                            "then": {"$divide":[{"$subtract":["$References",Dict['01'][0]]},Dict['01'][1]]}
                        },
                        {
                            "case": {"$eq":["$Page",2]},
                            "then": {"$divide":[{"$subtract":["$References",Dict['02'][0]]},Dict['02'][1]]}
                        },
                        {
                            "case": {"$eq":["$Page",3]},
                            "then": {"$divide":[{"$subtract":["$References",Dict['03'][0]]},Dict['03'][1]]}
                        },
                        {
                            "case": {"$eq":["$Page",4]},
                            "then": {"$divide":[{"$subtract":["$References",Dict['04'][0]]},Dict['04'][1]]}
                        },
                        {
                            "case": {"$eq":["$Page",5]},
                            "then": {"$divide":[{"$subtract":["$References",Dict['05'][0]]},Dict['05'][1]]}
                        },
                        {
                            "case": {"$gte":["$Page",6]},
                            "then": {"$divide":[{"$subtract":["$References",Dict['06'][0]]},Dict['06'][1]]}
                        },
                    ],
                    "default": "-"
                 }
             }
        }
    }
    match2 = {
        "$match":{
            "Z-score":{ "$gte":3}
        }
    }
    group2 = {
        "$group":{
            "_id": "$PageIndex2",
            "outliers": { "$push":{"id":"$id", "Num_references":"$References","Z-score":"$Z-score"}}
        }
    }
    cursor = articles.aggregate([match,project,project3,project4,match2,group2,sort],allowDiskUse=True)
    Dict2 = {}
    for c in cursor:
        Dict2[c['_id']] = c['outliers']

    return Dict2

articles = get_collection('coursework','articles')
Outliers = get_reference_outliers(articles)
from pprint import pprint
pprint(Outliers)

{'01-05：outliers': [{'Num_references': 30,
                     'Z-score': 3.444926861536592,
                     'id': 138573038},
                    {'Num_references': 54,
                     'Z-score': 7.172110365709728,
                     'id': 594343222},
                    {'Num_references': 53,
                     'Z-score': 7.016811053035847,
                     'id': 1148908938},
                    {'Num_references': 31,
                     'Z-score': 3.600226174210473,
                     'id': 1554633132},
                    {'Num_references': 36,
                     'Z-score': 4.376722737579876,
                     'id': 1892875445},
                    {'Num_references': 38,
                     'Z-score': 4.687321362927637,
                     'id': 2113929362},
                    {'Num_references': 50,
                     'Z-score': 6.550913115014206,
                     'id': 2177230543},
                    {'Num_references': 95,
                     

7) Data Association:

      Building a function that returns a list of authors that have artilces co-authored of the input author. 
      The function firstly finds the articles associated with the input author and stores the article ids into an array. Then, the function uses match function to search the articles in the array, unwinds function to divide the data with the "author" field, project function to illustrate "Affiliation" and "Name" values, group function to aggregate the data and remove the duplicates.

In [15]:
def get_collaborators(articles,author_id):
    import numpy as np
    target = author_id
    cursor = articles.find({'authors.id': target})
    a = []
    for c in cursor:
        a.append(c["id"])
    match = {
        "$match":{
            "id":{"$in": a},
        }
    }
    unwind = {        
        "$unwind": "$authors"
     }
    project = {
        "$project":{
            "ID":"$authors.id",
            "author":1,
            "Affiliation":"$authors.org",
            "Name":"$authors.name",
        }
    }
    match2 = {
        "$match":{
            "ID":{"$ne":target}
        } 
    }
    project1 ={
        "$project":{
            "Names":{"Name" : "$Name", "Affiliation" : "$Affiliation"}
        }
    }

    group = {
        "$group":{
            "_id":target,
            "Names":{"$addToSet":"$Names"}
        }
        
    }
    pipeline = [match,unwind,project,match2,project1,group]
    results = articles.aggregate(pipeline)
#     for c in results:
#         pprint(c)
    return results 

articles = get_collection('coursework','articles')
author_id = 2810682786
Collaborators = get_collaborators(articles,author_id)
for c in Collaborators:
    pprint(c)

{'Names': [{'Name': 'Aleksander González'},
           {'Name': 'Manuel I. Capel'},
           {'Name': 'María A. Pérez'},
           {'Affiliation': 'ETSI Informatics and Telecommunication, Software '
                           'Engineering Department, University of Granada, '
                           'UGR, 18071 Granada, Spain',
            'Name': 'Manuel I. Capel'},
           {'Affiliation': 'ESPOL Polytechnic University, Escuela Superior '
                           'Politécnica del Litoral, ESPOL, Facultad de '
                           'Ingeniería en Electricidad y Computación, Campus '
                           'Gustavo Galindo Km. 30.5 Vía Perimetral, P.O. Box '
                           '09-01-5863, Guayaquil and Ecuador, --- Select a '
                           'Country ---',
            'Name': 'Rosa Quelal'},
           {'Affiliation': 'ESPOL Polytechnic University, Escuela Superior '
                           'Politécnica del Litoral, ESPOL, Facultad de '
        

8) Collection Creation:

      Building a function creating a collection named authors containing the information of the authors in the fields of ID, name, affiliation, related articles, and collaborators. 
      The function firstly divides the data with the "authors" property, aggregates the data based on the author's id, illustrating new values "name_affiliations" and "collaborators", and finally deals with the duplicate data and makes the outputs meet the requirements.



In [16]:
def create_authors_collection(articles):
    import numpy as np
    project = {
        "$project":{
            "_id": 0,
            "id":1,
            "authors": 1,
            "collaborators":"$authors.id"
        }
    }
    unwind = {        
        "$unwind": "$authors"
     }
    group = {
        "$group":{
            "_id": "$authors.id",
            "collaborators": {"$push": "$collaborators"},
            "articles":{"$push":"$id"},
            "name_affiliations":{
                "$addToSet":{
                    "name" : "$authors.name", 
                    "affiliation" : "$authors.org"
                }
            }
        }
    }
    project2 = {
        "$project":{
            "collaborators":{
                "$setDifference":[{
                    "$reduce":{
                        "input":"$collaborators",
                        "initialValue":[],
                        "in":{
                            "$setUnion":["$$value","$$this"]
                        }
                    }
                },["$_id"]]
            },
            "articles":1,
            "name_affiliations":1
        }
    }
    pipeline = [project,unwind,group,project2]
    cursor = articles.aggregate(pipeline,allowDiskUse = True)
    for c in cursor:
        pprint(c)
    
    return cursor

articles = get_collection('coursework','articles')
create_authors_collection(articles)
authors = get_collection('coursework','authors')

{'_id': 16980,
 'articles': [2614270863,
              2777606786,
              2794816057,
              2893301518,
              2962905250,
              2963218298],
 'collaborators': [],
 'name_affiliations': [{'name': 'Christoph Schwarzweller'}]}
{'_id': 41399,
 'articles': [2647123907],
 'collaborators': [1963670275, 2461504439, 2531057293, 2790638693, 2922615506],
 'name_affiliations': [{'affiliation': 'Department of Physics (IFM), Linköping '
                                       'University, SE-58183 Linköping, Sweden',
                        'name': 'Igor Abrikosov'}]}
{'_id': 44880,
 'articles': [2592563856,
              2619517301,
              2737803841,
              2742038226,
              2798441735,
              2905658527,
              2906048284,
              2949064367,
              2951723130,
              2952047388,
              2962983930,
              2963068576,
              2963588142,
              2978071670,
              2978629210,
    

KeyboardInterrupt: 

9) Data Network:

      Building a function that given an author ID and the name of a country, returns the IDs of the authors that have close relationship with the input author.
      The function firstly matches the target author and takes it as a start point, then sets the values in the graph section based on the requirements of the detection depth and affiliation information

In [None]:
def get_network(authors,author_id,country):
    match = {
        "$match":{
            "_id": "$author_id"
        }
    }
    graph = {
        "$graphLookup":{
            "from": "authors",
            "startWith": "$collaborators",
            "connectFromField":"collaborators",
            "connectToField":"_id",
            "as":"relate_to",
            "maxDepth":2,
            "restrictSearchWithMatch":{
                "and":[{
                    "$or":[{"name_affiliations.affiliation":{"$ne":""},
                            "name_affiliations":{"$ne":None}}]},
                    {"name_affiliations.affiliation":{"$regex":country}}]
            }
        }
    }
    project = {
        "$project":{
            "_id":0,
            "relate_to.id":1
        }
    }
    Dict = []
    pipeline = [match,graph,project]
    cursor = authors.aggregate(pipeline,allowDiskUse = True)
#     for i in cursor:
#         pprint(i)
    for i in list(cursor)[0]["relate_to"]:
        Dict.append(i["_id"])
    return Dict

authors = get_collection('coursework','authors')
author_id = 44880
country = "Spain"
Network = get_network(authors,author_id,country)
pprint(Network)

10) Data Processing and Aggregation:
    
       Building a function that filters the subset of "Machine Learning" articles and compute the articles with the highest in-degree cnetrality for overall, in years of 2017, 2018 and 2019.

In [66]:
def machine_learning_central(articles):
    limit ={
        "$limit": 1000000000000000000
    }
    
    unwind = {
        "$unwind": "$fos"
    }
    
    match = {
        "$match":{
            "fos.name": "Machine learning",
        }
    }
    
    project = {
        "$project":{
            "_id":0,
            "id":1,
            "year":1,
            "fos":1,
            "weight":"$fos.w",
            "id_weight":{
                "id":"$id",
                "weight":"$fos.w"
            }
        }
    }    
    group = {
        "$group":{
            "_id": None,
            "Highest_weight":{"$max":"$weight"},
        }
    }
    
    # Highest Centrality for All
    pipeline = [unwind,match,project,group]
    cursor = articles.aggregate(pipeline,allowDiskUse = True)
    w = {}
    for c in cursor:
        w['overall'] = c['Highest_weight']
    find_conditionAll = {
        "fos.w": w['overall']
    }    
    results = articles.find(find_conditionAll)
    for result in results:
        w['overall'] = result['id']
        
    # Highest Centrality for 2017
    match2017 = {
        "$match":{
            "year":2017
        }
    }
    pipeline = [unwind,match,project,match2017,group]
    cursor = articles.aggregate(pipeline,allowDiskUse = True)
    for c in cursor:
        w['2017'] = c['Highest_weight']
    find_condition2017 = {
        '$and': [
            {"year": 2017},
            {"fos.w": w['2017']}
        ]
    }
    results = articles.find(find_condition2017)
    for result in results:
        w['2017'] = result['id']


    # Highest Centrality for 2018
    match2018 = {
        "$match":{
            "year":2018
        }
    }
    pipeline = [unwind,match,project,match2018,group]
    cursor = articles.aggregate(pipeline,allowDiskUse = True)
    for c in cursor:
        w['2018'] = c['Highest_weight']
    find_condition2018 = {
        '$and': [
            {"year": 2018},
            {"fos.w": w['2018']}
        ]
    }
    results = articles.find(find_condition2018)
    for result in results:
        w['2018'] = result['id']

    # Highest Centrality for 2019
    match2019 = {
        "$match":{
            "year":2019
        }
    }
    pipeline = [unwind,match,project,match2019,group]
    cursor = articles.aggregate(pipeline,allowDiskUse = True)
    for c in cursor:
        w['2019'] = c['Highest_weight']
    find_condition2019 = {
        '$and': [
            {"year": 2019},
            {"fos.w": w['2019']}
        ]
    }
    results = articles.find(find_condition2019)
    for result in results:
        w['2019'] = result['id']
    match2019 = {
        "$match":{
            "year":2019
        }
    }
    
    pprint(w)
articles = get_collection('coursework','articles')    
machine_learning_central(articles)

{'2017': 2771712594,
 '2018': 2890230849,
 '2019': 3007771404,
 'overall': 2890230849}


Task 2

1) Return the set of artilces that are missing the doc_type property or have it equal to the empty string
- Using Mapper to select the data.
- Using Reducer to accumulates the data in each category and outputs the set of categories removing the duplicates.

In [15]:
%%writefile mapper.py
#!/usr/bin/env python
#Answer for mapper.py
import sys
import json

for line in sys.stdin:
    jsonData = json.loads(line.rstrip())
    if 'doc_type' not in list(jsonData.keys()) or jsonData['doc_type'] == "":
        if 'publisher' in list(jsonData.keys()):
            print(jsonData['publisher'] + '\t1')
        else:
            print('None\t1')


Overwriting mapper.py


In [17]:
%%writefile reducer.py
#!/usr/bin/env python
# REDUCER

import sys
from collections import defaultdict
input_pairs = sys.stdin.readlines()
accumulator = defaultdict(lambda: 0)

for row in input_pairs:
    key_value_pair = row.split("\t", 1)
    if len(key_value_pair) != 2:
        continue
    word = key_value_pair[0]
    count = int(key_value_pair[1].strip())
    accumulator[word] = accumulator[word] + count
    
for (key, value) in accumulator.items():
    print(key)

Overwriting reducer.py


In [None]:
%%bash
rm -rf output
hadoop-standalone-mode.sh

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper.py,reducer.py \
-input ~/articles.json \
-mapper ./mapper.py \
-reducer ./reducer.py \
-output output

2) Return an histogram dictionary with the following specification:

   {"1-5" : Average references of articles between 1 and 5 pages inclusive}
   
   {"6-10" : Average references of articles between 6 and 10 pages inclusive}
   
   {"11-15" : Average references of articles between 11 and 15 pages inclusive}
   
   {"16-20" : Average references of articles between 16 and 20 pages inclusive}
   
   {"21-25" : Average references of articles between 21 and 25 pages inclusive}
   
   {"26-30" : Average references of articles between 26 and 30 pages inclusive}
   
   {">30" : Average references of articles with more than 30 pages}
   
- Using Mapper to select the necessary data from the JSON.
- Using Reducer to accumulates the value of data in each category and calculates the average values.

In [49]:
%%writefile mapper2.py
#!/usr/bin/env python
#Answer for mapper.py
import sys
import json
import numpy as np

for line in sys.stdin:
    jsonData = json.loads(line.rstrip())
    m = ''
    if 'references' in list(jsonData.keys()):
        m += str(len(jsonData['references']))
    if 'page_end' in list(jsonData.keys()) and jsonData['page_end'].isdigit():
        m += '\t' + jsonData['page_end']
    if 'page_start' in list(jsonData.keys()) and jsonData['page_start'].isdigit():
        m += '\t' + jsonData['page_start']
    print(m)

Overwriting mapper2.py


In [50]:
%%writefile reducer2.py
#!/usr/bin/env python
# REDUCER

import sys
from collections import defaultdict
input_pairs = sys.stdin.readlines()
accumulator = defaultdict(lambda: 0)
accumulator2 = defaultdict(lambda: 0)

for row in input_pairs:
    row = row.replace('\n','')
    key_value_pair = row.split("\t")
    if len(key_value_pair) < 3 or key_value_pair[0].isdigit()== False:
        continue
    references = int(key_value_pair[0])
    pages = int(key_value_pair[1]) - int(key_value_pair[2]) + 1
    if pages >=1 and pages <= 5:
        accumulator['1-5'] = accumulator['1-5'] + 1
        accumulator2['1-5'] = accumulator2['1-5'] + references
        
    if pages >=6 and pages <= 10:
        accumulator['6-10'] = accumulator['6-10'] + 1
        accumulator2['6-10'] = accumulator2['6-10'] + references
        
    if pages >=11 and pages <= 15:
        accumulator['11-15'] = accumulator['11-15'] + 1
        accumulator2['11-15'] = accumulator2['11-15'] + references
        
    if pages >=16 and pages <= 20:
        accumulator['16-20'] = accumulator['16-20'] + 1
        accumulator2['16-20'] = accumulator2['16-20'] + references
        
    if pages >=21 and pages <= 25:
        accumulator['21-25'] = accumulator['21-25'] + 1
        accumulator2['21-25'] = accumulator2['21-25'] + references
        
    if pages >=26 and pages <= 30:
        accumulator['26-30'] = accumulator['26-30'] + 1
        accumulator2['26-30'] = accumulator2['26-30'] + references
        
    if pages >30:
        accumulator['>30'] = accumulator['>30'] + 1
        accumulator2['>30'] = accumulator2['>30'] + references
        
for item in ['1-5']:
    print(item + '\t' + str(accumulator2[item]/accumulator[item]))
for item in ['6-10']:
    print(item + '\t' + str(accumulator2[item]/accumulator[item]))
for item in ['11-15']:
    print(item + '\t' + str(accumulator2[item]/accumulator[item]))
for item in ['16-20']:
    print(item + '\t' + str(accumulator2[item]/accumulator[item]))
for item in ['21-25']:
    print(item + '\t' + str(accumulator2[item]/accumulator[item]))
for item in ['26-30']:
    print(item + '\t' + str(accumulator2[item]/accumulator[item]))
for item in ['>30']:
    print(item + '\t' + str(accumulator2[item]/accumulator[item]))

Overwriting reducer2.py


In [None]:
%%bash
rm -rf output
hadoop-standalone-mode.sh

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper2.py,reducer2.py \
-input ~/articles.json \
-mapper ./mapper2.py \
-reducer ./reducer2.py \
-output output2