<a href="https://colab.research.google.com/github/NichaB/parkify-app/blob/main/des431s24_ch03_PySpark_(1).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark Installation and Initialization



1. Load ipynb from Google Classroom and save it in your Google Drive

2. Open colab.research.google.com

3. Open notebook and choose this file.

## Install JDK

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

## Install Spark and findspark

In [None]:
!curl -O https://dlcdn.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  382M  100  382M    0     0  21.6M      0  0:00:17  0:00:17 --:--:-- 86.4M


In [None]:
!tar zxf spark-3.5.4-bin-hadoop3.tgz
!pip install -q findspark

## Mount Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Initialize PySpark

In [None]:
import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.4-bin-hadoop3"

findspark.init()

# Spark Core

In [None]:
from pyspark import SparkContext

In [None]:
sc = SparkContext.getOrCreate()

## Example 1: Word Count

In [None]:
rdd1 = sc.textFile('/content/drive/MyDrive/des431s24/ch03/data/wordcount.txt')

In [None]:
def line2word(line):
    return line.split()

def wordpair(w):
    return (w.lower(), 1)

def add(a, b):
    return a + b

words = rdd1.flatMap(line2word).map(wordpair).reduceByKey(add)
words.take(3)
# rdd1.flatMap(line2word).map(wordpair).take(5)

Py4JJavaError: An error occurred while calling o31.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/content/drive/MyDrive/des431s24/ch03/data/wordcount.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:210)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Input path does not exist: file:/content/drive/MyDrive/des431s24/ch03/data/wordcount.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	... 25 more


In [None]:
words = rdd1.flatMap(lambda line: line.split()) \
            .map(lambda w: (w.lower(), 1)) \
            .reduceByKey(lambda a, b: a+b)

In [None]:
words.sortBy(lambda x: -x[1]).take(10)

[('the', 43),
 ('word', 28),
 ('a', 28),
 ('of', 25),
 ('and', 23),
 ('words', 21),
 ('to', 19),
 ('is', 19),
 ('in', 15),
 ('as', 11)]

## Example 2: Find the average of all integers

In [None]:
rdd2 = sc.textFile('/content/drive/My Drive/des431s24/ch03/data/integers.txt')

In [None]:
integers = rdd2.flatMap(lambda line: line.split()) \
               .map(lambda x: (1, int(x))) \
               .reduce(lambda a, b: (a[0]+b[0], a[1]+b[1]))

In [None]:
avg = integers[1] / integers[0]
avg

150.91163

In [None]:
avg = rdd2.flatMap(lambda line: line.split()) \
          .map(lambda x: int(x)) \
          .mean()
avg

150.91163000000034

## Example 3: Find the average of integers < 100

In [None]:
integers = rdd2.flatMap(lambda line: line.split()) \
               .map(lambda x: int(x))

In [None]:
integers.take(10)

[106, 78, 201, 256, 269, 19, 169, 17, 212, 96]

In [None]:
integers = integers.filter(lambda x: x < 100)

In [None]:
integers.take(10)

[78, 19, 17, 96, 74, 20, 67, 17, 21, 73]

In [None]:
integers.mean()

50.180088468763365

## Exercises

### Exercise 1: Find the word in RDD1 with the highest frequency

### Exercise 2: Find the longest line in RDD1

### Exercise 3: Find all lines in RDD2 with a value 100

### Exercise 4: Find the line in RDD2 containing the highest number of integers.