In [0]:
sc

# **# Create RDD from a Python Collection**



## **Creating RDD by Parallelize an existing collection in your driver program**

In [0]:
# Creating an RDD from a list
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)

# Show RDD elements
print(rdd.collect())


[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


## **Creating RDD by referencing a dataset in an external storage system **

In [0]:
# Creating an RDD from a text file
rdd_text = sc.textFile("/FileStore/tables/Baby_Names2.csv")  # Ensure "sample.txt" is in the working directory

# Show first 5 lines of the text file
print(rdd_text.take(5))


['Year,First Name,County,Sex,Count', '2013,GAVIN,ST LAWRENCE,M,9', '2013,LEVI,ST LAWRENCE,M,9', '2013,LOGAN,NEW YORK,M,44', '2013,HUDSON,NEW YORK,M,49']


## **Map Transformations**

**Creating RDD by Parallelize an existing collection**

In [0]:
# Creating an RDD from a list
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)

# Show RDD elements
print(rdd.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


In [0]:
rdd_squared = rdd.map(lambda x: x * x)
print(rdd_squared.collect())  # [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]


In [0]:
rdd_calc = rdd.map(lambda Y: Y * 10)
print(rdd_calc.collect())  # [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

[10, 20, 30, 40, 50, 60, 70, 80, 90, 100]


## filter() – Select elements that match a condition



In [0]:
rdd_even = rdd.filter(lambda x: x % 2 == 0)
print(rdd_even.collect())  # [2, 4, 6, 8, 10]

[2, 4, 6, 8, 10]


### flatMap() – Flatten nested results

In [0]:
rdd_words = sc.parallelize(["hello world", "pyspark tutorial"])
rdd_flat = rdd_words.flatMap(lambda line: line.split(" "))
print(rdd_flat.collect())  # ['hello', 'world', 'pyspark', 'tutorial']


['hello', 'world', 'pyspark', 'tutorial']


### distinct() – Remove duplicate values

In [0]:
rdd_dup = sc.parallelize([1, 2, 2, 3, 3, 3, 4, 5])
rdd_unique = rdd_dup.distinct()
print(rdd_unique.collect())  # [1, 2, 3, 4, 5]


[1, 2, 3, 4, 5]


###union() – Combine two RDDs

In [0]:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])
rdd_union = rdd1.union(rdd2)
print(rdd_union.collect())  # [1, 2, 3, 4, 5, 6]


[1, 2, 3, 4, 5, 6]


## **intersection() – Find common elements**

In [0]:
rdd3 = sc.parallelize([1, 2, 3, 4])
rdd4 = sc.parallelize([3, 4, 5, 6])
rdd_inter = rdd3.intersection(rdd4)
print(rdd_inter.collect())  # [3, 4]


[3, 4]


## cartesian() – Cartesian product

In [0]:
rdd5 = sc.parallelize([1, 2])
rdd6 = sc.parallelize(['a', 'b'])
rdd_cart = rdd5.cartesian(rdd6)
print(rdd_cart.collect())  # [(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]


