In [1]:
import pyspark

In [2]:
# a SparkSession is the entry point for every interaction with spark
# local[*] -> spark is running locally using all the thread (*)
# another option could be to configure the spark endpoint: spark://<spark-url>:7077
spark = pyspark.sql.SparkSession.builder.master('local[*]').getOrCreate()

2021-11-08 23:48:47 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# the SparkContext is the entry point for interacting with RDDs, while SparkSession is mostly used for dataframes
# but as it is possible to see, we use SparkSession even to get a sparkContext
sc = spark.sparkContext

In [None]:
# another way to create a spark context is:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName('DataMiningCourse').setMaster('local[*]')
sc = SparkContext(conf=conf)

### RDDs

In [4]:
data = [1, 2, 3, 4, 5]

In [5]:
dataRDD = sc.parallelize(data)

In [6]:
dataRDD.collect()

[Stage 0:>                                                          (0 + 8) / 8]                                                                                

[1, 2, 3, 4, 5]

In [8]:
# These entries return exactly the same output

def filterOperation(number: float) -> bool:
    return number < 4

print(dataRDD.filter(filterOperation).collect())


print(dataRDD.filter(lambda x: filterOperation(x)).collect())

print(dataRDD.filter(lambda x: x < 4).collect())

# Here I use collect to perform an action
# (remember lazy-evaluation and trasformations that are only triggered through an action)
# and to print the output

[1, 2, 3]
[1, 2, 3]
[1, 2, 3]


In [23]:
# This allows to download web pages
import requests
# This allows the usage of add, which is a shortcut for 'lambda x, y: x + y'
from operator import add

# This is just a way for having a list of lines from a file on the web.
# Let's remember that spark cannot process files from the web by itself.
# This is done only as an example, it cannot be done with huge files.
# Otherwise the machine may have hard times in processing it.
file_content = requests.get('https://raw.githubusercontent.com/forons/BigDataExamples/master/files/inferno.txt')
lines = sc.parallelize(file_content.text.splitlines())
lines.collect()

