In [115]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName("Test RDD Examples").getOrCreate()

In [2]:
type(spark)

pyspark.sql.session.SparkSession

### 1. Create RDDs in three different ways.

In [3]:
# Creating RDDs
# Using parallelize method
rdd_par = spark.sparkContext.parallelize(["Hello World", "Hope you are not fed up with ABD class", "ello"])
type(rdd_par)
rdd_par.collect()
rdd_par.count()
rdd_par.collect()[0]

'Hello World'

In [4]:
# Creating RDD using transformations
rdd_trans = rdd_par.filter(lambda word:word.startswith('H'))
rdd_trans.collect()

['Hello World', 'Hope you are not fed up with ABD class']

In [5]:
# Creating RDD using Data Sources
rdd_ds = spark.sparkContext.textFile('input.txt')
rdd_ds.count()
rdd_ds.collect()

["This is perhaps the best known database to be found in the pattern recognition literature. Fisher's paper is a classic in the field and is referenced frequently to this day. ",
 '(See Duda & Hart, for example.) ',
 'The data set contains 3 classes of 50 instances each, where each class refers to a type of iris plant. One class is linearly separable from the other 2; the latter are NOT linearly separable from each other.']

### 2. Read a text file and count number of words in the file using RDD operations.

In [103]:
rdd_ds = spark.sparkContext.textFile('input.txt')
word_rdd = rdd_ds.flatMap(lambda word: word.split(' '))
print("Number of words in the file: ")
word_rdd.count()

Number of words in the file: 


76

### 3. Write a program to find the word frequency in a given file.

In [7]:
word_rdd = rdd_ds.flatMap(lambda word: word.split(' '))
freq_words = word_rdd.map(lambda word: (word, 1))
freq_words.reduceByKey(lambda a, b: a + b).collect()

[('is', 4),
 ('perhaps', 1),
 ('best', 1),
 ('known', 1),
 ('database', 1),
 ('in', 2),
 ('pattern', 1),
 ("Fisher's", 1),
 ('field', 1),
 ('referenced', 1),
 ('this', 1),
 ('', 2),
 ('Hart,', 1),
 ('example.)', 1),
 ('The', 1),
 ('set', 1),
 ('of', 2),
 ('50', 1),
 ('instances', 1),
 ('each,', 1),
 ('where', 1),
 ('class', 2),
 ('refers', 1),
 ('type', 1),
 ('plant.', 1),
 ('One', 1),
 ('linearly', 2),
 ('other', 1),
 ('2;', 1),
 ('latter', 1),
 ('are', 1),
 ('This', 1),
 ('the', 5),
 ('to', 3),
 ('be', 1),
 ('found', 1),
 ('recognition', 1),
 ('literature.', 1),
 ('paper', 1),
 ('a', 2),
 ('classic', 1),
 ('and', 1),
 ('frequently', 1),
 ('day.', 1),
 ('(See', 1),
 ('Duda', 1),
 ('&', 1),
 ('for', 1),
 ('data', 1),
 ('contains', 1),
 ('3', 1),
 ('classes', 1),
 ('each', 2),
 ('iris', 1),
 ('separable', 2),
 ('from', 2),
 ('NOT', 1),
 ('other.', 1)]

### 4. Write a program to convert all words in a file to uppercase.

In [8]:
word_rdd_upper = rdd_ds.map(lambda word: word.upper())
word_rdd_upper.collect()

["THIS IS PERHAPS THE BEST KNOWN DATABASE TO BE FOUND IN THE PATTERN RECOGNITION LITERATURE. FISHER'S PAPER IS A CLASSIC IN THE FIELD AND IS REFERENCED FREQUENTLY TO THIS DAY. ",
 '(SEE DUDA & HART, FOR EXAMPLE.) ',
 'THE DATA SET CONTAINS 3 CLASSES OF 50 INSTANCES EACH, WHERE EACH CLASS REFERS TO A TYPE OF IRIS PLANT. ONE CLASS IS LINEARLY SEPARABLE FROM THE OTHER 2; THE LATTER ARE NOT LINEARLY SEPARABLE FROM EACH OTHER.']

### 5. Write a program to convert all words in a file to lowercase.

In [9]:
word_rdd_lower = rdd_ds.map(lambda word: word.lower())
word_rdd_lower.collect()

