@@ -0,0 +1,93 @@
package lectures
package algorithms

import scala.reflect.ClassTag
import org.scalameter._

class ConcBuffer[@specialized(Byte, Char, Int, Long, Float, Double) T: ClassTag](
val k: Int, private var conc: Conc[T]
) extends Traversable[T] {
require(k > 0)

def this() = this(128, Conc.Empty)

private var chunk: Array[T] = new Array(k)
private var lastSize: Int = 0

def foreach[U](f: T => U): Unit = {
conc.foreach(f)

var i = 0
while (i < lastSize) {
f(chunk(i))
i += 1
}
}

final def +=(elem: T): this.type = {
if (lastSize >= k) expand()
chunk(lastSize) = elem
lastSize += 1
this
}

final def combine(that: ConcBuffer[T]): ConcBuffer[T] = {
val combinedConc = this.result <> that.result
this.clear()
that.clear()
new ConcBuffer(k, combinedConc)
}

private def pack() {
conc = Conc.appendTop(conc, new Conc.Chunk(chunk, lastSize, k))
}

private def expand() {
pack()
chunk = new Array(k)
lastSize = 0
}

def clear() {
conc = Conc.Empty
chunk = new Array(k)
lastSize = 0
}

def result: Conc[T] = {
pack()
conc
}
}

object ConcBuffer {

val standardConfig = config(
Key.exec.minWarmupRuns -> 20,
Key.exec.maxWarmupRuns -> 40,
Key.exec.benchRuns -> 60,
Key.verbose -> true
) withWarmer(new Warmer.Default)

def main(args: Array[String]) {
val size = 1000000

def run(p: Int) {
val taskSupport = new collection.parallel.ForkJoinTaskSupport(
new scala.concurrent.forkjoin.ForkJoinPool(p))
val strings = (0 until size).map(_.toString)
val time = standardConfig measure {
val parallelized = strings.par
parallelized.tasksupport = taskSupport
parallelized.aggregate(new ConcBuffer[String])(_ += _, _ combine _).result
}
println(s"p = $p, time = $time ms")
}

run(1)
run(2)
run(4)
run(8)
}

}
@@ -0,0 +1,144 @@
package lectures
package algorithms

import org.scalameter._
import common._

object MergeSort {
// a bit of reflection to access the private sort1 method, which takes an offset and an argument
private val sort1 = {
val method = scala.util.Sorting.getClass.getDeclaredMethod("sort1", classOf[Array[Int]], classOf[Int], classOf[Int])
method.setAccessible(true)
(xs: Array[Int], offset: Int, len: Int) => {
method.invoke(scala.util.Sorting, xs, offset.asInstanceOf[AnyRef], len.asInstanceOf[AnyRef])
}
}

def quickSort(xs: Array[Int], offset: Int, length: Int): Unit = {
sort1(xs, offset, length)
}

@volatile var dummy: AnyRef = null

def parMergeSort(xs: Array[Int], maxDepth: Int): Unit = {
// 1) Allocate a helper array.
// This step is a bottleneck, and takes:
// - ~76x less time than a full quickSort without GCs (best time)
// - ~46x less time than a full quickSort with GCs (average time)
// Therefore:
// - there is a almost no performance gain in executing allocation concurrently to the sort
// - doing so would immensely complicate the algorithm
val ys = new Array[Int](xs.length)
dummy = ys

// 2) Sort the elements.
// The merge step has to do some copying, and is the main performance bottleneck of the algorithm.
// This is due to the final merge call, which is a completely sequential pass.
def merge(src: Array[Int], dst: Array[Int], from: Int, mid: Int, until: Int) {
var left = from
var right = mid
var i = from
while (left < mid && right < until) {
while (left < mid && src(left) <= src(right)) {
dst(i) = src(left)
i += 1
left += 1
}
while (right < until && src(right) <= src(left)) {
dst(i) = src(right)
i += 1
right += 1
}
}
while (left < mid) {
dst(i) = src(left)
i += 1
left += 1
}
while (right < until) {
dst(i) = src(right)
i += 1
right += 1
}
}
// Without the merge step, the sort phase parallelizes almost linearly.
// This is because the memory pressure is much lower than during copying in the third step.
def sort(from: Int, until: Int, depth: Int): Unit = {
if (depth == maxDepth) {
quickSort(xs, from, until - from)
} else {
val mid = (from + until) / 2
val right = task {
sort(mid, until, depth + 1)
}
sort(from, mid, depth + 1)
right.join()

val flip = (maxDepth - depth) % 2 == 0
val src = if (flip) ys else xs
val dst = if (flip) xs else ys
merge(src, dst, from, mid, until)
}
}
sort(0, xs.length, 0)

// 3) In parallel, copy the elements back into the source array.
// Executed sequentially, this step takes:
// - ~23x less time than a full quickSort without GCs (best time)
// - ~16x less time than a full quickSort with GCs (average time)
// There is a small potential gain in parallelizing copying.
// However, most Intel processors have a dual-channel memory controller,
// so parallel copying has very small performance benefits.
def copy(src: Array[Int], target: Array[Int], from: Int, until: Int, depth: Int): Unit = {
if (depth == maxDepth) {
Array.copy(src, from, target, from, until - from)
} else {
val mid = from + ((until - from) / 2)
val right = task {
copy(src, target, mid, until, depth + 1)
}
copy(src, target, from, mid, depth + 1)
right.join()
}
}
if (maxDepth % 2 != 0) {
copy(ys, xs, 0, xs.length, 0)
}
}

val standardConfig = config(
Key.exec.minWarmupRuns -> 20,
Key.exec.maxWarmupRuns -> 60,
Key.exec.benchRuns -> 60,
Key.verbose -> true
) withWarmer(new Warmer.Default)

def initialize(xs: Array[Int]) {
var i = 0
while (i < xs.length) {
xs(i) = i % 100
i += 1
}
}

def main(args: Array[String]) {
val length = 10000000
val maxDepth = 7
val xs = new Array[Int](length)
val seqtime = standardConfig setUp {
_ => initialize(xs)
} measure {
quickSort(xs, 0, xs.length)
}
println(s"sequential sum time: $seqtime ms")

val partime = standardConfig setUp {
_ => initialize(xs)
} measure {
parMergeSort(xs, maxDepth)
}
println(s"fork/join time: $partime ms")
println(s"speedup: ${seqtime / partime}")
}

}
@@ -0,0 +1,10 @@
package lectures

package object algorithms {

implicit class ConcOps[T](val self: Conc[T]) extends AnyVal {
def foreach[U](f: T => U) = Conc.traverse(self, f)
def <>(that: Conc[T]) = Conc.concatTop(self.normalized, that.normalized)
}

}
@@ -0,0 +1,38 @@
package lectures
package dataparallelism

import scala.collection._
import org.scalameter._

object Agnostic {

val standardConfig = config(
Key.exec.minWarmupRuns -> 20,
Key.exec.maxWarmupRuns -> 40,
Key.exec.benchRuns -> 60,
Key.verbose -> true
) withWarmer(new Warmer.Default)

val array = (0 until 1000000).toArray

def largestPalindrome(xs: GenSeq[Int]): Int = {
xs.aggregate(0)(
(largest, n) => if (n > largest && n.toString == n.toString.reverse) n else largest,
math.max
)
}

def main(args: Array[String]) {
val seqtime = standardConfig measure {
largestPalindrome(array)
}
println(s"sequential time: $seqtime ms")

val partime = standardConfig measure {
largestPalindrome(array.par)
}
println(s"parallel time: $partime ms")
println(s"speedup: ${seqtime / partime}")
}

}
@@ -0,0 +1,35 @@
package lectures
package dataparallelism

import org.scalameter._

object ArrayInitialization {

val standardConfig = config(
Key.exec.minWarmupRuns -> 20,
Key.exec.maxWarmupRuns -> 40,
Key.exec.benchRuns -> 60,
Key.verbose -> true
) withWarmer(new Warmer.Default)

val array = new Array[Int](100000000)

def main(args: Array[String]) {
val value = 100
val seqtime = standardConfig measure {
for (i <- 0 until array.length) {
array(i) = value
}
}
println(s"sequential time: $seqtime ms")

val partime = standardConfig measure {
for (i <- (0 until array.length).par) {
array(i) = value
}
}
println(s"parallel time: $partime ms")
println(s"speedup: ${seqtime / partime}")
}

}
@@ -0,0 +1,31 @@
package lectures
package dataparallelism

import org.scalameter._

