In [1]:
%%bash
source ~/.profile # Make sure environment variables are loaded.
stop-connect-server.sh
start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:$SPARK_VERSION

no org.apache.spark.sql.connect.service.SparkConnectServer to stop
starting org.apache.spark.sql.connect.service.SparkConnectServer, logging to /opt/spark/logs/spark-jesus-org.apache.spark.sql.connect.service.SparkConnectServer-1-Karbranth.out


In [2]:
from pyspark.sql import SparkSession

SparkSession.builder.master("local[*]").getOrCreate().stop()

spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()

In [5]:
# Cargar el archivo JSON y las palabras
spark.sql("""
CREATE OR REPLACE TEMP VIEW tweets AS
SELECT content, user.username AS username, likeCount, retweetCount, 
       place.country AS country, place.fullName AS fullName
FROM json.`twitter/us_news_tweets.json`
""")
spark.sql("""
CREATE OR REPLACE TEMP VIEW vio_keywords AS
SELECT value
FROM text.`vio_keywords.txt`
""")

DataFrame[]

In [7]:
# Mostrar los datos de la vista 'tweets'
spark.sql("SELECT * FROM tweets").show(10)  # Muestra las primeras 10 filas sin truncar el contenido

# Mostrar los datos de la vista 'vio_keywords'
spark.sql("SELECT * FROM vio_keywords").show(10)  # Muestra las primeras 10 filas sin truncar el contenido

+--------------------+---------------+---------+------------+-------+--------+
|             content|       username|likeCount|retweetCount|country|fullName|
+--------------------+---------------+---------+------------+-------+--------+
|Pagaya Technologi...|JacekWierzbicki|        0|           0|   NULL|    NULL|
|Nasdaq, S&amp;P, ...|JacekWierzbicki|        0|           0|   NULL|    NULL|
|Kaixin Auto stock...|JacekWierzbicki|        0|           0|   NULL|    NULL|
|Sea Non-GAAP EPS ...|JacekWierzbicki|        0|           0|   NULL|    NULL|
|Asia Cup 2022: St...|      Scrilling|        0|           0|   NULL|    NULL|
|#Cannabis #News #...|     DonPorrero|        0|           0|   NULL|    NULL|
|#Cannabis #News #...|     DonPorrero|        0|           0|   NULL|    NULL|
|🤩This is a good ...|    CARPE_DIGEM|        0|           0|   NULL|    NULL|
|Partnering with #...|  cryptovinesco|        0|           0|   NULL|    NULL|
|Today Update : Am...|   lifendaynews|        0|     

In [9]:
# Filtrar por país
spark.sql("""
CREATE OR REPLACE TEMP VIEW paises_tweets AS
SELECT *
FROM tweets
WHERE country = 'United States'
""")
spark.sql("SELECT * FROM paises_tweets").show(10) # Mostrar resultado

+--------------------+--------------+---------+------------+-------------+----------------+
|             content|      username|likeCount|retweetCount|      country|        fullName|
+--------------------+--------------+---------+------------+-------------+----------------+
|#News #Singularit...|    TheGodSeek|        1|           0|United States| The Singularity|
|#News #Singularit...|    TheGodSeek|        1|           1|United States| The Singularity|
|#News #Singularit...|    TheGodSeek|        1|           1|United States| The Singularity|
|#News #Singularit...|    TheGodSeek|        1|           1|United States| The Singularity|
|#News #Singularit...|    TheGodSeek|        1|           1|United States| The Singularity|
|#News #Singularit...|    TheGodSeek|        1|           1|United States| The Singularity|
|#News #Singularit...|    TheGodSeek|        1|           1|United States| The Singularity|
|#News #Singularit...|    TheGodSeek|        1|           1|United States| The S

In [11]:
# Realizar el producto cruz y buscar palabras clave en los tweets
spark.sql("""
CREATE OR REPLACE TEMP VIEW producto_cruz AS
SELECT *
FROM paises_tweets
CROSS JOIN vio_keywords
""")

spark.sql("""
CREATE OR REPLACE TEMP VIEW tweets_violentos AS
SELECT *
FROM producto_cruz
WHERE LOWER(content) LIKE CONCAT('%', value, '%')
""")

DataFrame[]

In [12]:
# Mostrar 10 primeros resultados
spark.sql("SELECT * FROM producto_cruz").show(10)

+--------------------+----------+---------+------------+-------------+---------------+--------+
|             content|  username|likeCount|retweetCount|      country|       fullName|   value|
+--------------------+----------+---------+------------+-------------+---------------+--------+
|#News #Singularit...|TheGodSeek|        1|           0|United States|The Singularity|  danger|
|#News #Singularit...|TheGodSeek|        1|           0|United States|The Singularity|  threat|
|#News #Singularit...|TheGodSeek|        1|           0|United States|The Singularity|   chaos|
|#News #Singularit...|TheGodSeek|        1|           0|United States|The Singularity|   anger|
|#News #Singularit...|TheGodSeek|        1|           0|United States|The Singularity|    rage|
|#News #Singularit...|TheGodSeek|        1|           0|United States|The Singularity|conflict|
|#News #Singularit...|TheGodSeek|        1|           0|United States|The Singularity|  attack|
|#News #Singularit...|TheGodSeek|       

In [13]:
# Extraer el estado y eliminar columnas innecesarias
spark.sql("""
CREATE OR REPLACE TEMP VIEW tweets_estados AS
SELECT SPLIT(fullName, ', ')[1] AS state, content, username, likeCount, retweetCount
FROM tweets_violentos
""")
# Agrupar por estado y contar los tweets
spark.sql("""
CREATE OR REPLACE TEMP VIEW tweets_agrupados AS
SELECT state, COLLECT_LIST(struct(content, username, likeCount, retweetCount)) AS tweets
FROM tweets_estados
GROUP BY state
""")

spark.sql("""
CREATE OR REPLACE TEMP VIEW num_tweets AS
SELECT state, SIZE(tweets) AS cantidad_tweets, tweets
FROM tweets_agrupados
""")

DataFrame[]

In [14]:
# Reorganizar las columnas y ordenar los resultados
resultados = spark.sql("""
SELECT state, cantidad_tweets, 
       TRANSFORM(tweets, x -> struct(x.content, x.username, x.likeCount, x.retweetCount)) AS tweets
FROM num_tweets
ORDER BY cantidad_tweets DESC
""")

In [16]:
# Mostrar los primeros 10 resultados
resultados.show(10)

+-----+---------------+--------------------+
|state|cantidad_tweets|              tweets|
+-----+---------------+--------------------+
| NULL|            103|[{#News #Singular...|
|   MD|             80|[{.@RexxRuga give...|
|   CA|             74|[{If you haven't ...|
|  USA|             49|[{#Singer-Songwri...|
|   NY|             41|[{On Aug 4th in t...|
|   FL|             25|[{National Modern...|
|   IL|             20|[{Morning jolt on...|
|   AZ|             18|[{#BREAKING: 8 Pe...|
|   DC|             14|[{BREAKING: @figh...|
|   TX|             14|[{Reflecting on #...|
+-----+---------------+--------------------+
only showing top 10 rows



In [17]:
# Guardar los resultados en un archivo JSON
resultados.write.json("salida_problema3_spark_sql")