In [22]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Test RDD Examples").getOrCreate()

In [23]:
type(spark)

pyspark.sql.session.SparkSession

In [24]:
# Q1. Creating RDD's
# 1. Using parallelize method
rdd_par = spark.sparkContext.parallelize(["hello world!", "this is an rdd assignment", "good morning!!"])
rdd_par.collect()

['hello world!', 'this is an rdd assignment', 'good morning!!']

In [25]:
# 2. Using transformations
rdd_trans = rdd_par.filter(lambda word:word.startswith('t'))
rdd_trans.collect()

['this is an rdd assignment']

In [26]:
# 3. Loading the data from the datasource
rdd_txt = spark.sparkContext.textFile("test.txt")
rdd_txt.collect()

['Textbooks often simplify this to large-sample vs. small-sample methods use normal distribution with large samples and t-distribution with small samples. This is right almost all the time, because in real sampling problems we seldom have a basis for knowing σ. However, there can be some situations when we do have a basis for assuming a value for σ, such as using a σ based on past data, and in those situations even if sample size is small the correct procedure would be to use the normal distribution, so the simplified large-sample vs. small sample approach would lead to an error.']

In [27]:
# Q2. Read a text file and count the number of words in the file using RDD operations
word_rdd = rdd_txt.flatMap(lambda word: word.split(' '))
word_rdd.count()

100

In [28]:
# Q3. Write a program to find the word frequency in a given file
freq_words = word_rdd.map(lambda word: (word, 1))
freq_words.reduceByKey(lambda a,b : a+b).collect()

