In [53]:
# Configurar el entorno de Spark
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext, StorageLevel
import random
import time
import multiprocessing
import requests
import textwrap

In [42]:
# Configuración de Spark
conf = SparkConf() \
    .setAppName("MiAplicaciónModificada") \
    .set("spark.master", "local[10]") \
    .set("spark.executor.cores", "2") \
    .set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)

In [3]:
# Función para transformar una lista de letras a mayúsculas
def transformar_a_mayusculas(lista):
    rdd = sc.parallelize(lista)
    return rdd.map(str.upper).collect()

# Crear un RDD y transformarlo
lista = ['b', 'a', 'c']
resultado_letras = transformar_a_mayusculas(lista)
print("Resultado en mayúsculas:", resultado_letras)

Resultado en mayúsculas: ['B', 'A', 'C']


In [4]:
# Función para multiplicar por 2 cada número de una lista
def multiplicar_por_dos(datos):
    rdd = sc.parallelize(datos)
    return rdd.map(lambda x: x * 2).collect()

# Crear un RDD de números y multiplicar por dos
numeros = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
resultado_numeros = multiplicar_por_dos(numeros)
print("Resultado multiplicado:", resultado_numeros)

# Verificar el tipo de resultado
print("Tipo de resultado:", type(resultado_numeros))

Resultado multiplicado: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
Tipo de resultado: <class 'list'>


In [5]:
# Función para generar una matriz aleatoria
def generar_matriz(filas, max_columnas, valor_maximo):
    return [
        [random.randint(1, valor_maximo)
            for _ in range(random.randint(0, max_columnas))]
        for _ in range(filas)
    ]

# Generar una matriz de 6 filas, con un máximo de 5 columnas y valores entre
# 1 y 100
matriz = generar_matriz(6, 5, 100)
print("Matriz generada:", matriz)

Matriz generada: [[44, 39, 27, 97, 48], [71, 56, 43], [99, 1], [], [89], [35, 91, 41]]


In [6]:
# Convertir matriz en RDD y aplanar sus elementos
rdd_matriz = sc.parallelize(matriz)
resultado_aplanado = rdd_matriz.flatMap(lambda x: x).collect()
print("Resultado aplanado:", resultado_aplanado)

Resultado aplanado: [44, 39, 27, 97, 48, 71, 56, 43, 99, 1, 89, 35, 91, 41]


In [7]:
# Crear un RDD de números aleatorios y filtrar los múltiplos de 5
x = sc.parallelize([random.randint(1, 100) for _ in range(10)])
y = x.filter(lambda num: num % 5 == 0)

# Mostrar los resultados
print("Números aleatorios:", x.collect())
print("Múltiplos de 5:", y.collect())

Números aleatorios: [97, 99, 94, 2, 51, 81, 62, 80, 83, 94]
Múltiplos de 5: [80]


In [8]:
# Realizar transformaciones a una lista de listas
datos = [[1, 2], [3, 4, 5], []]
rdd = sc.parallelize(datos)

# Aplanar la lista y multiplicar por 2
rdd_aplanado = rdd.flatMap(lambda x: x)
rdd2 = rdd_aplanado.map(lambda x: x * 2)

# Obtener sólo los resultados mayores a 5
rdd_filtrado = rdd2.filter(lambda x: x > 5)

# Obtener el resultado final
resultado = rdd_filtrado.collect()
print("Resultado filtrado:", resultado)

Resultado filtrado: [6, 8, 10]


In [9]:
# Agrupación de datos por clave
rdd = sc.parallelize([
    ('fruta', 'melón'),
    ('verdura', 'acelga'),
    ('fruta', 'arándano'),
    ('fruta', 'naranja'),
    ('verdura', 'apio'),
    ('verdura', 'lechuga'),
    ('fruta', 'tomate'),
    ('fruta', 'limón')
])

# Agrupar por clave
rdd_agrupado = rdd.groupByKey()

# Convertir a lista
rdd_lista = rdd_agrupado.mapValues(list)
resultado = rdd_lista.collect()
print("Resultado agrupado:", resultado)

# Primer resultado
print("Frutas:", resultado[0])

# Convertir a mapa y acceder a valores
resultado_mapa = rdd_lista.collectAsMap()
print("Resultado como mapa:", resultado_mapa)
print("Frutas desde mapa:", resultado_mapa['fruta'])

