In [179]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

In [180]:
from pyspark.sql.functions import array_join, explode, col, split, lower, flatten, concat_ws, size, regexp_replace
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [4]:
my_spark = SparkSession .\
    builder .\
    appName("myApp") .\
    config("spark.mongodb.input.uri", "mongodb://13.76.131.54:27017/cord19dataset.fulltexts") .\
    config("spark.mongodb.output.uri", "mongodb://13.76.131.54:27017/cord19dataset.fulltexts") .\
    config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
    getOrCreate()

In [22]:
df = my_spark.read.format("mongo").load()

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:57569)
Traceback (most recent call last):
  File "D:\spark-3.0.0-bin-hadoop3.2\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "D:\spark-3.0.0-bin-hadoop3.2\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:57569)

In [14]:
%%time
all_body = df.select('body_text')
all_body.limit(3).show()

+--------------------+
|           body_text|
+--------------------+
|[It was a distinc...|
|[Tuberculosis is ...|
|[

, The city of ...|
+--------------------+



In [22]:
%%time
data_file_path = 'data.json'
all_body.write.format('json').mode('overwrite').option("header", "false").save(data_file_path)

Wall time: 1min 18s


In [43]:
%%time
data_file_path = 'data.json'
df = my_spark.read.json(data_file_path)

Wall time: 40.2 s


In [6]:
%%time
df.count()

Wall time: 22.6 s


176564

# Data checking

In [44]:
df.select('body_text').show()

+--------------------+
|           body_text|
+--------------------+
|[It was a distinc...|
|[Tuberculosis is ...|
|[

, The city of ...|
|[With dreadful gl...|
|[According to the...|
|[Public health su...|
|[Lors de la premi...|
|[After the first ...|
|[The novel 2019 c...|
|[Further informat...|
|[To the Editor,, ...|
|[The dramatic spr...|
|[This study explo...|
|[Infectious bronc...|
|[Grassland ecolog...|
|[Our hospital was...|
|[Influenza viruse...|
|[Donor hematopoie...|
|[The coronavirus ...|
|[Left ventricular...|
+--------------------+
only showing top 20 rows



# Pre-processing

In [123]:
df_article = df.withColumn('article', concat_ws(' ', col('body_text')))
df_article.select('article').show(3, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [175]:
df_filter = df_article\
.withColumn('filtered_article', regexp_replace('article', "[\\r\\n]", ""))\
.withColumn('filtered_article', regexp_replace('filtered_article', "\\(.*?\\)", ""))\
.withColumn('filtered_article', regexp_replace('filtered_article', "\\[.*?\\]", ""))\
.withColumn('filtered_article', regexp_replace('filtered_article', "[^a-zA-Z]", " "))\
.withColumn('filtered_article', regexp_replace('filtered_article', " +", " "))\
.select('filtered_article')
df_filter.show(2, truncate=False)
# df_filter.limit(2).write.format('csv').mode('overwrite').save('test.csv')

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [172]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover

tokenizer = Tokenizer(inputCol='filtered_article', outputCol='words_token')
df_words_token = tokenizer.transform(df_filter).select('filtered_article', 'words_token')
df_words_token.show(2, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [173]:
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
df_stop_words = remover.transform(df_words_token).select('words_clean')
df_stop_words.show(2, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [177]:
def clean_words(x):
    res = []
    for x_ in x:
        if len(x_) > 2 and x_.isalpha():
            res.append(x_.lower())
    return res
clean_up = F.udf(clean_words, T.ArrayType(T.StringType()))

df_clean = df_stop_words.withColumn('words_clean', clean_up(col('words_clean')))
df_clean.select('words_clean').show(2, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Analysis

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer 
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline

hashingTF = HashingTF(inputCol="words_clean", outputCol="rawFeatures", numFeatures=2000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)

kmeans = KMeans(k=20)

pipeline = Pipeline(stages=[hashingTF, idf, kmeans])
model = pipeline.fit(df_clean)
results = model.transform(df_clean)
results.groupBy("prediction").count()