In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import sagemaker_pyspark

conf = SparkConf()
conf.set("spark.driver.extraClassPath", ":".join(sagemaker_pyspark.classpath_jars()))
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider')
spark = (
    SparkSession
    .builder
    .config(conf=conf)
    .appName("joins")
    .getOrCreate()
)
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/02/11 15:51:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/11 15:51:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Joins en Spark
- solo entre RDD de dos valores (*observación*: los RDD de un valor se pueden transformar a RDDs de dos valores)
- la condición sobre la join es siempre de igualdad sobre la clave
- sc-> Spark Context

In [2]:
from dataclasses import dataclass

@dataclass
class Person:
    """Class to model a Person"""
    name: str
    role: str
    age: int

@dataclass
class Address:
    """Class to model a Person"""
    person_name: str
    street: str
    number: str
    zipcode: str

Creamos unos datos sinteticos de ejemplo 

In [3]:
person1 = Person("Luca", "professor", 25)
person2 = Person("Ana", "coordinator", 25)

address1 = Address("Luca", "Mallorca", "22", "12345")
address2 = Address("Ana", "Paris", "33", "23456")
address3 = Address("Luca", "Londres", "44", "34567")

In [4]:
person_rdd = sc.parallelize([person1, person2])
address_rdd = sc.parallelize([address1, address2, address3])

`person_rdd` y `address_rdd` son dos RDD de 1 valor. Ya que join solo funciona para RDDs de dos valores, los transformaremos seleccionando un valor de la clase como clave, y dejando la clase entera como valor.

In [5]:
person_rdd_2 = person_rdd.map(lambda x: (x.name, x))
address_rdd_2 = address_rdd.map(lambda x: (x.person_name, x))

person_address_rdd = person_rdd_2.join(address_rdd_2)

In [6]:
for row in person_address_rdd.collect():
    print(row)



('Luca', (Person(name='Luca', role='professor', age=25), Address(person_name='Luca', street='Mallorca', number='22', zipcode='12345')))
('Luca', (Person(name='Luca', role='professor', age=25), Address(person_name='Luca', street='Londres', number='44', zipcode='34567')))
('Ana', (Person(name='Ana', role='coordinator', age=25), Address(person_name='Ana', street='Paris', number='33', zipcode='23456')))


                                                                                

`person_rdd` and `address_rdd`-> creates new RDDs (`person_rdd_2` and `address_rdd_2`) where the elements are tuples with the person as the first element and the object itself as the second element, performs a join operation to match the Person and Address objects based on the name and then prints the resulting tuples.

# Un ejemplo más complejo

Queremos "mejorar" los resultados de nuestro ejemplo anterior de "Tweet by Language", imprimiendo en pantalla el nombre completo del idioma (i.e. `English`) en lugar del código de dos caracteres (i.e. `en`). Para alcanzar este objetivo, usaremos un fichero de tipo CSV que contiene la asociación entre código y idioma, disponible en Moodle.

## Definir los inputs

In [7]:
input_data = 's3a://mudab-2025-big-data/twitter-data/Eurovision-00.json'
input_map = 'languages.csv'

## Preparar la "language map"

In [8]:
langmap_rdd = sc.textFile(input_map)

In [9]:
for line in langmap_rdd.take(5):
    print(line)

[Stage 2:>                                                          (0 + 1) / 1]

Afghanistan,AF,AFG,004,ISO 3166-2:AF,Asia,Southern Asia,"",142,034,""
Åland Islands,AX,ALA,248,ISO 3166-2:AX,Europe,Northern Europe,"",150,154,""
Albania,AL,ALB,008,ISO 3166-2:AL,Europe,Southern Europe,"",150,039,""
Algeria,DZ,DZA,012,ISO 3166-2:DZ,Africa,Northern Africa,"",002,015,""
American Samoa,AS,ASM,016,ISO 3166-2:AS,Oceania,Polynesia,"",009,061,""


                                                                                

### Observaciones
Después de explorar visualmente el fichero, podemos observar que: 

- En un fichero separado por comas (CSV), con varios campos: 
    - Country name
    - Country 2-letter code 
    - Country 3-letter code
    - ...

Que campos necesitamos nosotros para poder hacer el join? 

In [10]:
from dataclasses import dataclass

@dataclass(frozen=True)# frozen is required to be able to hash elements
class Language:
    name: str
    iso_2: str

