#PRINOM MOJUMDER
#2021-2-60-098

In [49]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RDD Example").getOrCreate() # making a new spark session. # obj name: spark



#1.
Create and Transform an RDD:
• Create an RDD from a list of integers.
• Perform the following:
   Multiply each number by 3 using map.
   Filter the numbers greater than 10 using filter.
   Collect and display the final result.

In [50]:
rdd_prinom = spark.sparkContext.parallelize([1, 2, 3, 4, 5,6,7,8,9])
mapped_rdd_prinom = rdd_prinom.map(lambda x: x * 3)
print("Task_01_Prinom:")
mapped_rdd_prinom.collect()

Task_01_Prinom:


[3, 6, 9, 12, 15, 18, 21, 24, 27]

In [51]:
filtered_rdd = mapped_rdd_prinom.filter(lambda x: x > 10)
print("Task_01_Prinom:")
filtered_rdd.collect()

Task_01_Prinom:


[12, 15, 18, 21, 24, 27]

#2.
Read a Text File:
  Use textFile to read a file containing sentences (e.g., "hello world", "spark is great").

   Perform the following:
     Split each line into words using flatMap.
      Count the total number of words using count

In [52]:
rdd_prinom = spark.sparkContext.textFile("/content/RDD.txt")
words = rdd_prinom.flatMap(lambda line: line.split(" "))
print("Task_02_Prinom:")
print(words.collect())
words.count()


Task_02_Prinom:
['hello', 'world', 'spark', 'is', 'great']


5

#3 GroupByKey and ReduceByKey:

• Create an RDD with pairs of student names and their scores, e.g., [("Alice", 85),
("Bob", 90), ("Alice", 95)].


• Use:
o groupByKey to group scores by student.

o reduceByKey to calculate the total score for each student.

In [53]:
rdd_prinom = spark.sparkContext.parallelize([("Alice", 85), ("Bob", 90), ("Alice", 95)])
grouped_rdd = rdd_prinom.groupByKey()
grouped_result = [(k, list(v)) for k, v in grouped_rdd.collect()]
print("Task_03_Prinom:")
print(grouped_result)


Task_03_Prinom:
[('Alice', [85, 95]), ('Bob', [90])]


In [17]:
reduced_rdd  = rdd_prinom.reduceByKey(lambda x, y: x + y)
reduced_result = reduced_rdd.collect()
print(reduced_result)


[('Alice', 180), ('Bob', 90)]


#4. RDD Persistence:
• Create an RDD from a large list (simulate it by generating numbers from 1 to 1,000,000).

• Perform multiple actions (e.g., count, sum) without caching and measure execution
time.

• Repeat with cache or persist and compare the performance.

In [21]:
print("Task_04_prinom:")
import time
# Without cache
rdd_prinom = spark.sparkContext.parallelize(range(1, 1000001)) #1 to 1000000
start_time = time.time()

count_prinom = rdd_prinom.count()
sum_prinom = rdd_prinom.sum()

end_time = time.time()
print("Without Cache:")
print(f"Count: {count_prinom}, Sum: {sum_prinom}")
print(f"Time: {end_time - start_time} seconds")


Task_04_prinom:
Without Cache:
Count: 1000000, Sum: 500000500000
Time: 1.0604298114776611 seconds


In [22]:
# With Cache
rdd_prinom_cached = rdd_prinom.cache()
start_time = time.time()
count_prinom_cached = rdd_prinom_cached.count()
sum_prinom_cached = rdd_prinom_cached.sum()

end_time = time.time()
print("With Cache:")
print(f"Count: {count_prinom_cached}, Sum: {sum_prinom_cached}")
print(f"Time: {end_time - start_time} seconds")

With Cache:
Count: 1000000, Sum: 500000500000
Time: 2.1086349487304688 seconds


#5. Custom Transformations:

• Create an RDD of numbers.

• Write a custom transformation to identify and filter out prime numbers.

In [24]:
rdd_prinom = spark.sparkContext.parallelize(range(1, 101))

def prime(n):
    if n <= 1:
        return False
    for i in range(2, n):
        if n % i == 0:
            return False
    return True

rdd = rdd_prinom.filter(prime)
print("Task_05_prinom:")
print(rdd.collect())


Task_05_prinom:
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]


In [25]:
rdd_prinom = spark.sparkContext.parallelize(range(1, 101))

# Use lambda for prime number:
rdd = rdd_prinom.filter(lambda n: n > 1 and all(n % i != 0 for i in range(2, n)))

print("Task_05_prinom:")
print(rdd.collect())


Task_05_prinom:
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]


#6. Transformation and Action Workflow:
• Load a dataset (e.g., CSV or text file) containing the following:

