<a href="https://colab.research.google.com/github/CatarsisXXI/BD-Avan.-Big-Data/blob/main/Transformaciones_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
pip install pyspark



In [3]:
from pyspark.context import SparkContext

In [8]:
from pyspark import SparkContext
from pyspark import SparkConf

In [5]:
from pyspark.sql import *
from pyspark.sql.functions import *

In [None]:
conf = SparkConf().setAppName("RDD Examples").setMaster("local")
sc = SparkContext(conf=conf)

In [12]:
# Ejemplo 1: map
rdd = sc.parallelize([1, 2, 3, 4])
mapped_rdd = rdd.map(lambda x: x * 2)
print("map:", mapped_rdd.collect())

map: [2, 4, 6, 8]


In [13]:
# Ejemplo 2: filter
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
print("filter:", filtered_rdd.collect())

filter: [2, 4]


In [14]:
# Ejemplo 3: flatMap
rdd = sc.parallelize(["hello world", "spark RDD"])
flatmapped_rdd = rdd.flatMap(lambda x: x.split(" "))
print("flatMap:", flatmapped_rdd.collect())

flatMap: ['hello', 'world', 'spark', 'RDD']


In [15]:
# Ejemplo 4: mapPartitions
def process_partition(partition):
    return [x * 2 for x in partition]

rdd = sc.parallelize([1, 2, 3, 4], 2)
mapped_partitions = rdd.mapPartitions(process_partition)
print("mapPartitions:", mapped_partitions.collect())

mapPartitions: [2, 4, 6, 8]


In [16]:
# Ejemplo 5: sample
sampled_rdd = rdd.sample(withReplacement=False, fraction=0.5)
print("sample:", sampled_rdd.collect())

sample: [3, 4]


In [17]:
# Ejemplo 6: union
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
union_rdd = rdd1.union(rdd2)
print("union:", union_rdd.collect())

union: [1, 2, 3, 3, 4, 5]


In [18]:
# Ejemplo 7: intersection
intersection_rdd = rdd1.intersection(rdd2)
print("intersection:", intersection_rdd.collect())

intersection: [3]


In [19]:
# Ejemplo 8: distinct
distinct_rdd = union_rdd.distinct()
print("distinct:", distinct_rdd.collect())

distinct: [2, 4, 1, 3, 5]


In [20]:
# Ejemplo 9: groupByKey
rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
grouped_rdd = rdd.groupByKey().mapValues(list)
print("groupByKey:", grouped_rdd.collect())

groupByKey: [('a', [1, 3]), ('b', [2])]


In [21]:
# Ejemplo 10: reduceByKey
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
print("reduceByKey:", reduced_rdd.collect())

reduceByKey: [('a', 4), ('b', 2)]


In [22]:
# Ejemplo 11: aggregateByKey
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4)])
aggregated_rdd = rdd.aggregateByKey((0, 0),
                                    lambda acc, val: (acc[0] + val, acc[1] + 1),
                                    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
print("aggregateByKey:", aggregated_rdd.collect())

aggregateByKey: [('a', (3, 2)), ('b', (7, 2))]


In [23]:
# Ejemplo 12: sortByKey
rdd = sc.parallelize([(2, "b"), (1, "a"), (3, "c")])
sorted_rdd = rdd.sortByKey()
print("sortByKey:", sorted_rdd.collect())

sortByKey: [(1, 'a'), (2, 'b'), (3, 'c')]


In [24]:
# Ejemplo 13: join
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("a", 3), ("b", 4), ("c", 5)])
joined_rdd = rdd1.join(rdd2)
print("join:", joined_rdd.collect())

join: [('b', (2, 4)), ('a', (1, 3))]


In [25]:
# Ejemplo 14: cogroup
cogrouped_rdd = rdd1.cogroup(rdd2)
print("cogroup:", cogrouped_rdd.mapValues(lambda x: (list(x[0]), list(x[1]))).collect())

cogroup: [('b', ([2], [4])), ('c', ([], [5])), ('a', ([1], [3]))]


In [26]:
# Ejemplo 15: cartesian
rdd1 = sc.parallelize([1, 2])
rdd2 = sc.parallelize(["a", "b"])
cartesian_rdd = rdd1.cartesian(rdd2)
print("cartesian:", cartesian_rdd.collect())

cartesian: [(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]


In [27]:
# Ejemplo 16: pipe
rdd = sc.parallelize(["hello", "world"])
piped_rdd = rdd.pipe("echo")
print("pipe:", piped_rdd.collect())

pipe: ['']


In [28]:
# Ejemplo 17: coalesce
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
coalesced_rdd = rdd.coalesce(2)
print("coalesce:", coalesced_rdd.getNumPartitions())

coalesce: 2


In [29]:
# Ejemplo 18: repartition
repartitioned_rdd = rdd.repartition(4)
print("repartition:", repartitioned_rdd.getNumPartitions())

repartition: 4


In [30]:
# Ejemplo 19: repartitionAndSortWithinPartitions
rdd = sc.parallelize([(3, "c"), (1, "a"), (2, "b"), (4, "d")])
partitioned_sorted_rdd = rdd.repartitionAndSortWithinPartitions(2)
print("repartitionAndSortWithinPartitions:", partitioned_sorted_rdd.collect())

repartitionAndSortWithinPartitions: [(2, 'b'), (4, 'd'), (1, 'a'), (3, 'c')]
