<a href="https://colab.research.google.com/github/adrian-alejandro/BDMA/blob/main/data-management/distributed-processing/MapReduce_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# MapReduce I

In [None]:
!pip3 install pyspark
!wget -O "adult.1000.sf" "https://mydisk.cs.upc.edu/s/DPYbCoMnF9y68by/download/adult.1000.sf"
import pyspark
from pyspark.sql import SparkSession
from pprint import pprint
import random

attributes = ["id","age","workclass","fnlwgt","education","education_num","marital_status","relationship","race","sex","capital_gain","capital_loss","hours_per_week","native_country"]
def getAttribute(row,attribute):
    for i,att in enumerate(attributes):
        if (att == attribute):
            return row[i]
    return None

def executeMapReduce(mapFunction, reduceFunction):
    sc = pyspark.SparkContext.getOrCreate()
    pprint(sc.sequenceFile("adult.1000.sf").flatMap(lambda t: mapFunction(t[0],t[1])).groupByKey().flatMap(lambda t: reduceFunction(t[0],t[1])).take(10))

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=6dc4f9f5d8c7176752d4513c6f9b353306b412d2097a18792e74301209fb4f35
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
--2023-11-03 07:54:57--  https://mydisk.cs.upc.edu/s/DPYbCoMnF9y68by/download/adult.1000.sf
Resolving mydisk.cs.upc.edu (mydisk.cs.upc.edu)... 147.83.29.248
Connecting to mydisk.cs.upc.edu (mydisk.cs.upc.edu)|147.83.29.248|:443... connected.
HTTP request sent, awaiting respo

# Session 1

In this session you will be able to implement relational operators using MapReduce.

## Dataset

In this session we will use the [Adult dataset](https://archive.ics.uci.edu/ml/datasets/Adult), containing information about census and their income. You can check the files `adult.names` to get a better understanding of the schema of data being used. As input data, we provide you with a `SequenceFile` dataset (`adult.1000.sf`) where the key is a surrogate ID, and the value is a comma separated set of attributes conforming to the schema in `adult.names`. The following tuple is an example of the file:

```
('GtdDh4aF', '18,Local-gov,674771,Doctorate,8,Widowed,Wife,Other,Female,44859,8519,31,Yugoslavia')
```

Furthermore, we provide you with the method `Utils.get(array,attribute)`, which returns the projection for a specific attribute in the array. Note that the array should also contain the key as the first value (see the provided example).

## Example 1) Projection

We provide you with the implementation of the projection operator. In SQL it would correspond to the following query:
```sql
SELECT DISTINCT age, relationship, native_country FROM adult;
```

In [None]:
def projection_map(k,v):
    tupl = (k+","+v)
    return [(getAttribute(tupl.split(","),"age")+","+
             getAttribute(tupl.split(","),"relationship")+","+getAttribute(tupl.split(","),"native_country"),1)]

def projection_reduce(k, lv):
    return [k]

executeMapReduce(projection_map,projection_reduce)

[]


## Example 2) Cross product

We provide you with the implementation of the cross product operator. In SQL it would correspond to the following query:
```sql
SELECT external.*, internal.*
FROM adult as internal, adult as external
WHERE external.native_country = "Italy" AND internal.native_country = "Ecuador"
```

In [None]:
N = 100

def crossproduct_map(k,v):
    tupl = k+","+v
    if "Italy" in getAttribute(tupl.split(","),"native_country"):
        return [(random.randint(0,N),tupl)]
    elif "Ecuador" in getAttribute(tupl.split(","),"native_country"):
        retValue = []
        for i in range(N):
            retValue.append((i,tupl))
        return retValue
    return []

def crossproduct_reduce(k, lv):
    italy = []
    ecuador = []
    for v in lv:
        if "Italy" in getAttribute(v.split(","),"native_country"):
            italy.append(v)
        elif "Ecuador" in getAttribute(v.split(","),"native_country"):
            ecuador.append(v)
    retValue = []
    for x in italy:
        for y in ecuador:
            retValue.append((None,x+"<->"+y))
    return retValue

executeMapReduce(crossproduct_map,crossproduct_reduce)

## Exercise 1) Selection

Implement the selection operator. In SQL it would correspond to the following query:

```sql
SELECT * FROM adult WHERE workclass = "Private"
```

In [None]:
def selection_map(k,v):
    return None

def selection_reduce(k, lv):
    return None

executeMapReduce(selection_map,selection_reduce)

## Exercise 2) Grouping

Implement the grouping operator (note this operation does not exist in standard SQL). In SQL it would correspond to the following query:

```sql
SELECT native_country, list(capital_gain) FROM adult GROUP BY native_country
```

In [None]:
def grouping_map(k,v):
    return None

def grouping_reduce(k, lv):
    return None

executeMapReduce(grouping_map,grouping_reduce)

## Exercise 3) Aggregation

Implement the aggregation operator (note this operation does not exist in standard SQL). In SQL it would correspond to the following query:

```sql
SELECT native_country, SUM(capital_gain) FROM adult GROUP BY native_country
```

In [None]:
def aggregation_map(k,v):
    return None

def aggregation_reduce(k, lv):
    return None

executeMapReduce(aggregation_map,aggregation_reduce)

## Exercise 4) Union

Implement the union operator. In SQL it would correspond to the following query:

```sql
SELECT capital_gain FROM adult a1 WHERE native_country = "Italy"
UNION
SELECT capital_loss FROM adult a2 WHERE native_country = "Ecuador"
```

In [None]:
def union_map(k,v):
    return None

def union_reduce(k, lv):
    return None

executeMapReduce(union_map,union_reduce)

## Exercise 5) Difference

Implement the difference operator. In SQL it would correspond to the following query:

```sql
SELECT age FROM adult a1 WHERE native_country = "Italy" EXCEPT
SELECT age FROM adult a2 WHERE native_country = "Ecuador"
```

In [None]:
def difference_map(k,v):
    return None

def difference_reduce(k, lv):
    return None

executeMapReduce(difference_map,difference_reduce)

## Exercise 6) Intersection

Implement the intersection (based on one attribute) operator. In SQL it would correspond to the following query:

```sql
SELECT age FROM adult a1 WHERE native_country = "Italy" INTERSECT
SELECT age FROM adult a2 WHERE native_country = "Ecuador"
```

In [None]:
def intersection_map(k,v):
    return None

def intersection_reduce(k, lv):
    return None

executeMapReduce(intersection_map,intersection_reduce)

## Exercise 7) Join

Implement the join operator. In SQL it would correspond to the following query:

```sql
SELECT external.*, internal.*
FROM adult as internal INNER JOIN adult as external ON internal.marital_status = external.marital_status
WHERE external.native_country = "Italy" AND internal.native_country = "Ecuador"
```

In [None]:
def join_map(k,v):
    return None

def join_reduce(k, lv):
    return None

executeMapReduce(join_map,join_reduce)