Resultado agrupado: [('fruta', ['melón', 'arándano', 'naranja', 'tomate', 'limón']), ('verdura', ['acelga', 'apio', 'lechuga'])]
Frutas: ('fruta', ['melón', 'arándano', 'naranja', 'tomate', 'limón'])
Resultado como mapa: {'fruta': ['melón', 'arándano', 'naranja', 'tomate', 'limón'], 'verdura': ['acelga', 'apio', 'lechuga']}
Frutas desde mapa: ['melón', 'arándano', 'naranja', 'tomate', 'limón']


In [23]:
# Ejemplo de reducción: Sumar valores por clave
rdd = sc.parallelize([
    ("frutas", 2),
    ("frutas", 3),
    ("vegetales", 4),
    ("frutas", 1),
    ("vegetales", 5),
    ("frutas", 7)
])

# Sumar y encontrar el máximo valor por clave
suma_por_clave = rdd.reduceByKey(lambda x, y: x + y).collectAsMap()
maximo_por_clave = rdd.reduceByKey(max).collectAsMap()

print("Suma por clave:", suma_por_clave)
print("Máximo por clave:", maximo_por_clave)

Suma por clave: {'frutas': 13, 'vegetales': 9}
Máximo por clave: {'frutas': 7, 'vegetales': 5}


In [24]:
# Ejemplo de almacenamiento en la memoria intermedia: Transformaciones en RDD
rdd_transformado = sc.parallelize([1, 2, 3, 4, 5]).map(lambda x: x * 2)
print("Primera acción; Se calcula el RDD:", rdd_transformado.collect())

rdd_transformado.cache()  # Almacenar en memoria el RDD transformado
print("Segunda acción; Ya con los datos en la memoria intermedia:",
    rdd_transformado.collect())

Primera acción; Se calcula el RDD: [2, 4, 6, 8, 10]
Segunda acción; Ya con los datos en la memoria intermedia: [2, 4, 6, 8, 10]


In [27]:
# Medición de tiempos de ejecución
datos = [1, 2, 3, 4, 5] * 1_000_000
rdd = sc.parallelize(datos)

# Definir la transformación
def transformation(x):
    return x * random.randint(1, 5)

In [34]:
# Tiempo sin memoria
inicio = time.time()
sin_memoria = rdd.map(transformation).filter(lambda x: x > 10).collect()
print("Tiempo sin memoria:", time.time() - inicio)

24/10/06 18:59:16 WARN TaskSetManager: Stage 35 contains a task of very large size (9908 KiB). The maximum recommended task size is 1000 KiB.
[Stage 35:>                                                         (0 + 1) / 1]

Tiempo sin memoria: 2.9843223094940186


                                                                                

In [35]:
# Tiempo con memoria
inicio = time.time()
con_memoria = rdd.map(transformation).filter(lambda x: x > 10).cache()
result1 = con_memoria.collect()
print("Con memoria:", time.time() - inicio)

24/10/06 18:59:30 WARN TaskSetManager: Stage 36 contains a task of very large size (9908 KiB). The maximum recommended task size is 1000 KiB.
[Stage 36:>                                                         (0 + 1) / 1]

Con memoria: 2.890392303466797


                                                                                

In [36]:
# Repetir la misma operación
inicio = time.time()
result2 = con_memoria.collect()
print("Tiempo de ejecución con memoria:", time.time() - inicio)

Tiempo de ejecución con memoria: 0.053607940673828125


24/10/06 18:59:37 WARN TaskSetManager: Stage 37 contains a task of very large size (9908 KiB). The maximum recommended task size is 1000 KiB.


In [37]:
# Mostrar los primeros resultados
print("Primeros resultados sin memoria:", result1[:10])
print("Primeros resultados con memoria:", result2[:10])

Primeros resultados sin memoria: [16, 20, 20, 12, 20, 20, 12, 15, 12, 12]
Primeros resultados con memoria: [16, 20, 20, 12, 20, 20, 12, 15, 12, 12]


In [32]:
# Persistir en memoria y disco
rdd_transformado = sc.parallelize([1, 2, 3, 4, 5]).map(lambda x: x * 3)
rdd_transformado.persist(StorageLevel.MEMORY_AND_DISK).collect()