[Product,Category,Price

Laptop,Electronics,800

Shoes,Clothing,50

Phone,Electronics,500]
_________________________________________________________________________________

• Perform the following:

o Filter products with a price greater than 100.

o Map to get only the product names.

o Count the number of products in each category using map and reduceByKey.

In [33]:
data = [
    "Laptop,Electronics,800",
    "Shoes,Clothing,50",
    "Phone,Electronics,500"
]
rdd = spark.sparkContext.parallelize(data)

In [34]:
filtered_rdd = rdd.filter(lambda line: int(line.split(",")[2]) > 100)

product_names_rdd = filtered_rdd.map(lambda line: line.split(",")[0])

category_counts_rdd = (
    rdd.map(lambda line: (line.split(",")[1], 1))
    .reduceByKey(lambda x, y: x + y)
)

print("Task_06_prinom:")
print("Filtered Products:")
print(filtered_rdd.collect())

print("\nProduct Names:")
print(product_names_rdd.collect())

print("\nCategory Counts:")
print(category_counts_rdd.collect())

Task_06_prinom:
Filtered Products:
['Laptop,Electronics,800', 'Phone,Electronics,500']

Product Names:
['Laptop', 'Phone']

Category Counts:
[('Electronics', 2), ('Clothing', 1)]


#7. Integration with Spark SQL:

• Load a JSON dataset containing information about students:

[{"name": "Alice", "age": 20, "grade": "A"},
{"name": "Bob", "age": 22, "grade": "B"}]
____________________________________________________________________________
• Perform the following:

o Register the data as a temporary SQL
 table.

o Query students who have a grade "A" using Spark SQL.

o Save the result to a new JSON file.

In [36]:
print("Task_07_prinom:")
rdd_prinom = spark.sparkContext.parallelize([{"name": "Alice", "age": 20, "grade": "A"},
    {"name": "Bob", "age": 22, "grade": "B"}
])
df_prinom = spark.read.json(rdd_prinom)

print("SQL Table:")
df_prinom.createOrReplaceTempView("students")
df_prinom.show()

print("\nStudents who have a grade A:")
result_prinom = spark.sql("SELECT * FROM students WHERE grade = 'A'")
result_prinom.show()

result_prinom.write.json("outputJ_GradeA.json")


Task_07_prinom:
SQL Table:
+---+-----+-----+
|age|grade| name|
+---+-----+-----+
| 20|    A|Alice|
| 22|    B|  Bob|
+---+-----+-----+


Students who have a grade A:
+---+-----+-----+
|age|grade| name|
+---+-----+-----+
| 20|    A|Alice|
+---+-----+-----+



#8. Advanced Word Count with Sorting:

• Extend the word count program to:

o Sort words by their frequency in descending order.

o Display the top 5 most frequent words.

In [37]:
print("Task_08_Prinom:")
rdd_Prinom = spark.sparkContext.parallelize([
    "Hello EWU",
    "Prinom loves EWU",
    "Prinom loves CSE of EWU"
])
words = rdd_Prinom.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word.lower(), 1))

word_counts_total = word_counts.reduceByKey(lambda x, y: x + y)
sorted_word_counts = word_counts_total.sortBy(lambda x: x[1], ascending=False)
top_5 = sorted_word_counts.take(5)

print("Top 5 Most Frequent Words:")
for word, count in top_5:
    print(f"{word}: {count}")



Task_08_Prinom:
Top 5 Most Frequent Words:
ewu: 3
loves: 2
prinom: 2
cse: 1
of: 1


#9. Custom Aggregations with aggregateByKey:

• Create an RDD with pairs of cities and temperatures, e.g., [("NY", 32), ("LA",
75), ("NY", 28)].

• Use aggregateByKey to calculate the average temperature for each city.

In [54]:
data = [("NY", 32), ("LA", 75), ("NY", 28)]
rdd_PM_9 = spark.sparkContext.parallelize(data)

aggregated_rdd_PM_9 = rdd_PM_9.aggregateByKey(
    (0, 0),
    lambda acc, value: (acc[0] + value, acc[1] + 1),
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
)

average_rdd_PM_9 = aggregated_rdd_PM_9.mapValues(lambda x: x[0] / x[1])

results = average_rdd_PM_9.collect()
print("Task_09_Prinom:")
print("Average Temperature for Each City:")
for city, avg_temp in results:
    print(f"City: {city}, Average Temperature: {avg_temp:.2f}")

spark.stop()

Task_09_Prinom:
Average Temperature for Each City:
City: NY, Average Temperature: 30.00
City: LA, Average Temperature: 75.00
