In [3]:
import pyspark
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('FIRST SPARK APP').getOrCreate()
df = spark.sql('''select 'SQL' as Hello''')
df.show()

+-----+
|Hello|
+-----+
|  SQL|
+-----+



In [6]:
spark = SparkSession.builder.appName("Test RDD Examples").getOrCreate()

In [7]:
type(spark)

pyspark.sql.session.SparkSession

In [10]:
# Creating RDDs
# Using parallelize method
# parallelize() method is used to create an RDD from a list
rdd_par = spark.sparkContext.parallelize(["Big Data ", "Data Structures", 'PySpark'])
type((rdd_par))
#rdd_par.collect() #ex for action (performed on that data set)

pyspark.rdd.RDD

### 1. Create RDDs in three different ways.

In [12]:
#creating RDD using transformations
rdd_trans = rdd_par.filter(lambda word:word.startswith('B'))
rdd_trans.collect()

['Big Data ']

In [23]:
# Creating RDD using Data Sources
# RDD can also be created from a text file using textFile() function of the SparkContext.
rdd_ds = spark.sparkContext.textFile('spark.txt')

In [24]:
# count-> number, collect()->gives data
rdd_ds.count()

4

In [25]:
rdd_ds.collect()

['Big data is a term that describes the large volume of data – both structured and unstructured – that inundates a business on a day-to-day basis. But it’s not the amount of data that’s important. It’s what organizations do with the data that matters. Big data can be analyzed for insights that lead to better decisions and strategic business moves.',
 'The term “big data” refers to data that is so large, fast or complex that it’s difficult or impossible to process using traditional methods. The act of accessing and storing large amounts of information for analytics has been around a long time. But the concept of big data gained momentum in the early 2000s when industry analyst Doug Laney articulated the now-mainstream definition of big data as the three V’s:',
 '',
 '']

In [26]:
rdd_ds.flatMap(lambda word: word.split(' ')).count()
# flatMap 


132

In [27]:
word_rdd = rdd_ds.flatMap(lambda word: word.split(' '))
#word_rdd.collect()
freq_words = word_rdd.map(lambda word: (word, 1))
#freq_words.collect()
freq_words.reduceByKey(lambda a,b : a+b)
# reduceByKey has a built in accumilator(initial val is zero)
# words are key : 1 is value
# each ele we created a tuple
freq_words.collect()

