In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from pyspark.sql.window import Window
NoneType = type(None)
import os
import socket
import hashlib
import string

import time
from osgeo import ogr
import geopandas as gpd
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

In [2]:
def createMd5(text):
    return hashlib.md5(text.encode('utf-8')).hexdigest()
md5Udf= udf(lambda z: createMd5(z),StringType())

def clean_lower(text):
    sentence = text.translate(str.maketrans('', '', '!"#$%&\'()*+,./:;<=>?@[\\]^`{|}~-_”“«»‘')).lower()
    return " ".join(sentence.split())
cleanLowerUdf= udf(lambda z: clean_lower(z),StringType())

def get_site_from_url(text):
    return text.split("/")[2]
getUrl= udf(lambda z: get_site_from_url(z),StringType())    


In [3]:
minio_ip = socket.gethostbyname('minio')
spark = SparkSession. \
    builder. \
    appName("Python Spark S3"). \
    config("spark.serializer", KryoSerializer.getName). \
    config("spark.executor.memory", "80g"). \
    config("spark.driver.memory", "80g"). \
    config('spark.dirver.maxResultSize', '5g'). \
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
    config('spark.hadoop.fs.s3a.endpoint', 'http://'+minio_ip+':9000'). \
    config("spark.hadoop.fs.s3a.access.key", "minio-access-key"). \
    config("spark.hadoop.fs.s3a.secret.key", "minio-secret-key"). \
    config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem'). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.0-incubating,org.datasyslab:geotools-wrapper:geotools-24.0'). \
    getOrCreate()
SedonaRegistrator.registerAll(spark)

True

In [4]:

st= StructType([
    StructField("abstract", StringType()),
    StructField("authors", StringType()),
    StructField("image", StringType()),
    StructField("metadata", StringType()),
    StructField("publish_date", TimestampType()),
    StructField("text", StringType()),
    StructField("title", StringType()),
    StructField("url", StringType()),
])

In [5]:
df_news_covid_mexico = spark \
                        .read.schema(st).option("timestampFormat", "dd-MM-yyyy") \
                        .json("s3a://news/covid_mexico/*.json")

In [6]:
df_news_covid_mexico.count()

128

In [7]:
df_news_covid_mexico.printSchema()

root
 |-- abstract: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- image: string (nullable = true)
 |-- metadata: string (nullable = true)
 |-- publish_date: timestamp (nullable = true)
 |-- text: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)



In [8]:
df_news_covid_mexico.show(10)

+--------------------+-----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+
|            abstract|    authors|               image|            metadata|       publish_date|                text|               title|                 url|
+--------------------+-----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+
|CIUDAD DE MÉXICO....|           |https://cdn2.exce...|Excélsior, hace 6...|2021-03-21 00:00:00|Al hacer click en...|México se acerca ...|https://www.excel...|
|México lleva apli...|           |                    |Aristeguinoticias...|               null|                    |México lleva apli...|https://aristegui...|
|México ha aplicad...|           |                    |   MSN, hace 6 horas|               null|                    |México ha aplicad...|https://www.msn.c...|
|Los principales p...|           |      

In [9]:
df_news_covid_mexico_date_text = df_news_covid_mexico.select(md5Udf("url").alias("article_id"),"title","url","publish_date",cleanLowerUdf("text").alias("clean_text"),getUrl("url").alias("site")).filter("length(text) >= 2")

In [10]:
df_news_covid_mexico_date_text.show(15)

+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+
|          article_id|               title|                 url|       publish_date|          clean_text|                site|
+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+
|562d622a58c373745...|México se acerca ...|https://www.excel...|2021-03-21 00:00:00|al hacer click en...|www.excelsior.com.mx|
|b77cd49c6e490dc06...|Mapa del coronavi...|https://www.infob...|2021-03-21 00:00:00|gráfica jovani pé...|     www.infobae.com|
|70d59b3cac0a6cc24...|Las claves e incó...|https://www.anima...|2021-03-21 00:00:00|en los últimos dí...|www.animalpolitic...|
|e7d826dfb328b0d24...|“Abre mi escuela”...|https://www.infob...|2021-03-21 00:00:00|las instalaciones...|     www.infobae.com|
|d453f2247fda4ef5d...|Covid-19 afecta f...|https://www.eluni...|2021-03-21 00:00:00|la arquidiócesis ...|www.el

