In [1]:
import os

file_path = "TaxiZones.csv"

if os.path.exists(file_path):
    print(f"{file_path} exists.")
else:
    print(f"{file_path} does not exist. Performing extra action...")
    
    !curl -o data.csv https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
    !tail -n +2 data.csv > data_quoted.csv
    !sed 's/"//g' data_quoted.csv > TaxiZones.csv
    !rm data.csv data_quoted.csv

TaxiZones.csv exists.


## On Spark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = (
            SparkSession
                .builder
                .appName("MapReduceApp")
                .master("local[4]")
                .getOrCreate()
        )
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/02 10:02:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
taxiZonesRdd = sc.textFile("TaxiZones.csv")
taxiZonesRdd.take(10)

                                                                                

['1,EWR,Newark Airport,EWR',
 '2,Queens,Jamaica Bay,Boro Zone',
 '3,Bronx,Allerton/Pelham Gardens,Boro Zone',
 '4,Manhattan,Alphabet City,Yellow Zone',
 '5,Staten Island,Arden Heights,Boro Zone',
 '6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone',
 '7,Queens,Astoria,Boro Zone',
 '8,Queens,Astoria Park,Boro Zone',
 '9,Queens,Auburndale,Boro Zone',
 '10,Queens,Baisley Park,Boro Zone']

In [4]:
# Reduce map
boroughCountRdd = (
                    taxiZonesRdd
                        .map( lambda zone: zone.split(",") )
                        .map( lambda zone: (zone[1], 1))
                        .reduceByKey( lambda value1, value2: value1 + value2 )
                  )

boroughCountRdd.collect()

[('EWR', 1),
 ('Manhattan', 69),
 ('Brooklyn', 61),
 ('Unknown', 1),
 ('N/A', 1),
 ('Queens', 69),
 ('Bronx', 43),
 ('Staten Island', 20)]

## On Ray

In [5]:
import ray
from typing import List, Dict, Any


def extract_lines(row: Dict[str, str]) -> Dict[str,str]:
    row['text'] = row['text'].split(',')
    return row

def select_data(row: Dict[str, str]) -> Dict[str,str]:
    row['text'] = row['text'][1]
    return row

taxi_zones_ds = (
    ray.data
    .read_text("TaxiZones.csv")
    .repartition(num_blocks=3)
    .map(extract_lines)
    .map(select_data)
    .groupby('text')
    .count()
    .take_all()
)

result = [(x['text'],x['count()']) for x in taxi_zones_ds]
result

2024-10-02 10:02:13,537	INFO worker.py:1786 -- Started a local Ray instance.
2024-10-02 10:02:17,119	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-02_10-02-11_210831_80229/logs/ray-data
2024-10-02 10:02:17,120	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadText] -> AllToAllOperator[Repartition] -> TaskPoolMapOperator[Map(extract_lines)->Map(select_data)] -> AllToAllOperator[Aggregate]


Running 0: 0.00 row [00:00, ? row/s]

- ReadText->SplitBlocks(24) 1: 0.00 row [00:00, ? row/s]

- Repartition 2: 0.00 row [00:00, ? row/s]

Split Repartition 3:   0%|                                                                                    …

- Map(extract_lines)->Map(select_data) 4: 0.00 row [00:00, ? row/s]

- Aggregate 5: 0.00 row [00:00, ? row/s]

Sort Sample 6:   0%|                                                                                          …

Shuffle Map 7:   0%|                                                                                          …

Shuffle Reduce 8:   0%|                                                                                       …

[('Bronx', 43),
 ('Brooklyn', 61),
 ('EWR', 1),
 ('Manhattan', 69),
 ('N/A', 1),
 ('Queens', 69),
 ('Staten Island', 20),
 ('Unknown', 1)]