[(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]


# **_# RDD Actions_**

## **collect() - Retrieve All Elements**

In [0]:
rdd.collect()


Out[14]: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

## **count() - Count Total Elements**

In [0]:
rdd.count()


Out[15]: 10

## **first() - Get First Element**

In [0]:
rdd.first()


Out[16]: 1

## **reduce() - Aggregate Elements**

In [0]:
rdd_sum = rdd.reduce(lambda x, y: x + y)
print("Sum of Elements:", rdd_sum)


Sum of Elements: 55


## **countByValue() - Count Occurrences**

In [0]:
rdd.countByValue()


Out[18]: defaultdict(int, {1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1, 10: 1})

**_Learning PySpark at BVRIT_**

# **#  Key-Value Pair RDDs**

## **# Create Key-Value Pair RDD**

In [0]:
rdd_kv = sc.parallelize([("apple", 3), ("banana", 5), ("apple", 2), ("banana", 7)])


**groupByKey() - Group Values by Key**

In [0]:
rdd_grouped = rdd_kv.groupByKey().mapValues(list)
rdd_grouped.collect()


Out[20]: [('apple', [3, 2]), ('banana', [5, 7])]

**reduceByKey() - Aggregate by Key**

In [0]:
rdd_reduced = rdd_kv.reduceByKey(lambda a, b: a + b)
rdd_reduced.collect()


Out[21]: [('apple', 5), ('banana', 12)]

**Caching & Persistence**

In [0]:
rdd.persist()
rdd.count()  # Forces execution


Out[22]: 10

**Partitioning & Parallelism**

In [0]:
print("Partitions:", rdd.getNumPartitions())


Partitions: 8


**Repartition RDD**

In [0]:
rdd_repartitioned = rdd.repartition(4)
print("New Partitions:", rdd_repartitioned.getNumPartitions())


New Partitions: 4


**Saving & Loading RDDs**

In [0]:
rdd.saveAsTextFile("/FileStore/output_rdd")


**Read Saved File**

In [0]:
saved_rdd = sc.textFile("/FileStore/output_rdd")
saved_rdd.collect()


In [0]:
# Creating an RDD from a text file
rdd_text = sc.textFile("/FileStore/tables/sample5.txt")  # Ensure "sample.txt" is in the working directory

# Show first 5 lines of the text file
print(rdd_text.collect())


['Apache Spark is a fast and powerful engine for big data processing.', 'Spark runs on Hadoop, Kubernetes, and standalone clusters.', 'Big data analytics is a key focus in the modern world.', 'Databricks provides an optimized environment for Apache Spark.']


In [0]:
rdd_text = sc.textFile("/FileStore/tables/sample5.txt")

# Word count logic
rdd_words = rdd_text.flatMap(lambda line: line.split(" "))
rdd_words.collect()



Out[6]: ['Apache',
 'Spark',
 'is',
 'a',
 'fast',
 'and',
 'powerful',
 'engine',
 'for',
 'big',
 'data',
 'processing.',
 'Spark',
 'runs',
 'on',
 'Hadoop,',
 'Kubernetes,',
 'and',
 'standalone',
 'clusters.',
 'Big',
 'data',
 'analytics',
 'is',
 'a',
 'key',
 'focus',
 'in',
 'the',
 'modern',
 'world.',
 'Databricks',
 'provides',
 'an',
 'optimized',
 'environment',
 'for',
 'Apache',
 'Spark.']

In [0]:
rdd_word_pairs = rdd_words.map(lambda word: (word.lower(), 1))

rdd_word_pairs.collect()


Out[7]: [('apache', 1),
 ('spark', 1),
 ('is', 1),
 ('a', 1),
 ('fast', 1),
 ('and', 1),
 ('powerful', 1),
 ('engine', 1),
 ('for', 1),
 ('big', 1),
 ('data', 1),
 ('processing.', 1),
 ('spark', 1),
 ('runs', 1),
 ('on', 1),
 ('hadoop,', 1),
 ('kubernetes,', 1),
 ('and', 1),
 ('standalone', 1),
 ('clusters.', 1),
 ('big', 1),
 ('data', 1),
 ('analytics', 1),
 ('is', 1),
 ('a', 1),
 ('key', 1),
 ('focus', 1),
 ('in', 1),
 ('the', 1),
 ('modern', 1),
 ('world.', 1),
 ('databricks', 1),
 ('provides', 1),
 ('an', 1),
 ('optimized', 1),
 ('environment', 1),
 ('for', 1),
 ('apache', 1),
 ('spark.', 1)]

In [0]:
rdd_word_counts = rdd_word_pairs.reduceByKey(lambda a, b: a + b)

# Display results
print("Word Count Results:", rdd_word_counts.take(10))

Word Count Results: [('is', 2), ('engine', 1), ('hadoop,', 1), ('kubernetes,', 1), ('analytics', 1), ('in', 1), ('modern', 1), ('provides', 1), ('an', 1), ('optimized', 1)]


In [0]:
# Show DAG as text output
print(rdd_word_counts.toDebugString())

b'(2) PythonRDD[39] at RDD at PythonRDD.scala:58 []\n |  MapPartitionsRDD[36] at mapPartitions at PythonRDD.scala:205 []\n |  ShuffledRDD[35] at partitionBy at NativeMethodAccessorImpl.java:0 []\n +-(2) PairwiseRDD[34] at wrapper at <command-2836883212980610>:6 []\n    |  PythonRDD[33] at wrapper at <command-2836883212980610>:6 []\n    |  /FileStore/tables/sample5.txt MapPartitionsRDD[32] at textFile at NativeMethodAccessorImpl.java:0 []\n    |  /FileStore/tables/sample5.txt HadoopRDD[31] at textFile at NativeMethodAccessorImpl.java:0 []'


In [0]:
rdd_word_counts.collect()

Out[11]: [('is', 2),
 ('engine', 1),
 ('hadoop,', 1),
 ('kubernetes,', 1),
 ('analytics', 1),
 ('in', 1),
 ('modern', 1),
 ('provides', 1),
 ('an', 1),
 ('optimized', 1),
 ('spark.', 1),
 ('apache', 2),
 ('spark', 2),
 ('a', 2),
 ('fast', 1),
 ('and', 2),
 ('powerful', 1),
 ('for', 2),
 ('big', 2),
 ('data', 2),
 ('processing.', 1),
 ('runs', 1),
 ('on', 1),
 ('standalone', 1),
 ('clusters.', 1),
 ('key', 1),
 ('focus', 1),
 ('the', 1),
 ('world.', 1),
 ('databricks', 1),
 ('environment', 1)]

In [0]:
rdd_word_counts.toDF().explain(True)

== Parsed Logical Plan ==
LogicalRDD [_1#13, _2#14L], false

== Analyzed Logical Plan ==
_1: string, _2: bigint
LogicalRDD [_1#13, _2#14L], false

== Optimized Logical Plan ==
LogicalRDD [_1#13, _2#14L], false

== Physical Plan ==
*(1) Scan ExistingRDD[_1#13,_2#14L]