clean_langmap_rdd = (
    langmap_rdd
    .map(lambda x: x.split(',')) # ARRAY 
    .map(lambda x: Language(x[0], x[1].lower()))
)

clean_langmap_rdd.take(5)

                                                                                

[Language(name='Afghanistan', iso_2='af'),
 Language(name='Åland Islands', iso_2='ax'),
 Language(name='Albania', iso_2='al'),
 Language(name='Algeria', iso_2='dz'),
 Language(name='American Samoa', iso_2='as')]

In [11]:
for line in clean_langmap_rdd.take(5):
    print(line)
print(clean_langmap_rdd.count())

                                                                                

Language(name='Afghanistan', iso_2='af')
Language(name='Åland Islands', iso_2='ax')
Language(name='Albania', iso_2='al')
Language(name='Algeria', iso_2='dz')
Language(name='American Samoa', iso_2='as')


[Stage 5:>                                                          (0 + 2) / 2]

249


                                                                                

In [12]:
@dataclass
class Tweet:
  """Class to model a Tweet"""
  id: int         # The unique ID of a tweet
  content: str    # The textual content of a tweet
  author: str     # The nickname of the author of the tweet
  language: str   # The language of the tweet

import json

def toTweet(line: str):
      try:
        parsed = json.loads(line)
        return Tweet(parsed['id'], parsed['text'], parsed['user']['name'], parsed['lang'])
      except Exception as e:
        return None

rdd = sc.textFile(input_data)

processed = (rdd  # rdd[string]
    .map(toTweet) # parse string into Tweet. Return rdd[Tweet]
    .filter(lambda x: x is not None) # filter empty values. Return same type (rdd[Tweet])
)

by_lang = (
    processed
        .map(lambda x: (x.language, 1)) # from rdd[Tweet] to rdd[str, int]
        .reduceByKey(lambda x, y: x + y) # reduce produces same type rdd[str, int]
        .sortBy(lambda x: x[1], ascending=False) # sort the output. 
            # Since it's a pair-rdd, select which element of the pair should be used to order 
)

25/02/11 13:58:03 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


                                                                                

In [13]:
for line in by_lang.take(5):
    print(line)
print(by_lang.count())

                                                                                

('es', 50874)
('en', 30675)
('pt', 5831)
('fr', 3766)
('ru', 2774)




41


                                                                                

## Join de los RDDs

Tenemos ahora dos RDDs de dos valores:
- uno contiene la asociación (código, idioma)
- el otro contiene la asociación (código, recuento)

Cómo hacemos el join?

In [14]:
langmap_for_join_rdd = clean_langmap_rdd.map(lambda x: (x.iso_2, x.name))
print(langmap_for_join_rdd.take(5))

[('af', 'Afghanistan'), ('ax', 'Åland Islands'), ('al', 'Albania'), ('dz', 'Algeria'), ('as', 'American Samoa')]


                                                                                

In [15]:
joined = by_lang.join(langmap_for_join_rdd)

In [16]:
for line in joined.take(5):
    print(line)



('fr', (3766, 'France'))
('it', (2326, 'Italy'))
('de', (332, 'Germany'))
('sv', (132, 'El Salvador'))
('cy', (79, 'Cyprus'))


                                                                                

Reduced by join

In [17]:
final = joined.map(lambda x: (x[1][1], x[1][0]))

In [18]:
for line in final.take(5):
    print(line)



('France', 3766)
('Italy', 2326)
('Germany', 332)
('El Salvador', 132)
('Cyprus', 79)


                                                                                

In [19]:
for line in final.collect():
    print(line)



('France', 3766)
('Italy', 2326)
('Germany', 332)
('El Salvador', 132)
('Cyprus', 79)
('Suriname', 10)
('Argentina', 6)
('Russian Federation', 2774)
('Ethiopia', 132)
('Haiti', 52)
('Timor-Leste', 102)
('Lithuania', 11)
('Romania', 35)
('Canada', 286)
('Spain', 50874)
('Norway', 48)
('Latvia', 3)
('Poland', 547)
('Hungary', 39)
('Thailand', 4)
('Sierra Leone', 4)
('Portugal', 5831)
('Finland', 24)
('Bulgaria', 7)
('Türkiye', 672)
('India', 109)
('Virgin Islands (U.S.)', 34)
('Iceland', 30)


                                                                                

-----

# Preguntas para casa

1.1 Cuales son los 10 usarios más mencionados en toda la collecion de Tweets? 
in the text of the tweet @ markdown

