In [None]:
import os
import gzip
import json
import requests

<br><h1>#Dataset</h1>

In [2]:
# city dataset
url = 'http://bulk.openweathermap.org/sample/city.list.min.json.gz'
cities_gz = url.split("/")[-1]

In [3]:
# download json if not exists
if not os.path.isfile(cities_gz):
    print('downloading')
    with open(cities_gz, "wb") as f:
        r = requests.get(url)
        f.write(r.content)
else:
    print('downloaded!')

downloading


In [42]:
# extract gz and store at cities_json variable
with gzip.open(cities_gz, "rb") as f:
    cities_json = json.loads(f.read().decode("utf-8"))

In [None]:
print('total city in the world: ', len(cities_json))

<br>

<br><h1>#Node</h1>

In [1]:
'''node class'''
class Node:
    def __init__(self, id):
        self.id = id
        self.datas = []
        
    def put(self, datas):
        self.datas.extend(datas)
    
    def sum(self, function):
        return function(self.datas)

<br><h1>#Algorithm</h1>

In [2]:
'''algorithm for node to sum all data they have'''
def aggregate(city_ids):
    total = 0
    apikey = '133559c63e055b23b0dd2dafc698fcfe'
    endpoint = "http://api.openweathermap.org/data/2.5/weather"

    for city in city_ids:
        response  = requests.get(endpoint, params={'id': city, 'APPID': apikey, 'units': 'metric'})
        json = response.json()
        total += json['main']['temp']

    return total


'''distributing data to available nodes'''
def map(datas, nodes):
    
    n_node = len(nodes)
    n_data = len(datas)
    
    start = 0
    stop  = 0

    if n_data > n_node:
        
        chunk  = int(n_data / n_node)
        
        for node in nodes:
            start = stop
            stop += chunk
            node.put(datas[start:stop])
            

    j = 0
    for i in range(stop, n_data):
        nodes[j].put([datas[i]])
        j += 1
        

'''totalling node's sum result'''
def reduce(nodes):
    total = 0
    for node in nodes:
        total += node.sum(aggregate)
        
    return total

<br><h1>#Execute</h1>

In [None]:
print("--- Map Reduce Simulation: Average Earth's Temperature in Celcius ---\n")

# get input
print('> Input')
n_city = int(input('\tnumber of city: '))
n_node = int(input('\tnumber of node: '))
print()

# generate nodes 
print(f'> Generating {n_node} node ...', end=' ')
nodes = [ Node(f'node-{i}') for i in range(n_node) ]
print('done!\n')


# get city's ids
print(f'\n> Preparing {n_city} data ...', end=' ')
cities = [city['id'] for city in cities_json[:n_city]]
print('done!\n')


# mapping data to nodes
print('\n> distributing data ...', end=' ')
map(cities, nodes)
print('done!\n')


# joining node result
print('\n> reducing data ...', end=' ')
total = reduce(nodes)
print('done!\n')


# calculating average
print('\n> EARTH\'S TEMPERATURE: %.2f°C' % (total/n_city))