In [1]:
import pyspark
import findspark
from pyspark.sql import SparkSession

findspark.init()
spark = SparkSession.builder.appName("app").config('spark.ui.port', '4050').getOrCreate()

In [2]:
spark.sparkContext.uiWebUrl

'http://DESKTOP-P5LT01P:4050'

In [3]:
paintings = ['Mona Lisa', 'Girl with a Pearl Earring', 'The Last Supper', 'The Scream', 'Creation of Adam', 'The Third of May',
              'Olympia', 'School of Athens', 'The Arnolfini Marriage', 'The Birth of Venus', 'Sistine Chapel Ceiling',
              'Portrait of Madame Recamier', 'Massacre of the Innocents', 'Portrait of Dora Maar', 'Dogs Playing Poker', 'Primavera',
              'The Sleeping Gypsy', 'Napoleon crossing the Alps', 'The Liberty leading the people', 'The Grand Odalisque',
              'Les Demoiselles d’Avignon', 'American Gothic', 'Cafe Terrace at Night', 'The Son of Man', 'Bal du Moulin de la Galette',
              'Whistler’s Mother', 'Portrait de L’artiste Sans Barbe', 'The Kiss', 'The Flower Carrier', 'The Gleaners', 'The Swing',
              'The Dance', 'The Tower of Babel', 'View of Toledo', 'The Triumph of Galatea', 'Impression, Sunrise', 'A Sunday Afternoon',
              'Three Musicians', 'Las Meninas', 'Landscape with the Fall of Icarus', 'Water Lilies', 'No. 5, 1948', 'Luncheon on the Boating Party',
              'The Persistence of Memory', 'Night Watch', 'Guernica', 'Beheading of Saint John', 'Starry Night', 'Royal Red and Blue', 'Composition']

# turn list into RDD

In [4]:
paintings_rdd = spark.sparkContext.parallelize(paintings)

# finding twentieth element

In [5]:
zip_words = paintings_rdd.zipWithIndex()
elem = zip_words.filter(lambda x: x[1]==19).map(lambda x: x[0]).collect()
print(elem)

['The Grand Odalisque']


# Converting lowercase to uppercase

In [6]:
upper_rdd = paintings_rdd.map(lambda x: x.upper()).collect()
print(upper_rdd)