1.2 Cuantos tweets escribieron los 10 usuarios más mencionados? 

Sugerencias: 
- primero enfocarse en los usuarios más mencionados
- luego enfocarse es sacar recuento de tweets por usuario 

In [28]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import sagemaker_pyspark
import json
import re
from dataclasses import dataclass

In [29]:
# CONFIGURE SPARK
conf = SparkConf()
conf.set("spark.driver.extraClassPath", ":".join(sagemaker_pyspark.classpath_jars()))
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider')
spark = (
    SparkSession
    .builder
    .config(conf=conf)
    .appName("joins")
    .getOrCreate()
)
sc = spark.sparkContext

print("SPARK CONFIGURED")

SPARK CONFIGURED


In [30]:
# DEFINE A CLASS TO REPRESENT A TWEET
@dataclass
class Tweet:
    """CLASS TO MODEL A TWEET"""
    id: int         # THE UNIQUE ID OF A TWEET
    content: str    # THE TEXTUAL CONTENT OF A TWEET
    author: str     # THE NICKNAME OF THE AUTHOR OF THE TWEET
    language: str   # THE LANGUAGE OF THE TWEET

# FUNCTION TO PARSE A TWEET FROM A STRING
def parse_tweet(line: str):
    try:
        parsed = json.loads(line)
        return Tweet(parsed['id'], parsed['text'], parsed['user']['name'], parsed['lang'])
    except Exception as e:
        return None

In [32]:
# FUNCTION TO EXTRACT MENTIONS FROM A TWEET
def extract_mentions(tweet: Tweet):
    """EXTRACT MENTIONS (@USERNAME) FROM THE CONTENT OF THE TWEET."""
    mentions = re.findall(r'@(\w+)', tweet.content)  # EXTRACT ONLY VALID USERNAMES
    return [(mention.lower(), 1) for mention in mentions]  # CONVERT TO LOWERCASE AND ASSIGN A COUNT

# MAIN FUNCTION
def main():
    input_data = 's3a://mudab-2025-big-data/twitter-data/Eurovision-00.json'
    rdd = sc.textFile(input_data)

# PROCESS TWEETS
    processed = (
        rdd  # RDD[STRING]
        .map(parse_tweet) # PARSE STRING INTO TWEET. RETURN RDD[TWEET]
        .filter(lambda x: x is not None) # FILTER EMPTY VALUES. RETURN SAME TYPE (RDD[TWEET])
    )

# EXTRACT MENTIONS AND COUNT OCCURRENCES
    mentioned_users = (
        processed
        .flatMap(extract_mentions)  # CONVERT EACH TWEET INTO A LIST OF MENTIONS
        .reduceByKey(lambda x, y: x + y)  # SUM MENTIONS BY USER
        .sortBy(lambda x: x[1], ascending=False)  # SORT IN DESCENDING ORDER
    )


# GET THE TOP 10  USERS
    top_10_mentions = mentioned_users.take(10)
   
    print("\nTOP 10 USERS:")
    for user, count in top_10_mentions:
        print(f"@{user}: {count} menciones")

# THEIR TWEETS
    top_10_usernames = [user for user, _ in top_10_mentions]
    tweets_by_top_users_dict = {user: 0 for user in top_10_usernames}  # Initialize with 0

    tweets_by_top_users = (
        processed
        .filter(lambda tweet: tweet.author.lower() in [user.lower() for user in top_10_usernames])  # FILTER TWEETS BY TOP 10 USERS
        .map(lambda tweet: (tweet.author.lower(), 1))  # MAP TWEETS TO (USER, 1)
        .reduceByKey(lambda x, y: x + y)  # SUM TWEETS BY USER
        .collect()
    )

# DICTIONARY
    for user, count in tweets_by_top_users:
        tweets_by_top_users_dict[user] = count

    print("\nNUMBER OF TWEETS WRITTEN BY THE TOP 10 USERS:")
    for user, count in sorted(tweets_by_top_users_dict.items(), key=lambda x: x[1], reverse=True):
        print(f"{user}: {count} Tweet(s)")

if __name__ == "__main__":
    main()

                                                                                


TOP 10 USERS:
@eurovision: 2264 menciones
@manelnmusic: 2218 menciones
@nettabarzilai: 2102 menciones
@alfred_ot2017: 1025 menciones
@amaia_ot2017: 1004 menciones
@paquitasalas: 934 menciones
@jungjaeguns: 737 menciones
@lvpibai: 694 menciones
@bbceurovision: 677 menciones
@elmundotoday: 630 menciones





