# Big Data Management and Processing

Exercises submitted via Python Notebook to keep queries and output close together and easily readable :)

## Setup and imports

In [1]:
from bson.code import Code
import pandas as pd
from pprint import pprint
from pymongo import MongoClient

client = MongoClient('localhost', 27017)
cities = client.bdmp['cities']
regions = client.bdmp['regions']
countries = client.bdmp['countries']

## Exercise 1: Queries

Select all Austrian cities (`countryID = 15`)!

In [2]:
pd.DataFrame(cities.find({'CountryID': 15}))

Unnamed: 0,_id,CityId,CountryID,RegionID,City,Latitude,Longitude,TimeZone,DmaId,Code
0,50bebc7714b6500c5cb78ffe,3352,15,1083,Abtenau,47.549999,13.35,+01:00,0,ABTE
1,50bebc7714b6500c5cb78fff,3353,15,1080,Graz,47.067001,15.45,+01:00,0,GRAZ
2,50bebc7814b6500c5cb79000,3354,15,1081,Kitzbuhel,47.450001,12.383,+01:00,0,KITZ
3,50bebc7814b6500c5cb79001,3355,15,1077,Lilienfeld,48.016998,15.633,+01:00,0,LILI
4,50bebc7814b6500c5cb79002,3356,15,1078,Linz,48.299999,14.3,+01:00,0,LINZ
5,50bebc7914b6500c5cb79003,3357,15,1079,Salzburg,47.799999,13.033,+01:00,0,SALZ
6,50bebc7914b6500c5cb79004,3358,15,1081,Solden,46.966999,11.0,+01:00,0,SOLD
7,50bebc7914b6500c5cb79005,3361,15,1075,Stegersbach,47.167,16.167,+01:00,0,STEG
8,50bebc7a14b6500c5cb79006,3362,15,1078,Steyr,48.049999,14.417,+01:00,0,STEY
9,50bebde014b6500c5cb79340,6033,15,1081,Innsbruck,47.266998,11.4,+01:00,0,INNS


Select all Austrian cities in ascending order!

In [3]:
pd.DataFrame(cities
             .find({'CountryID': 15})
             .sort([('City', 1)]))

Unnamed: 0,_id,CityId,CountryID,RegionID,City,Latitude,Longitude,TimeZone,DmaId,Code
0,50bebc7714b6500c5cb78ffe,3352,15,1083,Abtenau,47.549999,13.35,+01:00,0,ABTE
1,50bebe9a14b6500c5cb79508,14860,15,1078,Bad Hall,48.033001,14.2,+01:00,0,BHAL
2,50bebf1614b6500c5cb79627,17233,15,1077,Bad V,47.966999,16.200001,+01:00,0,BV
3,50bebe2314b6500c5cb793e9,12466,15,1077,Baden,48.016998,16.233,+01:00,0,BADE
4,50bebf0414b6500c5cb795fc,16781,15,1076,Bodensdorf,46.691002,13.971,+01:00,0,BODE
5,50bebe1f14b6500c5cb793e1,12399,15,1082,Bregenz,47.5,9.767,+01:00,0,BREG
6,50bebe2414b6500c5cb793ea,12467,15,1076,D,46.783001,13.66,+01:00,0,D
7,50bebe9014b6500c5cb794f3,14725,15,1082,Dornbirn,47.417,9.733,+01:00,0,DORN
8,50bebf7e14b6500c5cb7970f,41890,15,1077,Ebreichsdorf,47.950001,16.4,+01:00,0,EBRE
9,50bebefe14b6500c5cb795eb,16597,15,1075,Eisenstadt,47.849998,16.517,+01:00,0,EISE


Select all Austrian cities in descending order!

In [4]:
pd.DataFrame(cities
             .find({'CountryID': 15})
             .sort([('City', -1)]))

