In [1]:
import os
sparkFile = os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py')
exec(compile(open(sparkFile, "rb").read(), sparkFile, 'exec'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Python version 3.6.2 (default, Sep 21 2017 18:29:43)
SparkSession available as 'spark'.


# "Hello world"

In [2]:
sc

If PySpart was correctly installed, the variable `sc` (for SparkContent) should be initialized. `sc` is used as the driver program to manage the cluster resources. Below there is an example of π estimation using Spark.

In [3]:
import random
num_samples = 100000

def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()

pi = 4 * count / num_samples
print(pi)

3.15664


# Basic operations

## Read files

In [4]:
# read file
text = sc.textFile ("data/readme.txt")

# get first 5 lines
text. take(5)

['Title:  Bag of Words Data Set',
 '',
 'Abstract: This data set contains five text collections in the form of bags-of-words.',
 '',
 '-----------------------------------------------------\t']

In [5]:
# get first line
text. first()

'Title:  Bag of Words Data Set'

In [6]:
# get all the elements
print(text. collect())

['Title:  Bag of Words Data Set', '', 'Abstract: This data set contains five text collections in the form of bags-of-words.', '', '-----------------------------------------------------\t', '', 'Data Set Characteristics: Text', 'Number of Instances: 8000000', 'Area: N/A', 'Attribute Characteristics: Integer', 'Number of Attributes: 100000', 'Date Donated: 2008-03-12', 'Associated Tasks: Clustering', 'Missing Values? N/A', '', '-----------------------------------------------------\t\t', '', 'Source:', '', 'David Newman', "newman '@' uci.edu", 'University of California, Irvine', '', '-----------------------------------------------------\t', '', 'Data Set Information:', '', 'For each text collection, D is the number of documents, W is the', 'number of words in the vocabulary, and N is the total number of words', 'in the collection (below, NNZ is the number of nonzero counts in the', 'bag-of-words). After tokenization and removal of stopwords, the', 'vocabulary of unique words was truncated

## Transformations

In [7]:
# apply a function to every line
# return an iterable of iterables with the results
text. map(lambda line: line.split()). take(3)

[['Title:', 'Bag', 'of', 'Words', 'Data', 'Set'],
 [],
 ['Abstract:',
  'This',
  'data',
  'set',
  'contains',
  'five',
  'text',
  'collections',
  'in',
  'the',
  'form',
  'of',
  'bags-of-words.']]

In [8]:
# apply a function to every line
# return an iterable with the results
text. flatMap(lambda line: line.split()). take(10)

['Title:',
 'Bag',
 'of',
 'Words',
 'Data',
 'Set',
 'Abstract:',
 'This',
 'data',
 'set']

In [9]:
# reduce the dataset based on a condition
text. filter(lambda line: 'source' in line). collect()

['orig source: www.cs.cmu.edu/~enron',
 'orig source: books.nips.cc',
 'orig source: dailykos.com',
 'orig source: ldc.upenn.edu',
 'orig source: www.pubmed.gov']

In [10]:
# count number of elements
text. filter(lambda line: 'source' in line). count()

5

In [11]:
# sample 3 elements without replacement
text.takeSample (False, 3)

['Source:', 'University of California, Irvine', '']