["\ufeffProject Gutenberg's The Vision of Hell, Complete, by Dante Alighieri",
 'Translated By The Rev. H. F. Cary, Illustrated by Gustave Dore',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.net',
 '',
 '',
 'Title: The Vision of Hell, Complete, Translated By The Rev. H. F. Cary,',
 '       Illustrated by Gustave Dore',
 '       The Inferno',
 '',
 'Author: Dante Alighieri',
 '',
 'Release Date: August 7, 2004 [EBook #8789]',
 '',
 'Language: English',
 '',
 '',
 '*** START OF THIS PROJECT GUTENBERG EBOOK THE VISION OF HELL, COMPLETE ***',
 '',
 '',
 '',
 '',
 'Produced by David Widger',
 '',
 '',
 '',
 '',
 '',
 'THE VISION',
 '',
 'OF',
 '',
 'HELL, PURGATORY, AND PARADISE',
 '',
 'BY',
 '',
 'DANTE ALIGHIERI',
 '',
 '',
 '',
 'TRANSLATED BY',
 '',
 'THE REV. H. F. CAR

In [24]:
lines.map(lambda x: x.split(' ')).collect()

[['\ufeffProject',
  "Gutenberg's",
  'The',
  'Vision',
  'of',
  'Hell,',
  'Complete,',
  'by',
  'Dante',
  'Alighieri'],
 ['Translated',
  'By',
  'The',
  'Rev.',
  'H.',
  'F.',
  'Cary,',
  'Illustrated',
  'by',
  'Gustave',
  'Dore'],
 [''],
 ['This',
  'eBook',
  'is',
  'for',
  'the',
  'use',
  'of',
  'anyone',
  'anywhere',
  'at',
  'no',
  'cost',
  'and',
  'with'],
 ['almost',
  'no',
  'restrictions',
  'whatsoever.',
  '',
  'You',
  'may',
  'copy',
  'it,',
  'give',
  'it',
  'away',
  'or'],
 ['re-use',
  'it',
  'under',
  'the',
  'terms',
  'of',
  'the',
  'Project',
  'Gutenberg',
  'License',
  'included'],
 ['with', 'this', 'eBook', 'or', 'online', 'at', 'www.gutenberg.net'],
 [''],
 [''],
 ['Title:',
  'The',
  'Vision',
  'of',
  'Hell,',
  'Complete,',
  'Translated',
  'By',
  'The',
  'Rev.',
  'H.',
  'F.',
  'Cary,'],
 ['', '', '', '', '', '', '', 'Illustrated', 'by', 'Gustave', 'Dore'],
 ['', '', '', '', '', '', '', 'The', 'Inferno'],
 [''],
 ['

In [25]:
lines.flatMap(lambda x: x.split(' ')).collect()

['\ufeffProject',
 "Gutenberg's",
 'The',
 'Vision',
 'of',
 'Hell,',
 'Complete,',
 'by',
 'Dante',
 'Alighieri',
 'Translated',
 'By',
 'The',
 'Rev.',
 'H.',
 'F.',
 'Cary,',
 'Illustrated',
 'by',
 'Gustave',
 'Dore',
 '',
 'This',
 'eBook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'at',
 'no',
 'cost',
 'and',
 'with',
 'almost',
 'no',
 'restrictions',
 'whatsoever.',
 '',
 'You',
 'may',
 'copy',
 'it,',
 'give',
 'it',
 'away',
 'or',
 're-use',
 'it',
 'under',
 'the',
 'terms',
 'of',
 'the',
 'Project',
 'Gutenberg',
 'License',
 'included',
 'with',
 'this',
 'eBook',
 'or',
 'online',
 'at',
 'www.gutenberg.net',
 '',
 '',
 'Title:',
 'The',
 'Vision',
 'of',
 'Hell,',
 'Complete,',
 'Translated',
 'By',
 'The',
 'Rev.',
 'H.',
 'F.',
 'Cary,',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 'Illustrated',
 'by',
 'Gustave',
 'Dore',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 'The',
 'Inferno',
 '',
 'Author:',
 'Dante',
 'Alighieri',
 '',
 'Release',
 'Date:',
 'August

In [26]:
# Here we split on the whitespace each element of the 'file' rdd, which is a line of the file we read, into words
words = lines.flatMap(lambda x: x.split(' '))

# We map each word to a tuple with the word itself and a counter initialized to one
word_pairs = words.map(lambda x: (x, 1))

# We group the elements with the same key (the word in our case) and sum the counters
word_count = word_pairs.reduceByKey(add)

# It creates a folder, were each element of the folder is created by a partition
# that saves only its data in order to not collect everything into the driver
# and save time and resources.
word_count.saveAsTextFile('./output/')

In [27]:
# x[0] access the key, and x[1] access to the value of the pair/tuple
word_count.sortBy(lambda x: x[1], ascending=False).collect()



[('the', 1664),
 ('', 1593),
 ('and', 763),
 ('I', 675),
 ('to', 669),
 ('of', 659),
 ('that', 486),
 ('in', 481),
 ('with', 364),
 ('his', 344),
 ('a', 339),
 ('The', 285),
 ('my', 282),
 ('he', 266),
 ('And', 263),
 ('thou', 262),
 ('on', 245),
 ('from', 237),
 ('as', 227),
 ('who', 214),
 ('not', 205),
 ('so', 194),
 ('me', 190),
 ('this', 175),
 ('is', 173),
 ('for', 165),
 ('by', 161),
 ('all', 155),
 ('That', 150),
 ('was', 147),
 ('Of', 146),
 ('their', 144),
 ('at', 144),
 ('thy', 142),
 ('To', 139),
 ('him', 135),
 ('they', 129),
 ('or', 127),
 ('we', 120),
 ('one', 119),
 ('it', 118),
 ('thus', 116),
 ('He', 110),
 ('when', 104),
 ('but', 104),
 ('be', 102),
 ('which', 98),
 ('other', 95),
 ('But', 95),
 ('if', 95),
 ('As', 93),
 ('had', 91),
 ('now', 87),
 ('them', 87),
 ('were', 87),
 ('her', 83),
 ('are', 82),
 ('thee', 82),
 ('its', 82),
 ('Who', 81),
 ('have', 81),
 ('A', 80),
 ('then', 79),
 ('more', 79),
 ('our', 78),
 ('Project', 77),
 ('such', 76),
 ('In', 75),
 ('th