In [0]:
#Inicio Spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Debate2020Analysis") \
    .getOrCreate()

In [0]:
# upload file to DBFS
# dbutils.fs.cp('C:\Users\David\Desktop\Repositorios_Git\DataSets\us_election_2020_presidential_debates\archive\us_election_2020_1st_presidential_debate.txt', 'dbfs:/FileStore/tables/us_election_2020_1st_presidential_debate.txt')

#Lectura TXT como RDD de líneas
rdd = spark.sparkContext.textFile("/FileStore/shared_uploads/dmgs16@hotmail.com/us_election_2020_1st_presidential_debate.txt")


In [0]:
import re
# Paso 2: Agrupar bloques (speaker + texto)
def group_speaker_blocks(lines):
    result = []
    current_speaker = None
    current_time = None
    current_text = []

    for line in lines:
        match = re.match(r"^(.*?):\s*\((\d{2}:\d{2})\)", line)
        if match:
            # Si ya había un bloque anterior, lo guardamos
            if current_speaker:
                result.append((current_speaker, current_time, " ".join(current_text)))
            current_speaker = match.group(1)
            current_time = match.group(2)
            current_text = [line[match.end():].strip()]  # parte después del timestamp
        else:
            if current_speaker:
                current_text.append(line.strip())

    # Añadir el último bloque
    if current_speaker:
        result.append((current_speaker, current_time, "".join(current_text)))

    return result



In [0]:
# Paso 4: Aplicarlo en driver (collect primero para procesarlo como lista)
grouped_data = group_speaker_blocks(rdd.collect())

# Paso 5: Crear un RDD con los bloques
parsed_rdd = sc.parallelize(grouped_data)

# Mostramos los primeros 5 bloques de formato (SPEAKER, TIME, TEXT)
parsed_rdd.take(10)


