#**Resilient Distributed Dataset (RDD)**

In [None]:
#Installation
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
#spark
import findspark
findspark.init()

In [None]:
#Creating Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RDDwithPyspark").getOrCreate()

In [None]:
#RDD creation
data = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
rdd = spark.sparkContext.parallelize(data)
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

In [None]:
#Taking first five elements
first_five = rdd.take(5)
first_five

[1, 2, 3, 4, 5]

In [None]:
#Taking only first element
first_only = rdd.first()
first_only

1

In [None]:
#Showing all the values in each partitions
rdd_collection = rdd.glom().collect()
rdd_collection

[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]]

In [None]:
#Getting number of partition count
partition_count = rdd.getNumPartitions()
partition_count

2

In [None]:
#Using np.arange() function to create RDD
import numpy as np
data2 = np.arange(0,40,2)
rdd2 = spark.sparkContext.parallelize(data2, 5)
rdd2_collection = rdd2.glom().collect()
rdd2_collection

[[0, 2, 4, 6],
 [8, 10, 12, 14],
 [16, 18, 20, 22],
 [24, 26, 28, 30],
 [32, 34, 36, 38]]

In [None]:
#Union Operation
data1 = np.arange(0,20,5)
data2 = np.arange(0,20,3)
rdd3 = spark.sparkContext.parallelize(data1)
rdd4 = spark.sparkContext.parallelize(data2)
rdd_union = rdd3.union(rdd4)
print(rdd_union.collect())


[0, 5, 10, 15, 0, 3, 6, 9, 12, 15, 18]


In [None]:
#Intersection Operation
rdd_intersection = rdd3.intersection(rdd4)
print(rdd_intersection.collect())

[0, 15]


In [None]:
#Subtraction
rdd_subtract = rdd3.subtract(rdd4)
print(rdd_subtract.collect())

[5, 10]


In [None]:
#Cartesian Product
rdd_cartesian = rdd3.cartesian(rdd4)
print(rdd_cartesian.collect())

[(0, 0), (0, 3), (0, 6), (5, 0), (5, 3), (5, 6), (0, 9), (0, 12), (0, 15), (5, 9), (5, 12), (5, 15), (0, 18), (5, 18), (10, 0), (10, 3), (10, 6), (15, 0), (15, 3), (15, 6), (10, 9), (10, 12), (10, 15), (15, 9), (15, 12), (15, 15), (10, 18), (15, 18)]


In [None]:
#Join
j1_rdd = spark.sparkContext.parallelize([("a", 1), ("b", 2), ("c", 3)])
j2_rdd = spark.sparkContext.parallelize([("a", "aa"), ("b", "bb"), ("d", "dd")])
join_result = j1_rdd.join(j2_rdd)
join_result.collect()

[('b', (2, 'bb')), ('a', (1, 'aa'))]

# **Working with Text Dataset**

In [None]:
#Loading and Creatinf RDD from text file
path = "/content/lorem.txt"
text_rdd = spark.sparkContext.textFile(path, 5)
text_rdd.collect()
print(text_rdd.glom().collect())
print(text_rdd.getNumPartitions())