[('this', 1),
 ('large-sample', 2),
 ('vs.', 2),
 ('use', 2),
 ('distribution', 1),
 ('large', 1),
 ('t-distribution', 1),
 ('is', 2),
 ('right', 1),
 ('in', 2),
 ('sampling', 1),
 ('we', 2),
 ('have', 2),
 ('basis', 2),
 ('σ.', 1),
 ('However,', 1),
 ('there', 1),
 ('situations', 2),
 ('when', 1),
 ('do', 1),
 ('value', 1),
 ('σ,', 1),
 ('as', 1),
 ('using', 1),
 ('based', 1),
 ('past', 1),
 ('even', 1),
 ('size', 1),
 ('correct', 1),
 ('procedure', 1),
 ('would', 2),
 ('simplified', 1),
 ('approach', 1),
 ('an', 1),
 ('Textbooks', 1),
 ('often', 1),
 ('simplify', 1),
 ('to', 3),
 ('small-sample', 1),
 ('methods', 1),
 ('normal', 2),
 ('with', 2),
 ('samples', 1),
 ('and', 2),
 ('small', 3),
 ('samples.', 1),
 ('This', 1),
 ('almost', 1),
 ('all', 1),
 ('the', 4),
 ('time,', 1),
 ('because', 1),
 ('real', 1),
 ('problems', 1),
 ('seldom', 1),
 ('a', 4),
 ('for', 3),
 ('knowing', 1),
 ('can', 1),
 ('be', 2),
 ('some', 1),
 ('assuming', 1),
 ('such', 1),
 ('σ', 1),
 ('on', 1),
 ('data,'

In [29]:
# Q4. Write a program to convert all words in a file to uppercase
rdd_txt.map(lambda word:word.upper()).collect()

['TEXTBOOKS OFTEN SIMPLIFY THIS TO LARGE-SAMPLE VS. SMALL-SAMPLE METHODS USE NORMAL DISTRIBUTION WITH LARGE SAMPLES AND T-DISTRIBUTION WITH SMALL SAMPLES. THIS IS RIGHT ALMOST ALL THE TIME, BECAUSE IN REAL SAMPLING PROBLEMS WE SELDOM HAVE A BASIS FOR KNOWING Σ. HOWEVER, THERE CAN BE SOME SITUATIONS WHEN WE DO HAVE A BASIS FOR ASSUMING A VALUE FOR Σ, SUCH AS USING A Σ BASED ON PAST DATA, AND IN THOSE SITUATIONS EVEN IF SAMPLE SIZE IS SMALL THE CORRECT PROCEDURE WOULD BE TO USE THE NORMAL DISTRIBUTION, SO THE SIMPLIFIED LARGE-SAMPLE VS. SMALL SAMPLE APPROACH WOULD LEAD TO AN ERROR.']

In [30]:
# Q5. Write a program to convert all words in a file to lowercase
rdd_txt.map(lambda word:word.lower()).collect()

['textbooks often simplify this to large-sample vs. small-sample methods use normal distribution with large samples and t-distribution with small samples. this is right almost all the time, because in real sampling problems we seldom have a basis for knowing σ. however, there can be some situations when we do have a basis for assuming a value for σ, such as using a σ based on past data, and in those situations even if sample size is small the correct procedure would be to use the normal distribution, so the simplified large-sample vs. small sample approach would lead to an error.']

In [31]:
# Q6. Write a program to capitalize first letter of each words in file
word_rdd.map(lambda cap:cap.capitalize()).collect()

['Textbooks',
 'Often',
 'Simplify',
 'This',
 'To',
 'Large-sample',
 'Vs.',
 'Small-sample',
 'Methods',
 'Use',
 'Normal',
 'Distribution',
 'With',
 'Large',
 'Samples',
 'And',
 'T-distribution',
 'With',
 'Small',
 'Samples.',
 'This',
 'Is',
 'Right',
 'Almost',
 'All',
 'The',
 'Time,',
 'Because',
 'In',
 'Real',
 'Sampling',
 'Problems',
 'We',
 'Seldom',
 'Have',
 'A',
 'Basis',
 'For',
 'Knowing',
 'Σ.',
 'However,',
 'There',
 'Can',
 'Be',
 'Some',
 'Situations',
 'When',
 'We',
 'Do',
 'Have',
 'A',
 'Basis',
 'For',
 'Assuming',
 'A',
 'Value',
 'For',
 'Σ,',
 'Such',
 'As',
 'Using',
 'A',
 'Σ',
 'Based',
 'On',
 'Past',
 'Data,',
 'And',
 'In',
 'Those',
 'Situations',
 'Even',
 'If',
 'Sample',
 'Size',
 'Is',
 'Small',
 'The',
 'Correct',
 'Procedure',
 'Would',
 'Be',
 'To',
 'Use',
 'The',
 'Normal',
 'Distribution,',
 'So',
 'The',
 'Simplified',
 'Large-sample',
 'Vs.',
 'Small',
 'Sample',
 'Approach',
 'Would',
 'Lead',
 'To',
 'An',
 'Error.']

In [32]:
# Q7. Find the longest length of word from given set of words
word_rdd.map(lambda word:(len(word), word)).max()[1]

't-distribution'

In [33]:
# Q8. Map the Registration numbers to corresponding branch. 6000 series BDA, 9000 series HDA, 1000 series ML, 
# 2000 series VLSI, 3000 series ES, 4000 series MSc, 5000 series CC. 
# Given registration number, generate a key-value pair of Registration Number and Corresponding Branch.
reg_nos = [1010, 2002, 1012, 2020, 3005, 4004, 5002, 6011, 9002, 6022, 4023, 5038, 
           3033, 6045, 5050, 4021, 9054, 1055, 2060, 3045, 9080]
reg_nos_rdd = spark.sparkContext.parallelize(reg_nos, 1)
reg_nos_map = reg_nos_rdd.map(lambda reg:('ML', reg) if reg > 1000 and reg < 2000
                             else ('VLSI', reg) if reg > 2000 and reg < 3000
                             else ('ES', reg) if reg > 3000 and reg < 4000
                             else ('MSc', reg) if reg > 4000 and reg < 5000
                             else ('CC', reg) if reg > 5000 and reg < 6000
                             else ('BDA', reg) if reg > 6000 and reg < 7000
                             else ('HDA', reg))
reg_nos_collect = reg_nos_map.collect()
print(reg_nos_collect)

[('ML', 1010), ('VLSI', 2002), ('ML', 1012), ('VLSI', 2020), ('ES', 3005), ('MSc', 4004), ('CC', 5002), ('BDA', 6011), ('HDA', 9002), ('BDA', 6022), ('MSc', 4023), ('CC', 5038), ('ES', 3033), ('BDA', 6045), ('CC', 5050), ('MSc', 4021), ('HDA', 9054), ('ML', 1055), ('VLSI', 2060), ('ES', 3045), ('HDA', 9080)]


In [34]:
# Q9. Text file contain numbers. Numbers are separated by one white space. There is no order to store the numbers. 
# One line may contain one or more numbers. Find the maximum, minimum, sum and mean of numbers.
rdd_num = spark.sparkContext.textFile("numbers.txt")
numbers = rdd_num.flatMap(lambda no: no.split(" ")).map(lambda num:int(num))

In [35]:
# finding the maximum number
numbers.max()

484

In [36]:
# finding the minimum number
numbers.min()

2

In [37]:
# finding the sum of numbers
numbers.sum()

2034

In [38]:
# finding the mean of numbers
numbers.mean()

88.43478260869566

In [39]:
# Q10. A text file (citizen.txt) contains data about citizens of country. Fields (information in file) are Name, dob, 
# Phone, email and state name. Another file contains mapping of state names to state code like Karnataka is codes as KA, 
# TamilNadu as TN, Kerala KL etc. Compress the citizen.txt file by changing full state name to state code.
citizens_rdd = spark.sparkContext.textFile("citizen.txt")
states_rdd = spark.sparkContext.textFile("states.txt")
details = citizens_rdd.map(lambda word:word.split(",")).collect()
state_codes = states_rdd.map(lambda state:state.split(",")).collect()

In [40]:
for i in range(len(details)):
    for j in range(len(state_codes)):
        if details[i][4] == state_codes[j][0]:
            details[i][4] = state_codes[j][1]
details

[['Madhuri', '05-06-1997', '9876543210', 'madhuri@gmail.com', 'AP'],
 ['Mounika', '19-05-1998', '9780564326', 'mounika@gmail.com', 'TS'],
 ['Vaishnavi', '29-08-1998', '8750673528', 'vaishnavi@gmail.com', 'KL'],
 ['Renuka', '16-02-1997', '7690532360', 'renuka@gmail.com', 'GJ'],
 ['Meghana', '20-03-1997', '9086641207', 'meghana@gmail.com', 'NL'],
 ['Vasuki', '18-07-1997', '6306431870', 'vasuki@gmail.com', 'HR'],
 ['Priyanka', '04-09-1997', '90334681097', 'priyanka@gmail.com', 'MP'],
 ['Sneha', '03-12-1998', '9403450987', 'sneha@gmail.com', 'UP']]

In [41]:
stateCode = states_rdd.map(lambda word:(word.split(",")[0],word.split(",")[1]))
states_dict = {}
for val in stateCode.collect():
    states_dict[val[0]] = val[1]
states_dict

{'AndhraPradesh': 'AP',
 'Telangana': 'TS',
 'Kerala': 'KL',
 'Gujarat': 'GJ',
 'Nagaland': 'NL',
 'Haryana': 'HR',
 'MadhyaPradesh': 'MP',
 'UttarPradesh': 'UP'}

In [42]:
data = spark.sparkContext.broadcast(states_dict)
def compress(state,codes):
    dataSplit = state.split(",")
    dataSplit[4] = codes.value.get(dataSplit[4])
    data_new = ' '
    data_new = data_new.join(dataSplit)
    return data_new
citizens_rdd.map(lambda word:compress(word, data)).collect()

['Madhuri 05-06-1997 9876543210 madhuri@gmail.com AP',
 'Mounika 19-05-1998 9780564326 mounika@gmail.com TS',
 'Vaishnavi 29-08-1998 8750673528 vaishnavi@gmail.com KL',
 'Renuka 16-02-1997 7690532360 renuka@gmail.com GJ',
 'Meghana 20-03-1997 9086641207 meghana@gmail.com NL',
 'Vasuki 18-07-1997 6306431870 vasuki@gmail.com HR',
 'Priyanka 04-09-1997 90334681097 priyanka@gmail.com MP',
 'Sneha 03-12-1998 9403450987 sneha@gmail.com UP']