<a href="https://colab.research.google.com/github/bewithankit/CS3DP19/blob/main/PySpark_vs_Dask.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [4]:
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
import time

# Initialize Spark
conf = SparkConf().setAppName('wordCount')
sc = SparkContext(conf=conf)

# Start timing
start_time = time.time()

# Read the text file and perform word count
linesRDD = sc.textFile("/content/ulysses.txt")
wordsRDD = linesRDD.flatMap(lambda line: line.split())
wordCounts = wordsRDD.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# Collect the results
word_counts_collected = wordCounts.collect()

# Stop timing
end_time = time.time()

# Print the word counts
print("\nWord count:\n", word_counts_collected)

# Print the time taken
print("Time taken with PySpark: {} seconds".format(end_time - start_time))

# Stop Spark
sc.stop()


Word count:
Time taken with PySpark: 5.094547986984253 seconds


In [5]:
from dask.distributed import Client
import dask.bag as db
import time

# Initialize Dask Client
client = Client()

# Start timing
start_time = time.time()

# Read the text file into a Dask Bag and perform word count
lines = db.read_text("/content/ulysses.txt")
word_counts = lines.map(str.split).flatten().frequencies(sort=True)

# Compute the results
word_counts_computed = word_counts.compute()

# Stop timing
end_time = time.time()

# Print the word counts
print(word_counts_computed)

# Print the time taken
print("Time taken with Dask: {} seconds".format(end_time - start_time))

# Shut down the Dask client
client.shutdown()


INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:34215
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:40877'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:41021'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:35353', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:35353
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:50650
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:46021', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:46021
INFO:distributed.core:Sta

Time taken with Dask: 6.579550504684448 seconds


INFO:distributed.scheduler:Scheduler closing due to unknown reason...
INFO:distributed.scheduler:Scheduler closing all comms


In [9]:
from pyspark import SparkContext, SparkConf
import time
import re

def tokenize(text):
    """Use a regular expression to tokenize the text on word boundaries."""
    return re.findall(r'\w+', text.lower())

# Initialize Spark
conf = SparkConf().setAppName('wordCount')
sc = SparkContext(conf=conf)

# Start timing
start_time = time.time()

# Read the text file and perform word count
linesRDD = sc.textFile("/content/ulysses.txt")
wordsRDD = linesRDD.flatMap(tokenize)
wordCounts = wordsRDD.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# Collect the results
word_counts_collected = wordCounts.collect()

# Stop timing
end_time = time.time()

# Print the word counts
print("\nWord count:\n", word_counts_collected)

# Print the time taken
print("Time taken with PySpark: {} seconds".format(end_time - start_time))

# Stop Spark
sc.stop()



Word count:
Time taken with PySpark: 2.3530282974243164 seconds


In [10]:
from dask.distributed import Client
import dask.bag as db
import time
import re

def tokenize(text):
    """Use a regular expression to tokenize the text on word boundaries."""
    return re.findall(r'\w+', text.lower())

# Initialize Dask Client
client = Client()

# Start timing
start_time = time.time()

# Read the text file into a Dask Bag and perform word count
lines = db.read_text("/content/ulysses.txt")
word_counts = lines.map(tokenize).flatten().frequencies(sort=True)

# Compute the results
word_counts_computed = word_counts.compute()

# Stop timing
end_time = time.time()

# Print the word counts
print(word_counts_computed)

# Print the time taken
print("Time taken with Dask: {} seconds".format(end_time - start_time))

# Shut down the Dask client
client.shutdown()


INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:43663
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:35487'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:43667'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:43781', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:43781
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:42956
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:33647', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:33647
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:42948
INFO:distributed.scheduler:Receive client connection: Client-a1639de9-7d63-11ee-81b8-0242ac1c000c

Time taken with Dask: 2.7580559253692627 seconds


INFO:distributed.scheduler:Scheduler closing due to unknown reason...
INFO:distributed.scheduler:Scheduler closing all comms