[('Big', 1),
 ('data', 1),
 ('is', 1),
 ('a', 1),
 ('term', 1),
 ('that', 1),
 ('describes', 1),
 ('the', 1),
 ('large', 1),
 ('volume', 1),
 ('of', 1),
 ('data', 1),
 ('–', 1),
 ('both', 1),
 ('structured', 1),
 ('and', 1),
 ('unstructured', 1),
 ('–', 1),
 ('that', 1),
 ('inundates', 1),
 ('a', 1),
 ('business', 1),
 ('on', 1),
 ('a', 1),
 ('day-to-day', 1),
 ('basis.', 1),
 ('But', 1),
 ('it’s', 1),
 ('not', 1),
 ('the', 1),
 ('amount', 1),
 ('of', 1),
 ('data', 1),
 ('that’s', 1),
 ('important.', 1),
 ('It’s', 1),
 ('what', 1),
 ('organizations', 1),
 ('do', 1),
 ('with', 1),
 ('the', 1),
 ('data', 1),
 ('that', 1),
 ('matters.', 1),
 ('Big', 1),
 ('data', 1),
 ('can', 1),
 ('be', 1),
 ('analyzed', 1),
 ('for', 1),
 ('insights', 1),
 ('that', 1),
 ('lead', 1),
 ('to', 1),
 ('better', 1),
 ('decisions', 1),
 ('and', 1),
 ('strategic', 1),
 ('business', 1),
 ('moves.', 1),
 ('The', 1),
 ('term', 1),
 ('“big', 1),
 ('data”', 1),
 ('refers', 1),
 ('to', 1),
 ('data', 1),
 ('that', 1),


In [32]:
# Creating RDDs
# Using parallelize method
rdd_par = spark.sparkContext.parallelize(["Big data", "is a term that describes the large volume of data " ,"both structured and unstructured"])
type(rdd_par)
rdd_par.collect()
rdd_par.count()
rdd_par.collect()[0]

'Big data'

In [33]:
# Creating RDD using Data Sources
rdd_ds = spark.sparkContext.textFile('spark.txt')
rdd_ds.count()
rdd_ds.collect()

['Big data is a term that describes the large volume of data – both structured and unstructured – that inundates a business on a day-to-day basis. But it’s not the amount of data that’s important. It’s what organizations do with the data that matters. Big data can be analyzed for insights that lead to better decisions and strategic business moves.',
 'The term “big data” refers to data that is so large, fast or complex that it’s difficult or impossible to process using traditional methods. The act of accessing and storing large amounts of information for analytics has been around a long time. But the concept of big data gained momentum in the early 2000s when industry analyst Doug Laney articulated the now-mainstream definition of big data as the three V’s:',
 '',
 '']

### 2. Read a text file and count number of words in the file using RDD operations.

In [34]:
rdd_ds = spark.sparkContext.textFile('spark.txt')
word_rdd = rdd_ds.flatMap(lambda word: word.split(' '))
word_rdd.count()

132

### 3. Write a program to find the word frequency in a given file

In [35]:
word_rdd = rdd_ds.flatMap(lambda word: word.split(' '))
freq_words = word_rdd.map(lambda word: (word, 1))
freq_words.reduceByKey(lambda a, b: a + b).collect()

[('is', 2),
 ('term', 2),
 ('large', 2),
 ('of', 6),
 ('both', 1),
 ('inundates', 1),
 ('business', 2),
 ('basis.', 1),
 ('But', 2),
 ('it’s', 2),
 ('amount', 1),
 ('It’s', 1),
 ('organizations', 1),
 ('do', 1),
 ('matters.', 1),
 ('strategic', 1),
 ('moves.', 1),
 ('The', 2),
 ('refers', 1),
 ('large,', 1),
 ('impossible', 1),
 ('process', 1),
 ('using', 1),
 ('traditional', 1),
 ('methods.', 1),
 ('act', 1),
 ('accessing', 1),
 ('storing', 1),
 ('amounts', 1),
 ('analytics', 1),
 ('around', 1),
 ('long', 1),
 ('in', 1),
 ('when', 1),
 ('analyst', 1),
 ('Laney', 1),
 ('as', 1),
 ('three', 1),
 ('V’s:', 1),
 ('', 2),
 ('Big', 2),
 ('data', 8),
 ('a', 4),
 ('that', 6),
 ('describes', 1),
 ('the', 7),
 ('volume', 1),
 ('–', 2),
 ('structured', 1),
 ('and', 3),
 ('unstructured', 1),
 ('on', 1),
 ('day-to-day', 1),
 ('not', 1),
 ('that’s', 1),
 ('important.', 1),
 ('what', 1),
 ('with', 1),
 ('can', 1),
 ('be', 1),
 ('analyzed', 1),
 ('for', 2),
 ('insights', 1),
 ('lead', 1),
 ('to', 3),


### 4. Write a program to convert all words in a file to uppercase.

In [36]:
word_rdd_upper = rdd_ds.map(lambda word: word.upper())
word_rdd_upper.collect()

['BIG DATA IS A TERM THAT DESCRIBES THE LARGE VOLUME OF DATA – BOTH STRUCTURED AND UNSTRUCTURED – THAT INUNDATES A BUSINESS ON A DAY-TO-DAY BASIS. BUT IT’S NOT THE AMOUNT OF DATA THAT’S IMPORTANT. IT’S WHAT ORGANIZATIONS DO WITH THE DATA THAT MATTERS. BIG DATA CAN BE ANALYZED FOR INSIGHTS THAT LEAD TO BETTER DECISIONS AND STRATEGIC BUSINESS MOVES.',
 'THE TERM “BIG DATA” REFERS TO DATA THAT IS SO LARGE, FAST OR COMPLEX THAT IT’S DIFFICULT OR IMPOSSIBLE TO PROCESS USING TRADITIONAL METHODS. THE ACT OF ACCESSING AND STORING LARGE AMOUNTS OF INFORMATION FOR ANALYTICS HAS BEEN AROUND A LONG TIME. BUT THE CONCEPT OF BIG DATA GAINED MOMENTUM IN THE EARLY 2000S WHEN INDUSTRY ANALYST DOUG LANEY ARTICULATED THE NOW-MAINSTREAM DEFINITION OF BIG DATA AS THE THREE V’S:',
 '',
 '']

### 5. Write a program to convert all words in a file to lowercase.

In [37]:
word_rdd_lower = rdd_ds.map(lambda word: word.lower())
word_rdd_lower.collect()

['big data is a term that describes the large volume of data – both structured and unstructured – that inundates a business on a day-to-day basis. but it’s not the amount of data that’s important. it’s what organizations do with the data that matters. big data can be analyzed for insights that lead to better decisions and strategic business moves.',
 'the term “big data” refers to data that is so large, fast or complex that it’s difficult or impossible to process using traditional methods. the act of accessing and storing large amounts of information for analytics has been around a long time. but the concept of big data gained momentum in the early 2000s when industry analyst doug laney articulated the now-mainstream definition of big data as the three v’s:',
 '',
 '']

### 6. Write a program to capitalize first leter of each words in file (use string capitalize() method).

In [38]:
word_rdds = rdd_ds.flatMap(lambda word: word.split(' '))
word_rdd_capitalized = word_rdds.map(lambda word: word.capitalize())
word_rdd_capitalized.collect()

['Big',
 'Data',
 'Is',
 'A',
 'Term',
 'That',
 'Describes',
 'The',
 'Large',
 'Volume',
 'Of',
 'Data',
 '–',
 'Both',
 'Structured',
 'And',
 'Unstructured',
 '–',
 'That',
 'Inundates',
 'A',
 'Business',
 'On',
 'A',
 'Day-to-day',
 'Basis.',
 'But',
 'It’s',
 'Not',
 'The',
 'Amount',
 'Of',
 'Data',
 'That’s',
 'Important.',
 'It’s',
 'What',
 'Organizations',
 'Do',
 'With',
 'The',
 'Data',
 'That',
 'Matters.',
 'Big',
 'Data',
 'Can',
 'Be',
 'Analyzed',
 'For',
 'Insights',
 'That',
 'Lead',
 'To',
 'Better',
 'Decisions',
 'And',
 'Strategic',
 'Business',
 'Moves.',
 'The',
 'Term',
 '“big',
 'Data”',
 'Refers',
 'To',
 'Data',
 'That',
 'Is',
 'So',
 'Large,',
 'Fast',
 'Or',
 'Complex',
 'That',
 'It’s',
 'Difficult',
 'Or',
 'Impossible',
 'To',
 'Process',
 'Using',
 'Traditional',
 'Methods.',
 'The',
 'Act',
 'Of',
 'Accessing',
 'And',
 'Storing',
 'Large',
 'Amounts',
 'Of',
 'Information',
 'For',
 'Analytics',
 'Has',
 'Been',
 'Around',
 'A',
 'Long',
 'Time.'

### 7. Find the longest length of word from given set of words.


In [39]:
word_rdd = rdd_ds.flatMap(lambda word: word.split(' '))
word_rdd_length = word_rdd.map(lambda word:len(word))

max(word_rdd_length.collect())

14

### 8. Map the Registration numbers to corresponding branch. 6000 series BDA, 9000 series HDA, 1000 series ML, 2000 series VLSI, 3000 series ES, 4000 series MSc, 5000 series CC. Given registration number, generate a key-value pair of Registration Number and Corresponding Branch.

In [40]:
series = [6002, 3050, 2500, 1500, 9050]
rdd_par = spark.sparkContext.parallelize(series)
rdd_sequence = rdd_par.map(lambda series: (series, 'ML') if (series >= 1000 and series < 1999)  
                           else ((series, 'VLSI') if (series >= 2000 and series < 2999)
                           else ((series, 'ES') if (series >= 3000 and series < 3999) 
                           else ((series, 'MSc') if (series >= 4000 and series < 4999) 
                           else ((series, 'CC') if (series >= 5000 and series < 5999) 
                           else ((series, 'BDA') if (series >= 6000 and series < 6999)
                           else ((series, 'HDA') if (series >= 9000 and series < 9999) else (series, 'NA'))))))))

print("Key, value are: ")
rdd_sequence.collect()

Key, value are: 


[(6002, 'BDA'), (3050, 'ES'), (2500, 'VLSI'), (1500, 'ML'), (9050, 'HDA')]

### 9. Text file contain numbers. Numbers are separated by one white space. There is no order to store the numbers. One line may contain one or more numbers. Find the maximum, minimum, sum and mean of numbers.

In [46]:
rdd_ds = spark.sparkContext.textFile('numbers.txt')
rdd_ds_num = rdd_ds.flatMap(lambda n: n.split(' '))
rdd_l=rdd_ds_num.collect()

In [47]:
test_list = [i for i in rdd_l if i] # list comprehension

In [48]:
test_list = [int(i) for i in test_list if i]

In [49]:
max(test_list)

93

In [50]:
min(test_list)

1

In [51]:
sum(test_list)

1082

In [52]:
sum(test_list)/len(test_list)

41.61538461538461

### 10. A text file (citizen.txt) contains data about citizens of country.
Fields (information in file) are Name, dob, Phone, email and state name. 
Another file contains mapping of state names to state code like Karnataka is codes as KA, 
TamilNadu as TN, Kerala KL etc. Compress the citizen.txt file by changing full state name to state code.

In [60]:

rdd_ds_citizen = spark.sparkContext.textFile('citizen.txt')
rdd_ds_statecode = spark.sparkContext.textFile('statecode.txt')
rdd_ds_citizen = rdd_ds_citizen.map(lambda citizen: citizen.split(', '))
rdd_ds_statecode = rdd_ds_statecode.map(lambda state: state.split(', '))
citizen = spark.createDataFrame(rdd_ds_citizen, ['Name', 'dob', 'Phone', 'email', 'state name'])
statecodes = spark.createDataFrame(rdd_ds_statecode, ['state name', 'state code'])
citizen.collect()
statecodes.collect()
citizen.join(statecodes, on='state name', how='left').drop('state name').show()
citizen.write.csv('citizen_compressed.csv')
citizen.rdd.map(lambda x: x[0] + "," + str(x)).repartition(1).saveAsTextFile('Text/citizen.txt')

+------+----------+----------+-------------+----------+
|  Name|       dob|     Phone|        email|state code|
+------+----------+----------+-------------+----------+
|Mahesh|16-07-1990|8978866441| hi@gmail.com|        KA|
|Suresh|10-10-1992|8978866442|hi2@gmail.com|        KL|
|Ramesh|15-05-1991|9030915551|hi1@gmail.com|        TN|
+------+----------+----------+-------------+----------+

