In [41]:
from pyspark.sql import SparkSession
from pyspark import StorageLevel

# Initialize Spark session with cluster master
spark = SparkSession.builder \
    .appName("SparkExample") \
    .master("spark://spark-master:7077") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

In [42]:
# Creación y transformaciones básicas
sc = spark.sparkContext
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd_map = rdd.map(lambda x: (x, x**2))
rdd_filter = rdd.filter(lambda x: x % 2 == 0)

# Acción persistente + visualización
rdd_filter.persist(StorageLevel.MEMORY_ONLY)
print(f"Conteo: {rdd_filter.count()}, Elementos: {rdd_filter.collect()}")

Conteo: 2, Elementos: [2, 4]


In [45]:
import time
from pyspark import StorageLevel

file_path = "/data/test.ft.txt"  # Ajusta el nombre del archivo según corresponda
rdd = spark.sparkContext.textFile(file_path)

# Función para contar palabras
def word_count(rdd):
    return (
        rdd.flatMap(lambda line: line.split())  # Dividir líneas en palabras
        .map(lambda word: (word.lower(), 1))    # Convertir a minúsculas y asignar 1 a cada palabra
        .reduceByKey(lambda a, b: a + b)        # Sumar ocurrencias de cada palabra
    )

# Medir tiempo SIN CACHING
start_time = time.time()
word_counts_no_cache = word_count(rdd)
word_counts_no_cache.count()  # Forzar ejecución de la acción
time_no_cache = time.time() - start_time

# Aplicar persistencia en memoria y disco (para evitar problemas de memoria)
cached_rdd = rdd.persist(StorageLevel.MEMORY_AND_DISK)

# Primera ejecución con caché (tarda más porque almacena los datos)
word_count(cached_rdd).count()

# Segunda ejecución con caché (medimos esta porque ya está en memoria)
start_time = time.time()
word_counts_cache = word_count(cached_rdd)
word_counts_cache.count()
time_with_cache = time.time() - start_time

# Mostrar resultados
print(f"Tiempo sin caching: {time_no_cache:.4f} segundos")
print(f"Tiempo con caching (segunda ejecución): {time_with_cache:.4f} segundos")

# Mostrar las 10 palabras más frecuentes
top_words = word_counts_no_cache.takeOrdered(10, key=lambda x: -x[1])
print("\nTop 10 palabras más frecuentes:")
for word, count in top_words:
    print(f"{word}: {count}")


                                                                                

Tiempo sin caching: 5.0376 segundos
Tiempo con caching (segunda ejecución): 5.0117 segundos

Top 10 palabras más frecuentes:
the: 1560295
and: 838774
i: 798349
a: 791028
to: 759410
of: 625868
this: 557680
is: 540663
it: 527640
in: 357511


25/02/24 02:21:53 ERROR TaskSchedulerImpl: Lost executor 80 on 172.20.0.4: worker lost: Not receiving heartbeat for 60 seconds
25/02/24 02:21:53 ERROR TaskSchedulerImpl: Lost executor 78 on 172.20.0.5: worker lost: Not receiving heartbeat for 60 seconds
25/02/24 02:21:53 ERROR TaskSchedulerImpl: Lost executor 79 on 172.20.0.3: worker lost: Not receiving heartbeat for 60 seconds
25/02/24 02:21:53 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_345_2 !
25/02/24 02:21:53 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_252_1 !
25/02/24 02:21:53 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_210_0 !
25/02/24 02:21:53 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_239_1 !
25/02/24 02:21:53 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_270_2 !
25/02/24 02:21:53 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_306_0 !
25/02/24 02:21:53 WARN BlockManagerMasterEndpoint: No m