['MONA LISA', 'GIRL WITH A PEARL EARRING', 'THE LAST SUPPER', 'THE SCREAM', 'CREATION OF ADAM', 'THE THIRD OF MAY', 'OLYMPIA', 'SCHOOL OF ATHENS', 'THE ARNOLFINI MARRIAGE', 'THE BIRTH OF VENUS', 'SISTINE CHAPEL CEILING', 'PORTRAIT OF MADAME RECAMIER', 'MASSACRE OF THE INNOCENTS', 'PORTRAIT OF DORA MAAR', 'DOGS PLAYING POKER', 'PRIMAVERA', 'THE SLEEPING GYPSY', 'NAPOLEON CROSSING THE ALPS', 'THE LIBERTY LEADING THE PEOPLE', 'THE GRAND ODALISQUE', 'LES DEMOISELLES D’AVIGNON', 'AMERICAN GOTHIC', 'CAFE TERRACE AT NIGHT', 'THE SON OF MAN', 'BAL DU MOULIN DE LA GALETTE', 'WHISTLER’S MOTHER', 'PORTRAIT DE L’ARTISTE SANS BARBE', 'THE KISS', 'THE FLOWER CARRIER', 'THE GLEANERS', 'THE SWING', 'THE DANCE', 'THE TOWER OF BABEL', 'VIEW OF TOLEDO', 'THE TRIUMPH OF GALATEA', 'IMPRESSION, SUNRISE', 'A SUNDAY AFTERNOON', 'THREE MUSICIANS', 'LAS MENINAS', 'LANDSCAPE WITH THE FALL OF ICARUS', 'WATER LILIES', 'NO. 5, 1948', 'LUNCHEON ON THE BOATING PARTY', 'THE PERSISTENCE OF MEMORY', 'NIGHT WATCH', 'GUER

# Group by first character (sorted)

In [7]:
result = paintings_rdd.groupBy(lambda x: x[0]).map(lambda x:tuple([x[0],sorted(x[1])])).sortBy(lambda x: x[0]).collect()
result

[('A', ['A Sunday Afternoon', 'American Gothic']),
 ('B', ['Bal du Moulin de la Galette', 'Beheading of Saint John']),
 ('C', ['Cafe Terrace at Night', 'Composition', 'Creation of Adam']),
 ('D', ['Dogs Playing Poker']),
 ('G', ['Girl with a Pearl Earring', 'Guernica']),
 ('I', ['Impression, Sunrise']),
 ('L',
  ['Landscape with the Fall of Icarus',
   'Las Meninas',
   'Les Demoiselles d’Avignon',
   'Luncheon on the Boating Party']),
 ('M', ['Massacre of the Innocents', 'Mona Lisa']),
 ('N', ['Napoleon crossing the Alps', 'Night Watch', 'No. 5, 1948']),
 ('O', ['Olympia']),
 ('P',
  ['Portrait de L’artiste Sans Barbe',
   'Portrait of Dora Maar',
   'Portrait of Madame Recamier',
   'Primavera']),
 ('R', ['Royal Red and Blue']),
 ('S', ['School of Athens', 'Sistine Chapel Ceiling', 'Starry Night']),
 ('T',
  ['The Arnolfini Marriage',
   'The Birth of Venus',
   'The Dance',
   'The Flower Carrier',
   'The Gleaners',
   'The Grand Odalisque',
   'The Kiss',
   'The Last Supper',
   

# Group by first character (without sorting)

In [8]:
result = paintings_rdd.groupBy(lambda x: x[0]).map(lambda x:(x[0],list(x[1]))).collect()
result

[('O', ['Olympia']),
 ('N', ['Napoleon crossing the Alps', 'No. 5, 1948', 'Night Watch']),
 ('L',
  ['Les Demoiselles d’Avignon',
   'Las Meninas',
   'Landscape with the Fall of Icarus',
   'Luncheon on the Boating Party']),
 ('M', ['Mona Lisa', 'Massacre of the Innocents']),
 ('T',
  ['The Last Supper',
   'The Scream',
   'The Third of May',
   'The Arnolfini Marriage',
   'The Birth of Venus',
   'The Sleeping Gypsy',
   'The Liberty leading the people',
   'The Grand Odalisque',
   'The Son of Man',
   'The Kiss',
   'The Flower Carrier',
   'The Gleaners',
   'The Swing',
   'The Dance',
   'The Tower of Babel',
   'The Triumph of Galatea',
   'Three Musicians',
   'The Persistence of Memory']),
 ('A', ['American Gothic', 'A Sunday Afternoon']),
 ('B', ['Bal du Moulin de la Galette', 'Beheading of Saint John']),
 ('C', ['Creation of Adam', 'Cafe Terrace at Night', 'Composition']),
 ('S', ['School of Athens', 'Sistine Chapel Ceiling', 'Starry Night']),
 ('W', ['Whistler’s Mother',

# map reduce

In [9]:
text_file = spark.sparkContext.textFile("van_gogh.txt")
counts = text_file.flatMap(lambda line: line.split(" ")) \
                            .map(lambda word: (word, 1)) \
                           .reduceByKey(lambda x, y: x + y)
output = counts.collect()
output

[('', 251),
 ('Vincent', 61),
 ('Gogh', 212),
 ('Wikipedia,', 1),
 ('free', 2),
 ('search', 1),
 ('Gogh"', 1),
 ('redirects', 1),
 ('here.', 1),
 ('other', 13),
 ('this', 27),
 ('surname', 1),
 ('is', 71),
 ('Gogh.', 5),
 ('head', 5),
 ('shoulders', 1),
 ('portrait', 19),
 ('of', 479),
 ('thirty', 1),
 ('something', 3),
 ('1887,', 6),
 ('Art', 10),
 ('Institute', 3),
 ('Chicago', 2),
 ('Willem', 8),
 ('March', 17),
 ('Netherlands', 5),
 ('Died\t29', 1),
 ('July', 11),
 ('1890', 7),
 ('(aged', 1),
 ('37)', 1),
 ('France', 4),
 ('Resting', 1),
 ("d'Auvers-sur-Oise,", 1),
 ('Nationality\tDutch', 1),
 ('Mauve', 8),
 ('for\tPainting,', 1),
 ('drawing', 12),
 ('life,', 3),
 ('portraits', 11),
 ('landscapes', 5),
 ('Notable', 1),
 ('work', 29),
 ('The', 115),
 ('Potato', 5),
 ('Eaters', 3),
 ('(1885)', 1),
 ('(1887)', 2),
 ('in', 400),
 ('Arles', 22),
 ('Starry', 12),
 ('Night', 12),
 ('(1890)', 5),
 ('Wheatfield', 8),
 ('Church', 3),
 ('at', 98),
 ('Family\tTheodorus', 1),
 ('(brother)', 1),

# map reduce with sorting

In [10]:
import string

# read the text file
file_content = open('van_gogh.txt', "r", encoding='utf-8-sig').read()

new_text = file_content.translate(str.maketrans('', '', string.punctuation))

tokens = new_text.split()

#turn into rdd
text_rdd = spark.sparkContext.parallelize(tokens)

wordCounts = text_rdd.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b).sortBy(lambda x: -x[1]).collect()
wordCounts

[('the', 643),
 ('and', 494),
 ('of', 479),
 ('a', 429),
 ('in', 403),
 ('to', 326),
 ('Van', 257),
 ('Gogh', 235),
 ('his', 235),
 ('he', 186),
 ('with', 180),
 ('was', 150),
 ('The', 118),
 ('as', 106),
 ('at', 98),
 ('that', 97),
 ('on', 95),
 ('by', 79),
 ('from', 78),
 ('In', 78),
 ('is', 72),
 ('He', 72),
 ('for', 71),
 ('Vincent', 64),
 ('Goghs', 63),
 ('are', 60),
 ('A', 59),
 ('an', 58),
 ('van', 57),
 ('had', 56),
 ('Museum', 54),
 ('paintings', 54),
 ('1888', 50),
 ('which', 47),
 ('painting', 46),
 ('left', 44),
 ('Theo', 43),
 ('1890', 41),
 ('painted', 40),
 ('him', 39),
 ('Arles', 39),
 ('two', 37),
 ('Paris', 37),
 ('work', 36),
 ('were', 36),
 ('1889', 34),
 ('after', 34),
 ('but', 32),
 ('Gauguin', 32),
 ('have', 32),
 ('art', 32),
 ('or', 32),
 ('be', 31),
 ('it', 30),
 ('His', 29),
 ('this', 29),
 ('her', 29),
 ('also', 29),
 ('who', 28),
 ('–', 27),
 ('works', 26),
 ('not', 26),
 ('where', 25),
 ('time', 25),
 ('trees', 25),
 ('large', 25),
 ('been', 25),
 ('wrote'

# example of question 1

In [11]:

mylist = [1,2,3,4,5,6,7,8,9,10]

first_rdd = spark.sparkContext.parallelize(mylist)

first_rdd

ParallelCollectionRDD[39] at readRDDFromFile at PythonRDD.scala:274

In [12]:
second_rdd = first_rdd.map(lambda x : x+1)

print(second_rdd)

print(second_rdd.toDebugString())

PythonRDD[40] at RDD at PythonRDD.scala:53
b'(4) PythonRDD[40] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[39] at readRDDFromFile at PythonRDD.scala:274 []'


In [13]:
third_rdd = second_rdd.map(lambda x : x+9)

print(third_rdd)

print(third_rdd.toDebugString())

PythonRDD[41] at RDD at PythonRDD.scala:53
b'(4) PythonRDD[41] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[39] at readRDDFromFile at PythonRDD.scala:274 []'


In [14]:
third_rdd.collect()

[11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

In [15]:
print(third_rdd.toDebugString())

b'(4) PythonRDD[41] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[39] at readRDDFromFile at PythonRDD.scala:274 []'
