In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from pyspark.sql.window import Window
NoneType = type(None)
import os
import socket
import hashlib
import string

import time
from osgeo import ogr
import geopandas as gpd
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

In [2]:
def createMd5(text):
    return hashlib.md5(text.encode('utf-8')).hexdigest()
md5Udf= udf(lambda z: createMd5(z),StringType())

def clean_lower(text):
    sentence = text.translate(str.maketrans('', '', '!"#$%&\'()*+,./:;<=>?@[\\]^`{|}~-_”“«»‘')).lower()
    return " ".join(sentence.split())
cleanLowerUdf= udf(lambda z: clean_lower(z),StringType())

def get_site_from_url(text):
    return text.split("/")[2]
getUrl= udf(lambda z: get_site_from_url(z),StringType())    


In [3]:
minio_ip = socket.gethostbyname('minio')
spark = SparkSession. \
    builder. \
    appName("Python Spark S3"). \
    config("spark.serializer", KryoSerializer.getName). \
    config("spark.executor.memory", "80g"). \
    config("spark.driver.memory", "80g"). \
    config('spark.dirver.maxResultSize', '5g'). \
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
    config('spark.hadoop.fs.s3a.endpoint', 'http://'+minio_ip+':9000'). \
    config("spark.hadoop.fs.s3a.access.key", "minio-access-key"). \
    config("spark.hadoop.fs.s3a.secret.key", "minio-secret-key"). \
    config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem'). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.0-incubating,org.datasyslab:geotools-wrapper:geotools-24.0'). \
    getOrCreate()
SedonaRegistrator.registerAll(spark)

True

In [4]:

st= StructType([
    StructField("abstract", StringType()),
    StructField("authors", StringType()),
    StructField("image", StringType()),
    StructField("metadata", StringType()),
    StructField("publish_date", TimestampType()),
    StructField("text", StringType()),
    StructField("title", StringType()),
    StructField("url", StringType()),
])

In [29]:
df_news_covid_mexico = spark \
                        .read.schema(st).option("timestampFormat", "dd-MM-yyyy") \
                        .json("s3a://news/covid_mexico/*.json")

In [6]:
df_news_covid_mexico.count()

2110

In [7]:
df_news_covid_mexico.printSchema()

root
 |-- abstract: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- image: string (nullable = true)
 |-- metadata: string (nullable = true)
 |-- publish_date: timestamp (nullable = true)
 |-- text: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)



In [8]:
df_news_covid_mexico.show(10)

+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+
|            abstract|             authors|               image|            metadata|       publish_date|                text|               title|                 url|
+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+
|Caída del peso y ...|     Milenio Digital|https://www.milen...|Milenio.com, 29 m...|2020-03-29 00:00:00|Milenio Digital  ...|Caída del peso y ...|https://www.milen...|
|La Secretaría de ...|Fernando Damián;M...|https://www.milen...|Milenio.com, 29 m...|2020-03-29 00:00:00|Fernando Damián, ...|Coronavirus en Mé...|https://www.milen...|
|Han muerto 20 per...|Milenio Digital Y...|https://www.milen...|Milenio.com, 29 m...|2020-03-29 00:00:00|Milenio Digital y...|Coronavirus en Mé...|https://

In [9]:
df_news_covid_mexico_date_text = df_news_covid_mexico.select(md5Udf("url").alias("article_id"),"title","url","publish_date",cleanLowerUdf("text").alias("clean_text"),getUrl("url").alias("site")).filter("length(text) >= 2")

In [10]:
df_news_covid_mexico_date_text.show(15)

+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+
|          article_id|               title|                 url|       publish_date|          clean_text|                site|
+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+
|9ce703a7bf3423248...|Caída del peso y ...|https://www.milen...|2020-03-29 00:00:00|milenio digital l...|     www.milenio.com|
|cdd17bbabcd40bb3b...|Coronavirus en Mé...|https://www.milen...|2020-03-29 00:00:00|fernando damián m...|     www.milenio.com|
|cf44530bae9e92f21...|Coronavirus en Mé...|https://www.milen...|2020-03-29 00:00:00|milenio digital y...|     www.milenio.com|
|a1625d0cc6177f2f0...|El gobernador de ...|https://politica....|2020-03-29 00:00:00|de la 1 a la 3 co...|politica.expansio...|
|4f093d45e3e2bbc68...|#Testimonio | "En...|https://politica....|2020-03-29 00:00:00|la joven llegó a ...|politi