Unnamed: 0,_id,CityId,CountryID,RegionID,City,Latitude,Longitude,TimeZone,DmaId,Code
0,50bebe6014b6500c5cb79482,13801,15,1077,Zwettl,48.617001,15.167,+01:00,0,ZWET
1,50bebe3314b6500c5cb79414,12867,15,1081,Zirl,47.283001,11.233,+01:00,0,ZIRL
2,50bebf5114b6500c5cb796a2,18389,15,1079,Zell am See,47.317001,12.783,+01:00,0,ZASE
3,50bebe4d14b6500c5cb79457,13364,15,1077,Wieselburg,48.132999,15.133,+01:00,0,WIES
4,50bebe5714b6500c5cb79471,13625,15,1077,Wiener Neustadt,47.799999,16.25,+01:00,0,WNEU
5,50bebf1614b6500c5cb79628,17234,15,1083,Wien,48.200001,16.367001,+01:00,0,WIEN
6,50bebf0f14b6500c5cb79619,17005,15,1078,Wels,48.167,14.033,+01:00,0,WELS
7,50bebe3914b6500c5cb79422,12956,15,1081,W,47.483002,12.067,+01:00,0,W
8,50bebf0414b6500c5cb795fb,16779,15,1076,Villach,46.610001,13.856,+01:00,0,VILL
9,50bebde014b6500c5cb79341,6034,15,1083,Vienna,48.200001,16.367001,+01:00,0,VIEN


Select the number of Austrian cities that are included in the dataset!

In [5]:
cities.count_documents({'CountryID': 15})

54

Select solely the city names of all Austrian cities!

In [6]:
pd.DataFrame(cities.find({'CountryID': 15}, {'City': 1, '_id': 0}))

Unnamed: 0,City
0,Abtenau
1,Graz
2,Kitzbuhel
3,Lilienfeld
4,Linz
5,Salzburg
6,Solden
7,Stegersbach
8,Steyr
9,Innsbruck


Select all countries, which exhibit a population between 15 and 20 millions of people!

In [7]:
pd.DataFrame(countries.find({'Population': {'$gte': 1.5e7, '$lte': 2e7}}))

Unnamed: 0,_id,CountryId,Country,FIPS104,ISO2,ISO3,ISON,Internet,Capital,MapReference,NationalitySingular,NationalityPlural,Currency,CurrencyCode,Population,Title,Comment
0,50bebfc814b6500c5cb797c6,42,Cameroon,CM,CM,CMR,120,CM,Yaounde,Africa,Cameroonian,Cameroonians,CFA Franc BEAC,XAF,15803220,Cameroon,
1,50bebfc914b6500c5cb797cb,48,Chile,CI,CL,CHL,152,CL,Santiago,South America,Chilean,Chileans,Chilean Peso,CLP,15328467,Chile,
2,50bebfce14b6500c5cb797d6,60,Cote d'Ivoire,IV,CI,CIV,384,CI,Yamoussoukro,Africa,Ivorian,Ivorians,CFA Franc BCEAO,XOF,16393221,Cote d'Ivoire,
3,50bebfdc14b6500c5cb797f5,92,Ghana,GH,GH,GHA,288,GH,Accra,Africa,Ghanaian,Ghanaians,Cedi,GHC,19894014,Ghana,


## Exercise 2: Updates

In [8]:
pd.DataFrame(countries.find({'CountryId': 15}))

Unnamed: 0,_id,CountryId,Country,FIPS104,ISO2,ISO3,ISON,Internet,Capital,MapReference,NationalitySingular,NationalityPlural,Currency,CurrencyCode,Population,Title,Comment
0,50bebfbd14b6500c5cb797ab,15,Austria,AU,AT,AUT,40,AT,Vienna,Europe,Austrian,Austrians,Euro,EUR,8150895,Austria,


Increase the population of Austria (`countryID = 15`) by 3 persons

In [9]:
countries.update_one({'CountryId': 15}, {'$inc': {'Population': 3}})
pd.DataFrame(countries.find({'CountryId': 15}))

Unnamed: 0,_id,CountryId,Country,FIPS104,ISO2,ISO3,ISON,Internet,Capital,MapReference,NationalitySingular,NationalityPlural,Currency,CurrencyCode,Population,Title,Comment
0,50bebfbd14b6500c5cb797ab,15,Austria,AU,AT,AUT,40,AT,Vienna,Europe,Austrian,Austrians,Euro,EUR,8150898,Austria,


