<a href="https://colab.research.google.com/github/hsabaghpour/PySpark_Repo/blob/main/PySpark_Primier_Code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [9]:
!pip install pyspark



In [10]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("MyApp").setMaster("local")
sc = SparkContext.getOrCreate(conf=conf)

In [11]:
sc

In [6]:
nums = list(range(0,10000001))

In [7]:
len(nums)

10000001

	•	Action: Distributes this data across a cluster (or simulates it on a local machine if running locally), creating an RDD. Spark can now operate on this data in parallel, which is useful for processing large datasets.

In [17]:
# parallelize(nums): The parallelize method creates an RDD (Resilient Distributed Dataset) from an existing collection, in this case, the list or array nums.

nums_rdd = sc.parallelize(nums)
nums_rdd

ParallelCollectionRDD[4] at readRDDFromFile at PythonRDD.scala:289

Why avoid collect() in distributed contexts?

	•	Memory Overload: If the dataset is very large (which is often the case when using Spark), calling collect() can overwhelm the driver’s memory because it tries to pull all the distributed data back to one machine. This can lead to memory errors or crashes.
	•	Inefficiency: Collecting data back to the driver defeats the purpose of distributed processing because you’re pulling the data out of the
  parallelized environment and back into a single machine.

In [18]:
nums_rdd.collect()

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,


alternative methods to collect() in Spark, with brief explanations:

1. take(n)

	•	What it does: Returns the first n elements of the RDD.
	•	When to use: Useful for inspecting a small subset of the data without collecting everything.

2. foreach()

	•	What it does: Applies a function to each element in the RDD in parallel without retrieving the data back to the driver.
	•	When to use: Best for performing actions (like logging or updating a database) on each element without needing the data on the driver.

3. reduce()

	•	What it does: Combines elements of the RDD using a specified operation (e.g., summing or finding a maximum).
	•	When to use: Ideal for aggregating data across the distributed RDD into a single result.

4. count()

	•	What it does: Returns the total number of elements in the RDD.
	•	When to use: Useful when you only need the number of elements without needing to see the data itself.

5. saveAsTextFile() or saveAsHadoopFile()

	•	What it does: Saves the RDD data directly to storage, like HDFS or a local file system, instead of retrieving it to the driver.
	•	When to use: Ideal for saving large datasets to a file without bringing them back into memory.

6. map() and filter()

	•	What they do: map() transforms each element of the RDD, while filter() removes elements that don’t meet a condition.
	•	When to use: Best for transforming or filtering data while keeping it distributed, avoiding bringing large datasets to the driver.

In [19]:
nums_rdd.take(5)

[0, 1, 2, 3, 4]

In [21]:
	# •	This applies a transformation to the RDD nums_rdd, where each element (x) is squared (x**2).
	# •	The result is a new RDD called squared_nums_rdd, containing the squared values of the original RDD elements.
	# •	The operation is distributed, meaning Spark applies the transformation in parallel across the cluster without pulling the data back to the driver.

squared_nums_rdd = nums_rdd.map(lambda x: x**2)
squared_nums_rdd.take(8)

[0, 1, 4, 9, 16, 25, 36, 49]

In [22]:
pairs = squared_nums_rdd.map(lambda x: (x, len(str(x))))
pairs.take(25)
#

[(0, 1),
 (1, 1),
 (4, 1),
 (9, 1),
 (16, 2),
 (25, 2),
 (36, 2),
 (49, 2),
 (64, 2),
 (81, 2),
 (100, 3),
 (121, 3),
 (144, 3),
 (169, 3),
 (196, 3),
 (225, 3),
 (256, 3),
 (289, 3),
 (324, 3),
 (361, 3),
 (400, 3),
 (441, 3),
 (484, 3),
 (529, 3),
 (576, 3)]

In [26]:
even_digit_pairs = pairs.filter(lambda x: (x[1] % 2) == 0)
even_digit_pairs.take(100)