[3, 6, 9, 12, 15]

In [33]:
# Comprobación del estado de memoria
is_cached_before = con_memoria.is_cached
con_memoria.unpersist()
is_cached_after = con_memoria.is_cached

print("Estado de la memoria antes de anular la persistencia:",
    is_cached_before)
print("Estado de la memoria después de anular la ppersistencia:",
    is_cached_after)

Estado de la memoria antes de anular la persistencia: True
Estado de la memoria después de anular la ppersistencia: False


In [38]:
# Ejemplo de particiones
rdd_simple = sc.parallelize([1, 2, 3, 4, 5])
print("Número de particiones (RDD simple):", rdd_simple.getNumPartitions())

rdd_grande = sc.parallelize(datos)
print("Número de particiones (RDD grande):", rdd_grande.getNumPartitions())

rdd_reparticionado = rdd_simple.repartition(4)
print("Número de particiones después de reparticionar:",
    rdd_reparticionado.getNumPartitions())

rdd_reducido = rdd_reparticionado.coalesce(2)
print("Número de particiones después de reducir:",
    rdd_reducido.getNumPartitions())

Número de particiones (RDD simple): 1
Número de particiones (RDD grande): 1
Número de particiones después de reparticionar: 4
Número de particiones después de reducir: 2


In [45]:
# Contar núcleos de procesamiento disponibles
print("Núcleos de procesamiento disponibles en el sistema:",
    multiprocessing.cpu_count())

Núcleos de procesamiento disponibles en el sistema: 20


In [43]:
# Número de núcleos de Spark asociado al contexto
print("Número de núcleos de Spark:", sc.defaultParallelism)

Número de núcleos de Spark: 10


In [46]:
# Operaciones en RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
print("Conteo de elementos en RDD:", rdd.count())
print("Primeros 3 elementos:", rdd.take(3))
print("Reducción con división:", rdd.reduce(lambda x, y: x / y))
print("Reducción con mínimo:", rdd.reduce(lambda x, y: min(x, y)))

Conteo de elementos en RDD: 5
Primeros 3 elementos: [1, 2, 3]
Reducción con división: 0.008333333333333333
Reducción con mínimo: 1


In [48]:
# Guarda a archivo de texto
destino = "content/guardado.txt"
rdd.saveAsTextFile(destino)

In [49]:
# Carga desde archivo de texto
cargado = sc.textFile(destino)
print("Datos cargados desde archivo:", cargado.collect())

Datos cargados desde archivo: ['5', '1', '3', '2', '4']


In [54]:
def contenido_wikipedia(titulo):
    url = "https://en.wikipedia.org/w/api.php"
    params = {
        "action": "query",
        "format": "json",
        "titles": titulo,
        "prop": "extracts",
        "explaintext": True
    }
    response = requests.get(url, params=params).json()
    pages = response["query"]["pages"]
    return list(pages.values())[0].get("extract", "")

articulos = ["History of free and open-source software",
    "Federal State Statistics Service (Russia)", "Culture of North Korea"]
rdd_articulos = sc.parallelize(articulos)
rdd_contenido = rdd_articulos.map(lambda titulo: {
    "title": titulo, 
    "content": contenido_wikipedia(titulo)
})

resultado = rdd_contenido.collect()
for article in resultado:
    title = article['title']
    content = article['content'][:200] + '...'
    wrapped_content = textwrap.fill(content, width=80)
    print(f"Título: {title}\nContenido: {wrapped_content}\n")

Título: History of free and open-source software
Contenido: The history of free and open-source software begins at the advent of computer
software in the early half of the 20th century. In the 1950s and 1960s, computer
operating software and compilers were del...

Título: Federal State Statistics Service (Russia)
Contenido: The Federal State Statistics Service (‹See Tfd›Russian: Федеральная служба
государственной статистики, romanized: Federalnaya sluzhba gosudarstvennoy
statistiki, abbreviated as Rosstat) is the governm...

Título: Culture of North Korea
Contenido: The contemporary culture of North Korea is based on traditional Korean culture,
but has developed since the division of Korea in 1945. Juche, officially the
Juche idea, is the state ideology of North ...



In [41]:
# Detener el contexto de Spark
sc.stop()