NUMBER OF TWEETS WRITTEN BY THE TOP 10 USERS:
eurovision: 3 Tweet(s)
manelnmusic: 0 Tweet(s)
nettabarzilai: 0 Tweet(s)
alfred_ot2017: 0 Tweet(s)
amaia_ot2017: 0 Tweet(s)
paquitasalas: 0 Tweet(s)
jungjaeguns: 0 Tweet(s)
lvpibai: 0 Tweet(s)
bbceurovision: 0 Tweet(s)
elmundotoday: 0 Tweet(s)


                                                                                

### If we wanted the whole collection of tweets:

In [33]:
# FUNCTION TO EXTRACT MENTIONS FROM A TWEET
def extract_mentions(tweet: Tweet):
    """EXTRACT MENTIONS (@USERNAME) FROM THE CONTENT OF THE TWEET."""
    mentions = re.findall(r'@(\w+)', tweet.content)  # EXTRACT ONLY VALID USERNAMES
    return [(mention.lower(), 1) for mention in mentions]  # CONVERT TO LOWERCASE AND ASSIGN A COUNT

# MAIN FUNCTION
def main():
    input_data = 's3a://mudab-2025-big-data/twitter-data/Eurovision-*.json'
    rdd = sc.textFile(input_data)

# PROCESS TWEETS
    processed = (
        rdd  # RDD[STRING]
        .map(parse_tweet) # PARSE STRING INTO TWEET. RETURN RDD[TWEET]
        .filter(lambda x: x is not None) # FILTER EMPTY VALUES. RETURN SAME TYPE (RDD[TWEET])
    )

# EXTRACT MENTIONS AND COUNT OCCURRENCES
    mentioned_users = (
        processed
        .flatMap(extract_mentions)  # CONVERT EACH TWEET INTO A LIST OF MENTIONS
        .reduceByKey(lambda x, y: x + y)  # SUM MENTIONS BY USER
        .sortBy(lambda x: x[1], ascending=False)  # SORT IN DESCENDING ORDER
    )


# GET THE TOP 10  USERS
    top_10_mentions = mentioned_users.take(10)
   
    print("\nTOP 10 USERS:")
    for user, count in top_10_mentions:
        print(f"@{user}: {count} menciones")

# THEIR TWEETS
    top_10_usernames = [user for user, _ in top_10_mentions]
    tweets_by_top_users_dict = {user: 0 for user in top_10_usernames}  # Initialize with 0

    tweets_by_top_users = (
        processed
        .filter(lambda tweet: tweet.author.lower() in [user.lower() for user in top_10_usernames])  # FILTER TWEETS BY TOP 10 USERS
        .map(lambda tweet: (tweet.author.lower(), 1))  # MAP TWEETS TO (USER, 1)
        .reduceByKey(lambda x, y: x + y)  # SUM TWEETS BY USER
        .collect()
    )

# DICTIONARY
    for user, count in tweets_by_top_users:
        tweets_by_top_users_dict[user] = count

    print("\nNUMBER OF TWEETS WRITTEN BY THE TOP 10 USERS:")
    for user, count in sorted(tweets_by_top_users_dict.items(), key=lambda x: x[1], reverse=True):
        print(f"{user}: {count} Tweet(s)")

if __name__ == "__main__":
    main()

                                                                                


TOP 10 USERS:
@eurovision: 25754 menciones
@paquitasalas: 16446 menciones
@bbceurovision: 14442 menciones
@alfred_ot2017: 14379 menciones
@amaia_ot2017: 13762 menciones
@nettabarzilai: 11087 menciones
@netflixes: 10859 menciones
@manelnmusic: 9794 menciones
@pewdiepie: 7194 menciones
@elmundotoday: 6929 menciones





NUMBER OF TWEETS WRITTEN BY THE TOP 10 USERS:
eurovision: 84 Tweet(s)
pewdiepie: 5 Tweet(s)
paquitasalas: 0 Tweet(s)
bbceurovision: 0 Tweet(s)
alfred_ot2017: 0 Tweet(s)
amaia_ot2017: 0 Tweet(s)
nettabarzilai: 0 Tweet(s)
netflixes: 0 Tweet(s)
manelnmusic: 0 Tweet(s)
elmundotoday: 0 Tweet(s)


                                                                                