In [18]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("lab02_Spark_RDD_Core")
    .master("local[*]")
    .getOrCreate()
)

spark

In [19]:
~/projets/realtime-spark/data/kaggle_text/reviews_sample.txt

/Users/abidhiafahmed/projets/realtime-spark/data/iris.csv


In [20]:
iris_path = "/Users/abidhiafahmed/projets/realtime-spark/data/iris.csv"

df_iris = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(iris_path)
)

df_iris.printSchema()
df_iris.show(5)


root
 |-- Id: integer (nullable = true)
 |-- SepalLengthCm: double (nullable = true)
 |-- SepalWidthCm: double (nullable = true)
 |-- PetalLengthCm: double (nullable = true)
 |-- PetalWidthCm: double (nullable = true)
 |-- Species: string (nullable = true)

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
+---+-------------+------------+-------------+------------+-----------+
only showing top 5 rows



In [21]:
  df_iris.count()
  
  df_iris.select("species").distinct().show()

+---------------+
|        species|
+---------------+
| Iris-virginica|
|    Iris-setosa|
|Iris-versicolor|
+---------------+



In [22]:
from pyspark.sql.functions import avg

df_iris.groupBy("species").agg(avg("SepalLengthCm").alias("avg_sepal_length")).show()

+---------------+-----------------+
|        species| avg_sepal_length|
+---------------+-----------------+
| Iris-virginica|6.587999999999998|
|    Iris-setosa|5.005999999999999|
|Iris-versicolor|            5.936|
+---------------+-----------------+



In [23]:
rdd_lines = spark.sparkContext.textFile("/Users/abidhiafahmed/projets/realtime-spark/data/iris.csv")

print("Number of lines in the RDD:", rdd_lines.count())
print("Sample lines:", rdd_lines.take(5))

Number of lines in the RDD: 151
Sample lines: ['Id,SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species', '1,5.1,3.5,1.4,0.2,Iris-setosa', '2,4.9,3.0,1.4,0.2,Iris-setosa', '3,4.7,3.2,1.3,0.2,Iris-setosa', '4,4.6,3.1,1.5,0.2,Iris-setosa']


In [24]:
import re

def clean_line(line: str) -> list[str]:
    # Lowercase
    line = line.lower()
    # Replace non-alphanumeric characters with spaces
    line = re.sub(r"[^a-zA-ZÀ-ÖØ-öø-ÿ0-9]+", " ", line)
    # Split into words
    words = line.split()
    return words

# 1. Split into words
rdd_words_reviews = rdd_lines.flatMap(clean_line)

# 2. Optional: filter very short words
rdd_words_filtered = rdd_words_reviews.filter(lambda w: len(w) > 2)

# 3. Map to (word, 1)
rdd_pairs = rdd_words_filtered.map(lambda w: (w, 1))

# 4. Aggregate by word
rdd_word_count = rdd_pairs.reduceByKey(lambda a, b: a + b)

# 5. Display a few results
print("Number of distinct words:", rdd_word_count.count())
print("Sample pairs (word, count):", rdd_word_count.take(10))

Number of distinct words: 60
Sample pairs (word, count): [('petallengthcm', 1), ('petalwidthcm', 1), ('setosa', 50), ('versicolor', 50), ('100', 1), ('102', 1), ('106', 1), ('107', 1), ('108', 1), ('110', 1)]


In [25]:
rdd_count_word = rdd_word_count.map(lambda pair: (pair[1], pair[0]))

rdd_sorted = rdd_count_word.sortByKey(ascending=False)

top_20 = rdd_sorted.take(20)

print("Top 20 most frequent words:")
for count, word in top_20:
    print(f"{word} : {count}")

Top 20 most frequent words:
iris : 150
setosa : 50
versicolor : 50
virginica : 50
petallengthcm : 1
petalwidthcm : 1
100 : 1
102 : 1
106 : 1
107 : 1
108 : 1
110 : 1
111 : 1
112 : 1
113 : 1
115 : 1
116 : 1
119 : 1
121 : 1
122 : 1


In [26]:
print("Number of partitions in rdd_lines:", rdd_lines.getNumPartitions())

Number of partitions in rdd_lines: 2


In [27]:
rdd_repart_8 = rdd_lines.repartition(8)
print("Partitions after repartition(8):", rdd_repart_8.getNumPartitions())

Partitions after repartition(8): 8


In [28]:
rdd_coalesce_2 = rdd_lines.coalesce(2)
print("Partitions after coalesce(2):", rdd_coalesce_2.getNumPartitions())

Partitions after coalesce(2): 2


In [29]:
import time

def measure_time(action_fn, description: str):
    start = time.time()
    result = action_fn()
    end = time.time()
    print(f"{description} -> result = {result}, time = {end - start:.4f} s")

# Compare count across different versions
measure_time(lambda: rdd_lines.count(), "count with default partitioning")
measure_time(lambda: rdd_repart_8.count(), "count after repartition(8)")
measure_time(lambda: rdd_coalesce_2.count(), "count after coalesce(2)")

count with default partitioning -> result = 151, time = 0.0141 s
count after repartition(8) -> result = 151, time = 0.0523 s
count after coalesce(2) -> result = 151, time = 0.0141 s


In [30]:
# Intermediate RDD: filtered words
rdd_words_filtered = rdd_words_reviews.filter(lambda w: len(w) > 2)
from pyspark import StorageLevel

rdd_words_filtered_persist = rdd_words_filtered.persist(StorageLevel.MEMORY_AND_DISK)
# Cache
rdd_words_filtered_cache = rdd_words_filtered.cache()

UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level

In [None]:
# First action
print("Number of filtered words:", rdd_words_filtered_cache.count())

# Second action
print("Sample filtered words:", rdd_words_filtered_cache.take(10))

In [None]:
from pyspark import StorageLevel

rdd_words_filtered_persist = rdd_words_filtered.persist(StorageLevel.MEMORY_AND_DISK)