## Coursework 2 - MongoDB and Hadoop

Each question asks you to implement a function following a certain specification. We have an evaluation script that will check the correctness of your answer. It is essential that:

You respect the requested answer format, otherwise, the script will flag your answer as incorrect. If the result is correct, but your formatting is incorrect, you get a penalty of -20% of the mark.

Do not insert or delete cells in the notebook you will submit, the evaluation script relies on your answer being in the right cell.

If you feel the need of inserting cells to prepare and test your answer, do it in a copy of the notebook. Once you finish an answer that fits in a single cell, transfer it to the notebook that you will submit.

You can define auxiliary functions if you want, as long as they are all in the same cell as the main function.

If you add print statements for testing, please delete or comment them before submitting.

Answers can make use of the tools you learned so far: MongoDB pipelines and operators, Python code, pandas and networkx. An important aspect of the coursework is to identify in what situations is best to use MongoDB only, and in what situations is best to use multiple tools. A bad choice may make your answer less efficient or even not run at all.

### Context 

You have been hired by ChessChowDown Inc. to provide insights on chess games played in the Lichess portal. Your insights will be used by the company to develop new learning materials. 

### Schema

You are given a dataset of chess games played on the Lichess website (lichess.org), each game document has the following fields:


* **event**: Type of event the game was played 
* **site**: Where the game was played 
* **date**: date the game was played
* **UTCdate** : date in UTC format
* **UTCtime**: time the game was played in UTC format
* **White**: username of the White player
* **Black**: username of the Black player
* **ECO**: Alphanumeric code of the opening played in the game
* **opening**: Name of the opening played in the game
* **TimeControl**: Time control the game was played, e.g, 60 + 0 seconds per move increment or 5min + 3 seconds per increment
* **result**: result of the game
* **WhiteElo**: Elo rating of the White player at the time of the game (https://www.chess.com/terms/elo-rating-chess)
* **BlackElo**: Elo rating of the Black player at the time of the game
* **moves**: list of moves of the game, each move is a sub-document with the following fields:

    * **number**: Number of the move
    * **turn**: false if it was White move, true if it was Black move. Note that it is important to consider move number together with the turn: one can have move = 1 and turn = False (White's first move) and move = 1 and turn = True (Black's first move)
    * **clock**: player’s remaining time to the next time control after this move, in seconds
    * **move**: the algebraic notation of the move https://en.wikipedia.org/wiki/Algebraic_notation_(chess) 

    * **eval**: A subdocument with the evaluation of the position after the move calculated by a computer chess engine: 
  {'unit' : centipawns or mate , 'value' : an integer}. 
  
  If unit is 'centipawns', then 'value' is an integer that quantifies the advantage/disadvantage in the position after the move, from the point of view of the White player. A positive value indicates White player has a better position, a negative value indicates Black has a better position. 
  If unit is 'mate', then 'value' is the number of moves until checkmate. A positive value indicates that White player can checkmate in 'value' moves, a negative value, that Black player can checkmate in 'value' moves. 
  As checkmate ends the game, a mate evaluation indicates a very large advantage.


### Marking scheme

- The result provided is correct: An incorrect answer will have between 0% and 40% of the mark depending on the nature of the mistake. Questions where there was only one answer possible will have 0%, questions where the result is correct in some cases and not others will be marked at 20% or 40%. Feel free to create as many notebooks as you want for experimenting and transcribe your final answer to the one you submit.

- The result is provided in the expected format and output: 20% will be deducted to correct results that are not in the expected format. The reason is that bad format breaks the automated marking scripts. If you have doubts, ask. 

- Efficiency of the answer: Measured in terms of execution time. There are many ways to reach the correct result, some of them are more efficient than others, some are more straight forward than others. 

- Tables below detail the percentage of mark you get according to the efficiency of the answer, each cell shows the maximum time allowed to get the mark in the corresponding row. Answers that take more time than the time in the 60% column will be declared timeout and get zero points. 


| Task1 | Marks |  100% | 80% | 60% | 
| --- | --- | --- | --- | --- | 
| q1 | 5 | 5sec |  10 sec | 1minute | 
|  q2 | 5 | 10sec   | 40sec   | 3minutes  |
|  q3 | 6 |   30sec | 60 sec  |  4minutes  |
|  q4 | 6 | 30sec   | 60sec   | 4minutes   |
|  q5 | 8 | 5sec   | 10sec  | 1minute   |
|  q6 | 10  |  30sec   | 90sec   | 5minutes   |
|  q7 | 8 | 15sec    | 60sec   | 4minutes   |
|  q8 | 12 | 30 sec    | 120sec | 5minutes  | 
|  q9 | 10   | 30 sec   |  120sec  | 5minutes  |
|  q10| 10  |  30sec  | 120sec  | 5minutes   |


| Task2 | Marks | 100% | 80% | 60% | 
| --- |--- | --- | --- | --- | 
| q11 | 8 | 90sec |  3minutes | 6minutes | 
|  q12| 12  | 90sec   | 3minutes   | 6minutes   |

In [102]:
#import section
import pymongo
from pymongo import MongoClient
from datetime import datetime
from pprint import pprint
import networkx as nx
import pandas as pd

# Creation of pyMongo connection object
client = MongoClient('mongodb://localhost:27017')
db = client['tutorial']
games_collection = db['chess_5']

### Question 1


Write a function that receives as input a username, a colour ("White" or "Black") and a result, and returns all games where the input username plays as the input colour that ended with the input result.

For efficiency evaluation, we will use the username with most games played.



In [11]:
def get_games(games_collection,username,colour,result):
    #Your code here
    #Return list of game documents
    pipeline = [{"$match" : 
             {"$and": [
                 {colour: {"$eq" : username}},
                 {"result": {"$eq" : result}}
                  ]  
             }
    }]
    
    res = list(games_collection.aggregate(pipeline))
    if len(res) == 0:
        return 0
    return res
game_list = get_games(games_collection,"Nkko","Black","0-1")

### Question 2

Write a function that finds and removes duplicate games, leaving a single instance of a game. Games are duplicate if they have the same White, Black, UTCdate and UTCtime values.   


(5 marks)

In [87]:
add = [1,2,3,4]

In [88]:
add[1:]

[2, 3, 4]

In [90]:
games_collection.remove?

In [56]:
def remove_duplicates(games_collection):
    #your code here
    pipeline = [
    {
        u"$group": {
            u"_id": {
                u"White": u"$White"
            },
            u"COUNT(*)": {
                u"$sum": 1
            }
        }
    }, 
    {
        u"$project": {
            u"White": u"$_id.White",
            u"COUNT(*)": u"$COUNT(*)",
            u"_id": 0
        }
    }, 
    {
        u"$match": {
            u"COUNT(*)": {
                u"$gt": 1
            }
        }
    }, 
    {
        u"$project": {
            u"_id": 0,
            u"White": u"$White"
        }
    }, 
    {
        u"$group": {
            u"_id": None,
            u"distinct": {
                u"$addToSet": u"$$ROOT"
            }
        }
    }, 
    {
        u"$unwind": {
            u"path": u"$distinct",
            u"preserveNullAndEmptyArrays": False
        }
    }, 
    {
        u"$replaceRoot": {
            u"newRoot": u"$distinct"
        }
    }
    ]
    cursor = games_collection.aggregate(
        pipeline, 
        allowDiskUse = True
    )
    try:
        for doc in cursor:
            print(doc)
    finally:
        client.close()
    return doc

In [57]:
doc = remove_duplicates(games_collection)

{'White': 'Telorom'}
{'White': 'madrilenians'}
{'White': 'mostar'}
{'White': 'tdb59'}
{'White': 'aklonius'}
{'White': 'Karellust'}
{'White': 'EXchef'}
{'White': 'heavyload26'}
{'White': 'whiterubbit'}
{'White': 'ArfansSianakLangitt'}
{'White': 'cuauhtlahuac'}
{'White': 'shahinarabi5'}
{'White': 'svppasa'}
{'White': 'zex535'}
{'White': 'Pink_Floyd'}
{'White': 'ikxerz'}
{'White': 'PolynER'}
{'White': 'Creador'}
{'White': 'vitambaya'}
{'White': 'Sanet1982'}
{'White': 'rezanori32'}
{'White': 'anahita92'}
{'White': 'jmdons'}
{'White': 'rmcgre'}
{'White': 'Darth-Kirk'}
{'White': 'Moritz94'}
{'White': 'MaxWind'}
{'White': 'SteveTChess'}
{'White': 'Roza_202020'}
{'White': 'nonir'}
{'White': 'Mongo82'}
{'White': 'ofitser3'}
{'White': 'Huttetu'}
{'White': 'marinchuk'}
{'White': 'aarya_nath'}
{'White': 'flodou'}
{'White': 'pcircle85'}
{'White': 'meloturkmen'}
{'White': 'shimc'}


### Question 3

Write a function that returns the number of knight moves minus the number of bishop moves in all games in the dataset. Recall that in algebraic notation, a bishop move starts with the letter "B" and a Knight move starts with the letter "N". 

(6 marks)

In [193]:
def knight_vs_bishop(games_collection):
    # your code here
    # Requires pymongo 3.6.0
    pipeline = [
        {
            u"$project": {
                u"_id" : 0,
                u"moves": u"$moves.move"
            }
        }
    ]

    cursor = games_collection.aggregate(
        pipeline, 
        allowDiskUse = False
    )
    bishop_count = 0 
    knight_count = 0
    for doc in cursor:
        res_N = list(filter(lambda x: "N" == x[0], doc["moves"]))
        res_B = list(filter(lambda x: "B" == x[0], doc["moves"]))
        bishop_count = bishop_count + len(res_B)
        knight_count = knight_count + len(res_N)
    moves = knight_count - bishop_count
    return moves

In [194]:
knight_vs_bishop(games_collection)

1118

### Question 4

A colleague's exploration of the dataset reports: 

* the "date" field contains wrong values, the "UTCDate" contains the real dates
* WhiteElo and BlackElo fields are Strings, when they should be integers.

Write a function that drops the "date" field from all documents, and converts WhiteElo and BlackElo to integers.

(6 marks)

In [104]:
def date_and_elo_fix(games_collection):
    #your code here
    pipeline = [
    {
        '$set': {
            'WhiteElo': {
                '$toInt': '$WhiteElo'
            }, 
            'BlackElo': {
                '$toInt': '$BlackElo'
            }, 
            'date': '$$REMOVE'
        }
    }
    ]
    requests = [pymongo.UpdateMany({}, pipeline)]
    games_collection.bulk_write(requests)

# Apply the function to execute the update
import time
start_time = time.time()
date_and_elo_fix(games_collection)
print("--- %s seconds ---" % (time.time() - start_time))

--- 0.007425069808959961 seconds ---


In [85]:
games_collection.bulk_write?


### Question 5

An "upset" is a game between two players with a (large) rating difference won by the lower rated player.  

Write a function that receives as input an integer 'ratingDifference' and returns the number of upsets calculated based on the input 'ratingDifference'.

For efficiency evaluation, we will use ratingDifference = 50 

In [86]:
def count_upsets(games_collection,ratingDifference):
    res = games_collection.aggregate([
        {
            '$project': {
                'result': '$result',  
                'compare': {
                    '$cmp': [
                        '$WhiteElo', '$BlackElo'
                    ]
                }, 
                'ratingDifference': {
                    '$abs': {
                        '$subtract': [
                            '$WhiteElo', '$BlackElo'
                        ]
                    }
                }
            }
        }, {
            '$match': {
                '$or': [
                    {
                        'ratingDifference': {
                            '$gte': ratingDifference
                        }, 
                        'result': '0-1', 
                        'compare': 1
                    }, {
                        'ratingDifference': {
                            '$gte': ratingDifference
                        }, 
                        'result': '1-0', 
                        'compare': -1
                    }
                ]
            }
        }, {
            '$count': 'upsets'
        }
    ])
    
    upsets = list(res)[0]["upsets"]
    return upsets

count_upsets(games_collection,50)

144

In [83]:
for doc in cursor:
    print

<pymongo.cursor.Cursor at 0x7fba8deb6690>

### Question 6

A "blunder" is defined as a move that makes the evaluation of a position to lose >=200 centipawns, with respect to the evaluation of the previous move by the opponent _from the point of view of the player that made the move_ 

Examples:

* After Black's move 21 the evaluation is 50 centipawns and after White's move 22 the evaluation is -150 centipawns. White's move 22 is a blunder. 
* After White's move 36 the evaluation is 600 centipawns and after Black's move 36 the evaluation is checkmate for White in any number of moves. Black's move 36 is a blunder.
* After White's move 26 the evaluation is 50 centipawns and after Black's move 26 the evaluation is 300 centipawns. Black's move 26 is a blunder
* After Black's move 44, the evaluation is Mate in 4, that is, White checkmates in 4 moves, and after White's move 45 the evaluation is 600 centipawns. White's move 45 is a blunder

(Note the following case is not considered a blunder: After Black's move 44, the evaluation is Mate in 4, that is, White checkmates in 4 moves, and after White's move 45 the evaluation is Mate in 12.)


Remember evaluation values are expressed in terms of White's point of view. In the third example above, the evaluation increased more than 200 centipawns, but from Black's point of view, they are more than 200 centipawns worst, hence, the move was a blunder.

Write a function to add to each "move" subdocument of each game document a boolean field 'blunder' with true if the move is a blunder and false if it's not.




In [17]:
pipeline = [
    {
        '$set': {
            'blunder_bool': {
                '$reduce': {
                    'input': {
                        '$range': [
                            1, {
                                '$size': '$moves.eval.value'
                            }
                        ]
                    }, 
                    'initialValue': [], 
                    'in': {
                        '$concatArrays': [
                            '$$value', [
                                {
                                    '$cond': {
                                        'if': {
                                            '$gte': [
                                                {
                                                    '$switch': {
                                                        'branches': [
                                                            {
                                                                'case': {
                                                                    '$eq': [
                                                                        {
                                                                            '$arrayElemAt': [
                                                                                '$moves.eval.unit', '$$this'
                                                                            ]
                                                                        }, {
                                                                            '$arrayElemAt': [
                                                                                '$moves.eval.unit', {
                                                                                    '$subtract': [
                                                                                        '$$this', 1
                                                                                    ]
                                                                                }
                                                                            ]
                                                                        }
                                                                    ]
                                                                }, 
                                                                'then': {
                                                                    '$abs': {
                                                                        '$subtract': [
                                                                            {
                                                                                '$arrayElemAt': [
                                                                                    '$moves.eval.value', '$$this'
                                                                                ]
                                                                            }, {
                                                                                '$arrayElemAt': [
                                                                                    '$moves.eval.value', {
                                                                                        '$subtract': [
                                                                                            '$$this', 1
                                                                                        ]
                                                                                    }
                                                                                ]
                                                                            }
                                                                        ]
                                                                    }
                                                                }
                                                            }, {
                                                                'case': {
                                                                    '$ne': [
                                                                        {
                                                                            '$arrayElemAt': [
                                                                                '$moves.eval.unit', '$$this'
                                                                            ]
                                                                        }, {
                                                                            '$arrayElemAt': [
                                                                                '$moves.eval.unit', {
                                                                                    '$subtract': [
                                                                                        '$$this', 1
                                                                                    ]
                                                                                }
                                                                            ]
                                                                        }
                                                                    ]
                                                                }, 
                                                                'then': 10000
                                                            }
                                                        ]
                                                    }
                                                }, 200
                                            ]
                                        }, 
                                        'then': True, 
                                        'else': False
                                    }
                                }
                            ]
                        ]
                    }
                }
            }
        }
    }, {
        '$set': {
            'moves.eval': {
                '$map': {
                    'input': {
                        '$range': [
                            0, {
                                '$size': '$moves'
                            }
                        ]
                    }, 
                    'in': {
                        '$mergeObjects': [
                            {
                                '$arrayElemAt': [
                                    '$moves', '$$this'
                                ]
                            }, {
                                'blunder': {
                                    '$arrayElemAt': [
                                        '$blunder_bool', '$$this'
                                    ]
                                }
                            }
                        ]
                    }
                }
            }
        }
    }, {
        '$unset': 'blunder_bool'
    }
]

In [99]:
pipeline = [{
'$set': {
'moves': {
'$map': {
'input': {
'$range': [
1, {
'$size': '$moves'
}
]
}, 
'in': {
'$mergeObjects': [
{
'$arrayElemAt': [
'$moves', '$$this'
]
}, {
'blunder': {
'$cond': {
'if': {
'$gte': [
{
'$switch': {
'branches': [
{
'case': {
'$eq': [
{
'$arrayElemAt': [
'$moves.eval.unit', '$$this'
]
}, {
'$arrayElemAt': [
'$moves.eval.unit', {
'$subtract': [
'$$this', 1
]}]}]
}, 
'then': {
'$abs': {
'$subtract': [
{
'$arrayElemAt': [
'$moves.eval.value', '$$this'
]
}, {
'$arrayElemAt': [
'$moves.eval.value', {
'$subtract': [
'$$this', 1
]}]}]}}
}, {
'case': {
'$ne': [
{
'$arrayElemAt': [
'$moves.eval.unit', '$$this'
]
}, {
'$arrayElemAt': [
'$moves.eval.unit', {
'$subtract': [
'$$this', 1
]}]}]}, 
'then': 10000
}]}}, 100
]}, 
'then': True, 
'else': False
}}}]
}}}
}}
]

In [100]:
import time
start_time = time.time()

requests = [pymongo.UpdateMany({}, pipeline)]
games_collection.bulk_write(requests)

# a = games_collection.update_many({},pipeline)
print("--- %s seconds ---" % (time.time() - start_time))

--- 1.8755288124084473 seconds ---


###  Question 7

Now that we have blunders data, the company wants to get insights on the factors that may be related to them. They believe that openings where more blunders happen are "more difficult", hence, creating learning resources for those would make business sense. They set as cut-point for the opening phase of a game the first 15 moves (inclusive) and ask you to prepare data for a statistical analysis of the correlation between ECO codes and opening phase blunders.

Write code for a function that returns a pandas DataFrame with three columns:

1. ECO: the opening code.
2. Games: the number of games with that ECO
3. MOB: Median Opening Blunders, median of number of blunders in the opening phase 


Example output:

| ECO | Games | MOB |
| --- | --- | ---- |
| C38 | 3756  |   8   |
| C39 | 2100 | 4 |
| C40 | 1152 | 2 |
| ... | ... | ... |

NOTE: The correctness of question 6 will not affect your mark on question 7.
If you find question 6 challenging, you may want to create a test question 7 before coming back to question 6



In [54]:
def blunders_vs_openings(games_collection):
    pipeline = [
    {
    '$project': {
    'ECO': '$ECO', 
    'blunder': '$moves.blunder'
    }}, 
    {
    '$project': {
    'ECO': '$ECO', 
    'blunder_bool': {
    '$reduce': {
    'input': {
    '$range': [
    0, 16]
    }, 
    'initialValue': [], 
    'in': {
    '$concatArrays': [
    '$$value', [
    {
    '$cond': {
    'if': {
    '$eq': [
    {
    '$arrayElemAt': [
    '$blunder', '$$this'
    ]}, True]
    }, 
    'then': '$$this', 
    'else': None
    }}]]
    }}}
    }}, 
    {
    '$group': {
    '_id': {
    'ECO': '$ECO'
    }, 
    'doc': {
    '$push': '$blunder_bool'
    }, 
    'Games': {
    '$sum': 1
    }}}, 
    {
    '$project': {
    'ECO': '$_id.ECO', 
    'doc': '$doc', 
    'Games': '$Games', 
    '_id': 0
    }}
    ]
    cursor = games_collection.aggregate(pipeline)
    df = pd.DataFrame(list(cursor))
    df["MOB"] = df.doc.map(lambda x : concat_var(x))
    df = df.drop(["doc"],axis=1)
    df = df.sort_values(by='Games',ascending=False)
    return df

def concat_var(arr):
    import numpy as np
    arr = np.concatenate(arr, axis=0)
    arr = list(filter(lambda item: item is not None, arr))
    if len(arr)==0:
        return 0
    median_var = int(np.mean(arr))
    return median_var

import time
start_time = time.time()
df = blunders_vs_openings(games_collection)
print("--- %s seconds ---" % (time.time() - start_time))
df

--- 0.06620287895202637 seconds ---


Unnamed: 0,ECO,Games,MOB
58,A00,45,9
45,C00,35,8
128,B01,34,9
0,A40,28,8
57,D00,27,8
...,...,...,...
24,B03,1,7
82,C78,1,14
81,B33,1,10
79,C33,1,6


### Question 8

An essential aspect of online chess is cheating detection. Cheaters transfer moves in their games to chess computer engines and play back the moves computed by the engine. Two factors that suggest a player might be cheating are long winning streaks and low move time standard deviation, that is, taking the same time to play each of their moves. Cheat games are useless for the purposes of ChessChowDown, therefore, bosses are interested in filtering them out.

Write a function that receives the following input parameters: 

 * minStreak: minimum number of consecutive wins to warrant cheating analysis
 * TimeControl: Time Control to analyse
 * maxTimeStd: maximum move time standard deviation to flag as cheater
 
 and returns a list of potential cheating usernames that match all the following conditions:
 
 1. Have won 'minStreak' or more consecutive games played with 'TimeControl' at least once.
 2. For all of the username's winning streaks with length greater or equal than 'minStreak'played with 'TimeControl', each game has a move time standard deviation lower or equal than maxTimeStd 
 
 For efficiency evaluation, we will use the TimeControl with the most games, minStreak = 3 and maxTimeStd = 1

In [37]:
def cheater_detector(games_collection,minStreak,timeControl,maxTimeStd):
    import numpy as np
    #your code here
    pipeline = [
    {
        '$match': {
            'TimeControl': '0+1', 
            '$or': [
                {
                    'result': '1-0'
                }, {
                    'result': '0-1'
                }
            ]
        }
    }, {
        '$project': {
            'TimeControl': '$TimeControl', 
            'UTCdate': '$UTCdate', 
            'result': '$result', 
            'UTCtime': '$UTCtime', 
            'White': '$White', 
            'clock': '$moves.clock', 
            'user': {
                '$concatArrays': [
                    [
                        '$White'
                    ], [
                        '$Black'
                    ]
                ]
            }
        }
    }, {
        '$unwind': {
            'path': '$user'
        }
    }, {
        '$project': {
            'user': '$user', 
            'UTCdate': '$UTCdate', 
            'UTCtime': '$UTCtime', 
            'result': '$result', 
            'clock': '$clock', 
            'win': {
                '$cond': {
                    'if': {
                        '$eq': [
                            '$White', '$user'
                        ]
                    }, 
                    'then': {
                        '$toInt': {
                            '$substr': [
                                '$result', 0, 1
                            ]
                        }
                    }, 
                    'else': {
                        '$toInt': {
                            '$substr': [
                                '$result', 2, 1
                            ]
                        }
                    }
                }
            }
        }
    }, {
        '$group': {
            '_id': '$user', 
            'UTCdate': {
                '$first': 'UTCdate'
            }, 
            'UTCtime': {
                '$first': '$UTCtime'
            }, 
            'result': {
                '$first': '$result'
            }, 
            'clock': {
                '$push': '$clock'
            }, 
            'win': {
                '$sum': '$win'
            }
        }
    }, {
        '$match': {
            'win': {
                '$gt': 1
            }
        }
    }, {
        '$project': {
            'sususer': '$_id', 
            'clock': '$clock'
        }
    }
    ]
    cursor = games_collection.aggregate(pipeline)
    print(list(cursor))
    suspicious_user = set()
    for doc in cursor:
        user = doc["_id"]
        cheater = True
        for time_arr in doc["clock"]:
            std_time = np.std(time_arr)
            if std_time >  maxTimeStd:
                cheater = False
        if cheater == True: 
            suspicious_user.add(user)
    return list(suspicious_user)
a = cheater_detector(games_collection,1,'0+1',1)
len(a)

[{'_id': 'Nkko', 'sususer': 'Nkko', 'clock': [[3, 3, 4, 3, 5, 4, 5, 4, 5, 4, 5, 5, 5, 5, 6, 5, 7, 6, 8, 6, 5, 7, 4, 8, 4, 8, 4, 8, 4, 8, 4, 9, 4, 10, 3, 9, 2, 10, 2, 10, 2, 10], [3, 3, 4, 4, 3, 5, 4, 5, 4, 6, 4, 5, 4, 6, 4, 6, 5, 6, 6, 7, 7, 7, 8, 6, 8, 7, 8, 6, 8, 5, 7, 4, 8, 4, 7, 4, 6, 3, 6, 4, 6, 3, 5, 3, 6, 3, 6, 4, 6, 4, 7, 5, 7, 5, 7, 5, 8, 6, 8, 7, 8, 8, 8, 8, 8, 7, 9, 7, 9, 8, 10, 8, 10, 7]]}]


[]

### Question 9

The company now wants to explore the social aspect of the games. The team decides it is a good idea to store explicit information about who plays against who, in preparation for further social network analysis. 

Write a function that creates a new collection named "social" with the following schema:

* "username" : username
* "numgames" : number of games played by username with any colour
* "played" : list of subdocuments with the following schema:
     "username" : opponent's username (different from parent username)
     "numgames" : Number of games played between parent username and opponent username
     "gameids" : list of id of the games played between parent username and opponent username

An example subset of the new collection is shown below.



```JSON
{
"username" : "DataKnight",
"numgames" : 28 ,
"played" : [ { "username" : "MongoQueen", "numgames" : 12 , 
"gameids" : ["ids of the games between DataKnight and MongoQueen"] } , 
            {"username" : "ThePandas", "numgames" : 14 , 
            "gameids" : ["ids of the games between DataKnight and ThePandas"] } , 
			{"username" : "JupyterGod", "numgames" : 2 ,
            "gameids" : ["ids of the games between DataKnight and JupyterGod"] }]
}
{
"username" : "MongoQueen",
"numgames" : 20,
"played" : [ { "username" : "DataKnight", "numgames" : 12 ,
"gameids" : ["ids of the games between MongoQueen and DataKnight]" } , 
            {"username" : "Hadoooooooop", "numgames" : 6 , 
            "gameids" : ["ids of the games between MongoQueen and Hadoooooooop"]  }, 
			{"username" : "JupyterGod", "numgames" : 2 , 
            "gameids" : ["ids of the games between MongoQueen and JupyterGod"]} ]
}
```

In [72]:
def create_social(games_collection):
    #your code here
    pipeline = [
    {
        '$project': {
            '_id': 1, 
            'opening': 1, 
            'player1': {
                '$map': {
                    'input': {
                        '$literal': [
                            'p1', 'p2'
                        ]
                    }, 
                    'as': 'p', 
                    'in': {
                        '$cond': [
                            {
                                '$eq': [
                                    '$$p', 'p1'
                                ]
                            }, '$White', '$Black'
                        ]
                    }
                }
            }, 
            'player2': {
                '$map': {
                    'input': {
                        '$literal': [
                            'p1', 'p2'
                        ]
                    }, 
                    'as': 'p', 
                    'in': {
                        '$cond': [
                            {
                                '$eq': [
                                    '$$p', 'p1'
                                ]
                            }, '$Black', '$White'
                        ]
                    }
                }
            }
        }
    }, {
        '$project': {
            'player1': 1, 
            'opening': 1, 
            'played': [
                {
                    'player11': {
                        '$arrayElemAt': [
                            '$player1', 0
                        ]
                    }, 
                    'player22': {
                        '$arrayElemAt': [
                            '$player1', 1
                        ]
                    }
                }, {
                    'player11': {
                        '$arrayElemAt': [
                            '$player1', 1
                        ]
                    }, 
                    'player22': {
                        '$arrayElemAt': [
                            '$player1', 0
                        ]
                    }
                }
            ]
        }
    }, {
        '$project': {
            '_id': 0, 
            'gameid': '$_id', 
            'numgames1': 1, 
            'opening': 1, 
            'played': 1
        }
    }, {
        '$unwind': {
            'path': '$played'
        }
    }, {
        '$project': {
            'username': '$played.player11', 
            'played': {
                'username': '$played.player22', 
                'gameid': '$gameid', 
                'count': {
                    '$sum': 1
                }
            }
        }
    }, {
        '$group': {
            '_id': '$username', 
            'numgames': {
                '$sum': 1
            }, 
            'played': {
                '$push': '$played'
            }
        }
    }, {
        '$unwind': {
            'path': '$played'
        }
    }, {
        '$group': {
            '_id': {
                'username_p': '$_id', 
                'username_o': '$played.username'
            }, 
            'numgames_o': {
                '$sum': 1
            }, 
            'numgames_p': {
                '$first': '$numgames'
            }, 
            'gamesid': {
                '$push': '$played.gameid'
            }, 
            'played': {
                '$push': '$played'
            }
        }
    }, {
        '$project': {
            'username': '$_id.username_p', 
            'numgames': '$numgames_p', 
            'username_o': '$_id.username_o', 
            'gameid_o': '$gamesid', 
            'numgames_o': '$numgames_o'
        }
    }, {
        '$project': {
            'username': '$username', 
            'numgames': '$numgames', 
            'played': {
                'username': '$username_o', 
                'gameid': '$gameid_o', 
                'numgames': '$numgames_o'
            }
        }
    }, {
        '$group': {
            '_id': '$username', 
            'numgames': {
                '$first': '$numgames'
            }, 
            'played': {
                '$push': '$played'
            }
        }
    }, {
        '$project': {
            'username': '$_id', 
            'numgames': '$numgames', 
            'played': '$played', 
            '_id': 0
        }
    }, {
        '$out': 'social'
    }
    ] 
    cursor = games_collection.aggregate(pipeline)

create_social(games_collection)

--- 0.1443929672241211 seconds ---


### Question 10

The company wants to investigate communities of players that play the same openings  

Write a function that receives as input a list of ECO codes 'ecoCodes' and construct a networkx graph as follows 

 * Nodes are labeled with usernames
 * A directed edge from user1 to user 2 if a game g exists such that g's ECO is in the ecoCodes list, user1 played White and user2 played Black
 * Each directed edge is labeled with the result of the game it represents.
 
 Using that graph, return the following dictionary:
 
 { 'graph' : the networkx graph,
   'mostWhite' : username that played the most games as White ,   
   'mostBlack' : username that played the most games as black  ,
   'keyPlayers' : list of usernames with highest betweenness centrality 
 }
 
 Assume the length of ecoCodes is restricted to at most 5. For efficiency evaluation, we will use the 5 ECO codes with the most  games.
 

In [58]:
def list_max(dic):
    res = []
    max_value = max(dic.values())
    for k,v in dic.items():
        if v == max_value:
           res.append(k) 
    return res
def opening_community(games_collection,ecoCodes):
    #your code here
    # Return dictionary below with corresponding values
    """
    return { 'graph' : the networkx graph
    'mostWhite' : username that played the most games as White 
    'mostBlack' : username that played the most games as black 
    'keyPlayers' : list of usernames with highest betweenness centrality 
    }
    """
    pipeline = [
    {
        '$match': {
            'ECO': {
                '$in': ecoCodes
            }
        }
    }, {
        '$project': {
            'White': '$White', 
            'Black': '$Black', 
            'result': '$result', 
            '_id': 0
        }
    }
    ]
    
    cursor = games_collection.aggregate(pipeline)
    df = pd.DataFrame(cursor)
    g = nx.from_pandas_edgelist(df, "White", "Black", edge_attr="result", create_using=nx.MultiDiGraph())

    list_white_user = max(g.out_degree())
    list_white_user = list_white_user[0]
    list_black_user = max(g.in_degree())
    list_black_user = list_black_user[0]
    
    g2 = nx.DiGraph(g)
    centrality = nx.betweenness_centrality(g2)
    centrality = list_max(centrality)
    
    result_dic = {}
    result_dic["graph"] = g
    result_dic["mostWhite"] = list_white_user
    result_dic["mostBlack"] = list_black_user
    result_dic["keyPlayers"] = centrality
    return result_dic

ecoCodes = ["B20", "B39", "A40", "B28", "B12"]
import time
start_time = time.time()
result_dic = opening_community(games_collection,ecoCodes)
print("--- %s seconds ---" % (time.time() - start_time))

--- 0.008358001708984375 seconds ---


In [59]:
result_dic

{'graph': <networkx.classes.multidigraph.MultiDiGraph at 0x7fcd02a5a650>,
 'mostWhite': 'zuppid',
 'mostBlack': 'zuppid',
 'keyPlayers': ['matyko',
  'Nkko',
  'Temarpa',
  'fanoftal2',
  'Guest00936',
  'pawnchomper',
  'deliverus',
  'doctorpwn',
  'Bendo974',
  'Ivan_Mishenin_89',
  'J35U5',
  'DepressedBishop',
  'blitzkrieg200',
  'baladhars',
  'mnsa',
  'emadighreza',
  'AshotManukyan57',
  'Franchesco008',
  'AndrejMikula1',
  'matnl',
  'lumiga',
  'Blacknot',
  'vagor',
  'BobodaCorte',
  'flauty',
  'hectorlopez',
  'kula69',
  'delta_gate',
  'Florante',
  'Vredina',
  'Schwert',
  'msahinn',
  'Kebica',
  'matthi2609',
  'VRaptor',
  'Anacharsis',
  'svppasa',
  'ligurematto',
  'OlegTrof',
  'UNK',
  'mzinalabdin',
  'Octaecto',
  'WhatAPatzer',
  'Mickablitz',
  'vecam',
  'jerryspring',
  'Besnik95',
  'jarred771',
  'Mongo82',
  'Ardenadri08',
  'bojesto',
  'AlistairM',
  'hamid-aylien1394',
  'TAMARAN67',
  'zuppid',
  'prutis',
  'silvestr18by',
  'kocinus',
  'Zxcl

In [75]:
pipeline = [
    {
        '$match': {
            'ECO': {
                '$in': ecoCodes
            }
        }
    }, {
        '$project': {
            'White': '$White', 
            'Black': '$Black', 
            'result': '$result', 
            '_id': 0
        }
    }
]
    
cursor = games_collection.aggregate(pipeline)
df = pd.DataFrame(cursor)
g = nx.from_pandas_edgelist(df, "White", "Black", edge_attr="result", create_using=nx.DiGraph())

dict_white = dict(g.out_degree())
list_white_user = max(zip(dict_white.values(),dict_white.keys()))[1]
dict_black = dict(g.in_degree())
list_black_user = max(zip(dict_black.values(),dict_black.keys()))[1]


centrality = nx.betweenness_centrality(g)
centrality = list_max(centrality)

result_dic = {}
result_dic["graph"] = g
result_dic["mostWhite"] = list_white_user
result_dic["mostBlack"] = list_black_user
result_dic["keyPlayers"] = centrality


# ecoCodes = ["B20", "B39", "A40", "B28", "B12"]
# import time
# start_time = time.time()
# result_dic = opening_community(games_collection,ecoCodes)
# print("--- %s seconds ---" % (time.time() - start_time))

In [76]:
result_dic

{'graph': <networkx.classes.digraph.DiGraph at 0x7fcd018bded0>,
 'mostWhite': 'zuppid',
 'mostBlack': 'spiderman2012',
 'keyPlayers': ['matyko',
  'Nkko',
  'Temarpa',
  'fanoftal2',
  'Guest00936',
  'pawnchomper',
  'deliverus',
  'doctorpwn',
  'Bendo974',
  'Ivan_Mishenin_89',
  'J35U5',
  'DepressedBishop',
  'blitzkrieg200',
  'baladhars',
  'mnsa',
  'emadighreza',
  'AshotManukyan57',
  'Franchesco008',
  'AndrejMikula1',
  'matnl',
  'lumiga',
  'Blacknot',
  'vagor',
  'BobodaCorte',
  'flauty',
  'hectorlopez',
  'kula69',
  'delta_gate',
  'Florante',
  'Vredina',
  'Schwert',
  'msahinn',
  'Kebica',
  'matthi2609',
  'VRaptor',
  'Anacharsis',
  'svppasa',
  'ligurematto',
  'OlegTrof',
  'UNK',
  'mzinalabdin',
  'Octaecto',
  'WhatAPatzer',
  'Mickablitz',
  'vecam',
  'jerryspring',
  'Besnik95',
  'jarred771',
  'Mongo82',
  'Ardenadri08',
  'bojesto',
  'AlistairM',
  'hamid-aylien1394',
  'TAMARAN67',
  'zuppid',
  'prutis',
  'silvestr18by',
  'kocinus',
  'Zxclnic

In [None]:
def machine_learning_central(articles):
    """
    Input: Articles collections.
    Output : {
       'overall' : (id_article_highest_indegree_centrality),
       '2017' : id_article_highest_indegree_centrality published year 2017,
       '2018' : id_article_highest_indegree_centrality published year 2018,
       '2019' : id_article_highest_indegree_centrality published year 2019,
    }
    In the case that for a given year, no article has been referenced (and thus, all centrality values are zero),
    put None as the value of that year, example:
    {
       'overall' : id_article_highest_indegree_centrality,
       '2017' : id_article_highest_indegree_centrality of year 2017,
       '2018' : id_article_highest_indegree_centrality of year 2018,
       '2019' : None,
    }
    """
    #Your code here
    machineLearningDict = {}
    checkFos = {'$match': {"$and": [{'fos': { '$exists' : True} }, {'references': { '$ne': None}}]}}
    unwindFos = {'$unwind': {'path' : '$fos'}}
    filterMachinieLearningArticles = {'$match' : {'fos.name' : {'$regex' : 'machine learning' , '$options' : 'i'}}}
    groupByYear = {'$group' : { '_id' : '$year','articleDetails' : {'$addToSet' : { 'articleId' : '$id', 'references' : '$references'}}}}
    unwindArticleDetails = {'$unwind' :{'path' : '$articleDetails'}}
    unwindReferencesDetails = {'$unwind' :{'path' : '$articleDetails.references'}}
    projectDetails = {'$project' : {'_id':0, 'year':'$_id', 'articleId' : '$articleDetails.articleId', 'references' : '$articleDetails.references'}}
    documents = list(articles.aggregate([checkFos,unwindFos,filterMachinieLearningArticles,groupByYear,unwindArticleDetails,unwindReferencesDetails,projectDetails]))
    articlesDataFrame = pd.DataFrame(documents)
    machineLearningDict['overall'] = calculateInDegreeCentrality(articlesDataFrame,'references','articleId')
    machineLearningDict['2017'] = calculateInDegreeCentrality(articlesDataFrame[articlesDataFrame['year'] == 2017],'references','articleId')
    machineLearningDict['2018'] = calculateInDegreeCentrality(articlesDataFrame[articlesDataFrame['year'] == 2018],'references','articleId')
    machineLearningDict['2019'] = calculateInDegreeCentrality(articlesDataFrame[articlesDataFrame['year'] == 2019],'references','articleId')
    return machineLearningDict

         
def calculateInDegreeCentrality(articlesDataFrame,references,articleId):
   diGraph = nx.from_pandas_edgelist(articlesDataFrame,source=articleId,target=references, edge_attr=True, create_using=nx.DiGraph())
   centralityDegree = nx.in_degree_centrality(diGraph)
   max_value = max(centralityDegree.values())
   max_key = [i for i in centralityDegree.keys() if centralityDegree[i]==max_value]
   return max_key[0]

In [159]:
result_dic

{'graph': <networkx.classes.digraph.DiGraph at 0x7fcd99bcb8d0>,
 'mostWhite': ['matyko',
  'Temarpa',
  'Guest00936',
  'deliverus',
  'Bendo974',
  'J35U5',
  'blitzkrieg200',
  'mnsa',
  'AshotManukyan57',
  'AndrejMikula1',
  'lumiga',
  'vagor',
  'flauty',
  'kula69',
  'Florante',
  'Schwert',
  'Kebica',
  'VRaptor',
  'svppasa',
  'OlegTrof',
  'mzinalabdin',
  'WhatAPatzer',
  'vecam',
  'Besnik95',
  'Mongo82',
  'bojesto',
  'hamid-aylien1394',
  'zuppid',
  'silvestr18by',
  'Zxclnic',
  'lord3001',
  'chanchofuncky',
  'Serdarbaba41',
  'aarya_nath',
  'SerkaS',
  'inaciodias',
  'laslonadjlacika',
  'eluxio',
  'Siagas',
  'Isaac2016',
  'influx',
  'faentadere',
  'IQ_Jese',
  'exrolexx',
  'beyazid1453',
  'Darth-Kirk',
  'musicman124',
  'jeminallwin',
  'whiox',
  'tieukientuong',
  'ChequersFan',
  'MaxSwank',
  'SHOMITA',
  'andykerubim99',
  'nare5',
  'deserteagle6983',
  'McFlasher',
  'jesusroldan',
  'King-of-Kings',
  'VitoRomanelli',
  'RONDO_LIKE'],
 'mostBl

## Task 2

Questions 11 and 12 assess your ability to use the Hadoop Streaming API and MapReduce to process data. For each of the questions below, you are expected to write two python scripts, one for the Map phase and one for the Reduce phase. You are also expected to provide the correct parameters to the `hadoop` command to run the MapReduce process. Write down your answers in the specified cells below.

You will use the same dataset you used for task 1.

To help you, `%%writefile` has been added to the top of the cells, automatically writing them to "mapper.py" and "reducer.py" respectively when the cells are run.

No need to return a Python dictionary, the expected output here is the output file of Hadoop, named "output" for question1 and "output2" for question (do not change that part of the question)

### Question 11

Re-implement question 3 as a MapReduce Hadoop job: return the number of knight moves minus the number of bishop moves. Recall that in algebraic notation, a bishop move starts with the letter "B" and a Knight move starts with the letter "N".



In [56]:
%%writefile mapper.py
#!/usr/bin/env python
# MAPPER
import sys
import json
for line in sys.stdin:
    jsonData = json.loads(line)
    for m in jsonData["moves"]:
        if m['move'][0] == 'B':
            sys.stdout.write("B\t1 \n")
        if m['move'][0] == 'N':
            sys.stdout.write("N\t1 \n")

Overwriting mapper.py


In [78]:
%%writefile reducer.py
#!/usr/bin/env python
# REDUCER
import sys
mapper_data = sys.stdin.readlines()
result = {}

for val in mapper_data:
    map_data = val.split("\t",1)
    if len(map_data) != 2:
        continue
        
    move = map_data[0]
    count = int(map_data[1].strip())
    if (len(result) == 0) or (move not in result.keys()):
        result[move] = count
    else:
        result[move]=result[move]+count
    
bishop_moves = result['B']
knight_moves = result['N']
result = knight_moves-bishop_moves
sys.stdout.write(str(result))

Overwriting reducer.py


In [79]:
%%bash
chmod a+x mapper.py reducer.py
cat /Users/AXT6YP7/Desktop/chess.json | ./mapper.py | ./reducer.py | sort

1118


In [2]:
%%bash
#Hadoop command to run the map reduce.
hadoop-standalone-mode.sh

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper.py,reducer.py \
-input /Users/AXT6YP7/Desktop/games1.json \
-mapper ./mapper.py \
-reducer ./reducer.py \
-output output

### Question 12

Expert chess consultant Elizabeth Harmon analysed a small random sample of the games leading to concerns about some evaluation values. She fears the computer engine was not allowed enough time to process complex positions, leading to evaluation errors that would affect previous analysis. She would like to look at a sample of games with significant evaluation changes to confirm if they are blunders or mis-evaluations. After some discussion, the team decides that a metric to select potentially wrongly evaluated games is a high variance in centipawn evaluations (ignoring checkmate evaluations).

$wrongEval(game) = Var(\{move.eval.value : move \in game.moves \wedge move.eval.unit = centipawn \} ) > 800 $

Implement a MapReduce Hadoop job that returns a list of potentially wrongly evaluated games. The returned list must contain tuples (id,var), with 'id' the id of the game and 'var' the variance as per the equation above, and ordered by decreasing variance value.

Example:

$[(g456tr4,2600),(f783bw2,2594),(wq886tr1, 2593)....]$



In [81]:
%%writefile mapper2.py
#!/usr/bin/env python
#Answer for mapper2.py
import sys
import json

for line in sys.stdin:
    jsonData = json.loads(line)
    for m in jsonData["moves"]:
        if m["eval"]["unit"] == "centipawns":
            out = jsonData["_id"]["$oid"] + "\t" + str(m["eval"]["value"]) + "\n"
            sys.stdout.write(out)

Writing mapper2.py


In [80]:
%%writefile reducer2.py
#!/usr/bin/env python
#Answer for reducer2.py
import sys
import numpy as np

result = {}
mapper_data = sys.stdin.readlines()
for val in mapper_data:
    map_data = val.split("\t",1)
    if len(map_data) != 2:
        continue
        
    game_id = map_data[0]
    count = [int(map_data[1].strip())]

    if (len(result) == 0) or (game_id not in result.keys()):
        result[game_id] = [count]
    else:
        result[game_id].append(count)

res = []
for (key, value) in result.items():
    res.append((key, np.var(value)))
result.sort(key = lambda x : x[1],reverse = True)
for i in result:
    sys.stdout.write(str(i))

Writing reducer2.py


In [22]:
from collections import defaultdict
accumulator = defaultdict(lambda: 0)

In [23]:
accumulator["anant"] = 1
# result["adeeba"] = [2]

In [24]:
accumulator["anant"] = accumulator["anant"] + 4

In [25]:
accumulator

defaultdict(<function __main__.<lambda>()>, {'anant': 5})

In [19]:
import numpy as np
res = []
for (key, value) in result.items():
    res.append((key,np.var(value)))

In [20]:
res

[('anant', 0.25), ('adeeba', 0.25)]

In [82]:
%%bash
chmod a+x mapper2.py reducer2.py
cat /Users/AXT6YP7/Desktop/chess.json | ./mapper2.py | ./reducer2.py | sort 

('635f595f6f579310355d3524', 162209)('6350e1f972dec174ec94a2be', 153310)('6353497f66d6f91385e345a9', 152174)('63513a5c72dec174ec94f5ce', 151229)('63538a3566d6f91385e3831b', 144236)('6351ca8a72dec174ec957c65', 129107)('635fbcdd6f579310355d86a4', 127676)('6355a5f466d6f91385e582ff', 126915)('635021dc72dec174ec93ec89', 118193)('635f1caf6f579310355d032c', 108004)('6353de9566d6f91385e3d2f9', 98171)('6354ef7166d6f91385e4d3d9', 93440)('635f98826f579310355d6914', 89987)('63523ba472dec174ec95e63d', 88641)('63546e6066d6f91385e45993', 79357)('635efb6a6f579310355ce788', 79259)('635464d966d6f91385e45079', 76905)('634da9f2f8de8fa77f3c48bf', 76140)('6355157b66d6f91385e4f859', 75283)('6350ca9772dec174ec948c95', 72276)('6354cdb366d6f91385e4b409', 72209)('63534b9666d6f91385e3477f', 71055)('63554ba866d6f91385e52c85', 70424)('63534bce66d6f91385e347b1', 70069)('635182be72dec174ec953988', 67270)('6354f7fc66d6f91385e4dc00', 67199)('635445ee66d6f91385e43390', 64035)('6352227172dec174ec95ce65', 62980)('63543291

In [None]:
%%bash
#Hadoop command to run the map reduce.
rm -rf output2

hadoop-standalone-mode.sh

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper2.py,reducer2.py \
-input ~/datasets/chess.json \
-mapper ./mapper2.py \
-reducer ./reducer2.py \
-output output2