[['A big-data journey must strike a balance between data supply (data management) and information demand (managers asking for different information or more-timely updates). ', 'The aim of any big data project must be to add business value — by enabling cost reductions, productivity gains, or revenue increases. Many older big data projects never reached the point where they were adding value. '], ['For example, it is not unusual to find projects based on plant historians that accumulate data for years without business or operational managers taking full advantage of the data through analytics.'], ['', 'Any data with unknown form or the structure is classified as unstructured data. ', 'In addition to the size being huge, un-structured data poses multiple challenges in terms of its processing for deriving value out of it. A typical example of unstructured data is a heterogeneous data source containing a combination of simple text files, images, videos etc. '], [], ['Now day organizations 

In [None]:
#Using map() function to create splitted result
splitted_text_rdd = text_rdd.map(lambda line: line.split())
splitted_text_rdd.collect()

[['A',
  'big-data',
  'journey',
  'must',
  'strike',
  'a',
  'balance',
  'between',
  'data',
  'supply',
  '(data',
  'management)',
  'and',
  'information',
  'demand',
  '(managers',
  'asking',
  'for',
  'different',
  'information',
  'or',
  'more-timely',
  'updates).'],
 ['The',
  'aim',
  'of',
  'any',
  'big',
  'data',
  'project',
  'must',
  'be',
  'to',
  'add',
  'business',
  'value',
  '—',
  'by',
  'enabling',
  'cost',
  'reductions,',
  'productivity',
  'gains,',
  'or',
  'revenue',
  'increases.',
  'Many',
  'older',
  'big',
  'data',
  'projects',
  'never',
  'reached',
  'the',
  'point',
  'where',
  'they',
  'were',
  'adding',
  'value.'],
 ['For',
  'example,',
  'it',
  'is',
  'not',
  'unusual',
  'to',
  'find',
  'projects',
  'based',
  'on',
  'plant',
  'historians',
  'that',
  'accumulate',
  'data',
  'for',
  'years',
  'without',
  'business',
  'or',
  'operational',
  'managers',
  'taking',
  'full',
  'advantage',
  'of',
  't

In [None]:
#Use of flatMap() funciton
flat_splitted_text_rdd = text_rdd.flatMap(lambda line: line.split())
flat_splitted_text_rdd.collect()

['A',
 'big-data',
 'journey',
 'must',
 'strike',
 'a',
 'balance',
 'between',
 'data',
 'supply',
 '(data',
 'management)',
 'and',
 'information',
 'demand',
 '(managers',
 'asking',
 'for',
 'different',
 'information',
 'or',
 'more-timely',
 'updates).',
 'The',
 'aim',
 'of',
 'any',
 'big',
 'data',
 'project',
 'must',
 'be',
 'to',
 'add',
 'business',
 'value',
 '—',
 'by',
 'enabling',
 'cost',
 'reductions,',
 'productivity',
 'gains,',
 'or',
 'revenue',
 'increases.',
 'Many',
 'older',
 'big',
 'data',
 'projects',
 'never',
 'reached',
 'the',
 'point',
 'where',
 'they',
 'were',
 'adding',
 'value.',
 'For',
 'example,',
 'it',
 'is',
 'not',
 'unusual',
 'to',
 'find',
 'projects',
 'based',
 'on',
 'plant',
 'historians',
 'that',
 'accumulate',
 'data',
 'for',
 'years',
 'without',
 'business',
 'or',
 'operational',
 'managers',
 'taking',
 'full',
 'advantage',
 'of',
 'the',
 'data',
 'through',
 'analytics.',
 'Any',
 'data',
 'with',
 'unknown',
 'form',
 '

In [None]:
#Filtering undesired words from the splitted result
stopwords = ["a", "an", "the"]
filtered_text_rdd = flat_splitted_text_rdd.filter(lambda x: x.lower() not in stopwords)
filtered_text_rdd.collect()

['big-data',
 'journey',
 'must',
 'strike',
 'balance',
 'between',
 'data',
 'supply',
 '(data',
 'management)',
 'and',
 'information',
 'demand',
 '(managers',
 'asking',
 'for',
 'different',
 'information',
 'or',
 'more-timely',
 'updates).',
 'aim',
 'of',
 'any',
 'big',
 'data',
 'project',
 'must',
 'be',
 'to',
 'add',
 'business',
 'value',
 '—',
 'by',
 'enabling',
 'cost',
 'reductions,',
 'productivity',
 'gains,',
 'or',
 'revenue',
 'increases.',
 'Many',
 'older',
 'big',
 'data',
 'projects',
 'never',
 'reached',
 'point',
 'where',
 'they',
 'were',
 'adding',
 'value.',
 'For',
 'example,',
 'it',
 'is',
 'not',
 'unusual',
 'to',
 'find',
 'projects',
 'based',
 'on',
 'plant',
 'historians',
 'that',
 'accumulate',
 'data',
 'for',
 'years',
 'without',
 'business',
 'or',
 'operational',
 'managers',
 'taking',
 'full',
 'advantage',
 'of',
 'data',
 'through',
 'analytics.',
 'Any',
 'data',
 'with',
 'unknown',
 'form',
 'or',
 'structure',
 'is',
 'classifi

In [None]:
#Another Filtering example
another_filtered_text_rdd = flat_splitted_text_rdd.filter(lambda x: x.endswith("a"))
another_filtered_text_rdd.collect()

['big-data',
 'a',
 'data',
 '(data',
 'data',
 'data',
 'data',
 'data',
 'data',
 'data',
 'data',
 'a',
 'data',
 'a',
 'data',
 'data']

## **Working With CSV Dataset**

In [None]:
#Loading CSV File
csvRDD = spark.sparkContext.textFile("/content/FIFA-21 Complete.csv", 6)
csvRDD.take(5)
# print(csvRDD.getNumPartitions())
# print(csvRDD.glom().collect())


['player_id;name;nationality;position;overall;age;hits;potential;team',
 '158023;Lionel Messi;Argentina;ST|CF|RW;94;33;299;94;"FC Barcelona "',
 '20801;Cristiano Ronaldo;Portugal;ST|LW;93;35;276;93;"Juventus "',
 '190871;Neymar Jr;Brazil;CAM|LW;92;28;186;92;"Paris Saint-Germain "',
 '203376;Virgil van Dijk;Netherlands;CB;91;29;127;92;"Liverpool "']

In [None]:
#Splitting each row
comma_separated_csvRDD = csvRDD.map(lambda line:line.split(","))
comma_separated_csvRDD.take(5)

[['player_id;name;nationality;position;overall;age;hits;potential;team'],
 ['158023;Lionel Messi;Argentina;ST|CF|RW;94;33;299;94;"FC Barcelona "'],
 ['20801;Cristiano Ronaldo;Portugal;ST|LW;93;35;276;93;"Juventus "'],
 ['190871;Neymar Jr;Brazil;CAM|LW;92;28;186;92;"Paris Saint-Germain "'],
 ['203376;Virgil van Dijk;Netherlands;CB;91;29;127;92;"Liverpool "']]

In [None]:
#Splitting each row
semicolon_separated_csvRDD = csvRDD.map(lambda line:line.split(";"))
semicolon_separated_csvRDD.take(3)

[['player_id',
  'name',
  'nationality',
  'position',
  'overall',
  'age',
  'hits',
  'potential',
  'team'],
 ['158023',
  'Lionel Messi',
  'Argentina',
  'ST|CF|RW',
  '94',
  '33',
  '299',
  '94',
  '"FC Barcelona "'],
 ['20801',
  'Cristiano Ronaldo',
  'Portugal',
  'ST|LW',
  '93',
  '35',
  '276',
  '93',
  '"Juventus "']]

In [None]:
#Filtering rows with a condition
print(semicolon_separated_csvRDD.take(1))
filtered_rdd = semicolon_separated_csvRDD.filter(lambda fields: fields[3] == "GK")
filtered_rdd.take(3)

[['player_id', 'name', 'nationality', 'position', 'overall', 'age', 'hits', 'potential', 'team']]


[['200389',
  'Jan Oblak',
  'Slovenia',
  'GK',
  '91',
  '27',
  '47',
  '93',
  '"Atlético Madrid "'],
 ['212831', 'Alisson', 'Brazil', 'GK', '90', '27', '53', '91', '"Liverpool "'],
 ['192448',
  'Marc-André ter Stegen',
  'Germany',
  'GK',
  '90',
  '28',
  '68',
  '93',
  '"FC Barcelona "']]

In [None]:
# Create an RDD with missing data (None)
data_with_none = [1, 2, None, 4, 5]
rdd_with_none = spark.sparkContext.parallelize(data_with_none)
# Filter out None values
filtered_rdd = rdd_with_none.filter(lambda x: x is not None)
filtered_rdd.collect()

[1, 2, 4, 5]

In [None]:
# reduce
#Count total hits
only_hits = csvRDD.map(lambda line: (line.split(";")[6]))
only_hits = only_hits.filter(lambda s : s.isdigit())
print(only_hits.take(2))
total_hits = only_hits.reduce(lambda x, y: int(x)+ int(y))
print(total_hits)

['299', '276']
48359


In [None]:
# reduceByKey
data = [("product1", 100), ("product2", 200), ("product1", 50), ("product2", 300), ("product3", 150)]
rdd = spark.sparkContext.parallelize(data)
total_sales_per_product = rdd.reduceByKey(lambda x, y: x + y)
result = total_sales_per_product.collect()
for product, total_sales in result:
    print(f"Product: {product}, Total Sales: {total_sales}")


Product: product3, Total Sales: 150
Product: product1, Total Sales: 150
Product: product2, Total Sales: 500


In [None]:
# groupBy
data = [("apple", 3), ("banana", 2), ("apple", 5), ("banana", 4), ("orange", 1)]
rdd = spark.sparkContext.parallelize(data)
grouped_rdd = rdd.groupBy(lambda x: x[0])
result = grouped_rdd.collect()
for key, values in result:
    print(f"Key: {key}, Values: {list(values)}")

Key: banana, Values: [('banana', 2), ('banana', 4)]
Key: orange, Values: [('orange', 1)]
Key: apple, Values: [('apple', 3), ('apple', 5)]


In [None]:
#groupByKey
data = [("apple", 3), ("banana", 2), ("apple", 5), ("banana", 4), ("orange", 1)]
rdd = spark.sparkContext.parallelize(data)
grouped_rdd = rdd.groupByKey()
result = grouped_rdd.collect()
for key, values in result:
    print(f"Key: {key}, Values: {list(values)}")


Key: banana, Values: [2, 4]
Key: orange, Values: [1]
Key: apple, Values: [3, 5]


In [None]:
#Saving Output in a file
total_sales_per_product.saveAsTextFile("/content/grouped2_data.txt")

# **Word Count Example With MapReduce**

In [None]:
#Loading Data
path = "/content/lorem.txt"
text_rdd = spark.sparkContext.textFile(path, 5)
text_rdd.collect()

In [None]:
#Splitting Data based on space
splitted_text_rdd = text_rdd.flatMap(lambda line: line.split())
splitted_text_rdd.collect()

In [None]:
#Mapping each key with a value
mapped_words = splitted_text_rdd.map(lambda x:(x,1))
mapped_words.take(5)

[('A', 1), ('big-data', 1), ('journey', 1), ('must', 1), ('strike', 1)]

In [None]:
#Redeucing to count the words
reduced_mapped_words = mapped_words.reduceByKey(lambda x, y: x+y)
reduced_mapped_words.take(5)

[('demand', 1), ('(managers', 1), ('The', 1), ('of', 8), ('to', 4)]

In [None]:
reduced_mapped_words.saveAsTextFile("/content/count_data.txt")