In [1]:
#Install Java Runtime 8
import os       #importing os to set environment variable
!apt-get remove -y java*
def install_java():
  !apt-get install -y openjdk-8-jdk-headless -qq > /dev/null      #install openjdk
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"     #set environment variable
  !java -version       #check java version
install_java()

Reading package lists... Done
Building dependency tree       
Reading state information... Done
Note, selecting 'java-propose-classpath' for glob 'java*'
Note, selecting 'java2-sdk' for glob 'java*'
Note, selecting 'java12-sdk-headless' for glob 'java*'
Note, selecting 'java2html' for glob 'java*'
Note, selecting 'java9-sdk-headless' for glob 'java*'
Note, selecting 'java-gcj-compat' for glob 'java*'
Note, selecting 'javascript-common' for glob 'java*'
Note, selecting 'java11-runtime' for glob 'java*'
Note, selecting 'java17-sdk' for glob 'java*'
Note, selecting 'javacc4' for glob 'java*'
Note, selecting 'java3ds-fileloader' for glob 'java*'
Note, selecting 'java10-runtime' for glob 'java*'
Note, selecting 'javahelp2-doc' for glob 'java*'
Note, selecting 'java-sdk-headless' for glob 'java*'
Note, selecting 'java15-sdk' for glob 'java*'
Note, selecting 'java-sdk' for glob 'java*'
Note, selecting 'java6-sdk-headless' for glob 'java*'
Note, selecting 'java13-sdk' for glob 'java*'
Note, se

In [3]:
#Initialize Spark Runtime Environment
!export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/jre
!pip install pyspark
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import SQLContext

spark = SparkSession.builder \
   .master("local") \
   .appName("Wordcount & Friends Tutorial") \
   .config("spark.executor.memory", "2gb") \
   .getOrCreate()
   
sc = spark.sparkContext
sqlContext = SQLContext(sc)

sc.version

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


'3.3.1'

PART-1:  "WORDCOUNT" EXAMPLE USING MAPREDUCE

In [4]:
# Fetch the text file for wordcount example
!wget https://raw.githubusercontent.com/ibarabasi/wordcount/master/wordcount
!cat wordcount

--2022-12-22 01:18:38--  https://raw.githubusercontent.com/ibarabasi/wordcount/master/wordcount
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.110.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 710 [text/plain]
Saving to: ‘wordcount’


2022-12-22 01:18:39 (24.7 MB/s) - ‘wordcount’ saved [710/710]

Big data refers to the massive amount of data which cannot be stored, processed and analyzed using traditional ways.
The main elements of big data are:
Volume - There is a massive amount of data generated every second.
Velocity - The speed at which data is generated, collected and analyzed
Variety - The different types of data: structured, semi-structured, unstructured
Value - The ability to turn data into useful insights for your business
Veracity - Trustworthiness in terms of quality and accuracy
The main challe

In [5]:
#Simple example to read text file
rdd0 = sc.textFile("wordcount")
rdd0.take(20)

['Big data refers to the massive amount of data which cannot be stored, processed and analyzed using traditional ways.',
 'The main elements of big data are:',
 'Volume - There is a massive amount of data generated every second.',
 'Velocity - The speed at which data is generated, collected and analyzed',
 'Variety - The different types of data: structured, semi-structured, unstructured',
 'Value - The ability to turn data into useful insights for your business',
 'Veracity - Trustworthiness in terms of quality and accuracy',
 'The main challenges that big data faced and the solutions for each are listed below:',
 'Single central storage',
 'Serial processing',
 'One input',
 'One Output',
 'One Processor',
 'Lack of ability to process unstructured data']

In [6]:
word_counts = rdd0.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: (a + b)).map(lambda x:(x[1],x[0]))

word_counts.take(30)


[(1, 'Big'),
 (8, 'data'),
 (1, 'refers'),
 (3, 'to'),
 (2, 'the'),
 (2, 'massive'),
 (2, 'amount'),
 (6, 'of'),
 (2, 'which'),
 (1, 'cannot'),
 (1, 'be'),
 (1, 'stored,'),
 (1, 'processed'),
 (4, 'and'),
 (2, 'analyzed'),
 (1, 'using'),
 (1, 'traditional'),
 (1, 'ways.'),
 (5, 'The'),
 (2, 'main'),
 (1, 'elements'),
 (2, 'big'),
 (1, 'are:'),
 (1, 'Volume'),
 (5, '-'),
 (1, 'There'),
 (2, 'is'),
 (1, 'a'),
 (1, 'generated'),
 (1, 'every')]

PART-2:  "COMMON FRIENDS" EXAMPLE USINg MAPREDUCE

In [7]:
# Load data from github
!wget "https://raw.githubusercontent.com/ibarabasi/wordcount/master/friends"
rdd = sc.textFile("friends")
!cat friends

--2022-12-22 01:18:45--  https://raw.githubusercontent.com/ibarabasi/wordcount/master/friends
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.110.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 193 [text/plain]
Saving to: ‘friends’


2022-12-22 01:18:45 (6.92 MB/s) - ‘friends’ saved [193/193]

me Alice
Henry me
Henry Alice
me Jane
Alice John
Jane John
Judy Alice
me Mary
Mary Joyce
Joyce Henry
Judy me
Judy Jane
John Carol
Carol me
Mary Henry
Louise Ronald
Ronald Thomas
William Thomas


In [8]:
# Print the RDD content
rdd.take(20)

['me Alice',
 'Henry me',
 'Henry Alice',
 'me Jane',
 'Alice John',
 'Jane John',
 'Judy Alice',
 'me Mary',
 'Mary Joyce',
 'Joyce Henry',
 'Judy me',
 'Judy Jane',
 'John Carol',
 'Carol me',
 'Mary Henry',
 'Louise Ronald',
 'Ronald Thomas',
 'William Thomas']

In [9]:
rdd1=rdd.map(lambda x: x.split()).union(rdd.map(lambda x: x.split()[::-1]))
# Bring my friend list to local
lst = rdd1.filter(lambda x: x[0] == 'me').map(lambda x: x[1]).collect()
# Build the second pair RDD
rdd2 = rdd1.filter(lambda x: x[0] in lst).map(lambda x: x[1]). \
    filter(lambda x: x != 'me' and x not in lst). \
    map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b). \
    map(lambda x: (x[1], x[0])).sortByKey(ascending = False)
# Bring the result to local since the sample is small
for x, y in rdd2.collect():
   print ("The stranger {} has {} common friends with me".format(y, x))


The stranger John has 3 common friends with me
The stranger Joyce has 2 common friends with me
