In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
import findspark
findspark.init("spark-3.1.1-bin-hadoop3.2")# SPARK_HOME
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
import pyspark
type(spark)

pyspark.sql.session.SparkSession

In [5]:
sc = spark.sparkContext

In [6]:
s = '''Every person had a star, every star had a friend, and for every person
carrying a star there was someone else who reflected it, and everyone
carried this reflection like a secret confidante in the heart'''
simple_rdd = sc.parallelize(s.split('\n'))

In [7]:
simple_rdd.collect()

['Every person had a star, every star had a friend, and for every person',
 'carrying a star there was someone else who reflected it, and everyone',
 'carried this reflection like a secret confidante in the heart']

In [8]:
(simple_rdd.map(lambda line: line.split(' '))
           .collect())

[['Every',
  'person',
  'had',
  'a',
  'star,',
  'every',
  'star',
  'had',
  'a',
  'friend,',
  'and',
  'for',
  'every',
  'person'],
 ['carrying',
  'a',
  'star',
  'there',
  'was',
  'someone',
  'else',
  'who',
  'reflected',
  'it,',
  'and',
  'everyone'],
 ['carried',
  'this',
  'reflection',
  'like',
  'a',
  'secret',
  'confidante',
  'in',
  'the',
  'heart']]

In [9]:
(simple_rdd.flatMap(lambda line: line.split(' '))
           .take(10))

['Every',
 'person',
 'had',
 'a',
 'star,',
 'every',
 'star',
 'had',
 'a',
 'friend,']

In [10]:
(simple_rdd.flatMap(lambda line: line.split(' '))
           .map(lambda word: word.replace(',', '').lower())
           .take(5))

['every', 'person', 'had', 'a', 'star']

In [11]:
(simple_rdd.flatMap(lambda line: line.split(' '))
           .map(lambda word: word.replace(',', '').lower())
           .map(lambda word: (word, 1))
           .take(5))

[('every', 1), ('person', 1), ('had', 1), ('a', 1), ('star', 1)]

In [12]:
(simple_rdd.flatMap(lambda line: line.split(' '))
           .map(lambda word: word.replace(',', '').lower())
           .map(lambda word: (word, 1))
           .reduceByKey(lambda a,b: a+b)
           .collect())

[('person', 2),
 ('there', 1),
 ('was', 1),
 ('carried', 1),
 ('this', 1),
 ('like', 1),
 ('secret', 1),
 ('confidante', 1),
 ('in', 1),
 ('heart', 1),
 ('every', 3),
 ('had', 2),
 ('a', 4),
 ('star', 3),
 ('friend', 1),
 ('and', 2),
 ('for', 1),
 ('carrying', 1),
 ('someone', 1),
 ('else', 1),
 ('who', 1),
 ('reflected', 1),
 ('it', 1),
 ('everyone', 1),
 ('reflection', 1),
 ('the', 1)]

In [13]:
def count_freq(rdd):
  return (rdd.flatMap(lambda line: line.split(' '))
            .map(lambda word: word.replace(',', '').lower())
            .map(lambda word: (word, 1))
            .reduceByKey(lambda a,b: a+b)
            .collect())

In [14]:
!wget http://www.scifiscripts.com/scripts/swd1_5-74.txt

--2024-03-13 17:07:43--  http://www.scifiscripts.com/scripts/swd1_5-74.txt
Resolving www.scifiscripts.com (www.scifiscripts.com)... 207.32.177.145
Connecting to www.scifiscripts.com (www.scifiscripts.com)|207.32.177.145|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 203125 (198K) [text/plain]
Saving to: ‘swd1_5-74.txt’


2024-03-13 17:07:44 (844 KB/s) - ‘swd1_5-74.txt’ saved [203125/203125]



In [15]:
sw = sc.textFile('swd1_5-74.txt')

In [16]:
sw.take(10)

['The Star Wars',
 'by',
 'George Lucas',
 '',
 '',
 '',
 'Rough Draft [First of four major screenplay drafts]',
 'Lucasfilm Ltd.',
 '5/74',
 '']

In [17]:
count_freq(sw)[: 10]

[('george', 1),
 ('', 3543),
 ('draft', 1),
 ('of', 744),
 ('four', 25),
 ('major', 2),
 ('drafts]', 1),
 ('ltd.', 1),
 ('5/74', 1),
 ('1.', 1)]

In [18]:
import random

NUM_SAMPLES = 10**7

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = (sc.parallelize(range(0, NUM_SAMPLES))
           .filter(inside).count())

print('Pi is roughly {}'.format(4.0 * count / NUM_SAMPLES))

Pi is roughly 3.1418864


In [19]:
!wget https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv

--2024-03-13 17:07:55--  https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv
Resolving jacobceles.github.io (jacobceles.github.io)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jacobceles.github.io (jacobceles.github.io)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/cars.csv [following]
--2024-03-13 17:07:55--  https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/cars.csv
Resolving jacobcelestine.com (jacobcelestine.com)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jacobcelestine.com (jacobcelestine.com)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 22608 (22K) [text/csv]
Saving to: ‘cars.csv’


2024-03-13 17:07:55 (117 MB/s) - ‘cars.csv’ saved [22608/22608]



In [20]:
cars = spark.read.csv('cars.csv', header=True, sep=";")
cars.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0| 3449.|        10.5|   70|    US|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
only showing top 5 rows



In [21]:
cars.select('Cylinders').rdd.map(lambda c: (c, 1)).reduceByKey(lambda a, b: a+b).collect()

[(Row(Cylinders='8'), 108),
 (Row(Cylinders='4'), 207),
 (Row(Cylinders='6'), 84),
 (Row(Cylinders='3'), 4),
 (Row(Cylinders='5'), 3)]