In [11]:
df_news_covid_mexico_date_text.count()

1912

In [12]:
df_news_covid_mexico_date_text.select("title").show(15,False)

+-------------------------------------------------------------------------------------------------------------------------------+
|title                                                                                                                          |
+-------------------------------------------------------------------------------------------------------------------------------+
|Caída del peso y suspensión de clases: cronología del coronavirus en México                                                    |
|Coronavirus en México, noticias del 29 de marzo                                                                                |
|Coronavirus en México. Reportan 20 muertes por Covid-19                                                                        |
|El gobernador de Coahuila reporta 16 médicos y enfermeras con COVID-19                                                         |
|#Testimonio | "En Perú, la gente no entró en pánico como en México"                      

In [13]:
url = "jdbc:postgresql://postgres/shared"
mode="overwrite"
properties = {
    "user": "shared",
    "password": os.environ['SHARED_PASSWORD']
}

In [14]:
df_news_covid_mexico_date_text.write.jdbc(url=url, table="tb_news_covid_mexico_date_text", mode=mode, properties=properties)

In [15]:
df_news_covid_mexico_palabras = df_news_covid_mexico_date_text.select("article_id","publish_date",explode(split(df_news_covid_mexico_date_text.clean_text, "\s")).alias("palabra")).where(length('palabra') > 1)

In [16]:
df_news_covid_mexico_palabras.show(30)

+--------------------+-------------------+------------------+
|          article_id|       publish_date|           palabra|
+--------------------+-------------------+------------------+
|9ce703a7bf3423248...|2020-03-29 00:00:00|           milenio|
|9ce703a7bf3423248...|2020-03-29 00:00:00|           digital|
|9ce703a7bf3423248...|2020-03-29 00:00:00|                la|
|9ce703a7bf3423248...|2020-03-29 00:00:00|            muerte|
|9ce703a7bf3423248...|2020-03-29 00:00:00|                de|
|9ce703a7bf3423248...|2020-03-29 00:00:00|               una|
|9ce703a7bf3423248...|2020-03-29 00:00:00|           persona|
|9ce703a7bf3423248...|2020-03-29 00:00:00|                la|
|9ce703a7bf3423248...|2020-03-29 00:00:00|        suspensión|
|9ce703a7bf3423248...|2020-03-29 00:00:00|                de|
|9ce703a7bf3423248...|2020-03-29 00:00:00|            clases|
|9ce703a7bf3423248...|2020-03-29 00:00:00|             caída|
|9ce703a7bf3423248...|2020-03-29 00:00:00|                de|
|9ce703a

In [17]:
#https://sigdelta.com/blog/word-count-in-spark-with-a-pinch-of-tf-idf/
df_news_covid_mexico_palabras.groupBy('article_id', 'palabra','publish_date')\
    .count()\
    .orderBy('count', ascending=False)\
    .show(25)

+--------------------+-------+-------------------+-----+
|          article_id|palabra|       publish_date|count|
+--------------------+-------+-------------------+-----+
|8b0cf48512581df5b...|     de|2020-06-30 00:00:00|  405|
|dd755391269cad294...|     de|2020-04-21 00:00:00|  395|
|c436a8d90969dd3b3...|     de|2020-07-01 00:00:00|  390|
|99caa557807376a6a...|     de|2020-06-20 00:00:00|  374|
|b83cdf964da7930fb...|     de|2020-07-09 00:00:00|  369|
|8c4f10eedc1022f82...|     de|2020-07-21 00:00:00|  354|
|dbf80d5e3a74dd1f7...|     de|2020-04-30 00:00:00|  349|
|9fb6aad8918cbddc8...|     de|2020-06-07 00:00:00|  349|
|cdd17bbabcd40bb3b...|     de|2020-03-29 00:00:00|  331|
|6b24a43e24bb55800...|     de|2020-05-25 00:00:00|  254|
|d5244ccef7fe552a3...|     de|2020-07-19 00:00:00|  251|
|17daba1b37d7fcdd2...|     de|2020-02-29 00:00:00|  245|
|8d6050c8ce801550d...|     de|2020-03-28 00:00:00|  238|
|f223424e8afd7e4a7...|     de|2020-05-27 00:00:00|  228|
|dd755391269cad294...|     la|2

