In [1]:
from pyspark import SparkContext, SparkConf

In [2]:
sc = SparkContext.getOrCreate()
sc.stop()
conf = SparkConf().set("spark.executor.cores", "2").set("spark.executor.memory", "4g")
sc = SparkContext(conf=conf)

In [3]:
conf.getAll()

[('spark.rdd.compress', 'True'),
 ('spark.executor.memory', '4g'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.executor.extraJavaOptions',
  '-Dio.netty.tryReflectionSetAccessible=true'),
 ('spark.executor.cores', '2'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.driver.extraJavaOptions',
  '-Dio.netty.tryReflectionSetAccessible=true')]

# PySpark Constants

In [4]:
N_PARTITION = 10

# A simple Monte-Carlo method

In [5]:
n_samples = 100000000
index_arr = [(n_samples // N_PARTITION) for i in range(N_PARTITION)]

In [6]:
import numpy as np

def sample_func(n_samples):
    return np.random.uniform(low=-1.0, high=1.0, size=(n_samples, 2))

def count_inside_circles(samples):
    print(samples)
    x = samples[:,0]
    y = samples[:,1]
    
    inside_circles = (x**2+y**2) < 1.0
    n_inside = inside_circles.sum()
    return [('inside', n_inside), ('outside', samples.shape[0] - n_inside)]

def sum_all_counts(n_inside_circles):
    return n_inside_circles.sum()

def compute_percentage(*args):
    s = 0
    
    for result in args:
        if result[0]=='inside':
            n_inside = result[1]
        s += result[1]
    
    return 4.0*n_inside/s

In [7]:
samples = sc.parallelize(c=index_arr).repartition(N_PARTITION)

In [8]:
samples.map(sample_func).flatMap(count_inside_circles).reduceByKey(
    lambda x, y: x+y
).reduce(compute_percentage)

3.14152088

# Stop SparkContext

In [9]:
sc.stop()