In [11]:
df_news_covid_mexico_date_text.count()

85

In [12]:
df_news_covid_mexico_date_text.select("title").show(15,False)

+---------------------------------------------------------------------------------------------------------------------------+
|title                                                                                                                      |
+---------------------------------------------------------------------------------------------------------------------------+
|México se acerca a los 198 mil decesos por covid                                                                           |
|Mapa del coronavirus en México 21 de marzo: Querétaro ya es el tercer estado con más contagios activos detrás CDMX y Edomex|
|Las claves e incógnitas del nuevo plan para frenar la ...                                                                  |
|“Abre mi escuela”: exigieron regresar a clases presenciales en 10 estados a un año del cierre por COVID-19                 |
|Covid-19 afecta festejos en iglesia | El Universal                                                                   

In [13]:
url = "jdbc:postgresql://postgres/shared"
mode="overwrite"
properties = {
    "user": "shared",
    "password": os.environ['SHARED_PASSWORD']
}

In [14]:
df_news_covid_mexico_date_text.write.jdbc(url=url, table="tb_news_covid_mexico_date_text", mode=mode, properties=properties)

In [15]:
df_news_covid_mexico_palabras = df_news_covid_mexico_date_text.select("article_id","publish_date",explode(split(df_news_covid_mexico_date_text.clean_text, "\s")).alias("palabra")).where(length('palabra') > 1)

In [16]:
df_news_covid_mexico_palabras.show(30)

+--------------------+-------------------+-----------+
|          article_id|       publish_date|    palabra|
+--------------------+-------------------+-----------+
|562d622a58c373745...|2021-03-21 00:00:00|         al|
|562d622a58c373745...|2021-03-21 00:00:00|      hacer|
|562d622a58c373745...|2021-03-21 00:00:00|      click|
|562d622a58c373745...|2021-03-21 00:00:00|         en|
|562d622a58c373745...|2021-03-21 00:00:00|     enviar|
|562d622a58c373745...|2021-03-21 00:00:00|   quedaras|
|562d622a58c373745...|2021-03-21 00:00:00|   regitrad|
|562d622a58c373745...|2021-03-21 00:00:00|    nuestro|
|562d622a58c373745...|2021-03-21 00:00:00|    boletín|
|562d622a58c373745...|2021-03-21 00:00:00|         el|
|562d622a58c373745...|2021-03-21 00:00:00|       cual|
|562d622a58c373745...|2021-03-21 00:00:00|     podrás|
|562d622a58c373745...|2021-03-21 00:00:00|   cancelar|
|562d622a58c373745...|2021-03-21 00:00:00|         en|
|562d622a58c373745...|2021-03-21 00:00:00|  cualquier|
|562d622a5

In [17]:
#https://sigdelta.com/blog/word-count-in-spark-with-a-pinch-of-tf-idf/
df_news_covid_mexico_palabras.groupBy('article_id', 'palabra','publish_date')\
    .count()\
    .orderBy('count', ascending=False)\
    .show(25)

+--------------------+-------+-------------------+-----+
|          article_id|palabra|       publish_date|count|
+--------------------+-------+-------------------+-----+
|61f85cb23d9827940...|    the|2021-03-21 00:00:00|  129|
|9f123746139b1f0ea...|     de|2021-03-21 00:00:00|   92|
|6aa6971a33b988654...|     de|2021-03-21 00:00:00|   82|
|61f85cb23d9827940...|     in|2021-03-21 00:00:00|   77|
|61f85cb23d9827940...|    and|2021-03-21 00:00:00|   74|
|72236c94ac71feaf4...|     de|2021-03-22 00:00:00|   74|
|8327c6e59ed7f3df3...|     de|2021-03-21 00:00:00|   74|
|57c9f33b7ef72ab61...|     de|2021-03-27 00:00:00|   69|
|d5b1ce551006f587b...|     de|2021-03-21 00:00:00|   69|
|a082606587ab3227a...|     de|2021-03-21 00:00:00|   66|
|065df75dc352b9913...|     de|2021-03-21 00:00:00|   65|
|70d59b3cac0a6cc24...|     de|2021-03-21 00:00:00|   64|
|68e032f866fec4d26...|     de|2021-03-26 00:00:00|   64|
|61f85cb23d9827940...|     to|2021-03-21 00:00:00|   63|
|320a8502c488ae08c...|     de|2