[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
File [0;32m<command-3451784740362237>, line 9[0m
[1;32m      7[0m [38;5;66;03m# Mostramos los primeros 5 bloques de formato (SPEAKER, TIME, TEXT)[39;00m
[1;32m      8[0m parsed_rdd[38;5;241m.[39mtake([38;5;241m10[39m)
[0;32m----> 9[0m grouped_data[38;5;241m.[39mtake([38;5;241m10[39m)

[0;31mAttributeError[0m: 'list' object has no attribute 'take'

In [0]:
stopwords = set([
    "this", "that", "what", "which", "with", "have", "from", "would", "could", 
    "should", "their", "there", "about", "because", "before", "after", "over",
    "under", "again", "some", "them", "been", "being", "then", "than",
    "themselves", "does", "did", "doing", "into", "just", "a", "an", "the",
    "more", "most", "other", "out", "such", "too", "very", "where", "when", 
    "how", "who", "whom", "whose", "why", "also", "only", "every", "much",
    "many", "each", "same", "own", "any", "both", "few", "while", "though",
    "through", "against", "among", "those", "etc", "like", "even", "still",
    "in", "on", "at", "by", "for", "with", "about", "against", "between", "into", "through", "during", "before", "after", "above", "below", "to", "from", "up", "down", "out", "off", "over", "under",
    "are", "be","is","of", "but", "it", "if","was","were", "and",
    "I", "i","you", "he", "she", "it", "we", "they", "me", "him", "her", "us", "them", "my", "your", "his", "her", "its", "our", "their", "mine", "yours", "hers", "ours", "theirs", "myself", "yourself", "himself", "herself", "itself", "ourselves", "yourselves", "themselves", "who", "whom", "whose", "which", "that", "anyone", "anybody", "someone", "somebody", "everyone", "everybody", "no one", "nobody", "each", "either", "neither", "none", "one", "another", "others", "everything", "something", "nothing", "anything", "not", "no", "as", "so","all"

])

In [0]:
def filter_short_words(record):
    speaker, time, text = record
    # Filtramos palabras de lista stopwords
    words = [word for word in text.split() if word.isalpha() and word.lower() not in stopwords]
    # Reconstruimos el texto limpio
    clean_text = " ".join(words)
    return (speaker, time, clean_text)

# Aplicamos el filtrado al RDD
filtered_rdd = parsed_rdd.map(filter_short_words)

# Ejemplo de impresión
filtered_rdd.take(5)


[('Chris Wallace',
  '01:20',
  'Good evening Health Education Campus Case Western Reserve University Cleveland Chris Wallace Fox News welcome first Presidential Debates President Donald Trump former Vice President Joe debate sponsored Commission Presidential Commission has designed six roughly minute segments two minute answers candidate first open discussion rest campaigns agreed these decided topics questions can assure questions has shared Commission or two'),
 ('Chris Wallace',
  '02:10',
  'debate conducted health safety protocols designed Cleveland serving Health Security advisor Commission four campaigns agreed candidates will shake hands beginning audience here hall has promised remain or interruptions importantly can focus candidates noise except right welcome Republican President Democratic nominee Vice President'),
 ('Vice President Joe Biden', '02:49', ''),
 ('President Donald J. Trump', '02:51', ''),
 ('Vice President Joe Biden', '02:51', '')]

In [0]:
# Paso 1: crear pares ((speaker, word), 1)
word_pairs_rdd = filtered_rdd.flatMap(
    lambda record: [((record[0], word.lower()), 1) for word in record[2].split()]
)
# Paso 2: Reducir por clave (speaker, word)
word_counts_rdd = word_pairs_rdd.reduceByKey(lambda a, b: a + b)

word_counts_rdd.take(10)

[(('Chris Wallace', 'wallace'), 1),
 (('Chris Wallace', 'fox'), 1),
 (('Chris Wallace', 'welcome'), 2),
 (('Chris Wallace', 'debates'), 3),
 (('Chris Wallace', 'joe'), 11),
 (('Chris Wallace', 'sponsored'), 2),
 (('Chris Wallace', 'has'), 15),
 (('Chris Wallace', 'safety'), 1),
 (('Chris Wallace', 'protocols'), 1),
 (('Chris Wallace', 'hands'), 1)]

In [0]:
# Paso 3: reorganizar como (speaker, (word, count))
grouped_by_speaker_rdd = word_counts_rdd.map(
    lambda pair: (pair[0][0], (pair[0][1], pair[1]))
)

# Agrupar todas las palabras por speaker
speaker_word_list_rdd = grouped_by_speaker_rdd.groupByKey().mapValues(list)


In [0]:
top_20_words_per_speaker = speaker_word_list_rdd.mapValues(
    lambda word_counts: sorted(word_counts, key=lambda x: x[1], reverse=True)[:20]
)

In [0]:
for speaker, top_words in top_20_words_per_speaker.collect():
    print(f"\n{speaker}:\n" + "\n".join([f"  {word}: {count}" for word, count in top_words]))


Chris Wallace:
  president: 73
  going: 63
  vice: 37
  go: 34
  do: 31
  donald: 30
  two: 28
  want: 27
  will: 27
  let: 24
  people: 22
  question: 22
  ballots: 18
  say: 17
  or: 16
  has: 15
  can: 14
  think: 14
  end: 14
  election: 13

President Donald J. Trump:
  people: 50
  want: 37
  going: 34
  do: 33
  said: 30
  know: 29
  had: 27
  got: 27
  look: 25
  think: 22
  done: 19
  say: 18
  tell: 17
  million: 16
  never: 16
  three: 16
  law: 15
  will: 15
  let: 15
  go: 14

Vice President Joe Biden:
  people: 61
  going: 45
  has: 24
  get: 23
  do: 23
  fact: 19
  number: 18
  take: 17
  can: 16
  way: 15
  make: 15
  tax: 15
  look: 14
  will: 13
  got: 13
  know: 13
  want: 13
  sure: 12
  american: 12
  now: 12


In [0]:
#Graficas

from pyspark.sql import Row

# Aplanamos los datos
flattened = top_10_words_per_speaker.flatMap(lambda x: [Row(speaker=x[0], word=w[0], count=w[1]) for w in x[1]])

df = spark.createDataFrame(flattened)
df_chris = df.filter(df['speaker']=='Chris Wallace')
df_trump= df.filter(df['speaker']=='President Donald J. Trump')
df_biden= df.filter(df['speaker']=='Vice President Joe Biden')


In [0]:
display(df)

speaker,word,count
Chris Wallace,president,73
Chris Wallace,going,63
Chris Wallace,vice,37
Chris Wallace,go,34
Chris Wallace,do,31
Chris Wallace,donald,30
Chris Wallace,two,28
Chris Wallace,want,27
Chris Wallace,will,27
Chris Wallace,let,24