[36m(apply_map pid=80266)[0m i
[36m(apply_map pid=80266)[0m b
[36m(apply_map pid=80266)[0m t
[36m(apply_map pid=80266)[0m n
[36m(apply_map pid=80266)[0m a
[36m(apply_map pid=80266)[0m n
[36m(apply_map pid=80266)[0m i
[36m(apply_map pid=80266)[0m o
[36m(apply_map pid=80266)[0m b
[36m(apply_map pid=80266)[0m t
[36m(apply_map pid=80266)[0m *
[36m(apply_map pid=80266)[0m n
[36m(apply_map pid=80266)[0m i
[36m(apply_map pid=80266)[0m t
[36m(apply_map pid=80266)[0m i
[36m(apply_map pid=80266)[0m i
[36m(apply_map pid=80266)[0m h
[36m(apply_map pid=80266)[0m t
[36m(apply_map pid=80266)[0m e
[36m(apply_map pid=80266)[0m i
[36m(apply_map pid=80266)[0m a
[36m(apply_map pid=80266)[0m b
[36m(apply_map pid=80266)[0m i
[36m(apply_map pid=80266)[0m i
[36m(apply_map pid=80266)[0m t
[36m(apply_map pid=80266)[0m i
[36m(apply_map pid=80266)[0m i
[36m(apply_map pid=80266)[0m e
[36m(apply_map pid=80266)[0m t
[36m(apply_map pid=80266)[0m e
[36m(appl

## Working Example

**Via Python Data Types**

From [here](https://docs.ray.io/en/latest/ray-core/examples/map_reduce.html#a-simple-mapreduce-example-with-ray-core)

![](../media/map_reduce.png)

In [6]:
import subprocess
zen_of_python = subprocess.check_output(["python", "-c", "import this"])
corpus = zen_of_python.split()

num_partitions = 3
chunk = len(corpus) // num_partitions
partitions = [
    corpus[i * chunk: (i + 1) * chunk] for i in range(num_partitions)
]

The corpus is equivalent to the full list of words here *The Zen of Python*

In [7]:
corpus[:10]

[b'The',
 b'Zen',
 b'of',
 b'Python,',
 b'by',
 b'Tim',
 b'Peters',
 b'Beautiful',
 b'is',
 b'better']

The repartitionning makes multiple batches

In [8]:
[x[:2] for x in partitions]

[[b'The', b'Zen'], [b'Although', b'practicality'], [b'is', b'better']]

This is a mapper:

In [9]:
def map_function(document):
    for word in document.lower().split():
        yield word, 1

And this is the way the reduce is performed:

Check
1. We apply via `apply_map`
2. We reduce via `apply_reduce`

In [10]:
import ray

@ray.remote
def apply_map(corpus, num_partitions=3):
    map_results = [list() for _ in range(num_partitions)]
    for document in corpus:
        for result in map_function(document):
            first_letter = result[0].decode("utf-8")[0]
            print(first_letter)
            word_index = ord(first_letter) % num_partitions
            map_results[word_index].append(result)
    return map_results

Let first apply the map over several partitions:

- Note the strategy for broadcasting to each partition accordingly (shuffling)
  - Take the first letter
  - Convert it to ordinal so `A`→ 65 via `ord(first_letter)`
  - Take the modulo to say 0,1,2
  - Write into `map_results`

In [11]:
map_results = [
    apply_map
    .options(num_returns=num_partitions)
    .remote(data, num_partitions)
    for data in partitions
]

for i in range(num_partitions):
    mapper_results = ray.get(map_results[i])
    for j, result in enumerate(mapper_results):
        print(f"Mapper {i}, return value {j}: {result[:2]}")

Mapper 0, return value 0: [(b'of', 1), (b'is', 1)]
Mapper 0, return value 1: [(b'python,', 1), (b'peters', 1)]
Mapper 0, return value 2: [(b'the', 1), (b'zen', 1)]
Mapper 1, return value 0: [(b'unless', 1), (b'in', 1)]
Mapper 1, return value 1: [(b'although', 1), (b'practicality', 1)]
Mapper 1, return value 2: [(b'beats', 1), (b'errors', 1)]
Mapper 2, return value 0: [(b'is', 1), (b'is', 1)]
Mapper 2, return value 1: [(b'although', 1), (b'a', 1)]
Mapper 2, return value 2: [(b'better', 1), (b'than', 1)]


Then let reduce by shuffling the data accordingly

In [12]:
@ray.remote
def apply_reduce(*results):
    reduce_results = dict()
    for res in results:
        for key, value in res:
            if key not in reduce_results:
                reduce_results[key] = 0
            reduce_results[key] += value

    return reduce_results

In [13]:
outputs = []
for i in range(num_partitions):
    outputs.append(
        apply_reduce.remote(*[partition[i] for partition in map_results])
    )

counts = {k: v for output in ray.get(outputs) for k, v in output.items()}

sorted_counts = sorted(counts.items(), key=lambda item: item[1], reverse=True)
for count, _ in zip(sorted_counts,range(10)):
    print(f"{count[0].decode('utf-8')}: {count[1]}")
print('...')

is: 10
better: 8
than: 8
the: 6
to: 5
of: 3
although: 3
be: 3
unless: 2
one: 2
...


## Now same usecase

In [14]:
import csv

corpus = []
with open('TaxiZones.csv') as csvfile:
    reader = csv.reader(csvfile, delimiter=',', quotechar='\"') 
    for row in reader:
        corpus.append(row[1])
        
num_partitions = 3
chunk = len(corpus) // num_partitions
partitions = [
    corpus[i * chunk: (i + 1) * chunk] for i in range(num_partitions)
]
[x[:2] for x in partitions]

[['EWR', 'Queens'], ['Brooklyn', 'Manhattan'], ['Brooklyn', 'Brooklyn']]

In [15]:
@ray.remote
def apply_map(corpus, num_partitions=3):
    map_results = [list() for _ in range(num_partitions)]
    for document in corpus:
        for result in map_function(document):
            first_letter = result[0][0]
            word_index = ord(first_letter) % num_partitions # Strategy to broadcast to each partition
            map_results[word_index].append(result)
    return map_results

map_results = [
    apply_map
    .options(num_returns=num_partitions)
    .remote(data, num_partitions)
    for data in partitions
]

for i in range(num_partitions):
    mapper_results = ray.get(map_results[i])
    for j, result in enumerate(mapper_results):
        print(f"Mapper {i}, return value {j}: {result[:2]}")

Mapper 0, return value 0: [('island', 1), ('island', 1)]
Mapper 0, return value 1: [('manhattan', 1), ('staten', 1)]
Mapper 0, return value 2: [('ewr', 1), ('queens', 1)]
Mapper 1, return value 0: [('island', 1), ('island', 1)]
Mapper 1, return value 1: [('manhattan', 1), ('staten', 1)]
Mapper 1, return value 2: [('brooklyn', 1), ('brooklyn', 1)]
Mapper 2, return value 0: [('island', 1), ('island', 1)]
Mapper 2, return value 1: [('manhattan', 1), ('staten', 1)]
Mapper 2, return value 2: [('brooklyn', 1), ('brooklyn', 1)]


In [16]:
outputs = []
for i in range(num_partitions):
    outputs.append(
        apply_reduce.remote(*[partition[i] for partition in map_results])
    )

counts = {k: v for output in ray.get(outputs) for k, v in output.items()}

sorted_counts = sorted(counts.items(), key=lambda item: item[1], reverse=True)
for count, _ in zip(sorted_counts,range(10)):
    print(f"{count[0]}: {count[1]}")
print('...')

manhattan: 69
queens: 69
brooklyn: 61
bronx: 43
island: 20
staten: 20
unknown: 1
ewr: 1
...
