In [114]:
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from pyspark.sql.window import Window
NoneType = type(None)
import os
import socket
import hashlib
import string

def createMd5(text):
    return hashlib.md5(text.encode('utf-8')).hexdigest()
md5Udf= udf(lambda z: createMd5(z),StringType())

def clean_lower(text):
    sentence = text.translate(str.maketrans('', '', '!"#$%&\'()*+,./:;<=>?@[\\]^`{|}~-_”“«»‘')).lower()
    return " ".join(sentence.split())
cleanLowerUdf= udf(lambda z: clean_lower(z),StringType())

def get_site_from_url(text):
    return text.split("/")[2]
getUrl= udf(lambda z: get_site_from_url(z),StringType())

def get_metadata_date(text):
    NoneType = type(None)
    switcher = {
    "ene.": "01",
    "feb.": "02",
    "mar.": "03",
    "abr.": "04",
    "may.": "05",
    "jun.": "06",
    "jul.": "07",
    "ago.": "08",
    "sep.": "09",
    "oct.": "10",
    "nov.": "11",
    "dic.": "12"
    }
    
    date_array = text.split(" ")
    if len(date_array) == 3:        
        DAY = date_array[0].zfill(2)
        YEAR = date_array[2]
        MONTH = ' ' if type(switcher.get(date_array[1])) == NoneType else switcher.get(date_array[1])
        DATE = YEAR+"-"+MONTH+"-"+DAY
        return DATE
    else:
        return " "
getDateString= udf(lambda z: get_metadata_date(z),StringType())    

os.environ['PYSPARK_SUBMIT_ARGS'] = 'pyspark-shell'

In [2]:
minio_ip = socket.gethostbyname('minio')
spark = SparkSession \
    .builder \
    .appName("Python Spark S3") \
    .config('spark.hadoop.fs.s3a.endpoint', 'http://'+minio_ip+':9000') \
    .config("spark.hadoop.fs.s3a.access.key", "changeme1234") \
    .config("spark.hadoop.fs.s3a.secret.key", "changeme1234") \
    .config('spark.hadoop.fs.s3a.path.style.access', 'true') \
    .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .getOrCreate()

In [3]:
st= StructType([
    StructField("abstract", StringType()),
    StructField("authors", StringType()),
    StructField("image", StringType()),
    StructField("metadata", StringType()),
    StructField("publish_date", TimestampType()),
    StructField("text", StringType()),
    StructField("title", StringType()),
    StructField("url", StringType()),
])

In [4]:
df_news_covid_mexico = spark.read.schema(st).option("timestampFormat", "dd-MM-yyyy").json("s3a://news/covid_mexico/*.json")

In [99]:
df_news_covid_mexico.count()

1686

In [100]:
df_news_covid_mexico.printSchema()

root
 |-- abstract: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- image: string (nullable = true)
 |-- metadata: string (nullable = true)
 |-- publish_date: timestamp (nullable = true)
 |-- text: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)



In [115]:
df_news_covid_mexico_date_text = df_news_covid_mexico.select(md5Udf("url").alias("article_id"),"title","url","publish_date",cleanLowerUdf("text").alias("clean_text"),getUrl("url").alias("site"),getDateString("metadata").alias("string_date")).filter("length(text) >= 2 and length(string_date) >= 2")

In [116]:
df_news_covid_mexico_date_text.show(15)

+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+-----------+
|          article_id|               title|                 url|       publish_date|          clean_text|                site|string_date|
+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+-----------+
|6a07a98230ad362ef...|Actualización: Mé...|https://www.forbe...|2020-05-07 00:00:00|nota este texto s...|   www.forbes.com.mx| 2020-05-07|
|e7f7687b96e97d253...|A unas horas del ...|https://www.infob...|2020-05-07 00:00:00|las cifras de ‌ló...|     www.infobae.com| 2020-05-07|
|249038fa0de68d615...|México: mapa de d...|https://www.milen...|2020-07-05 00:00:00|alejandro gonzále...|     www.milenio.com| 2020-05-07|
|346e32556fdc0471f...|Reportan contagio...|https://www.forbe...|2020-05-07 00:00:00|reuters al menos ...|   www.forbes.com.mx| 2020-05-07|
|d80203ca9120a820d...|Suben

In [103]:
df_news_covid_mexico_date_text.select("title").show(15,False)