In [18]:
#https://sigdelta.com/blog/word-count-in-spark-with-a-pinch-of-tf-idf-continued/
w = Window.partitionBy(df_news_covid_mexico_palabras['article_id'])

article_tf = df_news_covid_mexico_palabras.groupBy('article_id', 'palabra', 'publish_date')\
    .agg(count('*').alias('n_w'),sum(count('*')).over(w).alias('n_d'),(count('*')/sum(count('*')).over(w)).alias('tf'))\
    .orderBy('n_w', ascending=False)\
    .cache()

article_tf.show(truncate=15)

+---------------+-------+---------------+---+----+---------------+
|     article_id|palabra|   publish_date|n_w| n_d|             tf|
+---------------+-------+---------------+---+----+---------------+
|61f85cb23d98...|    the|2021-03-21 0...|129|2788|0.0462697274...|
|9f123746139b...|     de|2021-03-21 0...| 92|1093|0.0841720036...|
|6aa6971a33b9...|     de|2021-03-21 0...| 82|1204|0.0681063122...|
|61f85cb23d98...|     in|2021-03-21 0...| 77|2788|0.0276183644...|
|8327c6e59ed7...|     de|2021-03-21 0...| 74| 707|0.1046676096...|
|61f85cb23d98...|    and|2021-03-21 0...| 74|2788|0.0265423242...|
|72236c94ac71...|     de|2021-03-22 0...| 74| 711|0.1040787623...|
|57c9f33b7ef7...|     de|2021-03-27 0...| 69| 765|0.0901960784...|
|d5b1ce551006...|     de|2021-03-21 0...| 69| 884|0.0780542986...|
|a082606587ab...|     de|2021-03-21 0...| 66| 846|0.0780141843...|
|065df75dc352...|     de|2021-03-21 0...| 65| 716|0.0907821229...|
|68e032f866fe...|     de|2021-03-26 0...| 64| 719|0.0890125173

In [21]:
w = Window.partitionBy('palabra')

c_d = df_news_covid_mexico_palabras.select('article_id').distinct().count()

article_idf = df_news_covid_mexico_palabras.groupBy('palabra', 'article_id','publish_date').agg(
        lit(c_d).alias('c_d'),
        count('*').over(w).alias('i_d'),
        log(lit(c_d)/count('*').over(w)).alias('idf')
    )\
    .orderBy('idf', ascending=False)\
    .cache()

In [22]:
article_idf.show(150, truncate=15)

+---------------+---------------+---------------+---+---+---------------+
|        palabra|     article_id|   publish_date|c_d|i_d|            idf|
+---------------+---------------+---------------+---+---+---------------+
|            296|99aab48771a6...|2021-03-21 0...| 85|  1|4.4426512564...|
|          49003|8327c6e59ed7...|2021-03-21 0...| 85|  1|4.4426512564...|
|         596354|b77cd49c6e49...|2021-03-21 0...| 85|  1|4.4426512564...|
|      advertido|713f2e2880e2...|2021-03-22 0...| 85|  1|4.4426512564...|
|           bamx|99aab48771a6...|2021-03-21 0...| 85|  1|4.4426512564...|
|   coincidiendo|4c2b8d97bb1c...|2021-03-21 0...| 85|  1|4.4426512564...|
|       consumen|9f123746139b...|2021-03-21 0...| 85|  1|4.4426512564...|
|           didi|7a3031279dfe...|2021-03-21 0...| 85|  1|4.4426512564...|
|         eligen|6aa6971a33b9...|2021-03-21 0...| 85|  1|4.4426512564...|
|     etiquetado|29a518094e7b...|2021-03-21 0...| 85|  1|4.4426512564...|
|            few|61f85cb23d98...|2021-

In [23]:
article_tfidf = article_tf.join(article_idf, ['article_id', 'palabra', 'publish_date'])\
    .withColumn('tf_idf', col('tf') * col('idf'))\
    .cache()

In [24]:
article_tfidf.orderBy('tf_idf', ascending=False).show(150,truncate=12)