object CharCount {

val standardConfig = config(
Key.exec.minWarmupRuns -> 20,
Key.exec.maxWarmupRuns -> 40,
Key.exec.benchRuns -> 60,
Key.verbose -> true
) withWarmer(new Warmer.Default)

val txt = "A short text..." * 500000
val ps = new ParString(txt)

def main(args: Array[String]) {
val seqtime = standardConfig measure {
txt.foldLeft(0)((x, y) => x + 1)
}
println(s"sequential time: $seqtime ms")

val partime = standardConfig measure {
ps.aggregate(0)((x, y) => x + 1, _ + _)
}
println(s"parallel time: $partime ms")
println(s"speedup: ${seqtime / partime}")
}

}
@@ -0,0 +1,32 @@
package lectures
package dataparallelism

import scala.collection._
import org.scalameter._

object Conversion {

val standardConfig = config(
Key.exec.minWarmupRuns -> 10,
Key.exec.maxWarmupRuns -> 20,
Key.exec.benchRuns -> 20,
Key.verbose -> true
) withWarmer(new Warmer.Default)

val array = Array.fill(10000000)("")
val list = array.toList

def main(args: Array[String]) {
val listtime = standardConfig measure {
list.par
}
println(s"list conversion time: $listtime ms")

val arraytime = standardConfig measure {
array.par
}
println(s"array conversion time: $arraytime ms")
println(s"difference: ${listtime / arraytime}")
}

}

Large diffs are not rendered by default.

@@ -0,0 +1,20 @@
package lectures
package dataparallelism

import scala.collection._
import org.scalameter._

object IntersectionCorrect {

def main(args: Array[String]) {
def intersection(a: GenSet[Int], b: GenSet[Int]): GenSet[Int] = {
if (a.size < b.size) a.filter(b(_))
else b.filter(a(_))
}
val seqres = intersection((0 until 1000).toSet, (0 until 1000 by 4).toSet)
val parres = intersection((0 until 1000).par.toSet, (0 until 1000 by 4).par.toSet)
log(s"Sequential result - ${seqres.size}")
log(s"Parallel result - ${parres.size}")
}

}
@@ -0,0 +1,23 @@
package lectures
package dataparallelism

import scala.collection._
import scala.collection.convert.wrapAsScala._
import java.util.concurrent._
import org.scalameter._

object IntersectionSynchronized {

def main(args: Array[String]) {
def intersection(a: GenSet[Int], b: GenSet[Int]) = {
val result = new ConcurrentSkipListSet[Int]()
for (x <- a) if (b contains x) result += x
result
}
val seqres = intersection((0 until 1000).toSet, (0 until 1000 by 4).toSet)
val parres = intersection((0 until 1000).par.toSet, (0 until 1000 by 4).par.toSet)
log(s"Sequential result - ${seqres.size}")
log(s"Parallel result - ${parres.size}")
}

}
@@ -0,0 +1,21 @@
package lectures
package dataparallelism

import scala.collection._
import org.scalameter._

object IntersectionWrong {

def main(args: Array[String]) {
def intersection(a: GenSet[Int], b: GenSet[Int]): Set[Int] = {
val result = mutable.Set[Int]()
for (x <- a) if (b contains x) result += x
result
}
val seqres = intersection((0 until 1000).toSet, (0 until 1000 by 4).toSet)
val parres = intersection((0 until 1000).par.toSet, (0 until 1000 by 4).par.toSet)
log(s"Sequential result - ${seqres.size}")
log(s"Parallel result - ${parres.size}")
}

}
@@ -0,0 +1,31 @@
package lectures
package dataparallelism

import org.scalameter._

object LargestPalindromeProduct {

val standardConfig = config(
Key.exec.minWarmupRuns -> 20,
Key.exec.maxWarmupRuns -> 40,
Key.exec.benchRuns -> 60,
Key.verbose -> true
) withWarmer(new Warmer.Default)

def main(args: Array[String]) {
val value = 100
val seqtime = standardConfig measure {
(100 to 999).flatMap(i => (i to 999).map(i * _))
.filter(n => n.toString == n.toString.reverse).max
}
println(s"sequential time: $seqtime ms")

val partime = standardConfig measure {
(100 to 999).par.flatMap(i => (i to 999).map(i * _))
.filter(n => n.toString == n.toString.reverse).max
}
println(s"parallel time: $partime ms")
println(s"speedup: ${seqtime / partime}")
}

}
@@ -0,0 +1,281 @@
package lectures
package dataparallelism

import java.awt._
import java.awt.event._
import javax.swing._
import javax.swing.event._
import scala.collection.parallel._
import scala.collection.par._
import org.scalameter._
import common._

object Mandelbrot {

try {
UIManager.setLookAndFeel(UIManager.getSystemLookAndFeelClassName())
} catch {
case _: Exception => println("Cannot set look and feel.")
}

private def compute(xc: Double, yc: Double, threshold: Int): Int = {
var i = 0
var x = 0.0
var y = 0.0
while (x * x + y * y < 2 && i < threshold) {
val xt = x * x - y * y + xc
val yt = 2 * x * y + yc

x = xt
y = yt

i += 1
}
i
}

class MandelCanvas(frame: MandelFrame) extends JComponent {
val pixels = new Array[Int](4000 * 4000)
var xoff = -0.9572428
var yoff = -0.2956327
var xlast = -1
var ylast = -1

def parallelism = {
val selidx = frame.parcombo.getSelectedIndex
frame.parcombo.getItemAt(selidx).toInt
}

def threshold = frame.threshold.getText.toInt

def zoom = frame.zoomlevel.getValue.asInstanceOf[Int] / 10.0 * 500.0

def xlo = xoff - getWidth / zoom

def ylo = yoff - getHeight / zoom

def xhi = xoff + getWidth / zoom

def yhi = yoff + getHeight / zoom

addMouseMotionListener(new MouseMotionAdapter {
override def mouseDragged(e: MouseEvent) {
val xcurr = e.getX
val ycurr = e.getY
if (xlast != -1) {
val xd = xcurr - xlast
val yd = ycurr - ylast
xoff -= xd / zoom
yoff -= yd / zoom
}
xlast = xcurr
ylast = ycurr
repaint()
}
})

addMouseListener(new MouseAdapter {
override def mousePressed(e: MouseEvent) {
xlast = -1
ylast = -1
}
})

addMouseWheelListener(new MouseAdapter {
override def mouseWheelMoved(e: MouseWheelEvent) {
val prev = frame.zoomlevel.getValue.asInstanceOf[Int]
val next = prev + (prev * -0.1 * e.getWheelRotation - e.getWheelRotation)
frame.zoomlevel.setValue(math.max(1, next.toInt))
}
})

private def fill(pixels: Array[Int], wdt: Int, hgt: Int) {
val selected = frame.implcombo.getSelectedItem

println("xlo: " +xlo)
println("ylo: " +ylo)
println("xhi: " +xhi)
println("yhi: " +yhi)
println("wdt: " +wdt)
println("hgt: " +hgt)

selected match {
case "Reduction tree" =>
fillReduction(pixels, wdt, hgt)
case "Parallel collections" =>
fillClassic(pixels, wdt, hgt)
case "Workstealing tree" =>
fillWsTree(pixels, wdt, hgt)
}
}

private def fillReduction(pixels: Array[Int], wdt: Int, hgt: Int) {
def renderPixel(idx: Int) {
val x = idx % wdt
val y = idx / wdt
val xc = xlo + (xhi - xlo) * x / wdt
val yc = ylo + (yhi - ylo) * y / hgt
val iters = compute(xc, yc, threshold)
val a = 255 << 24
val r = math.min(255, 1.0 * iters / threshold * 255).toInt << 16
val g = math.min(255, 2.0 * iters / threshold * 255).toInt << 8
val b = math.min(255, 3.0 * iters / threshold * 255).toInt << 0
pixels(idx) = a | r | g | b
}

def render(from: Int, end: Int): Unit = {
for (idx <- from until end) {
renderPixel(idx)
}
}

def parRender(from: Int, end: Int, threshold: Int): Unit = {
if (end - from <= threshold) {
render(from, end)
} else {
val mid = (from + end) / 2
parallel(parRender(from, mid, threshold), parRender(mid, end, threshold))
}
}

parRender(0, wdt * hgt, 40000)
}

private def fillWsTree(pixels: Array[Int], wdt: Int, hgt: Int) {
val range = 0 until (wdt * hgt)
val conf = new Scheduler.Config.Default(parallelism)
implicit val s = new Scheduler.ForkJoin(conf)

for (idx <- range.toPar) {
val x = idx % wdt
val y = idx / wdt
val xc = xlo + (xhi - xlo) * x / wdt
val yc = ylo + (yhi - ylo) * y / hgt

val iters = compute(xc, yc, threshold)
val a = 255 << 24
val r = math.min(255, 1.0 * iters / threshold * 255).toInt << 16
val g = math.min(255, 2.0 * iters / threshold * 255).toInt << 8
val b = math.min(255, 3.0 * iters / threshold * 255).toInt << 0
pixels(idx) = a | r | g | b
}

s.pool.shutdown()
}

private def fillClassic(pixels: Array[Int], wdt: Int, hgt: Int) {
val fj = new collection.parallel.ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(parallelism))
val range = 0 until (wdt * hgt)
val pr = range.par
pr.tasksupport = fj
println("xlo: " +xlo)
println("ylo: " +ylo)
println("xhi: " +xhi)
println("yhi: " +yhi)
println("wdt: " +wdt)
println("hgt: " +hgt)

for (idx <- pr) {
val x = idx % wdt
val y = idx / wdt
val xc = xlo + (xhi - xlo) * x / wdt
val yc = ylo + (yhi - ylo) * y / hgt

val iters = compute(xc, yc, threshold)
val a = 255 << 24
val r = math.min(255, 1.0 * iters / threshold * 255).toInt << 16
val g = math.min(255, 2.0 * iters / threshold * 255).toInt << 8
val b = math.min(255, 3.0 * iters / threshold * 255).toInt << 0
pixels(idx) = a | r | g | b
}

fj.environment.shutdown()
}

override def paintComponent(g: Graphics) {
super.paintComponent(g)

val time = measure {
fill(pixels, getWidth, getHeight)
}
val stats = "size: " + getWidth + "x" + getHeight + ", parallelism: " + parallelism + ", time: " + time + " ms" + ", bounds=(" + xoff + ", " + yoff + ")"
println("Rendering: " + stats)
frame.setTitle("Mandelbrot: " + stats)

val img = new image.BufferedImage(getWidth, getHeight, image.BufferedImage.TYPE_INT_ARGB)
for (x <- 0 until getWidth; y <- 0 until getHeight) {
val color = pixels(y * getWidth + x)
img.setRGB(x, y, color)
}
g.drawImage(img, 0, 0, null)
//javax.imageio.ImageIO.write(img, "png", new java.io.File("mandelbrot.png"))
}
}