+-----------------------------------------------------------------+
|title                                                            |
+-----------------------------------------------------------------+
|Actualización: México, quinto lugar global por nuevas muertes ...|
|A unas horas del temido pico máximo de COVID-19, las ...         |
|México: mapa de defunciones por covid-19                         |
|Reportan contagio de Covid-19 de casi 50 personas en asilo ...   |
|Suben a 29616 los casos positivos de Covid-19 en México          |
|Ex secretarios de Salud cuestionan cifras de Covid-19 en ...     |
|El 10 de mayo la conferencia es para las mamás: así pueden ...   |
|Reinicio de actividades en CDMX será escalonado y desde ...      |
|Manuel Negrete da positivo por coronavirus COVID-19              |
|'Que no te cargue el payaso': así refuerza el Metro su ...       |
|Mutaciones del Covid-19 en México                                |
|Cierran asilo de ancianos en norte de México tr

In [117]:
df_news_covid_mexico_date_text.count()

1281

In [11]:
url = "jdbc:postgresql://postgres/shared"
mode="overwrite"
properties = {
    "user": "shared",
    "password": os.environ['SHARED_PASSWORD']
}

In [12]:
df_news_covid_mexico_date_text.write.jdbc(url=url, table="tb_news_covid_mexico_date_text", mode=mode, properties=properties)

In [13]:
df_news_covid_mexico_palabras = df_news_covid_mexico_date_text.select("article_id","publish_date",explode(split(df_news_covid_mexico_date_text.clean_text, "\s")).alias("palabra")).where(length('palabra') > 1)

In [14]:
df_news_covid_mexico_palabras.show(30)

+--------------------+-------------------+----------+
|          article_id|       publish_date|   palabra|
+--------------------+-------------------+----------+
|6a07a98230ad362ef...|2020-05-07 00:00:00|      nota|
|6a07a98230ad362ef...|2020-05-07 00:00:00|      este|
|6a07a98230ad362ef...|2020-05-07 00:00:00|     texto|
|6a07a98230ad362ef...|2020-05-07 00:00:00|        se|
|6a07a98230ad362ef...|2020-05-07 00:00:00| actualizó|
|6a07a98230ad362ef...|2020-05-07 00:00:00|       las|
|6a07a98230ad362ef...|2020-05-07 00:00:00|      1245|
|6a07a98230ad362ef...|2020-05-07 00:00:00|        de|
|6a07a98230ad362ef...|2020-05-07 00:00:00|      mayo|
|6a07a98230ad362ef...|2020-05-07 00:00:00|        de|
|6a07a98230ad362ef...|2020-05-07 00:00:00|      2020|
|6a07a98230ad362ef...|2020-05-07 00:00:00|     luego|
|6a07a98230ad362ef...|2020-05-07 00:00:00|        de|
|6a07a98230ad362ef...|2020-05-07 00:00:00|       que|
|6a07a98230ad362ef...|2020-05-07 00:00:00|reportamos|
|6a07a98230ad362ef...|2020-0

In [15]:
#https://sigdelta.com/blog/word-count-in-spark-with-a-pinch-of-tf-idf/
df_news_covid_mexico_palabras.groupBy('article_id', 'palabra','publish_date')\
    .count()\
    .orderBy('count', ascending=False)\
    .show(25)

+--------------------+-------+-------------------+-----+
|          article_id|palabra|       publish_date|count|
+--------------------+-------+-------------------+-----+
|e9cd5af96de5238b8...|     de|2020-05-09 00:00:00|  377|
|e007af30319332790...|     de|2020-05-07 00:00:00|  352|
|249038fa0de68d615...|     de|2020-07-05 00:00:00|  339|
|a501f551553465c3f...|     de|2020-05-06 00:00:00|  333|
|cdd17bbabcd40bb3b...|     de|2020-03-29 00:00:00|  331|
|3678d188ceeb03e6b...|     de|2020-06-05 00:00:00|  327|
|acd5384fe0192a18f...|     de|2020-05-13 00:00:00|  320|
|a38a67e29e23b582d...|     de|2020-05-11 00:00:00|  308|
|eb0a283d495866e69...|     de|2020-05-10 00:00:00|  304|
|2b7a3fb53b9905a69...|     de|2020-03-14 00:00:00|  278|
|8d6050c8ce801550d...|     de|2020-03-28 00:00:00|  238|
|63113c50ad7d9bba9...|     de|2020-04-05 00:00:00|  235|
|cedc44bfb6180a9ac...|     de|2020-04-26 00:00:00|  229|
|30d9911177d838db9...|     de|2020-04-09 00:00:00|  225|
|a501f551553465c3f...|     la|2