In [18]:
#https://sigdelta.com/blog/word-count-in-spark-with-a-pinch-of-tf-idf-continued/
w = Window.partitionBy(df_news_covid_mexico_palabras['article_id'])

article_tf = df_news_covid_mexico_palabras.groupBy('article_id', 'palabra', 'publish_date')\
    .agg(count('*').alias('n_w'),sum(count('*')).over(w).alias('n_d'),(count('*')/sum(count('*')).over(w)).alias('tf'))\
    .orderBy('n_w', ascending=False)\
    .cache()

article_tf.show(truncate=15)

+---------------+-------+---------------+---+----+---------------+
|     article_id|palabra|   publish_date|n_w| n_d|             tf|
+---------------+-------+---------------+---+----+---------------+
|8b0cf4851258...|     de|2020-06-30 0...|405|5383|0.0752368567...|
|dd755391269c...|     de|2020-04-21 0...|395|5425|0.0728110599...|
|c436a8d90969...|     de|2020-07-01 0...|390|4986|0.0782190132...|
|99caa5578073...|     de|2020-06-20 0...|374|4616|0.0810225303...|
|b83cdf964da7...|     de|2020-07-09 0...|369|5333|0.0691918244...|
|8c4f10eedc10...|     de|2020-07-21 0...|354|4512|0.0784574468...|
|dbf80d5e3a74...|     de|2020-04-30 0...|349|4173|0.0836328780...|
|9fb6aad8918c...|     de|2020-06-07 0...|349|4242|0.0822725129...|
|cdd17bbabcd4...|     de|2020-03-29 0...|331|4066|0.0814067879...|
|6b24a43e24bb...|     de|2020-05-25 0...|254|2579|0.0984877859...|
|d5244ccef7fe...|     de|2020-07-19 0...|251|3489|0.0719403840...|
|17daba1b37d7...|     de|2020-02-29 0...|245|2745|0.0892531876

In [19]:
w = Window.partitionBy('palabra')

c_d = df_news_covid_mexico_palabras.select('article_id').distinct().count()

article_idf = df_news_covid_mexico_palabras.groupBy('palabra', 'article_id','publish_date').agg(
        lit(c_d).alias('c_d'),
        count('*').over(w).alias('i_d'),
        log(lit(c_d)/count('*').over(w)).alias('idf')
    )\
    .orderBy('idf', ascending=False)\
    .cache()

In [20]:
article_idf.show(150, truncate=15)

+---------------+---------------+---------------+----+---+---------------+
|        palabra|     article_id|   publish_date| c_d|i_d|            idf|
+---------------+---------------+---------------+----+---+---------------+
|          14157|15db600b5cc6...|2020-09-28 0...|1912|  1|7.5559050936...|
|           1512|6b24a43e24bb...|2020-05-25 0...|1912|  1|7.5559050936...|
|          16320|0cccaf8df9f8...|2020-06-03 0...|1912|  1|7.5559050936...|
|          16974|d01d519971c6...|2020-09-18 0...|1912|  1|7.5559050936...|
|          17427|6e8dbba33509...|2021-03-05 0...|1912|  1|7.5559050936...|
|          20428|d6f5d07e217f...|2020-02-04 0...|1912|  1|7.5559050936...|
|           2136|0cccaf8df9f8...|2020-06-03 0...|1912|  1|7.5559050936...|
|          23918|eba398de9058...|2020-08-08 0...|1912|  1|7.5559050936...|
|           2904|d01d519971c6...|2020-09-18 0...|1912|  1|7.5559050936...|
|           4032|d96678dab28d...|2021-01-13 0...|1912|  1|7.5559050936...|
|         404000|6b7d6d4f

In [21]:
article_tfidf = article_tf.join(article_idf, ['article_id', 'palabra', 'publish_date'])\
    .withColumn('tf_idf', col('tf') * col('idf'))\
    .cache()

In [22]:
article_tfidf.orderBy('tf_idf', ascending=False).show(150,truncate=12)