class MandelFrame extends JFrame("Mandelbrot") {
setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE)
setSize(1024, 600)
setLayout(new BorderLayout)
val canvas = new MandelCanvas(this)
add(canvas, BorderLayout.CENTER)
val right = new JPanel
right.setBorder(BorderFactory.createEtchedBorder(border.EtchedBorder.LOWERED))
right.setLayout(new BorderLayout)
val panel = new JPanel
panel.setLayout(new GridLayout(0, 1))
val controls = new JPanel
controls.setLayout(new GridLayout(0, 2))
controls.add(new JLabel("Implementation"))
val implcombo = new JComboBox[String](Array("Reduction tree", "Parallel collections", "Workstealing tree"))
implcombo.addActionListener(new ActionListener {
def actionPerformed(e: ActionEvent) {
canvas.repaint()
}
})
controls.add(implcombo)
controls.add(new JLabel("Parallelism"))
val items = 1 to Runtime.getRuntime.availableProcessors map { _.toString } toArray
val parcombo = new JComboBox[String](items)
parcombo.setSelectedIndex(items.length - 1)
parcombo.addActionListener(new ActionListener {
def actionPerformed(e: ActionEvent) {
canvas.repaint()
}
})
controls.add(parcombo)
controls.add(new JLabel("Zoom"))
val zoomlevel = new JSpinner
zoomlevel.setValue(157)
zoomlevel.addChangeListener(new ChangeListener {
def stateChanged(e: ChangeEvent) {
canvas.repaint()
}
})
controls.add(zoomlevel)
controls.add(new JLabel("Threshold"))
val threshold = new JTextField("2000")
threshold.addActionListener(new ActionListener {
def actionPerformed(e: ActionEvent) {
canvas.repaint()
}
})
controls.add(threshold)
panel.add(controls)
panel.add(new JLabel("Drag canvas to scroll, move wheel to zoom."))
val renderbutton = new JButton("Render")
renderbutton.addActionListener(new ActionListener {
def actionPerformed(e: ActionEvent) {
canvas.repaint()
}
})
panel.add(renderbutton)
right.add(panel, BorderLayout.NORTH)
add(right, BorderLayout.EAST)
setVisible(true)
}

def main(args: Array[String]) {
val frame = new MandelFrame
}

}
@@ -0,0 +1,15 @@
package lectures
package dataparallelism

import scala.collection._

object ParallelGraphContraction {

def main(args: Array[String]) {
val graph = mutable.Map[Int, Int]() ++= (0 until 100000).map(i => (i, i + 1))
graph(graph.size - 1) = 0
for ((k, v) <- graph.par) graph(k) = graph(v)
val violation = graph.find({ case (i, v) => v != (i + 2) % graph.size })
println(s"violation: $violation")
}
}
@@ -0,0 +1,17 @@
package lectures
package dataparallelism

import org.scalameter._
import scala.collection._

object ParallelMutation {

def main(args: Array[String]) {
val array = Array.fill(10000000)("")
val (result, _) = common.parallel(
array.par.count(_ == ""),
for (i <- (0 until 10000000).par) array(i) = "modified"
)
println(s"result: $result")
}
}
@@ -0,0 +1,44 @@
package lectures
package dataparallelism

import org.scalameter._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.collection._
import scala.io.Source

object ParallelRegexSearch {

val standardConfig = config(
Key.exec.minWarmupRuns -> 30,
Key.exec.maxWarmupRuns -> 60,
Key.exec.benchRuns -> 120,
Key.verbose -> true
) withWarmer(new Warmer.Default)

def getHtmlSpec() = Future {
val specSrc: Source = Source.fromURL("http://www.w3.org/MarkUp/html-spec/html-spec.txt")
try specSrc.getLines.toArray finally specSrc.close()
}

def main(args: Array[String]) {
val measurements = for (specDoc <- getHtmlSpec()) yield {
println(s"Download complete!")

def search(d: GenSeq[String]) = standardConfig measure {
d.indexWhere(line => line.matches(".*TEXTAREA.*"))
}

val seqtime = search(specDoc)
val partime = search(specDoc.par)

(seqtime, partime)
}
println("Fetching HTML specification, searching for TEXTAREA.")
val (seqtime, partime) = Await.result(measurements, Duration.Inf)
println(s"Sequential time $seqtime ms")
println(s"Parallel time $partime ms")
println(s"speedup: ${seqtime / partime}")
}
}
@@ -0,0 +1,17 @@
package lectures
package dataparallelism

import scala.collection._

object ParallelTrieMapGraphContraction {

def main(args: Array[String]) {
val graph = concurrent.TrieMap[Int, Int]() ++= (0 until 100000).map(i => (i, i + 1))
graph(graph.size - 1) = 0
val previous = graph.snapshot()
for ((k, v) <- graph.par) graph(k) = previous(v)
val violation = graph.find({ case (i, v) => v != (i + 2) % graph.size })
println(s"violation: $violation")
}

}
@@ -0,0 +1,55 @@
package lectures
package dataparallelism

import org.scalameter._

object WordCount {

val standardConfig = config(
Key.exec.minWarmupRuns -> 50,
Key.exec.maxWarmupRuns -> 100,
Key.exec.benchRuns -> 40,
Key.verbose -> true
) withWarmer(new Warmer.Default)

val txt = "A short text... " * 250000
val ps = new ParString(txt)

def main(args: Array[String]) {
val seqtime = standardConfig measure {
txt.foldLeft((0, true)) {
case ((wc, _), ' ') => (wc, true)
case ((wc, true), x) => (wc + 1, false)
case ((wc, false), x) => (wc, false)
}
}
println(s"sequential time: $seqtime ms")

val partime = standardConfig measure {
ps.aggregate((0, 0, 0))({ (x, y) =>
if (x._2 > 0) {
if (y != ' ') x match {
case (ls, wc, 0) => (ls, wc, 0)
case (ls, wc, rs) => (ls, wc + 1, 0)
} else x match {
case (ls, wc, rs) => (ls, wc, rs + 1)
}
} else {
if (y != ' ') x match {
case (ls, 0, _) => (ls + 1, 0, ls + 1)
} else x match {
case (ls, 0, _) => (ls + 1, 1, 0)
}
}
}, {
case ((0, 0, 0), res) => res
case (res, (0, 0, 0)) => res
case ((lls, lwc, 0), (0, rwc, rrs)) => (lls, lwc + rwc - 1, rrs)
case ((lls, lwc, _), (_, rwc, rrs)) => (lls, lwc + rwc, rrs)
})
}
println(s"parallel time: $partime ms")
println(s"speedup: ${seqtime / partime}")
}

}
@@ -0,0 +1,47 @@
package lectures