In [16]:
#https://sigdelta.com/blog/word-count-in-spark-with-a-pinch-of-tf-idf-continued/
w = Window.partitionBy(df_news_covid_mexico_palabras['article_id'])

article_tf = df_news_covid_mexico_palabras.groupBy('article_id', 'palabra', 'publish_date')\
    .agg(count('*').alias('n_w'),sum(count('*')).over(w).alias('n_d'),(count('*')/sum(count('*')).over(w)).alias('tf'))\
    .orderBy('n_w', ascending=False)\
    .cache()

article_tf.show(truncate=15)

+---------------+-------+---------------+---+----+---------------+
|     article_id|palabra|   publish_date|n_w| n_d|             tf|
+---------------+-------+---------------+---+----+---------------+
|e9cd5af96de5...|     de|2020-05-09 0...|377|4735|0.0796198521...|
|e007af303193...|     de|2020-05-07 0...|352|4994|0.0704845814...|
|249038fa0de6...|     de|2020-07-05 0...|339|4057|0.0835592802...|
|a501f5515534...|     de|2020-05-06 0...|333|4688|0.0710324232...|
|cdd17bbabcd4...|     de|2020-03-29 0...|331|4066|0.0814067879...|
|3678d188ceeb...|     de|2020-06-05 0...|327|3699|0.0884022708...|
|acd5384fe019...|     de|2020-05-13 0...|320|3621|0.0883733775...|
|a38a67e29e23...|     de|2020-05-11 0...|308|3653|0.0843142622...|
|eb0a283d4958...|     de|2020-05-10 0...|304|4275|0.0711111111...|
|2b7a3fb53b99...|     de|2020-03-14 0...|278|2867|0.0969654691...|
|8d6050c8ce80...|     de|2020-03-28 0...|238|2885|0.0824956672...|
|63113c50ad7d...|     de|2020-04-05 0...|235|2967|0.0792045837

In [17]:
w = Window.partitionBy('palabra')

c_d = df_news_covid_mexico_palabras.select('article_id').distinct().count()

article_idf = df_news_covid_mexico_palabras.groupBy('palabra', 'article_id','publish_date').agg(
        lit(c_d).alias('c_d'),
        count('*').over(w).alias('i_d'),
        log(lit(c_d)/count('*').over(w)).alias('idf')
    )\
    .orderBy('idf', ascending=False)\
    .cache()

In [18]:
article_idf.show(150, truncate=15)

+---------------+---------------+---------------+----+---+---------------+
|        palabra|     article_id|   publish_date| c_d|i_d|            idf|
+---------------+---------------+---------------+----+---+---------------+
|           201’|038da904a144...|2020-01-11 0...|1294|  1|7.1654934750...|
|           2136|370853b8e0eb...|2020-01-21 0...|1294|  1|7.1654934750...|
|           2162|70ffaf20effa...|2020-01-02 0...|1294|  1|7.1654934750...|
|        3267184|906f7950de61...|2020-05-03 0...|1294|  1|7.1654934750...|
|           3414|ccbf475555be...|2020-05-06 0...|1294|  1|7.1654934750...|
|           4821|a78b641ef212...|2020-02-06 0...|1294|  1|7.1654934750...|
|            691|377b116e873e...|2020-05-06 0...|1294|  1|7.1654934750...|
|     arbitrajes|a760a8f1cf3b...|2020-01-19 0...|1294|  1|7.1654934750...|
|    artesanales|eb0a283d4958...|2020-05-10 0...|1294|  1|7.1654934750...|
|     asistiendo|722156625331...|2020-02-17 0...|1294|  1|7.1654934750...|
|         azpiri|d7f19c1e

In [19]:
article_tfidf = article_tf.join(article_idf, ['article_id', 'palabra', 'publish_date'])\
    .withColumn('tf_idf', col('tf') * col('idf'))\
    .cache()

In [20]:
article_tfidf.orderBy('tf_idf', ascending=False).show(150,truncate=12)

