Skip to content
Permalink
Browse files

week 2 example completed

  • Loading branch information
cch123 committed May 8, 2019
1 parent cc08ee0 commit fa90ad0bc45af1e600a356f0c710168727e95cef
Showing with 77 additions and 6 deletions.
  1. +1 −1 tidb/mapreduce/Makefile
  2. +76 −5 tidb/mapreduce/mapreduce.go
@@ -3,7 +3,7 @@
all: test_example test_homework cleanup gendata

test_example:
go test -v -run=TestExampleURLTop
go test -v -run=TestExampleURLTop -timeout 20m

test_homework:
go test -v -run=TestURLTop
@@ -2,7 +2,11 @@ package main

import (
"bufio"
"encoding/json"
"bytes"
"fmt"

jsoniter "github.com/json-iterator/go"

"hash/fnv"
"io/ioutil"
"log"
@@ -11,6 +15,7 @@ import (
"runtime"
"strconv"
"sync"
"time"
)

// KeyValue is a type used to hold the key/value pairs passed to the map and reduce functions.
@@ -82,6 +87,7 @@ func (c *MRCluster) Start() {

func (c *MRCluster) worker() {
defer c.wg.Done()
var json = jsoniter.ConfigCompatibleWithStandardLibrary
for {
select {
case t := <-c.taskCh:
@@ -108,8 +114,46 @@ func (c *MRCluster) worker() {
SafeClose(fs[i], bs[i])
}
} else {
// YOUR CODE HERE :)
panic("YOUR CODE HERE")
// reduce 需要把本 phase 所有相关的 map phase 的文件收集起来
// 一起处理
mergeFileName := mergeName(t.dataDir, t.jobName, t.taskNumber)
fs, bs := CreateFileAndBuf(mergeFileName)
var kvMap = map[string][]string{}
for i := 0; i < t.nMap; i++ {
fileName := reduceName(t.dataDir, t.jobName, i, t.taskNumber)
content, err := ioutil.ReadFile(fileName)
if err != nil {
panic(err)
}

lines := bytes.Split(content, []byte("\n"))
for _, l := range lines {
// skip empty lines
if len(l) == 0 {
continue
}

var kv KeyValue
err = json.Unmarshal(l, &kv)
if err != nil {
panic(err)
}

kvMap[kv.Key] = append(kvMap[kv.Key], kv.Value)
}
}

for k, vList := range kvMap {
reduceResult := t.reduceF(k, vList)

// write res to merge file
_, err := bs.Write([]byte(reduceResult))
if err != nil {
panic(err)
}
}
SafeClose(fs, bs)

}
t.wg.Done()
case <-c.exit:
@@ -155,8 +199,35 @@ func (c *MRCluster) run(jobName, dataDir string, mapF MapF, reduceF ReduceF, map
}

// reduce phase
// YOUR CODE HERE :D
panic("YOUR CODE HERE")
startTime := time.Now()
tasks = make([]*task, 0, nReduce)
for i := 0; i < nReduce; i++ {
t := &task{
dataDir: dataDir,
jobName: jobName,
phase: reducePhase,
taskNumber: i,
nReduce: nReduce,
nMap: nMap,
reduceF: reduceF,
}
t.wg.Add(1)
tasks = append(tasks, t)
go func() { c.taskCh <- t }()
}

// 任务收集阶段,需要把最终 merge 到的 filename 传出
// 这里稍微有一些疑惑
notifyList := []string{}
for _, t := range tasks {
t.wg.Wait()
mergefileName := mergeName(t.dataDir, t.jobName, t.taskNumber)
notifyList = append(notifyList, mergefileName)
}
du := time.Now().Sub(startTime)
fmt.Println("reduce time: ", du)

notify <- notifyList
}

func ihash(s string) int {

0 comments on commit fa90ad0

Please sign in to comment.
You can’t perform that action at this time.