In [10]:
countries.update_one({'CountryId': 15}, {'$inc': {'Population': -3}})
pd.DataFrame(countries.find({'CountryId': 15}))

Unnamed: 0,_id,CountryId,Country,FIPS104,ISO2,ISO3,ISON,Internet,Capital,MapReference,NationalitySingular,NationalityPlural,Currency,CurrencyCode,Population,Title,Comment
0,50bebfbd14b6500c5cb797ab,15,Austria,AU,AT,AUT,40,AT,Vienna,Europe,Austrian,Austrians,Euro,EUR,8150895,Austria,


## Exercise 3: MapReduce: Selection

Write a MapReduce program, which returns all Austrian cities (full documents!).

In [11]:
map = Code('function () { emit(this._id, this); }')
reduce = Code('function (key, values) { return values; }')
pd.DataFrame(cities.map_reduce(map, reduce, out='all_cities', query={'CountryID': 15}).find({}))

Unnamed: 0,_id,value
0,50bebc7714b6500c5cb78ffe,"{'_id': 50bebc7714b6500c5cb78ffe, 'CityId': 33..."
1,50bebc7714b6500c5cb78fff,"{'_id': 50bebc7714b6500c5cb78fff, 'CityId': 33..."
2,50bebc7814b6500c5cb79000,"{'_id': 50bebc7814b6500c5cb79000, 'CityId': 33..."
3,50bebc7814b6500c5cb79001,"{'_id': 50bebc7814b6500c5cb79001, 'CityId': 33..."
4,50bebc7814b6500c5cb79002,"{'_id': 50bebc7814b6500c5cb79002, 'CityId': 33..."
5,50bebc7914b6500c5cb79003,"{'_id': 50bebc7914b6500c5cb79003, 'CityId': 33..."
6,50bebc7914b6500c5cb79004,"{'_id': 50bebc7914b6500c5cb79004, 'CityId': 33..."
7,50bebc7914b6500c5cb79005,"{'_id': 50bebc7914b6500c5cb79005, 'CityId': 33..."
8,50bebc7a14b6500c5cb79006,"{'_id': 50bebc7a14b6500c5cb79006, 'CityId': 33..."
9,50bebde014b6500c5cb79340,"{'_id': 50bebde014b6500c5cb79340, 'CityId': 60..."


## Exercise 4: MapReduce: Projection with Duplicates 

Write a MapReduce program, which returns **solely the city names** of all Austrian cities. If duplicates are included, they should be **kept** in the result. How many Austrian city names are included in the dataset?

In [12]:
map = Code('function () { emit(this._id, this.City); }')
reduce = Code('function (key, values) { return values; }')
pd.DataFrame(cities.map_reduce(map, reduce, out='city_names_duplicates', query={'CountryID': 15}).find({}, {'_id': 0}))

Unnamed: 0,value
0,Abtenau
1,Graz
2,Kitzbuhel
3,Lilienfeld
4,Linz
5,Salzburg
6,Solden
7,Stegersbach
8,Steyr
9,Innsbruck


## Exercise 5: MapReduce: Projection without Duplicates

Write a MapReduce program, which returns **solely the city names** of all Austrian cities. If duplicates are included, they should be **removed** from the result. Did duplicates arise? How many?

In [13]:
map = Code('function () { emit(this.City, this.City); }')
# just use first entry of values-list
reduce = Code('function (key, values) { return values[0]; }')
pd.DataFrame(cities.map_reduce(map, reduce, out='city_names_no_duplicates', query={'CountryID': 15}).find({}, {'_id': 0}))

Unnamed: 0,value
0,Abtenau
1,Bad Hall
2,Bad V
3,Baden
4,Bodensdorf
5,Bregenz
6,D
7,Dornbirn
8,Ebreichsdorf
9,Eisenstadt


## Exercise 6: MapReduce: Aggregation

Write a MapReduce program, which returns the total population of countries having EURO as currency. How many people are returned?