import scala.collection.parallel._
import scala.collection.mutable.ArrayBuffer

package object dataparallelism {

class ParString(val str: String)
extends immutable.ParSeq[Char] {

def apply(i: Int) = str.charAt(i)

def length = str.length

def seq = new collection.immutable.WrappedString(str)

def splitter = new ParStringSplitter(str, 0, str.length)

class ParStringSplitter(private var s: String, private var i: Int, private val ntl: Int)
extends SeqSplitter[Char] {
final def hasNext = i < ntl
final def next = {
val r = s.charAt(i)
i += 1
r
}
def remaining = ntl - i
def dup = new ParStringSplitter(s, i, ntl)
def split = {
val rem = remaining
if (rem >= 2) psplit(rem / 2, rem - rem / 2)
else Seq(this)
}
def psplit(sizes: Int*): Seq[SeqSplitter[Char]] = {
val splitted = new ArrayBuffer[ParStringSplitter]
for (sz <- sizes) {
val next = (i + sz) min ntl
splitted += new ParStringSplitter(s, i, next)
i = next
}
splitted
}
}

}

}
@@ -0,0 +1,65 @@
package lectures
package examples

import org.scalameter._
import common._

/** An example of a trivially parallelizable brute force solution.
* The nice thing about this example is that computing the collatz sequence
* does not require any memory access, so the memory bandwidth is not a bottleneck.
* Here we can really see the benefits of a quad-core with hyperthreading.
*/
object BruteForceCollatzSequence {

@volatile var dummy = 0

val standardConfig = config(
Key.exec.minWarmupRuns -> 30,
Key.exec.maxWarmupRuns -> 60,
Key.exec.benchRuns -> 60,
Key.verbose -> true
) withWarmer(new Warmer.Default)

def collatz(number: Int): Int = {
var length = 1
var n = number
while (n != 1) {
if (n % 2 == 0) n = n / 2
else n = 3 * n + 1
length += 1
}
length
}

def longestCollatz(from: Int, until: Int): Int = {
(from until until).maxBy(collatz)
}

def fjLongestCollatz(from: Int, until: Int, threshold: Int): Int = {
if (until - from < threshold) {
longestCollatz(from, until)
} else {
val mid = (from + until) / 2
val (leftLongest,rightLongest) =
parallel(fjLongestCollatz(from, mid, threshold),
fjLongestCollatz(mid, until, threshold))
math.max(leftLongest, rightLongest)
}
}

def main(args: Array[String]) {
val until = 100000
val threshold = 100
val seqtime = standardConfig measure {
dummy = longestCollatz(1, until)
}
println(s"sequential sum time: $seqtime ms")

val fjtime = standardConfig measure {
fjLongestCollatz(1, until, threshold)
}
println(s"fork/join time: $fjtime ms")
println(s"speedup: ${seqtime / fjtime}")
}

}
@@ -0,0 +1,82 @@
package lectures
package examples

import java.util.concurrent.atomic._
import org.scalameter._
import common._

/** An example of a trivially parallelizable brute force solution.
* The nice thing about this example is that computing the collatz sequence
* does not require any memory access, so the memory bandwidth is not a bottleneck.
* Here we can really see the benefits of a quad-core with hyperthreading.
*/
object DynamicProgrammingCollatzSequence {

@volatile var dummy = 0

val standardConfig = config(
Key.exec.minWarmupRuns -> 50,
Key.exec.maxWarmupRuns -> 100,
Key.exec.benchRuns -> 60,
Key.verbose -> true
) withWarmer(new Warmer.Default)

val maxSize = 500000000

val table = new AtomicIntegerArray(maxSize)
table.set(1, 1)

def collatz(number: Int): Int = {
var length = 1
var n = number
var prev = n
while (n != 1) {
if (n % 2 == 0) n = n / 2
else n = 3 * n + 1

if (n >= table.length || table.get(n) == 0) {
length += 1
prev = n
} else {
length += table.get(n)
n = 1
}
}
table.set(number, length)
length
}

def longestCollatz(from: Int, until: Int): Int = {
(from until until).maxBy(collatz)
}

def fjLongestCollatz(from: Int, until: Int, threshold: Int): Int = {
if (until - from < threshold) {
longestCollatz(from, until)
} else {
val mid = (from + until) / 2
val right = task {
fjLongestCollatz(mid, until, threshold)
}
val leftLongest = fjLongestCollatz(from, mid, threshold)
val rightLongest = right.join()
math.max(leftLongest, rightLongest)
}
}

def main(args: Array[String]) {
val until = 100000
val threshold = 100
val seqtime = standardConfig measure {
dummy = longestCollatz(1, until)
}
println(s"sequential sum time: $seqtime ms")

val fjtime = standardConfig measure {
fjLongestCollatz(1, until, threshold)
}
println(s"fork/join time: $fjtime ms")
println(s"speedup: ${seqtime / fjtime}")
}

}
@@ -0,0 +1,68 @@
package lectures
package introduction

import org.scalameter._
import common._

object FourBatchArrayNorm {
@volatile var dummy: Int = 0
@volatile var dummy2: Int = 0

val logE = math.log(math.E)

def power(x: Int, p: Double): Int = {
math.exp(p * math.log(x) / logE).toInt // TODO <-- make everything doubles
}

def sumSegment(xs: Array[Int], p: Double, from: Int, until: Int): Int = {
var i = from
var s = 0
while (i < until) {
s += power(xs(i), p)
i += 1
}
s
}

def normSum(xs: Array[Int], p: Double): Int =
power(sumSegment(xs, p, 0, xs.size), 1.0 / p)

def fjNormSum(xs: Array[Int], p: Double, threshold: Int): Int = {
val ((s1, s2), (s3, s4)) = parallel(
parallel(
sumSegment(xs, p, 0, xs.length / 4),
sumSegment(xs, p, xs.length / 4, 2 * xs.length / 4)
),
parallel(
sumSegment(xs, p, 2 * xs.length / 4, 3 * xs.length / 4),
sumSegment(xs, p, 3 * xs.length / 4, xs.length)
)
)
power(s1 + s2 + s3 + s4, 1.0 / p)
}

val standardConfig = config(
Key.exec.minWarmupRuns -> 10,
Key.exec.maxWarmupRuns -> 10,
Key.exec.benchRuns -> 10,
Key.verbose -> true
) withWarmer(new Warmer.Default)

def main(args: Array[String]) {
val p = 1.5
val xs = (0 until 2000000).map(_ % 100).toArray
val seqtime = standardConfig measure {
dummy = normSum(xs, p)
}
println(s"sequential sum time: $seqtime ms")

val threshold = 10000
val fjtime = standardConfig measure {
dummy2 = fjNormSum(xs, p, threshold)
}
println(s"values computed are $dummy vs $dummy2")
println(s"fork/join time: $fjtime ms")
println(s"speedup: ${seqtime/fjtime}")
}

}
@@ -0,0 +1,56 @@
package lectures
package introduction

import org.scalameter._
import scala.util.Random
import common._

object ParallelMonteCarloPi {
@volatile var seqResult: Double = 0
@volatile var parResult: Double = 0

val standardConfig = config(
Key.exec.minWarmupRuns -> 20,
Key.exec.maxWarmupRuns -> 40,
Key.exec.benchRuns -> 20,
Key.verbose -> true
) withWarmer(new Warmer.Default)

def monteCarloPi(iterations: Int): Double = {
val randomX = new Random
val randomY = new Random
var hits = 0
for (i <- 0 until iterations) {
val x = randomX.nextDouble()
val y = randomY.nextDouble()
if (x * x + y * y < 1) hits += 1
}
// r * r * Pi = hitRatio * 4 * r * r
// Pi = hitRatio * 4
4.0 * hits / iterations
}

def parMonteCarloPi(iterations: Int): Double = {
val ((pi1, pi2), (pi3, pi4)) = parallel(
parallel(monteCarloPi(iterations / 4), monteCarloPi(iterations / 4)),
parallel(monteCarloPi(iterations / 4), monteCarloPi(iterations / 4))
)
(pi1 + pi2 + pi3 + pi4) / 4
}

def main(args: Array[String]) {
val iterations = 4000000
val seqtime = standardConfig measure {
seqResult = monteCarloPi(iterations)
}
println(s"sequential time: $seqtime ms")

val partime = standardConfig measure {
parResult = parMonteCarloPi(iterations)
}
println(s"fork/join time: $partime ms")
println(s"speedup: ${seqtime/partime}")
println(s"values computed are $seqResult vs $parResult")
}

}
@@ -0,0 +1,80 @@
package lectures
package reductions

