In [None]:
from pyspark.sql import SparkSession
import pandas as pd
import findspark    
import re
import datetime
from pyspark.sql.functions import col, udf, lit
from pyspark.sql.types import StringType, IntegerType

import os
os.environ['HADOOP_HOME'] = 'C:\\hadoop'
os.environ['hadoop.home.dir'] = 'C:\\hadoop'
os.environ["HADOOP_OPTS"] = "-Djava.library.path=C:/hadoop/bin"

findspark.init()
spark = SparkSession.builder \
    .appName("GlueTest") \
    .config("spark.hadoop.java.library.path", "C:/hadoop/bin") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

In [2]:
df = spark.read.csv("twcs.csv", header=True, inferSchema=True)

df = df.filter(col('inbound') == True)

df.show()

+--------+---------+-------+--------------------+--------------------+-----------------+-----------------------+
|tweet_id|author_id|inbound|          created_at|                text|response_tweet_id|in_response_to_tweet_id|
+--------+---------+-------+--------------------+--------------------+-----------------+-----------------------+
|       2|   115712|   True|Tue Oct 31 22:11:...|@sprintcare and h...|             NULL|                      1|
|       3|   115712|   True|Tue Oct 31 22:08:...|@sprintcare I hav...|                1|                      4|
|       5|   115712|   True|Tue Oct 31 21:49:...|  @sprintcare I did.|                4|                      6|
|       8|   115712|   True|Tue Oct 31 21:45:...|@sprintcare is th...|           9,6,10|                   NULL|
|      12|   115713|   True|Tue Oct 31 22:04:...|@sprintcare You g...|         11,13,14|                     15|
|      16|   115713|   True|Tue Oct 31 20:00:...|@sprintcare Since...|               15|        

In [3]:
@udf(StringType())
def clean_text(text):
    """
    Limpia el texto eliminando caracteres especiales y convirtiendo a minúsculas.
    """
    if text is None:
        return ""
    text = text.lower()
    text = re.sub(r"http\S+", "", text)
    text = re.sub(r"[^a-zA-Z0-9\s]", "", text)
    text = re.sub(r"\s+", " ", text)
    return text.strip()

In [4]:
df = df.withColumn("clean_text", clean_text(col("text")))

df.show()

+--------+---------+-------+--------------------+--------------------+-----------------+-----------------------+--------------------+
|tweet_id|author_id|inbound|          created_at|                text|response_tweet_id|in_response_to_tweet_id|          clean_text|
+--------+---------+-------+--------------------+--------------------+-----------------+-----------------------+--------------------+
|       2|   115712|   True|Tue Oct 31 22:11:...|@sprintcare and h...|             NULL|                      1|sprintcare and ho...|
|       3|   115712|   True|Tue Oct 31 22:08:...|@sprintcare I hav...|                1|                      4|sprintcare i have...|
|       5|   115712|   True|Tue Oct 31 21:49:...|  @sprintcare I did.|                4|                      6|    sprintcare i did|
|       8|   115712|   True|Tue Oct 31 21:45:...|@sprintcare is th...|           9,6,10|                   NULL|sprintcare is the...|
|      12|   115713|   True|Tue Oct 31 22:04:...|@sprintcare Y

In [5]:
@udf(IntegerType())
def count_token(text, token):
    if text is None:
        return 0
    return text.count(token)

In [6]:
df = df.withColumn("num_question_marks", count_token(col("clean_text"), lit("?")))
df = df.withColumn("num_exclamations", count_token(col("clean_text"), lit("!")))

df.show()

+--------+---------+-------+--------------------+--------------------+-----------------+-----------------------+--------------------+------------------+----------------+
|tweet_id|author_id|inbound|          created_at|                text|response_tweet_id|in_response_to_tweet_id|          clean_text|num_question_marks|num_exclamations|
+--------+---------+-------+--------------------+--------------------+-----------------+-----------------------+--------------------+------------------+----------------+
|       2|   115712|   True|Tue Oct 31 22:11:...|@sprintcare and h...|             NULL|                      1|sprintcare and ho...|                 0|               0|
|       3|   115712|   True|Tue Oct 31 22:08:...|@sprintcare I hav...|                1|                      4|sprintcare i have...|                 0|               0|
|       5|   115712|   True|Tue Oct 31 21:49:...|  @sprintcare I did.|                4|                      6|    sprintcare i did|                 

In [7]:
# Palabras clave
critical_words = ['problem', 'issue', 'not working', 'refund', 'error', 'fail', 'help', 'wtf', 'worst', 'urgent', 'bad']

for word in critical_words:
    @udf(IntegerType())
    def has_word(text, w=word):
        if text is None:
            return 0
        return int(w in text)
    df = df.withColumn(f"has_{word.replace(' ', '_')}", has_word(col("clean_text")))

In [17]:
###  Preparación de datos para guardar
output_cols = ['clean_text', 'num_question_marks', 'num_exclamations'] + [f"has_{word.replace(' ', '_')}" for word in critical_words]

df_processed = df.select(output_cols)

df_processed.show()

+--------------------+------------------+----------------+-----------+---------+---------------+----------+---------+--------+--------+-------+---------+----------+-------+
|          clean_text|num_question_marks|num_exclamations|has_problem|has_issue|has_not_working|has_refund|has_error|has_fail|has_help|has_wtf|has_worst|has_urgent|has_bad|
+--------------------+------------------+----------------+-----------+---------+---------------+----------+---------+--------+--------+-------+---------+----------+-------+
|sprintcare and ho...|                 0|               0|          0|        0|              0|         0|        0|       0|       0|      0|        0|         0|      0|
|sprintcare i have...|                 0|               0|          0|        0|              0|         0|        0|       0|       0|      0|        0|         0|      0|
|    sprintcare i did|                 0|               0|          0|        0|              0|         0|        0|       0|       0|

In [None]:
df_processed.write.mode("overwrite").parquet("S3://your-bucket-name/processed_data/")


In [20]:
# Usar solo para pruebas locales

# df_sample = df_processed.limit(500000).toPandas()
# df_sample.to_parquet("sample_data.parquet", index=False)

In [15]:
### Logging

run_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

log_text = f"""
Preprocessing job run: 0
Run date: {run_date}
Source S3: S3://your-bucket-name/raw_data/twcs.csv
Output S3: S3://your-bucket-name/processed_data/
Total input records: {df.count()}
"""

In [16]:
# Guarda log como txt en S3 (usando Spark)
log_df = spark.createDataFrame([(log_text,)], ["log"])
# log_df.write.mode('overwrite').text("S3://your-bucket-name/logs/preprocessing_log.txt")
# Guarda log como txt localmente
with open("preprocessing_log.txt", "w") as log_file:
    log_file.write(log_text)

print("Preprocessing finished and logs saved.")

Preprocessing finished and logs saved.
