<a href="https://colab.research.google.com/github/domvwt/uol-ds-tools/blob/main/pyspark-utils/pyspark_for_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark Setup for Colab

## Prepare Environment

In [1]:
%%shell

SPARK_VERSION="3.1.1"
HADOOP_VERSION="3.2"

echo "Preparing Spark requirements..."
if ! test -d "spark"; then
  echo "Updating system..." && \
  sudo apt-get -yqq update && 2>&1 > /dev/null \
  sudo apt-get -yqq install openjdk-8-jre 2>&1 > /dev/null && \
  echo "Downloading Spark..." && \
  wget --quiet https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz && \
  tar -xf spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz && \
  echo "Installing Python libs..." && \
  pip install -Uqq pyspark==${SPARK_VERSION} findspark && \
  mv spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} spark && \
  rm -rf spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz && \
  echo "Complete."
else
  echo "Requirements already satisfied."
fi

Preparing Spark requirements...
Updating system...
Downloading Spark...
Installing Python libs...
[K     |████████████████████████████████| 212.3MB 63kB/s 
[K     |████████████████████████████████| 204kB 43.5MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Complete.




## Python Setup

In [2]:
import os
import pyspark
import findspark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark"
findspark.init()
try:
  sc = pyspark.SparkContext().getOrCreate()
except:
  pass

## Test PySpark

In [3]:
# Download test file
!wget https://www.gutenberg.org/files/84/84-0.txt -O frankenstein.txt

--2021-04-22 14:02:31--  https://www.gutenberg.org/files/84/84-0.txt
Resolving www.gutenberg.org (www.gutenberg.org)... 152.19.134.47, 2610:28:3090:3000:0:bad:cafe:47
Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 448821 (438K) [text/plain]
Saving to: ‘frankenstein.txt’


2021-04-22 14:02:32 (1.27 MB/s) - ‘frankenstein.txt’ saved [448821/448821]



In [4]:
import re
from pathlib import Path


INPUT_TXT = "frankenstein.txt"


myfile = Path(INPUT_TXT).absolute()
rdd_txt = sc.textFile(f"file:///{myfile}")

# Simple word counts splitting on whitespace
counts = (
    rdd_txt.flatMap(lambda line: line.split())
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
    .map(lambda a: (a[1], a[0]))
)

res1 = counts.collect()[:20]
for i in res1:
    print(i)
print()


(268, 'The')
(79, 'Project')
(2746, 'of')
(3, 'Mary')
(3, 'Wollstonecraft')
(3, '(Godwin)')
(3, 'Shelley')
(318, 'is')
(18, 'use')
(7, 'anyone')
(2, 'anywhere')
(1129, 'in')
(15, 'United')
(7, 'States')
(85, 'other')
(25, 'world')
(302, 'at')
(154, 'no')
(2, 'restrictions')
(2, 'whatsoever.')