import org.scalameter._
import common._

object ArrayFold {
def foldASegSeq[A,B](inp: Array[A], b0: B,
left: Int, right: Int,
f: (B,A) => B): B = {
var b= b0
var i= left
while (i < right) {
b= f(b, inp(i))
i= i+1
}
b
}

def foldASegPar[A](inp: Array[A], a0: A,
left: Int, right: Int,
f: (A,A) => A): A = {
// requires f to be associative
if (right - left < threshold)
foldASegSeq(inp, a0, left, right, f)
else {
val mid = left + (right - left)/2
val (a1,a2) = parallel(foldASegPar(inp, a0, left, mid, f),
foldASegPar(inp, a0, mid, right, f))
f(a1,a2)
}
}

val c = 2.99792458e8
def assocOp(v1: Double, v2: Double): Double = {
val u1 = v1/c
val u2 = v2/c
(v1 + v2)/(1 + u1*u2)
}

def addVelSeq(inp: Array[Double]): Double = {
foldASegSeq(inp, 0.0, 0, inp.length, assocOp)
}
def addVelPar(inp: Array[Double]): Double = {
foldASegPar(inp, 0.0, 0, inp.length, assocOp)
}

val threshold = 10000

val standardConfig = config(
Key.exec.minWarmupRuns -> 30,
Key.exec.maxWarmupRuns -> 30,
Key.exec.benchRuns -> 20,
Key.verbose -> false
) withWarmer(new Warmer.Default)

def main(args: Array[String]) {
val alen = 2000000
val inp = (0 until alen).map((x:Int) => (x % 50)*0.0001*c).toArray
var resSeq = 0.0
val seqtime = standardConfig measure {
resSeq = addVelSeq(inp)
}
var resPar = 0.0
val partime = standardConfig measure {
resPar = addVelPar(inp)
}

println(s"sequential time: $seqtime ms and result $resSeq")
println(s"parallel time: $partime ms and result $resPar")
/* Example output on Intel(R) Core(TM) i7-3770K CPU @ 3.50GHz (4 cores, 8 hw threads), 16GB RAM
[info] sequential time: 33.0507908 ms and result 2.997924579999967E8
[info] parallel time: 11.158121000000003 ms and result 2.99792458E8
We get around 3 times speedup. The computed value is slightly different due to roundoff errors.
*/

}
}
@@ -0,0 +1,164 @@
package lectures
package reductions

import org.scalameter._
import common._

object ArrayMap {

def mapASegSeq[A,B](inp: Array[A], f : A => B,
left: Int, right: Int,
out: Array[B]) = {
var i= left
while (i < right) {
out(i)= f(inp(i))
i= i+1
}
}

def mapASegPar[A,B](inp: Array[A], left: Int, right: Int,
f : A => B,
out: Array[B]): Unit = {
// require f to be pure
if (right - left < threshold)
mapASegSeq(inp, f, left, right, out)
else {
val mid = left + (right - left)/2
val _ = parallel(mapASegPar(inp, left, mid, f, out),
mapASegPar(inp, mid, right, f, out))
}
}

def normsOfPar(inp: Array[Int], p: Double,
left: Int, right: Int,
out: Array[Double]): Unit = {
if (right - left < threshold) {
var i= left
while (i < right) {
out(i)= power(inp(i),p)
i= i+1
}
} else {
val mid = left + (right - left)/2
val _ = parallel(normsOfPar(inp, p, left, mid, out),
normsOfPar(inp, p, mid, right, out))
}
}

def normsOf(inp: Array[Int], p: Double,
left: Int, right: Int,
out: Array[Double]): Unit = {
var i= left
while (i < right) {
out(i)= power(inp(i),p)
i= i+1
}
}

// an effectful map: more flexible, but easier to use wrongly
def actionOnSegPar(action : Int => Unit,
left: Int, right: Int): Unit = {
// require action(i1) and action(i2) do not interfere for i1 != i2
if (right - left < threshold) {
var i= left
while (i < right) {
action(i)
i= i+1
}
} else {
val mid = left + (right - left)/2
val _ = parallel(actionOnSegPar(action, left, mid),
actionOnSegPar(action, left, mid))
}
}

def mapASegPar2[A,B](inp: Array[A], left: Int, right: Int,
f : A => B,
out: Array[B]): Unit = {
def action(i: Int): Unit = { out(i)= f(inp(i)) }
actionOnSegPar(action, left, right)
}

val logE = math.log(math.E)

def power(x: Int, p: Double): Int = {
math.exp(p * math.log(math.abs(x)) / logE).toInt
}

def mapNormSeq(inp: Array[Int], p: Double,
out: Array[Double]): Unit = {
require(inp.length == out.length)
def f(x: Int): Double = power(x, p)
mapASegSeq(inp, f, 0, inp.length, out)
}

def mapNormPar(inp: Array[Int], p: Double,
out: Array[Double]): Unit = {
require(inp.length == out.length)
def f(x: Int): Double = power(x, p)
mapASegPar(inp, 0, inp.length, f, out)
}

def mapNormPar2(inp: Array[Int], p: Double,
out: Array[Double]): Unit = {
require(inp.length == out.length)
def f(x: Int): Double = power(x, p)
mapASegPar2(inp, 0, inp.length, f, out)
}

val threshold = 10000

val standardConfig = config(
Key.exec.minWarmupRuns -> 30,
Key.exec.maxWarmupRuns -> 30,
Key.exec.benchRuns -> 20,
Key.verbose -> false
) withWarmer(new Warmer.Default)

def main(args: Array[String]) {
val p = 1.5
val alen = 2000000
val inp = (0 until alen).map(_ % 100).toArray
val out1 = (0 until alen).map(_ => 0.0).toArray
val out2 = (0 until alen).map(_ => 0.0).toArray
val out3 = (0 until alen).map(_ => 0.0).toArray
val seqtime = standardConfig measure {
mapNormSeq(inp, p, out1)
}

/*
val mapNormPar2time = standardConfig measure {
mapNormPar2(inp, p, out3)
}
println(s"mapNormPar2: $mapNormPar2time ms")
println(s"speedup2: ${seqtime/mapNormPar2time}")
*/

val mapNormParTime = standardConfig measure {
mapNormPar(inp, p, out2)
}

val normsOfParTime = standardConfig measure {
normsOfPar(inp, p, 0, inp.length, out3)
}

val normsOfTime = standardConfig measure {
normsOf(inp, p, 0, inp.length, out3)
}

println(s"sequential sum time: $seqtime ms")
println(s"mapNormPar time: $mapNormParTime ms")
println(s"normsOfPar time: $normsOfParTime ms")
println(s"normsOf time: $normsOfTime ms")
/* Example output on Intel(R) Core(TM) i7-3770K CPU @ 3.50GHz (4 cores, 8 hw threads), 16GB RAM
[info] sequential sum time: 174.17463240000004 ms
[info] mapNormPar time: 28.9307023 ms
[info] normsOfPar time: 28.165657500000002 ms
[info] normsOf time: 166.83788205000002 ms
Note that manual inlining does not pay off much,
and parallelization is where the main win is!
*/
}

}
@@ -0,0 +1,71 @@
package lectures
package reductions

import org.scalameter._
import common._