+------------+------------+------------+---+---+------------+----+---+------------+------------+
|  article_id|     palabra|publish_date|n_w|n_d|          tf| c_d|i_d|         idf|      tf_idf|
+------------+------------+------------+---+---+------------+----+---+------------+------------+
|8605697c7...|    palenque|2020-02-0...|  1| 11|0.0909090...|1294|  1|7.1654934...|0.6514084...|
|8605697c7...|       toros|2020-02-0...|  1| 11|0.0909090...|1294|  2|6.4723462...|0.5883951...|
|8605697c7...|     barbosa|2020-02-0...|  1| 11|0.0909090...|1294| 12|4.6805868...|0.4255078...|
|f2bdf750a...|mexicoaer...|2020-05-0...|  2| 34|0.0588235...|1294|  1|7.1654934...|0.4214996...|
|8605697c7...|     anuncia|2020-02-0...|  1| 11|0.0909090...|1294| 14|4.5264361...|0.4114941...|
|8605697c7...|       plaza|2020-02-0...|  1| 11|0.0909090...|1294| 16|4.3929047...|0.3993549...|
|70ffaf20e...|     premium|2020-01-0...| 19|296|0.0641891...|1294|  7|5.2195833...|0.3350408...|
|70ffaf20e...|      diesel|202

In [21]:
w = Window.partitionBy('article_id').orderBy(col('tf_idf').desc())

article_tfidf_top_15=article_tfidf.withColumn('rank', rank().over(w))\
    .where('rank <= 15')\
    .drop('rank')\
    .orderBy('article_id', 'tf_idf','n_w')\
    .select('article_id','publish_date','palabra','n_w','tf_idf')

In [22]:
article_tfidf_top_15.show(truncate=12, n=30)

+------------+------------+------------+---+------------+
|  article_id|publish_date|     palabra|n_w|      tf_idf|
+------------+------------+------------+---+------------+
|0000210f7...|2020-02-0...|      zuñiga|  2|0.0335354...|
|0000210f7...|2020-02-0...|  campamento|  2|0.0335354...|
|0000210f7...|2020-02-0...|     managua|  2|0.0371269...|
|0000210f7...|2020-02-0...| selecciones|  2|0.0371269...|
|0000210f7...|2020-02-0...|     acevedo|  2|0.0371269...|
|0000210f7...|2020-02-0...|     octavos|  2|0.0371269...|
|0000210f7...|2020-02-0...|     academy|  2|0.0371269...|
|0000210f7...|2020-02-0...|       tyron|  2|0.0371269...|
|0000210f7...|2020-02-0...|     jicaral|  2|0.0371269...|
|0000210f7...|2020-02-0...|       sub17|  2|0.0371269...|
|0000210f7...|2020-02-0...|     fenifut|  2|0.0371269...|
|0000210f7...|2020-02-0...|       costa|  4|0.0422222...|
|0000210f7...|2020-02-0...|        rica|  4|0.0448940...|
|0000210f7...|2020-02-0...|       sub20|  3|0.0471519...|
|0000210f7...|

In [118]:
article_tfidf_top_15_site = article_tfidf_top_15.join(df_news_covid_mexico_date_text, ['article_id','publish_date']).select('article_id','publish_date','site','palabra','n_w','tf_idf','string_date')

In [119]:
article_tfidf_top_15_site.show(15)

+--------------------+-------------------+-------------------+-----------+---+--------------------+-----------+
|          article_id|       publish_date|               site|    palabra|n_w|              tf_idf|string_date|
+--------------------+-------------------+-------------------+-----------+---+--------------------+-----------+
|0000210f7e639a06c...|2020-02-06 00:00:00|www.laprensa.com.ni| campamento|  2| 0.03353547302850208| 2020-02-06|
|0000210f7e639a06c...|2020-02-06 00:00:00|www.laprensa.com.ni|     zuñiga|  2| 0.03353547302850208| 2020-02-06|
|0000210f7e639a06c...|2020-02-06 00:00:00|www.laprensa.com.ni|    academy|  2| 0.03712690919720645| 2020-02-06|
|0000210f7e639a06c...|2020-02-06 00:00:00|www.laprensa.com.ni|    fenifut|  2| 0.03712690919720645| 2020-02-06|
|0000210f7e639a06c...|2020-02-06 00:00:00|www.laprensa.com.ni|    octavos|  2| 0.03712690919720645| 2020-02-06|
|0000210f7e639a06c...|2020-02-06 00:00:00|www.laprensa.com.ni|      tyron|  2| 0.03712690919720645| 2020

In [120]:
article_tfidf_top_15_site.write.jdbc(url=url, table="tb_news_covid_mexico_palabras_top_tfidf", mode=mode, properties=properties)