<a href="https://colab.research.google.com/github/NishatVasker/CSE488_Big-Data-Analytics-/blob/main/CSE488_PySpark_Exercise_using_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Create and Transform an RDD**

In [1]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
integer_rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
transformed_rdd = integer_rdd.map(lambda num: num * 3).filter(lambda num: num > 10)
final_output = transformed_rdd.collect()
print("Transformed numbers greater than 10:", final_output)


Transformed numbers greater than 10: [12, 15, 18, 21, 24, 27, 30]


**Read a Text File**

In [5]:
file_rdd = sc.textFile("/content/text.txt")
word_list_rdd = file_rdd.flatMap(lambda line: line.split())
total_words = word_list_rdd.count()
print(f"Total word count in the file: {total_words}")

Total word count in the file: 700


**GroupByKey and ReduceByKey**

In [6]:
scores_rdd = sc.parallelize([("Alice", 85), ("Bob", 90), ("Alice", 95)])
grouped_scores_rdd = scores_rdd.groupByKey().mapValues(list)
print("Grouped Scores by Student:", grouped_scores_rdd.collect())
total_scores_rdd = scores_rdd.reduceByKey(lambda score1, score2: score1 + score2)
print("Total Scores by Student:", total_scores_rdd.collect())

Grouped Scores by Student: [('Alice', [85, 95]), ('Bob', [90])]
Total Scores by Student: [('Alice', 180), ('Bob', 90)]


**RDD Persistence**

In [7]:
import time

# Step 1: Generate a large dataset
large_dataset_rdd = sc.parallelize(range(1, 1000001))

# Step 2: Measure performance without caching
start_time = time.time()
count_without_cache = large_dataset_rdd.count()
sum_without_cache = large_dataset_rdd.sum()
print(f"Execution time without caching: {time.time() - start_time:.2f} seconds")

# Step 3: Enable caching and measure performance
cached_rdd = large_dataset_rdd.cache()
start_time = time.time()
count_with_cache = cached_rdd.count()
sum_with_cache = cached_rdd.sum()
print(f"Execution time with caching: {time.time() - start_time:.2f} seconds")

Execution time without caching: 1.04 seconds
Execution time with caching: 1.53 seconds


**Custom Transformations**

In [8]:
def check_prime(n):
    if n < 2:
        return False
    for divisor in range(2, int(n**0.5) + 1):
        if n % divisor == 0:
            return False
    return True

# Step 1: Creation an RDD of integers
numbers_rdd = sc.parallelize(range(1, 101))

# Step 2: Filter out prime numbers
non_prime_numbers = numbers_rdd.filter(lambda num: not check_prime(num)).collect()
print("Numbers that are not prime:", non_prime_numbers)

Numbers that are not prime: [1, 4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20, 21, 22, 24, 25, 26, 27, 28, 30, 32, 33, 34, 35, 36, 38, 39, 40, 42, 44, 45, 46, 48, 49, 50, 51, 52, 54, 55, 56, 57, 58, 60, 62, 63, 64, 65, 66, 68, 69, 70, 72, 74, 75, 76, 77, 78, 80, 81, 82, 84, 85, 86, 87, 88, 90, 91, 92, 93, 94, 95, 96, 98, 99, 100]


**Transformation and Action Workflow**

In [9]:
# Step 1: Initialize the dataset
product_data = sc.parallelize([
    ("Laptop", "Electronics", 800),
    ("Shoes", "Clothing", 50),
    ("Phone", "Electronics", 500)
])

# Step 2: Filter products priced above 100
high_price_products = product_data.filter(lambda item: item[2] > 100)

# Step 3: Extract product names
product_names = high_price_products.map(lambda item: item[0]).collect()
print("Products priced above $100:", product_names)

# Step 4: Count products per category
category_counts = product_data.map(lambda item: (item[1], 1)).reduceByKey(lambda x, y: x + y).collect()
print("Product count by category:", category_counts)

Products priced above $100: ['Laptop', 'Phone']
Product count by category: [('Electronics', 2), ('Clothing', 1)]


**Integration with Spark SQL**

In [10]:
from pyspark.sql import SparkSession

# Step 1: Initialize SparkSession
spark = SparkSession.builder.appName("StudentData").getOrCreate()

# Step 2: Load JSON data
student_data = [{"name": "Alice", "age": 20, "grade": "A"}, {"name": "Bob", "age": 22, "grade": "B"}]
student_df = spark.createDataFrame(student_data)

# Step 3: Register as SQL table
student_df.createOrReplaceTempView("students_table")

# Step 4: Query students with grade "A"
top_students = spark.sql("SELECT * FROM students_table WHERE grade = 'A'")
top_students.show()

# Step 5: Save results to JSON file
top_students.write.json("path/to/output.json")


+---+-----+-----+
|age|grade| name|
+---+-----+-----+
| 20|    A|Alice|
+---+-----+-----+



**Advanced Word Count with Sorting**

In [13]:
text_data = sc.textFile("/content/text.txt")

word_counts = text_data.flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda count1, count2: count1 + count2) \
    .sortBy(lambda pair: pair[1], ascending=False)

#top 5 most frequent words
top_5_frequent_words = word_counts.take(5)
print("Top 5 words by frequency:", top_5_frequent_words)


Top 5 words by frequency: [('and', 21), ('M', 19), ('N', 16), ('2023', 15), ('Vasker,', 14)]


**Custom Aggregations with aggregateByKey**

In [14]:
city_temp_rdd = sc.parallelize([("NY", 32), ("LA", 75), ("NY", 28)])
average_temp_rdd = city_temp_rdd.aggregateByKey(
    (0, 0),  # (sum, count)
    lambda acc, temp: (acc[0] + temp, acc[1] + 1),  # Combine within partitions
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])  # Combine across partitions
).mapValues(lambda acc: acc[0] / acc[1])

print("Average temperature by city:", average_temp_rdd.collect())


Average temperature by city: [('NY', 30.0), ('LA', 75.0)]