[(16, 2),
 (25, 2),
 (36, 2),
 (49, 2),
 (64, 2),
 (81, 2),
 (1024, 4),
 (1089, 4),
 (1156, 4),
 (1225, 4),
 (1296, 4),
 (1369, 4),
 (1444, 4),
 (1521, 4),
 (1600, 4),
 (1681, 4),
 (1764, 4),
 (1849, 4),
 (1936, 4),
 (2025, 4),
 (2116, 4),
 (2209, 4),
 (2304, 4),
 (2401, 4),
 (2500, 4),
 (2601, 4),
 (2704, 4),
 (2809, 4),
 (2916, 4),
 (3025, 4),
 (3136, 4),
 (3249, 4),
 (3364, 4),
 (3481, 4),
 (3600, 4),
 (3721, 4),
 (3844, 4),
 (3969, 4),
 (4096, 4),
 (4225, 4),
 (4356, 4),
 (4489, 4),
 (4624, 4),
 (4761, 4),
 (4900, 4),
 (5041, 4),
 (5184, 4),
 (5329, 4),
 (5476, 4),
 (5625, 4),
 (5776, 4),
 (5929, 4),
 (6084, 4),
 (6241, 4),
 (6400, 4),
 (6561, 4),
 (6724, 4),
 (6889, 4),
 (7056, 4),
 (7225, 4),
 (7396, 4),
 (7569, 4),
 (7744, 4),
 (7921, 4),
 (8100, 4),
 (8281, 4),
 (8464, 4),
 (8649, 4),
 (8836, 4),
 (9025, 4),
 (9216, 4),
 (9409, 4),
 (9604, 4),
 (9801, 4),
 (100489, 6),
 (101124, 6),
 (101761, 6),
 (102400, 6),
 (103041, 6),
 (103684, 6),
 (104329, 6),
 (104976, 6),
 (105625, 6)

In [31]:
flipped_pairs = even_digit_pairs.map(lambda x: (x[1], x[0]))
flipped_pairs.take(25)

[(2, 16),
 (2, 25),
 (2, 36),
 (2, 49),
 (2, 64),
 (2, 81),
 (4, 1024),
 (4, 1089),
 (4, 1156),
 (4, 1225),
 (4, 1296),
 (4, 1369),
 (4, 1444),
 (4, 1521),
 (4, 1600),
 (4, 1681),
 (4, 1764),
 (4, 1849),
 (4, 1936),
 (4, 2025),
 (4, 2116),
 (4, 2209),
 (4, 2304),
 (4, 2401),
 (4, 2500)]

In [32]:
grouped = flipped_pairs.groupByKey()
grouped.take(25)

[(2, <pyspark.resultiterable.ResultIterable at 0x7bac472b9c00>),
 (4, <pyspark.resultiterable.ResultIterable at 0x7bac472b9840>),
 (6, <pyspark.resultiterable.ResultIterable at 0x7bac472b9bd0>),
 (8, <pyspark.resultiterable.ResultIterable at 0x7bac472ba3b0>),
 (10, <pyspark.resultiterable.ResultIterable at 0x7bac472bafb0>),
 (12, <pyspark.resultiterable.ResultIterable at 0x7bac472b8f10>),
 (14, <pyspark.resultiterable.ResultIterable at 0x7bac9037ad10>)]

In [34]:
grouped = grouped.map(lambda x: (x[0], list(x[1])))
grouped.take(2)
#

[(2, [16, 25, 36, 49, 64, 81]),
 (4,
  [1024,
   1089,
   1156,
   1225,
   1296,
   1369,
   1444,
   1521,
   1600,
   1681,
   1764,
   1849,
   1936,
   2025,
   2116,
   2209,
   2304,
   2401,
   2500,
   2601,
   2704,
   2809,
   2916,
   3025,
   3136,
   3249,
   3364,
   3481,
   3600,
   3721,
   3844,
   3969,
   4096,
   4225,
   4356,
   4489,
   4624,
   4761,
   4900,
   5041,
   5184,
   5329,
   5476,
   5625,
   5776,
   5929,
   6084,
   6241,
   6400,
   6561,
   6724,
   6889,
   7056,
   7225,
   7396,
   7569,
   7744,
   7921,
   8100,
   8281,
   8464,
   8649,
   8836,
   9025,
   9216,
   9409,
   9604,
   9801])]

In [36]:
average = grouped.map(lambda x: (x[0], sum(x[1]) / len(x[1])))
average.take(25)

[(2, 45.166666666666664),
 (4, 4675.5),
 (6, 471838.0),
 (8, 47204941.666666664),
 (10, 4720705565.0),
 (12, 472075391214.1667),
 (14, 47207587468622.5)]

In [37]:
average.collect() #collect is also fine coz we don't have that much tuples here since they have been grouped

[(2, 45.166666666666664),
 (4, 4675.5),
 (6, 471838.0),
 (8, 47204941.666666664),
 (10, 4720705565.0),
 (12, 472075391214.1667),
 (14, 47207587468622.5)]