object ArrayNorm {
@volatile var dummy: Int = 0
@volatile var dummy2: Int = 0

val logE = math.log(math.E)

def power(x: Int, p: Double): Int = {
math.exp(p * math.log(x) / logE).toInt // TODO <-- make everything doubles
}

def sumSegment(xs: Array[Int], p: Double, from: Int, until: Int): Int = {
var i = from
var s = 0
while (i < until) {
s += power(xs(i), p)
i += 1
}
s
}

def normSum(xs: Array[Int], p: Double): Int =
power(sumSegment(xs, p, 0, xs.size), 1.0 / p)

def fjSumSegment(xs: Array[Int], p: Double, from: Int, until: Int, threshold: Int): Int = {
if (until - from < threshold) {
sumSegment(xs, p, from, until)
} else {
val mid = (from + until) / 2
val right = task {
fjSumSegment(xs, p, mid, until, threshold)
}
val leftSum = fjSumSegment(xs, p, from, mid, threshold)
val rightSum = right.join()
leftSum + rightSum
}
}

def fjNormSum(xs: Array[Int], p: Double, threshold: Int): Int =
power(fjSumSegment(xs, p, 0, xs.length, threshold), 1.0 / p)

val standardConfig = config(
Key.exec.minWarmupRuns -> 10,
Key.exec.maxWarmupRuns -> 10,
Key.exec.benchRuns -> 10,
Key.verbose -> true
) withWarmer(new Warmer.Default)

def main(args: Array[String]) {
val p = 1.5
val xs = (0 until 2000000).map(_ % 100).toArray
val seqtime = standardConfig measure {
dummy = normSum(xs, p)
}
println(s"sequential sum time: $seqtime ms")

val threshold = 10000
val fjtime = standardConfig measure {
dummy2 = fjNormSum(xs, p, threshold)
}
println(s"values computed are $dummy vs $dummy2")
println(s"fork/join time: $fjtime ms")
println(s"speedup: ${seqtime/fjtime}")
}

}
@@ -0,0 +1,243 @@
package lectures
package reductions
import org.scalameter._
import common._

object ArrayScan { // Parallel scan of an array

/*
fold left array segment from left to right-1, sequentially.
Used in the base case for upsweep.
This is the same operation we would use in the base case of parallel fold.
*/
def foldASegSeq[A,B](inp: Array[A],
left: Int, right: Int,
b0: B, // initial element
f: (B,A) => B): B = {
var b= b0
var i= left
while (i < right) {
b= f(b, inp(i))
i= i+1
}
b
}

// Binary trees whose nodes store elements of type A
sealed abstract class FoldTree[A] {
val res: A // whether it is leaf or internal node, res stores the result
}
case class Leaf[A](from: Int, to: Int, resLeaf: A) extends FoldTree[A] {
val res= resLeaf
}
case class Node[A](l: FoldTree[A], r: FoldTree[A], resNode: A) extends FoldTree[A] {
val res= resNode
}

/*
fold array segment in parallel and record the intermediate computation results in a Tree[A].
In the context of scan, this phase is called upsweep.
For an intuition, picture the array to reduce on the bottom, and the root of the tree at the top.
Once the 'parallel' tasks are initiated, the results are combined in the 'up' direction, from array
to the result of the fold.
*/
def upsweep[A](inp: Array[A],
left: Int, right: Int,
a0: A,
f: (A,A) => A): FoldTree[A] = {
// requires f to be associative
if (right - left < threshold)
Leaf(left, right, foldASegSeq(inp, left + 1, right, inp(left), f))
else {
val mid = left + (right - left)/2
val (t1,t2) = parallel(upsweep(inp, left, mid, a0, f),
upsweep(inp, mid, right, a0, f))
Node(t1, t2, f(t1.res,t2.res))
}
}

/*
Scan array segment inp(left) to inp(right-1),
storing results into out(left+1) to out(right).
At the end, out(i+1) stores fold of elements:
[a0, in(left),... in(i)] for i from left to right-1.
In particular, out(left+1) stores f(a0,inp(left))
and out(right) stores fold of [a0, in[(left),... inp(right-1)].
The value a0 is not directly stored into out anywhere.
This is used below cutoff in downsweep for scanAPar,
and also to implement scanASeq as a comparison point.
*/
def scanASegSeq1[A](inp: Array[A],
left: Int, right: Int,
a0: A,
f: (A,A) => A,
out: Array[A]) = {
if (left < right) {
var i= left
var a= a0
while (i < right) {
a= f(a,inp(i))
out(i+1)=a
i= i+1
}
}
}

def downsweep[A](inp: Array[A],
a0: A,
f: (A,A) => A,
t: FoldTree[A],
out: Array[A]): Unit = {
t match {
case Leaf(from, to, res) =>
scanASegSeq1(inp, from, to, a0, f, out)
case Node(l, r, res) => {
val (_,_) = parallel(
downsweep(inp, a0, f, l, out),
downsweep(inp, f(a0,l.res), f, r, out))
}
}
}

def scanASegPar[A](inp: Array[A],
from: Int, to: Int,
a0: A,
f: (A,A) => A,
out: Array[A]) = {
val t = upsweep(inp, from, to, a0, f)
downsweep(inp, a0, f, t, out)
}

def scanAPar[A](inp: Array[A],
a0: A,
f: (A,A) => A,
out: Array[A]) = {
out(0)= a0
scanASegPar(inp, 0, inp.length, a0, f, out)
}

def scanASeq[A](inp: Array[A],
a0: A,
f: (A,A) => A,
out: Array[A]) = {
out(0) = a0
scanASegSeq1(inp, 0, inp.length, a0, f, out)
}

/*
=======================================
Setting parameters and testing
=======================================
*/

var threshold = 20000

val standardConfig = config(
Key.exec.minWarmupRuns -> 6,
Key.exec.maxWarmupRuns -> 6,
Key.exec.benchRuns -> 5,
Key.verbose -> false
) withWarmer(new Warmer.Default)


def testConcat : Unit = {
println("===========================================")
println("Testing ArrayScan on concatenation example.")
println("===========================================")

def concat(x: List[Int], y: List[Int]): List[Int] =
x ::: y

def arrEq[A](a1: Array[A], a2: Array[A]): Boolean = {
def eqSeq(from: Int, to: Int): Boolean = {
var i= from
while (i < to) {
if (a1(i) != a2(i)) {
println(s"Array difference: a1(${i})=${a1(i)}, a2(${i})=${a2(i)}")
return false
} else {
i= i + 1
}
}
true
}
if (a1.length != a2.length) {
println("Different sizes!")
false
} else eqSeq(0, a1.length)
}

threshold = 100

val alen = 2000
val inp = (0 until alen).map((x:Int) => List(x)).toArray
val outSeq = new Array[List[Int]](alen + 1)
val outPar = new Array[List[Int]](alen + 1)
val init = List(12309, 32123)
val seqtime = standardConfig measure {
scanASeq(inp, init, concat, outSeq)
}
println(s"sequential time: $seqtime ms")

val partime = standardConfig measure {
scanAPar(inp, init, concat, outPar)
}
println(s"parallel time: $partime ms")
println(s"speedup: ${seqtime / partime}")
print("Are results equal?")
println(arrEq(outSeq, outPar))
//println(outPar.toList)
}

def testVelocity = {
println("======================================")
println("Testing ArrayScan on velocity example.")
println("======================================")

threshold = 20000

val c = 2.99792458e8
def velocityAdd(v1: Double, v2: Double): Double = {
val u1 = v1/c
val u2 = v2/c
(u1 + u2)/(1 + u1*u2)*c
}

val alen = 2000000
val inp = (0 until alen).map((x:Int) => (x % 50)*0.0001*c).toArray
val outSeq = new Array[Double](alen + 1)
val outPar = new Array[Double](alen + 1)
val seqtime = standardConfig measure {
scanASeq(inp, 0.0, velocityAdd, outSeq)
}
println(s"sequential time: $seqtime ms")

val partime = standardConfig measure {
scanAPar(inp, 0.0, velocityAdd, outPar)
}
println(s"parallel time: $partime ms")
println(s"speedup: ${seqtime / partime}")
}

def testNonZero = {
println("====================================================")
println("Testing ArrayScan on addition with non-zero initial.")
println("====================================================")
val inp: Array[Int] = (1 to 10).toArray
val outSeq: Array[Int] = new Array[Int](inp.length + 1)
val outPar: Array[Int] = new Array[Int](inp.length + 1)
val f = (x: Int, y: Int) => x + y
threshold = 3
scanASeq(inp, 10, f, outSeq)
println(outSeq.toList)
scanAPar(inp, 10, f, outPar) // a0 = 10
println(outPar.toList)
}

def main(args: Array[String]) {
testNonZero
testConcat
testVelocity
}
}
@@ -0,0 +1,161 @@
package lectures
package reductions

import org.scalameter._
import common._

