In [1]:
# Create Spark Context with SparkConf
from pyspark import SparkConf, SparkContext

conf = SparkConf()
sc = SparkContext.getOrCreate(conf)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/11 19:20:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/12/11 19:20:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [6]:
# Excercise 1
# Add the phone prefix to the numbers using as reference the International Calling Codes
# Use a Broadcast Variable

input_data = [("Simón", "Bolivar", "VEN", "489 895 965"),
              ("Fidel", "Castro", "CU", "956 268 348"),
              ("Jose", "Doroteo", "MEX", "985 621 444"),
              ("Ernesto", "Guevara", "AR", "895 325 481"),
              ("Hugo", "Chávez", "VE", "489 895 965"),
              ("Camilo", "Cienfuegos", "CUB", "956 268 348"),
              ("Andrés Manuel", "López", "ME", "985 621 444"),
              ("Juan Domingo", "Perón", "ARG", "985 621 444"),
              ]

rdd = sc.parallelize(input_data)
calling_codes = {"VEN": "+58", "CU": "+53", "MEX": "+52", "AR": "+54", "VE": "+58", "CUB": "+53", "ME": "+52",
                 "ARG": "+54"}
broadcast_codes = sc.broadcast(calling_codes)


def add_prefix(record):
    first_name, last_name, country_code, phone_number = record
    prefix = broadcast_codes.value.get(country_code, "")
    return (first_name, last_name, country_code, f"{prefix} {phone_number}")


result_rdd = rdd.map(add_prefix)
result = result_rdd.collect()
for record in result:
    print(record)

('Simón', 'Bolivar', 'VEN', '+58 489 895 965')
('Fidel', 'Castro', 'CU', '+53 956 268 348')
('Jose', 'Doroteo', 'MEX', '+52 985 621 444')
('Ernesto', 'Guevara', 'AR', '+54 895 325 481')
('Hugo', 'Chávez', 'VE', '+58 489 895 965')
('Camilo', 'Cienfuegos', 'CUB', '+53 956 268 348')
('Andrés Manuel', 'López', 'ME', '+52 985 621 444')
('Juan Domingo', 'Perón', 'ARG', '+54 985 621 444')


In [5]:
# Excercise 2
# Count the number of times the word 'to' appears in a line and the number of lines in the bible.txt file
# Use Accumulators

def count_to_and_lines(line):
    global line_count, to_count
    words = line.split()
    to_count += words.count('to')
    line_count += 1


input_file_path = "../../data/spark_applications/bible.txt"

# Crear un acumulador para contar las líneas
line_count = sc.accumulator(0)

to_count = sc.accumulator(0)

rdd = sc.textFile(input_file_path)
rdd.foreach(count_to_and_lines)

# Imprimir los resultados
print(f"Number of lines: {line_count.value}")
print(f"Number of occurrences of 'to': {to_count.value}")

Number of lines: 30383
Number of occurrences of 'to': 12837


In [5]:
# Excercise 3
# Write the RDD containing the pagecounts dataset 
# Write the RDD but with only 2 partitions+
# Use Repartition
import shutil

input_file_path = "../../data/spark_applications/pagecounts"
rdd = sc.textFile(input_file_path)
rdd_repartitioned = rdd.repartition(2)
# Delete the existing output directory if it exists
output_file_path = "pagecounts_repartitioned"
shutil.rmtree(output_file_path, ignore_errors=True)

rdd_repartitioned.saveAsTextFile(output_file_path)

                                                                                

In [7]:
# Excercise 4
# Check the differences in computation time when using cache method on an RDD
# Read pagecount files and count lines with and without using cache method
# show the time differences
# Use Cache

import time
input_file_path = "../../data/spark_applications/pagecounts"
rdd = sc.textFile(input_file_path)

# Measure time without caching
start_time_without_cache = time.time()

# Count the number of lines without caching
count_without_cache = rdd.count()

end_time_without_cache = time.time()
time_without_cache = end_time_without_cache - start_time_without_cache

# Cache the RDD
rdd.cache()

# Measure time with caching
start_time_with_cache = time.time()

# Count the number of lines with caching
count_with_cache = rdd.count()

end_time_with_cache = time.time()
time_with_cache = end_time_with_cache - start_time_with_cache

# Display results
print(f"Number of lines without cache: {count_without_cache}")
print(f"Time without cache: {time_without_cache} seconds")

print(f"\nNumber of lines with cache: {count_with_cache}")
print(f"Time with cache: {time_with_cache} seconds")



Number of lines without cache: 4729148
Time without cache: 1.4901230335235596 seconds

Number of lines with cache: 4729148
Time with cache: 2.2003750801086426 seconds


                                                                                

In [12]:
# Excercise 5
# use spark-submit to launch the app.py file by yourself
# :)

!spark-submit --master local app.py


23/12/11 19:43:57 INFO SparkContext: Running Spark version 3.5.0
23/12/11 19:43:57 INFO SparkContext: OS info Mac OS X, 13.3.1, x86_64
23/12/11 19:43:57 INFO SparkContext: Java version 17.0.7
23/12/11 19:43:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/12/11 19:43:57 INFO ResourceUtils: No custom resources configured for spark.driver.
23/12/11 19:43:57 INFO SparkContext: Submitted application: app.py
23/12/11 19:43:57 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/12/11 19:43:57 INFO ResourceProfile: Limiting resource is cpu
23/12/11 19:43:57 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/12/11 19:43:57 INFO SecurityManager: Cha

In [13]:
sc.stop()