+------------+------------+------------+---+----+------------+----+---+------------+------------+
|  article_id|     palabra|publish_date|n_w| n_d|          tf| c_d|i_d|         idf|      tf_idf|
+------------+------------+------------+---+----+------------+----+---+------------+------------+
|59827c83a...|       email|2020-10-0...|  2|  36|0.0555555...|1912|  6|5.7641456...|0.3202303...|
|c084f0fa8...|  contenidos|2020-04-2...|  2|  41|0.0487804...|1912|  9|5.3586805...|0.2613990...|
|88cd1e24b...|    hospital|2020-11-2...|103|1036|0.0994208...|1912|156|2.5060490...|0.2491535...|
|d94f7e7fc...|     meteoro|2020-01-1...|  3|  96|     0.03125|1912|  1|7.5559050...|0.2361220...|
|a4520e70b...|  balnearios|2020-06-2...|  7| 196|0.0357142...|1912|  3|6.4572928...|0.2306176...|
|59827c83a...|   infórmate|2020-10-0...|  1|  36|0.0277777...|1912|  1|7.5559050...|0.2098862...|
|59827c83a...|      ¡falta|2020-10-0...|  1|  36|0.0277777...|1912|  1|7.5559050...|0.2098862...|
|59827c83a...|      

In [23]:
w = Window.partitionBy('article_id').orderBy(col('tf_idf').desc())

article_tfidf_top_15=article_tfidf.withColumn('rank', rank().over(w))\
    .where('rank <= 15')\
    .drop('rank')\
    .orderBy('article_id', 'tf_idf','n_w')\
    .select('article_id','publish_date','palabra','n_w','tf_idf')

In [24]:
article_tfidf_top_15.show(truncate=12, n=30)

+------------+------------+------------+---+------------+
|  article_id|publish_date|     palabra|n_w|      tf_idf|
+------------+------------+------------+---+------------+
|0030acce3...|2021-01-1...|      sector|  3|0.0153563...|
|0030acce3...|2021-01-1...|   constante|  2|0.0158418...|
|0030acce3...|2021-01-1...|        eran|  2|0.0158418...|
|0030acce3...|2021-01-1...|     mayoría|  3|0.0165288...|
|0030acce3...|2021-01-1...|profesion...|  2|0.0165435...|
|0030acce3...|2021-01-1...|   empleados|  2|0.0166395...|
|0030acce3...|2021-01-1...|represent...|  1|0.0168658...|
|0030acce3...|2021-01-1...|transmisores|  1|0.0168658...|
|0030acce3...|2021-01-1...| enfrentaban|  1|0.0168658...|
|0030acce3...|2021-01-1...|corresponden|  2|0.0170456...|
|0030acce3...|2021-01-1...|trabajadores|  4|0.0209990...|
|0030acce3...|2021-01-1...|    personal|  5|0.0218469...|
|0030acce3...|2021-01-1...|         580|  2|0.0222810...|
|0030acce3...|2021-01-1...|  enfermería|  3|0.0290426...|
|0030acce3...|

In [26]:
article_tfidf_top_15_site = article_tfidf_top_15.join(df_news_covid_mexico_date_text, ['article_id','publish_date']).select('article_id','publish_date','site','palabra','n_w','tf_idf')

In [27]:
article_tfidf_top_15_site.show(15)

+--------------------+-------------------+--------------------+-----------+---+--------------------+
|          article_id|       publish_date|                site|    palabra|n_w|              tf_idf|
+--------------------+-------------------+--------------------+-----------+---+--------------------+
|082df15321be3547d...|2020-09-25 00:00:00|politica.expansio...| acumularon|  2| 0.08556067886585966|
|082df15321be3547d...|2020-09-25 00:00:00|politica.expansio...|       5023|  1| 0.05435902945044134|
|082df15321be3547d...|2020-09-25 00:00:00|politica.expansio...|      11208|  1| 0.05435902945044134|
|082df15321be3547d...|2020-09-25 00:00:00|politica.expansio...|      52477|  1| 0.05435902945044134|
|082df15321be3547d...|2020-09-25 00:00:00|politica.expansio...|  apoyarlos|  1| 0.05435902945044134|
|082df15321be3547d...|2020-09-25 00:00:00|politica.expansio...|      23365|  1| 0.05435902945044134|
|082df15321be3547d...|2020-09-25 00:00:00|politica.expansio...|     518204|  1| 0.049372359

In [28]:
article_tfidf_top_15_site.write.jdbc(url=url, table="tb_news_covid_mexico_palabras_top_tfidf", mode=mode, properties=properties)