object ArrayScanDebug {

def foldASegSeq[A,B](inp: Array[A],
left: Int, right: Int,
b0: B,
f: (B,A) => B): B = {
var b= b0
var i= left
while (i < right) {
b= f(b, inp(i))
i= i+1
}
b
}

sealed abstract class FoldTree[A] {
val res: A
}
case class Leaf[A](from: Int, to: Int, resLeaf: A) extends FoldTree[A] {
val res= resLeaf
}
case class Node[A](l: FoldTree[A], r: FoldTree[A], resNode: A) extends FoldTree[A] {
val res= resNode
}

def foldASegParTree[A](inp: Array[A],
left: Int, right: Int,
a0: A,
f: (A,A) => A): FoldTree[A] = {
// requires f to be associative
if (right - left < threshold)
Leaf(left, right, foldASegSeq(inp, left, right, a0, f))
else {
val mid = left + (right - left)/2
val (t1,t2) = parallel(foldASegParTree(inp, left, mid, a0, f),
foldASegParTree(inp, mid, right, a0, f))
Node(t1, t2, f(t1.res,t2.res))
}
}

/* // Poor man's dynamic effect checks
def write[A](arr: Array[A], i: Int, v:A) = {
if (arr(i) != 0.0) {
println(s"Overwritting with $v element at $i already set to ${arr(i)} in array ${printA(arr)}")
}
arr(i)=v
}
*/

def scanASegSeq1[A](inp: Array[A],
left: Int, right: Int,
a0: A,
f: (A,A) => A,
out: Array[A]) = {
if (left < right) {
var i= left
var a= a0
while (i < right) {
a= f(a,inp(i))
//write(out,i+1,a)
out(i+1)=a
i= i+1
}
}
}

def scanASegParT[A,B](inp: Array[A],
a0: A,
f: (A,A) => A,
t: FoldTree[A],
out: Array[A]): Unit = {
t match {
case Leaf(from, to, res) =>
scanASegSeq1(inp, from, to, a0, f, out)
case Node(l, r, res) => {
val (_,_) = parallel(
scanASegParT(inp, a0, f, l, out),
scanASegParT(inp, f(a0,l.res), f, r, out))
}
}
}

def scanASegPar[A,B](inp: Array[A],
from: Int, to: Int,
a0: A,
f: (A,A) => A,
out: Array[A]) = {
val t = foldASegParTree(inp, from, to, a0, f)
println("FoldTree is: " + t)
scanASegParT(inp, a0, f, t, out)
}

def scanAPar[A](inp: Array[A],
a0: A,
f: (A,A) => A,
out: Array[A]) = {
out(0)= a0
scanASegPar(inp, 0, inp.length, a0, f, out)
}

def scanASeq[A](inp: Array[A],
a0: A,
f: (A,A) => A,
out: Array[A]) = {
out(0) = a0
scanASegSeq1(inp, 0, inp.length, a0, f, out)
}

val c = 2.99792458e8
def assocOp(v1: Double, v2: Double): Double = {
val u1 = v1/c
val u2 = v2/c
(u1 + u2)/(1 + u1*u2)*c
}

def sum(x: Double, y: Double) = x + y

val standardConfig = config(
Key.exec.minWarmupRuns -> 1,
Key.exec.maxWarmupRuns -> 1,
Key.exec.benchRuns -> 1,
Key.verbose -> false
) withWarmer(new Warmer.Default)

def printA[A](a: Array[A]): String = {
a.toList.toString
}

val threshold = 3

def main(args: Array[String]) {
// val inp = (0 until alen).map((x:Int) => (x % 50)*0.0001*c).toArray
val inp = Array(1.0, 10.0, 200.0, 0.5, 3.0, 40.0, 50.0, 5.0)
val alen = inp.length
println("Input: " + printA(inp))
val outSeq = new Array[Double](alen + 1); outSeq(0) = 41.0; outSeq(alen) = -12.0
val outPar = new Array[Double](alen + 1); outPar(0) = 42.0; outPar(alen) = -13.0
val seqtime = 1
//standardConfig measure {
scanASeq[Double](inp, 0.0, sum, outSeq)
//}
//println(s"sequential time: $seqtime ms and result ${printA(outSeq)}")

val partime = 1
//standardConfig measure {
scanAPar[Double](inp, 0.0, sum, outPar)
//}
//println(s"parallel time: $partime ms and result ${printA(outPar)}")
//println(s"speedup: ${seqtime / partime}")
println(s"seq result ${printA(outSeq)}")
println(s"par result ${printA(outPar)}")

}

}
@@ -0,0 +1,56 @@
package lectures
package reductions

import org.scalameter._
import common._

object ArraySum {

@volatile var dummy: Int = 0

def sum(xs: Array[Int], from: Int, until: Int): Int = {
var i = from
var s = 0
while (i < until) {
s += xs(i)
i += 1
}
s
}

def parSum(xs: Array[Int], from: Int, until: Int, threshold: Int): Int = {
if (until - from < threshold) {
sum(xs, from, until)
} else {
val mid = (from + until) / 2
val right = task {
parSum(xs, mid, until, threshold)
}
val leftSum = parSum(xs, from, mid, threshold)
val rightSum = right.join()
leftSum + rightSum
}
}

val standardConfig = config(
Key.exec.minWarmupRuns -> 20,
Key.exec.maxWarmupRuns -> 60,
Key.exec.benchRuns -> 60,
Key.verbose -> true
) withWarmer(new Warmer.Default)

def main(args: Array[String]) {
val xs = (0 until 100000000).map(_ % 100).toArray
val seqtime = standardConfig measure {
dummy = sum(xs, 0, xs.length)
}
println(s"sequential sum time: $seqtime ms")

val fjtime = standardConfig measure {
parSum(xs, 0, xs.length, 10000)
}
println(s"fork/join time: $fjtime ms")
println(s"speedup: ${seqtime / fjtime}")
}

}
@@ -0,0 +1,103 @@
package lectures
package reductions

import org.scalameter._
import common._

object RunningAverage {

val standardConfig = config(
Key.exec.minWarmupRuns -> 20,
Key.exec.maxWarmupRuns -> 60,
Key.exec.benchRuns -> 60,
Key.verbose -> true
) withWarmer(new Warmer.Default)

def runningAverage(input: Array[Int], output: Array[Float]): Unit = {
var i = 0
var xPrev = 0
output(0) = xPrev
while (i < input.length) {
xPrev = input(i) + xPrev
i += 1
output(i) = 1.0f * xPrev / i
}
}

sealed abstract class Tree {
def xPrev: Int
}

case class Node(left: Tree, right: Tree) extends Tree {
val xPrev = left.xPrev + right.xPrev
}

case class Leaf(from: Int, until: Int, xPrev: Int) extends Tree

def parRunningAverage(input: Array[Int], output: Array[Float], threshold: Int): Unit = {
def reduceSequential(from: Int, until: Int): Int = {
var i = from
var x = 0
while (i < until) {
x = input(i) + x
i += 1
}
x
}

def reduce(from: Int, until: Int): Tree = {
if (until - from < threshold) {
Leaf(from, until, reduceSequential(from, until))
} else {
val mid = (from + until) / 2
val (leftTree, rightTree) = parallel(
reduce(from, mid),
reduce(mid, until)
)
Node(leftTree, rightTree)
}
}

val tree = reduce(0, input.length)

def downsweepSequential(xPrev: Int, from: Int, until: Int): Unit = {
var i = from
var x = xPrev
while (i < until) {
x = input(i) + x
i += 1
output(i) = 1.0f * x / i
}
}

def downsweep(xPrev: Int, tree: Tree): Unit = tree match {
case Node(left, right) =>
parallel(
downsweep(xPrev, left),
downsweep(xPrev + left.xPrev, right)
)
case Leaf(from, until, _) =>
downsweepSequential(xPrev, from, until)
}

output(0) = 0
downsweep(0, tree)
}

def main(args: Array[String]) {
val length = 10000000
val input = (0 until length).map(_ % 100 - 50).toArray
val output = new Array[Float](length + 1)
val seqtime = standardConfig measure {
runningAverage(input, output)
}
println(s"sequential time: $seqtime ms")

val partime = standardConfig measure {
parRunningAverage(input, output, 10000)
}
println(s"parallel time: $partime ms")
println(s"speedup: ${seqtime / partime}")
}

}
@@ -0,0 +1,92 @@
package lectures
package reductions

import org.scalameter._
import common._