+------------+------------+------------+---+----+------------+---+---+------------+------------+
|  article_id|     palabra|publish_date|n_w| n_d|          tf|c_d|i_d|         idf|      tf_idf|
+------------+------------+------------+---+----+------------+---+---+------------+------------+
|ed2a11140...|         the|2021-03-2...| 60| 937|0.0640341...| 85|  5|2.8332133...|0.1814224...|
|6ae38bfe4...|         the|2021-03-2...| 47| 788|0.0596446...| 85|  5|2.8332133...|0.1689860...|
|61f85cb23...|         the|2021-03-2...|129|2788|0.0462697...| 85|  5|2.8332133...|0.1310920...|
|5a9f0fdac...|     milenio|2021-03-2...|  2|  43|0.0465116...| 85|  7|2.4967411...|0.1161274...|
|ed2a11140...|          to|2021-03-2...| 32| 937|0.0341515...| 85|  3|3.3440389...|0.1142041...|
|ed2a11140...|          of|2021-03-2...| 31| 937|0.0330843...| 85|  3|3.3440389...|0.1106352...|
|5a9f0fdac...|         114|2021-03-2...|  1|  43|0.0232558...| 85|  1|4.4426512...|0.1033174...|
|5a9f0fdac...|         159|202

In [25]:
w = Window.partitionBy('article_id').orderBy(col('tf_idf').desc())

article_tfidf_top_15=article_tfidf.withColumn('rank', rank().over(w))\
    .where('rank <= 15')\
    .drop('rank')\
    .orderBy('article_id', 'tf_idf','n_w')\
    .select('article_id','publish_date','palabra','n_w','tf_idf')

In [26]:
article_tfidf_top_15.show(truncate=12, n=30)

+------------+------------+------------+---+------------+
|  article_id|publish_date|     palabra|n_w|      tf_idf|
+------------+------------+------------+---+------------+
|039459da2...|2021-03-1...|      señalo|  1|0.0222132...|
|039459da2...|2021-03-1...|funcionarios|  1|0.0222132...|
|039459da2...|2021-03-1...|   agradezco|  1|0.0222132...|
|039459da2...|2021-03-1...|    impulsar|  1|0.0222132...|
|039459da2...|2021-03-1...|   ayudarnos|  1|0.0222132...|
|039459da2...|2021-03-1...|    homólogo|  1|0.0222132...|
|039459da2...|2021-03-1...|  telefónica|  1|0.0222132...|
|039459da2...|2021-03-1...|    abordado|  1|0.0222132...|
|039459da2...|2021-03-1...|        pedí|  1|0.0222132...|
|039459da2...|2021-03-1...|   enviarnos|  1|0.0222132...|
|039459da2...|2021-03-1...|     logrará|  1|0.0222132...|
|039459da2...|2021-03-1...|     calculo|  1|0.0222132...|
|039459da2...|2021-03-1...|        tema|  2|0.0224542...|
|039459da2...|2021-03-1...|       biden|  2|0.0236320...|
|039459da2...|

In [27]:
article_tfidf_top_15_site = article_tfidf_top_15.join(df_news_covid_mexico_date_text, ['article_id','publish_date']).select('article_id','publish_date','site','palabra','n_w','tf_idf')

In [28]:
article_tfidf_top_15_site.show(15)

+--------------------+-------------------+---------------+----------------+---+--------------------+
|          article_id|       publish_date|           site|         palabra|n_w|              tf_idf|
+--------------------+-------------------+---------------+----------------+---+--------------------+
|e386a6062c63e081c...|2021-03-21 00:00:00|www.infobae.com|correspondientes|  2| 0.02551621226311474|
|e386a6062c63e081c...|2021-03-21 00:00:00|www.infobae.com|            5811|  1|0.025242336684604072|
|e386a6062c63e081c...|2021-03-21 00:00:00|www.infobae.com|      acumulando|  1|0.025242336684604072|
|e386a6062c63e081c...|2021-03-21 00:00:00|www.infobae.com|           66041|  1|0.025242336684604072|
|e386a6062c63e081c...|2021-03-21 00:00:00|www.infobae.com|         2193639|  2|0.024318933676093987|
|e386a6062c63e081c...|2021-03-21 00:00:00|www.infobae.com|       contagios|  5|  0.0236287881774458|
|e386a6062c63e081c...|2021-03-21 00:00:00|www.infobae.com|      fallecidas|  2|0.0232358634

In [29]:
article_tfidf_top_15_site.write.jdbc(url=url, table="tb_news_covid_mexico_palabras_top_tfidf", mode=mode, properties=properties)