# 0. Installing PySpark in Google Colab

Install Dependencies (needs to be done once each time you re-open this notebook):

1.   Java 8
2.   Apache Spark with hadoop and
3.   Findspark (used to locate the spark in the system)

In [None]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz

# unzip the spark file to the current folder
!tar xf spark-3.5.3-bin-hadoop3.tgz

# set your spark folder to your system path environment.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"

# install findspark using pip
!pip install -q findspark
import findspark
findspark.init()

# 1. Wordcount

Setup up path to file before running if you are using Google Colab

In [None]:
import sys, os
if 'google.colab' in sys.modules:
    from google.colab import drive
    drive.mount('/content/gdrive')
    # find automatically the path of the folder containing "file_name" :
    file_name = 'Lab-part1.ipynb'
    import subprocess
    path_to_file = subprocess.check_output('find . -type f -name ' + str(file_name), shell=True).decode("utf-8")
    path_to_file = path_to_file.replace(file_name,"").replace('\n',"")
    # if previous search failed or too long, comment the previous line and simply write down manually the path below :
    #path_to_file = '/content/gdrive/My Drive/CS5344_AY2425Sem2_Lab'
    print(path_to_file)

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
./gdrive/MyDrive/CS5344_Lab/CS5344_AY2425Sem2_Lab/


In [None]:
import re
from pyspark.sql import SparkSession

# initalize SparkContext
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
# read data from given input path
# you can try out using different input files
lines = sc.textFile(f'{path_to_file}/Lab-part1_in.txt')

In [None]:
# flatMap converts "list of list" into a single list by concatenating them after the mapping function
# e.g. ["First line text", "Second line text"] -> ["First", "line", "text", "Second", "line", "text"]
words = lines.flatMap(lambda l: [word for word in re.split(r'\W+', l.lower()) if word.isalpha()])

In [None]:
# pair each word with value 1
# e.g. [("First", 1), ("line", 1), ("text", 1), ("Second", 1), ("line", 1), ("text", 1)]
pairs = words.map(lambda w: (w, 1))

In [None]:
# reduceByKey group tuples by the first item and perform reduce operation on the second item
# e.g. [("First", 1), ("line", 2), ("text", 2), ("Second", 1)]
counts = pairs.reduceByKey(lambda n1, n2: n1 + n2)

In [None]:
# print out the first 10 outputs
counts.take(10)

[('are', 2),
 ('as', 8),
 ('look', 1),
 ('walk', 1),
 ('only', 1),
 ('love', 1),
 ('share', 1),
 ('people', 1),
 ('not', 1),
 ('beautiful', 2)]

In [None]:
# save data into file
# repartition(1) ensures the output are stored into a single file
counts.repartition(1).saveAsTextFile(f'{path_to_file}/Lab-part1_out')

In [None]:
# stop the SparkContext
sc.stop()