In [14]:
map = Code('function () { emit(this.CurrencyCode, this.Population); }')
# just use first entry of values-list
reduce = Code('function (key, values) { return values[0]; }')
pd.DataFrame(countries.map_reduce(map, reduce, out='currency_code_euro', query={'CurrencyCode': 'EUR'}).find({}, {'_id': 0}))

Unnamed: 0,value
0,10258762.0


In [15]:
map = Code('function () { emit(this.Currency, this.Population); }')
# just use first entry of values-list
reduce = Code('function (key, values) { return values[0]; }')
pd.DataFrame(countries.map_reduce(map, reduce, out='currency_euro', query={'Currency': 'Euro'}).find({}, {'_id': 0}))

Unnamed: 0,value
0,10258762.0


## Exercise 7: MapReduce: Join

Write two MapReduce programs, which together produce a collection that represents a join between attributes of the `countries` and the `regions` collections. In terms of SQL, approximately the following query should be realized:

```SQL
SELECT countries.Country, regions.Region
FROM countries, regions
WHERE regions.CountryID = countries.CountryId
AND regions.CountryID = 15;
```

The term “approximately” has been used, since the above query would return **one tuple for each region**. In contrast, the MapReduce program should return **a single document** with an array containing all regions.

**Desired result:**
```
{
 "_id" : 15.0,
 "value" : {
 "Country" : "Austria",
 "Region" : ["Austria", "Burgenland", "Karnten",
 "Niederosterreich", "Oberosterreich", "Salzburg", "Steiermark",
 "Tirol", "Vorarlberg", "Wien"]
 }
}
```

**Hints for solution:**

For achieving combined results, consider “reduce” semantics of MapReduce! Keep in mind that reduce may be applied to **structurally equal** documents, only! Pay attention to the three properties that must be guaranteed by the reduce function:

- `reduce(key, [ C, reduce(key, [ A, B ]) ] ) == reduce (key, [ C, A, B ] )`
- `reduce( key, [ reduce(key, [ A, B ]) ] ) == reduce ( key, [ A, B ] )`
- `reduce ( key, [ A, B ] ) == reduce ( key, [ B, A ] )`

Consequently, test the program with several invocations and in different execution order (e.g., first apply MapReduce to `countries`, then to `regions` and the other way round)!

In [16]:
map_regions = Code('function () { emit(this.CountryID, {_id: this.CountryID, Country: "", Region: [this.Region]}); }')
map_country = Code('function () { emit(this.CountryId, {_id: this.CountryId, Country: this.Country, Region: []}); }')

reduce = Code("""function (key, values) {
    const ret = {Country: "", Region: []};
    values.forEach(value => {
        if (ret.Country === "") {
            ret.Country = value.Country;
        }
        value.Region.forEach(region => {
            if (!ret.Region.includes(region)) {
                ret.Region.push(region);
            }
        });
    });
    return ret;
}""")

Testing invariance by order of execution

In [17]:
result = countries.map_reduce(map_country, reduce, out={'reduce': 'joined_country_first'})
result = regions.map_reduce(map_regions, reduce, out={'reduce': 'joined_country_first'})

for doc in result.find({'_id': 15}):
    pprint(doc)

{'_id': 15.0,
 'value': {'Country': 'Austria',
           'Region': ['Austria',
                      'Burgenland',
                      'Karnten',
                      'Niederosterreich',
                      'Oberosterreich',
                      'Salzburg',
                      'Steiermark',
                      'Tirol',
                      'Vorarlberg',
                      'Wien']}}


In [18]:
result = regions.map_reduce(map_regions, reduce, out={'reduce': 'joined_regions_first'})
result = countries.map_reduce(map_country, reduce, out={'reduce': 'joined_regions_first'})

for doc in result.find({'_id': 15}):
    pprint(doc)

{'_id': 15.0,
 'value': {'Country': 'Austria',
           'Region': ['Austria',
                      'Burgenland',
                      'Karnten',
                      'Niederosterreich',
                      'Oberosterreich',
                      'Salzburg',
                      'Steiermark',
                      'Tirol',
                      'Vorarlberg',
                      'Wien']}}
