In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init("spark-2.4.5-bin-hadoop2.7")# SPARK_HOME
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
import pyspark
type(spark)

pyspark.sql.session.SparkSession

In [0]:
sc = spark.sparkContext

In [0]:
s = '''Every person had a star, every star had a friend, and for every person
carrying a star there was someone else who reflected it, and everyone
carried this reflection like a secret confidante in the heart'''
simple_rdd = sc.parallelize(s.split('\n'))

In [37]:
simple_rdd.collect()

['Every person had a star, every star had a friend, and for every person',
 'carrying a star there was someone else who reflected it, and everyone',
 'carried this reflection like a secret confidante in the heart']

In [38]:
(simple_rdd.map(lambda line: line.split(' '))
           .collect())

[['Every',
  'person',
  'had',
  'a',
  'star,',
  'every',
  'star',
  'had',
  'a',
  'friend,',
  'and',
  'for',
  'every',
  'person'],
 ['carrying',
  'a',
  'star',
  'there',
  'was',
  'someone',
  'else',
  'who',
  'reflected',
  'it,',
  'and',
  'everyone'],
 ['carried',
  'this',
  'reflection',
  'like',
  'a',
  'secret',
  'confidante',
  'in',
  'the',
  'heart']]

In [39]:
(simple_rdd.flatMap(lambda line: line.split(' '))
           .take(10))

['Every',
 'person',
 'had',
 'a',
 'star,',
 'every',
 'star',
 'had',
 'a',
 'friend,']

In [40]:
(simple_rdd.flatMap(lambda line: line.split(' '))
           .map(lambda word: word.replace(',', '').lower())
           .take(5))

['every', 'person', 'had', 'a', 'star']

In [41]:
(simple_rdd.flatMap(lambda line: line.split(' '))
           .map(lambda word: word.replace(',', '').lower())
           .map(lambda word: (word, 1))
           .take(5))

[('every', 1), ('person', 1), ('had', 1), ('a', 1), ('star', 1)]

In [42]:
(simple_rdd.flatMap(lambda line: line.split(' '))
           .map(lambda word: word.replace(',', '').lower())
           .map(lambda word: (word, 1))
           .reduceByKey(lambda a,b: a+b)
           .collect())

[('person', 2),
 ('there', 1),
 ('was', 1),
 ('carried', 1),
 ('this', 1),
 ('like', 1),
 ('secret', 1),
 ('confidante', 1),
 ('in', 1),
 ('heart', 1),
 ('every', 3),
 ('had', 2),
 ('a', 4),
 ('star', 3),
 ('friend', 1),
 ('and', 2),
 ('for', 1),
 ('carrying', 1),
 ('someone', 1),
 ('else', 1),
 ('who', 1),
 ('reflected', 1),
 ('it', 1),
 ('everyone', 1),
 ('reflection', 1),
 ('the', 1)]

In [0]:
def count_freq(rdd):
  return (rdd.flatMap(lambda line: line.split(' '))
            .map(lambda word: word.replace(',', '').lower())
            .map(lambda word: (word, 1))
            .reduceByKey(lambda a,b: a+b)
            .collect())

In [44]:
!wget http://www.scifiscripts.com/scripts/swd1_5-74.txt

--2020-03-11 11:24:00--  http://www.scifiscripts.com/scripts/swd1_5-74.txt
Resolving www.scifiscripts.com (www.scifiscripts.com)... 207.32.177.145
Connecting to www.scifiscripts.com (www.scifiscripts.com)|207.32.177.145|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 203125 (198K) [text/plain]
Saving to: ‘swd1_5-74.txt.1’


2020-03-11 11:24:01 (294 KB/s) - ‘swd1_5-74.txt.1’ saved [203125/203125]



In [0]:
sw = sc.textFile('swd1_5-74.txt')

In [46]:
sw.take(10)

['The Star Wars',
 'by',
 'George Lucas',
 '',
 '',
 '',
 'Rough Draft [First of four major screenplay drafts]',
 'Lucasfilm Ltd.',
 '5/74',
 '']

In [47]:
count_freq(sw)[: 10]

[('george', 1),
 ('', 3543),
 ('draft', 1),
 ('of', 744),
 ('four', 25),
 ('major', 2),
 ('drafts]', 1),
 ('ltd.', 1),
 ('5/74', 1),
 ('1.', 1)]

In [49]:
import random

NUM_SAMPLES = 10**7

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = (sc.parallelize(range(0, NUM_SAMPLES))
           .filter(inside).count())

print('Pi is roughly {}'.format(4.0 * count / NUM_SAMPLES))

Pi is roughly 3.1404776
