In [1]:
!pip install nltk



In [2]:
from pyspark.sql import SparkSession

spark = SparkSession\
       .builder\
       .appName("test")\
       .getOrCreate()

21/08/27 06:18:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
df = spark.read.json("hdfs://namenode:8020/tmp/data/covid")

                                                                                

In [4]:
df.show()

+--------------------+
|               value|
+--------------------+
|{"userID": 320881...|
|{"userID": 926309...|
|{"userID": 447289...|
|{"userID": 472445...|
|{"userID": 567289...|
|{"userID": 102953...|
|{"userID": 787182...|
|{"userID": 223501...|
|{"userID": 149409...|
|{"userID": 724245...|
|{"userID": 838192...|
|{"userID": 985121...|
|{"userID": 852189...|
|{"userID": 119300...|
|{"userID": 134640...|
|{"userID": 154605...|
|{"userID": 824736...|
|{"userID": 530009...|
|{"userID": 140798...|
|{"userID": 105685...|
+--------------------+
only showing top 20 rows



In [5]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

schema= df.select(F.schema_of_json("""{
   "userID": 724245818299969500,
   "tweetText": "Text",
   "hashTags": [
      "propaganda",
      "China",
      "ChinaLiedPeopleDied",
      "COVID",
      "ChinaVirus"
   ],
   "location_full_name": "Merica",
   "favoriteCount": 0,
   "reTweetCount": 0,
   "created_at": "Fri Aug 27 03:21:20 +0000 2021"
}""")).collect()[0][0]

                                                                                

In [6]:
schema

'STRUCT<`created_at`: STRING, `favoriteCount`: BIGINT, `hashTags`: ARRAY<STRING>, `location_full_name`: STRING, `reTweetCount`: BIGINT, `tweetText`: STRING, `userID`: BIGINT>'

In [7]:
df = df.withColumn("value", F.from_json("value",schema))\
.select("value.userID", "value.tweetText", "value.hashTags", "value.location_full_name",
        "value.favoriteCount", "value.reTweetCount", "value.created_at",)

In [8]:
df.show()

+-------------------+--------------------+--------------------+--------------------+-------------+------------+--------------------+
|             userID|           tweetText|            hashTags|  location_full_name|favoriteCount|reTweetCount|          created_at|
+-------------------+--------------------+--------------------+--------------------+-------------+------------+--------------------+
|         3208811578|Babita Deokaran h...|[BabitaDeokaran, ...|Umhlanga, South A...|            0|           0|Fri Aug 27 06:00:...|
| 926309384615088128|The COVID-19 pand...|           [COVID19]|            Zimbabwe|            0|           0|Fri Aug 27 06:00:...|
|           44728980|DOH says several ...|           [COVID19]|         Philippines|            0|           0|Fri Aug 27 06:00:...|
|         4724452756|HMRC issues new S...|[SEISS5, coronavi...|Scotland, United ...|            0|           0|Fri Aug 27 06:00:...|
|          567289542|A crisis of incre...|[Afghanistan, COV...|      

# Country statistic of covid tweet

In [9]:
df.groupBy('location_full_name').count().orderBy('count', ascending=False).show()



+--------------------+-----+
|  location_full_name|count|
+--------------------+-----+
|       United States|  246|
|               India|  229|
|    Bengaluru, India|  159|
|     Los Angeles, CA|  124|
|    New Delhi, India|   98|
|           Australia|   94|
|        New York, NY|   66|
|       Mumbai, India|   63|
|     California, USA|   61|
|              Canada|   60|
|           Worldwide|   57|
|               Earth|   49|
|           New Delhi|   46|
|              Mumbai|   41|
|           Sri Lanka|   38|
|      Washington, DC|   36|
|                 USA|   36|
|        South Africa|   35|
|Sydney, New South...|   34|
|      United Kingdom|   32|
+--------------------+-----+
only showing top 20 rows



                                                                                

# Hashtags statistic 

In [10]:
df.select(F.explode(df.hashTags).alias('tag'))\
    .groupBy(F.col('tag').alias('key'))\
    .count()\
    .orderBy('count', ascending=False)\
    .show()



+--------------------+-----+
|                 key|count|
+--------------------+-----+
|             COVID19| 4031|
|               COVID|  711|
|             covid19|  473|
|             Covid19|  468|
|         coronavirus|  335|
|               Covid|  291|
|               covid|  185|
|   IndiaFightsCorona|  181|
|             Vaccine|  162|
|               CoWIN|  142|
|WhatsHappeningInM...|  137|
|      cowinblore1844|  136|
|               India|  134|
|           Aug27Coup|  134|
|                BBMP|  115|
|             vaccine|  112|
|              Kerala|  109|
|     MyanmarCovidSOS|  108|
|        DeltaVariant|  104|
|          COVISHIELD|   97|
+--------------------+-----+
only showing top 20 rows



                                                                                

# Clean text and remove stopwords

In [11]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer

In [12]:
# Clean and remove hashtag
df_clean = df.select('tweetText', (F.lower(F.regexp_replace('tweetText', "@[A-Za-z0-9_]+", "")).alias('text')))

# Tokenize text
tokenizer = Tokenizer(inputCol='tweetText', outputCol='words_token')
df_words_token = tokenizer.transform(df_clean).select('words_token')

# Remove stop words
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
df_words_no_stopw = remover.transform(df_words_token).select('words_clean')

# Stem text
stemmer = SnowballStemmer(language='english')
stemmer_udf = F.udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
df_stemmed = df_words_no_stopw.withColumn("words_stemmed", stemmer_udf("words_clean")).select('words_stemmed')

# Filter length word > 3
filter_length_udf = F.udf(lambda row: [x for x in row if len(x) >= 3], ArrayType(StringType()))
df_final_words = df_stemmed.withColumn('words', filter_length_udf(F.col('words_stemmed')))

In [13]:
df_clean.show()

+--------------------+--------------------+
|           tweetText|                text|
+--------------------+--------------------+
|Babita Deokaran h...|babita deokaran h...|
|The COVID-19 pand...|the covid-19 pand...|
|DOH says several ...|doh says several ...|
|HMRC issues new S...|hmrc issues new s...|
|A crisis of incre...|a crisis of incre...|
|ICYMI: @CrabbBren...|icymi:  from  app...|
|As we continue th...|as we continue th...|
|RT @NidhiTanejaa:...|rt : 26 students ...|
|"de-wormer" for a...|"de-wormer" for a...|
|Claiming that's i...|claiming that's i...|
|RT @AntonyRobart:...|rt : the debate o...|
|RT @EuroELSO: Lat...|rt : latest updat...|
|#Most worrisome i...|#most worrisome i...|
|RT @DrAmbrishMith...|rt : start the da...|
|@KavalAuthorActs ...| take ivermectin ...|
|RT @SantaKlauSchw...|rt : a lot of goo...|
|RT @SamirShahMD: ...|rt : 💸😟financia...|
|This expectation ...|this expectation ...|
|Watch the grand M...|watch the grand m...|
|More than 61 cror...|more than 61

In [14]:
df_final_words.show()

[Stage 10:>                                                         (0 + 1) / 1]

+--------------------+--------------------+
|       words_stemmed|               words|
+--------------------+--------------------+
|[babita, deokaran...|[babita, deokaran...|
|[covid-19, pandem...|[covid-19, pandem...|
|[doh, say, sever,...|[doh, say, sever,...|
|[hmrc, issu, new,...|[hmrc, issu, new,...|
|[crisi, incred, p...|[crisi, incred, p...|
|[icymi:, @crabbbr...|[icymi:, @crabbbr...|
|[continu, fight, ...|[continu, fight, ...|
|[rt, @nidhitaneja...|[@nidhitanejaa:, ...|
|["de-wormer", who...|["de-wormer", who...|
|[claim, western, ...|[claim, western, ...|
|[rt, @antonyrobar...|[@antonyrobart:, ...|
|[rt, @euroelso:, ...|[@euroelso:, late...|
|[#most, worrisom,...|[#most, worrisom,...|
|[rt, @drambrishmi...|[@drambrishmithal...|
|[@kavalauthoract,...|[@kavalauthoract,...|
|[rt, @santaklausc...|[@santaklauschwab...|
|[rt, @samirshahmd...|[@samirshahmd:, ?...|
|[expect, tepid, #...|[expect, tepid, #...|
|[watch, grand, ms...|[watch, grand, ms...|
|[61, crore, 22, l...|[crore, la

                                                                                

# Statistic top words

In [15]:
df_final_words.select(F.explode(df_final_words.words).alias('words'))\
    .groupBy(F.col('words').alias('key'))\
    .count()\
    .orderBy('count', ascending=False)\
    .show()



+------------+-----+
|         key|count|
+------------+-----+
|    #covid19| 4442|
|      vaccin| 1297|
|       death| 1276|
|      #covid| 1004|
|        case|  869|
|         new|  859|
|       blame|  726|
|       covid|  567|
|       &amp;|  558|
|         get|  510|
|     #vaccin|  503|
|       state|  496|
|        dose|  448|
|       peopl|  430|
|       india|  406|
|#coronavirus|  397|
|     control|  393|
|      report|  392|
|        last|  392|
|       biden|  391|
+------------+-----+
only showing top 20 rows



                                                                                