object TreeMap {

sealed abstract class Tree[A] { val size: Int }
case class Leaf[A](a: Array[A]) extends Tree[A] {
override val size = a.size
}
case class Node[A](l: Tree[A], r: Tree[A]) extends Tree[A] {
override val size = l.size + r.size
}

def mapTreeSeq[A:Manifest,B:Manifest](t: Tree[A], f: A => B) : Tree[B] = t match {
case Leaf(a) => {
val len = a.length
val b = new Array[B](len)
var i= 0
while (i < len) {
b(i)= f(a(i))
i= i + 1
}
Leaf(b)
}
case Node(l,r) => {
val (lb,rb) = (mapTreeSeq(l,f),mapTreeSeq(r,f))
Node(lb, rb)
}
}

def mapTreePar[A:Manifest,B:Manifest](t: Tree[A], f: A => B) : Tree[B] = t match {
case Leaf(a) => {
val len = a.length
val b = new Array[B](len)
var i= 0
while (i < len) {
b(i)= f(a(i))
i= i + 1
}
Leaf(b)
}
case Node(l,r) => {
val (lb,rb) = parallel(mapTreePar(l,f),mapTreePar(r,f))
Node(lb, rb)
}
}

val logE = math.log(math.E)
def power(x: Double, p: Double): Int = {
math.exp(p * math.log(math.abs(x)) / logE).toInt
}

val threshold = 10000

val standardConfig = config(
Key.exec.minWarmupRuns -> 30,
Key.exec.maxWarmupRuns -> 30,
Key.exec.benchRuns -> 20,
Key.verbose -> false
) withWarmer(new Warmer.Default)


def makeTree(len: Int) : Tree[Double] = {
if (len < threshold)
Leaf((0 until len).map((x:Int) => (x % 100)*0.9).toArray)
else {
Node(makeTree(len/2), makeTree(len - len/2))
}
}

def main(args: Array[String]) {
val p = 1.5
def f(x:Double)= power(x,p)
val alen = 2000000
val t = makeTree(alen)
var t1: Tree[Double] = t
var t2: Tree[Double] = t
val seqtime = standardConfig measure {
t1 = mapTreeSeq(t, f)
}
val partime = standardConfig measure {
t2 = mapTreePar(t, f)
}

println(s"sequential time: $seqtime ms")
println(s"parallel time: $partime ms")
}

}
@@ -0,0 +1,30 @@
package lectures.algorithms

import org.scalatest.FunSuite

import scala.util.Random

class MergeSortTest extends FunSuite {

def sortCase(nbElem: Int, maxDepth: Int) = {
print(s"Test case: # elements = $nbElem, maxDepth = $maxDepth")
val arr2sort = Array.tabulate(nbElem)(_ => Random.nextInt())
val expected = arr2sort.sorted
MergeSort.parMergeSort(arr2sort, maxDepth)
assert(arr2sort === expected)
println(" => ok")
}

/**
* Note that maxDepth should not be too large (< 15),
* otherwise number of thread in parallel will be exponential to maxDepth
* which make the system slow
*/
test("MergeSort x elements with maxDepth = y") {
for {
x <- 100 to 1000 by 100
y <- 2 to 5
} sortCase(x, y)
}

}
@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>theory</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
</buildSpec>
<natures>
</natures>
</projectDescription>
@@ -0,0 +1,6 @@
name := "theory"

version := "1.0"

scalaVersion := "2.12.2"

@@ -0,0 +1 @@
sbt.version = 0.13.15
@@ -0,0 +1 @@
logLevel := Level.Warn
@@ -0,0 +1,61 @@
package week1




/**
* Created by rofc on 27/06/2017.
*/
object SegmentSum {
def sumSegment(a: Array[Int], p: Double, s: Int, t: Int): Int = {
var i= s; var sum: Int = 0
while (i < t) {
sum= sum + power(a(i), p)
i= i + 1
}
sum
}

def sumSegmentRofc(a: Array[Int], p: Double): Int = a.foldRight(0)((b,a) => power(b,p)+ a)

def power(x: Int, p: Double): Int = math.exp(p * math.log(math.abs(x))).toInt

def pNorm(a: Array[Int], p: Double): Int =
power(sumSegment(a, p, 0, a.length), 1/p)

def pNormRofc(a: Array[Int], p: Double): Int =
power(sumSegmentRofc(a, p), 1/p)

def pNormTwoPart(a: Array[Int], p: Double): Int = {
val m = a.length / 2
val (sum1, sum2) = (sumSegment(a, p, 0, m),
sumSegment(a, p, m, a.length))
power(sum1 + sum2, 1/p) }

def pNormTwoPartRofc(a: Array[Int], p: Double): Int = {
val m = a.length / 2
val (sum1, sum2) = (sumSegmentRofc(a.take(m),p),
sumSegmentRofc(a.takeRight(m),p))
power(sum1 + sum2, 1/p) }

def pNormRec(a: Array[Int], p: Double): Int =
power(segmentRec(a, p), 1/p)

// like sumSegment but parallel
def segmentRec(a: Array[Int], p: Double) = {
if (a.length< 2)
sumSegmentRofc(a, p) // small segment: do it sequentially
else {
val m = a.length/2
val (sum1, sum2) = (sumSegmentRofc(a.take(m),p),
sumSegmentRofc(a.takeRight(m),p))
sum1 + sum2 } }

def main(args: Array[String]): Unit = {
val p = 2d
val a = 1 to 500 toArray;

println(pNormRec(a,p)) //casi segur que aquesta sera mes rapida.
println(pNormTwoPart(a,p)) //la meva fara servir molta memoria
}
}
@@ -0,0 +1,41 @@
package week1

/**
* Created by rofc on 27/06/2017.
*/
object SimpleApp {

private var uidCount = 0L
private var insideruidCount = 0L

def getUniqueId(): Long = {
uidCount = uidCount + 1
uidCount
}

class Atomic extends Thread{

def getInsiderUniqueId(): Long = {
insideruidCount = insideruidCount + 1
insideruidCount
}

override def run() {
val uids = for(i <- 0 to 10) yield getUniqueId()
println ("outsider"+uids)
val uids2 = for(i <- 0 to 10) yield getInsiderUniqueId
println ("insider"+uids2)
}
}




def main(args: Array[String]): Unit = {
println ("Aixo funciona?")
val a = new Atomic
val b = new Atomic
a.start()
b.start()
}
}
@@ -0,0 +1,28 @@

object SimpleThread{
// class HelloThread extends Thread {
// override def run() {
// println("Hello")
// println("World!!")
// }
// }
// val t = new HelloThread
// val t2 = new HelloThread
// val t3 = new HelloThread
// t.start()
// t2.start()
// t3.start()
// t.join()
// t2.join()
// t3.join()



for(i <- 0 until 10) yield i

val t = new Atomicon
t.start()
val t2 = new Atomicon
t2.start()

}
@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src"/>
<classpathentry kind="con" path="org.scala-ide.sdt.launching.SCALA_CONTAINER"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="output" path="bin"/>
</classpath>
@@ -0,0 +1 @@
/bin/
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>theory_ec</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.scala-ide.sdt.core.scalabuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.scala-ide.sdt.core.scalanature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,4 @@

object other {;import org.scalaide.worksheet.runtime.library.WorksheetSupport._; def main(args: Array[String])=$execute{;$skip(59);
println("Welcome to the Scala worksheet")}
}
@@ -0,0 +1,25 @@
package week1

object SimpleThread {;import org.scalaide.worksheet.runtime.library.WorksheetSupport._; def main(args: Array[String])=$execute{;$skip(80);
println("Welcome to the Scala worksheet");$skip(26);


var uidCount = 0L;System.out.println("""uidCount : Long = """ + $show(uidCount ));$skip(99);
def getUniqueId(): Long = {
uidCount = uidCount + 1
println (uidCount)
uidCount

}

class Atomicon extends Thread{
override def run() {
println("Hello")
val uids = for(i <- 0 until 10) yield getUniqueId()
println(uids)
}
};System.out.println("""getUniqueId: ()Long""");$skip(198);

val t1=new Atomicon;System.out.println("""t1 : week1.SimpleThread.Atomicon = """ + $show(t1 ));$skip(13);
t1.start()}
}
@@ -0,0 +1,25 @@
package week1

object SimpleThread {
println("Welcome to the Scala worksheet") //> Welcome to the Scala worksheet


var uidCount = 0L //> uidCount : Long = 0
def getUniqueId(): Long = {
uidCount = uidCount + 1
println (uidCount)
uidCount

} //> getUniqueId: ()Long

class Atomicon extends Thread{
override def run() {
println("Hello")
val uids = for(i <- 0 until 10) yield getUniqueId()
println(uids)
}
}

val t1=new Atomicon //> t1 : week1.SimpleThread.Atomicon = Thread[Thread-0,5,main]
t1.start()
}