["this is perhaps the best known database to be found in the pattern recognition literature. fisher's paper is a classic in the field and is referenced frequently to this day. ",
 '(see duda & hart, for example.) ',
 'the data set contains 3 classes of 50 instances each, where each class refers to a type of iris plant. one class is linearly separable from the other 2; the latter are not linearly separable from each other.']

### 6. Write a program to capitalize first leter of each words in file (use string capitalize() method).

In [10]:
word_rdd_capitalized = rdd_ds.map(lambda word: [w.capitalize() for w in word.split(' ')]  )
word_rdd_capitalized = word_rdd_capitalized.map(lambda word: ' '.join(word)  )
word_rdd_capitalized.collect()

["This Is Perhaps The Best Known Database To Be Found In The Pattern Recognition Literature. Fisher's Paper Is A Classic In The Field And Is Referenced Frequently To This Day. ",
 '(see Duda & Hart, For Example.) ',
 'The Data Set Contains 3 Classes Of 50 Instances Each, Where Each Class Refers To A Type Of Iris Plant. One Class Is Linearly Separable From The Other 2; The Latter Are Not Linearly Separable From Each Other.']

### 7. Find the longest length of word from given set of words.

In [102]:
word_rdd_longest = rdd_ds.flatMap(lambda word: word.split(' '))
word_rdd_longest = word_rdd_longest.map(lambda word: len(word))
print("The longest word length is: ")
word_rdd_longest.max()

The longest word length is: 


3

### 8. Map the Registration numbers to corresponding branch. 6000 series BDA, 9000 series HDA, 1000 series ML, 2000 series VLSI, 3000 series ES, 4000 series MSc, 5000 series CC. Given registration number, generate a key-value pair of Registration Number and Corresponding Branch.

In [100]:
series = [6002, 3050, 2500, 1500, 9050]
rdd_par = spark.sparkContext.parallelize(series)
rdd_sequence = rdd_par.map(lambda series: (series, 'ML') if (series >= 1000 and series < 1999)  
                           else ((series, 'VLSI') if (series >= 2000 and series < 2999)
                           else ((series, 'ES') if (series >= 3000 and series < 3999) 
                           else ((series, 'MSc') if (series >= 4000 and series < 4999) 
                           else ((series, 'CC') if (series >= 5000 and series < 5999) 
                           else ((series, 'BDA') if (series >= 6000 and series < 6999)
                           else ((series, 'HDA') if (series >= 9000 and series < 9999) else (series, 'NA'))))))))

print("Key, value are: ")
rdd_sequence.collect()

Key, value are: 


[(6002, 'BDA'), (3050, 'ES'), (2500, 'VLSI'), (1500, 'ML'), (9050, 'HDA')]

### 9. Text file contain numbers. Numbers are separated by one white space. There is no order to store the numbers. One line may contain one or more numbers. Find the maximum, minimum, sum and mean of numbers.

In [98]:
rdd_ds = spark.sparkContext.textFile('numbers.txt')
rdd_ds_num = rdd_ds.flatMap(lambda n: n.split(' '))
rdd_ds_num = rdd_ds_num.map(lambda n: int(n))
print("The maximum: ", rdd_ds_num.max())
print("The minimum: ", rdd_ds_num.min())
print("The minimum: ", rdd_ds_num.sum())
print("The minimum: ", rdd_ds_num.mean())

The maximum:  456
The minimum:  10
The minimum:  936
The minimum:  72.0


### 10. A text file (citizen.txt) contains data about citizens of country. Fields (information in file) are Name, dob, Phone, email and state name. Another file contains mapping of state names to state code like Karnataka is codes as KA, TamilNadu as TN, Kerala KL etc. Compress the citizen.txt file by changing full state name to state code.

In [120]:
rdd_ds_citizen = spark.sparkContext.textFile('citizen.txt')
rdd_ds_statecode = spark.sparkContext.textFile('statecode.txt')
rdd_ds_citizen = rdd_ds_citizen.map(lambda citizen: citizen.split(' '))
rdd_ds_statecode = rdd_ds_statecode.map(lambda state: state.split(' '))
#rdd_ds_citizen.join(rdd_ds_statecode).collect()
#rdd_ds_citizen.collect()

[('K', ('u', 'a')), ('K', ('u', 'e'))]