In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as fc
spark = SparkSession.builder.appName('dsci551').getOrCreate()

In [2]:
df = spark.read.option("header",True).json('./testout.json')

In [3]:
df.printSchema()

root
 |-- _id: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- screen_name: string (nullable = true)
 |-- tweet: string (nullable = true)
 |-- tweet_id: long (nullable = true)
 |-- user_id: long (nullable = true)



In [4]:
df.take(1)

[Row(_id='603782cfa19bf221e69ec569', city=None, country=None, country_code=None, created_at=datetime.datetime(2021, 2, 25, 2, 58, 18), hashtags=[], screen_name='averismusic', tweet='Pfizer gang. My arm really hurts tired face', tweet_id=1364892472753741800, user_id=36044017)]

In [5]:
df.count()

656438

# 每日推文数量

In [6]:
df[df.created_at.isNull()][['created_at']].count()

0

In [7]:
df[['created_at']].take(10)

[Row(created_at=datetime.datetime(2021, 2, 25, 2, 58, 18)),
 Row(created_at=datetime.datetime(2021, 2, 25, 2, 58, 19)),
 Row(created_at=datetime.datetime(2021, 2, 25, 2, 58, 19)),
 Row(created_at=datetime.datetime(2021, 2, 25, 2, 58, 19)),
 Row(created_at=datetime.datetime(2021, 2, 25, 2, 58, 20)),
 Row(created_at=datetime.datetime(2021, 2, 25, 2, 58, 20)),
 Row(created_at=datetime.datetime(2021, 2, 25, 2, 58, 21)),
 Row(created_at=datetime.datetime(2021, 2, 25, 2, 58, 21)),
 Row(created_at=datetime.datetime(2021, 2, 25, 2, 58, 21)),
 Row(created_at=datetime.datetime(2021, 2, 25, 2, 58, 22))]

In [8]:
df[df.created_at.rlike('2021-02-25')].rdd.count()

97967

In [9]:
def date_str_int(n):
    res = int(n.replace('-', ''))
    return res

def date_int_str(n):
    res = list(str(n))
    res.insert(4, '-')
    res.insert(7, '-')
    res = ''.join(res)
    return res

In [10]:
for date in range(20210225, 20210229):
    date_str = date_int_str(date)
    cnt = df[df.created_at.rlike(date_str)].count()
    print(date_str, cnt)
for date in range(20210301, 20210306):
    date_str = date_int_str(date)
    cnt = df[df.created_at.rlike(date_str)].count()
    print(date_str, cnt)

2021-02-25 97967
2021-02-26 89463
2021-02-27 87522
2021-02-28 46316
2021-03-01 82514
2021-03-02 75871
2021-03-03 66403
2021-03-04 71407
2021-03-05 38975


In [11]:
97967 + 89463 + 87522 + 46316 + 82514 + 75871 + 66403 + 71407 + 38975

656438

# tweet word cnt

In [12]:
df[['tweet']].take(1)

[Row(tweet='Pfizer gang. My arm really hurts tired face')]

In [13]:
tweet_df = df[['tweet']]

In [14]:
tweet_df.take(2)

[Row(tweet='Pfizer gang. My arm really hurts tired face'),
 Row(tweet='Me when i attended a weekend retreat with my batman pajamas...and my batman toothbrush\n\nI was 20.')]

In [15]:
tweet_df.show()

+--------------------+
|               tweet|
+--------------------+
|Pfizer gang. My a...|
|Me when i attende...|
|@pabloLID07 @king...|
|Nope. That's not ...|
|@Riaanafrica @BT_...|
|loudspeaker Major...|
|They want to give...|
|Experts worry var...|
|Does it feel weir...|
|...good question....|
|Hey, check out th...|
|@SarahJTodd @chri...|
|Plans to launch a...|
|No Vaccine, No Fo...|
|@suzseddon Once t...|
|@QuigleyCatriona ...|
|@idgiveuakidney A...|
|We are urging our...|
|@DrAmarMOH Meanwh...|
|So thankful to be...|
+--------------------+
only showing top 20 rows



In [16]:
test_df = tweet_df.withColumn('wordCount', fc.size(fc.split(fc.col('tweet'), ' ')))
test_df.show()

+--------------------+---------+
|               tweet|wordCount|
+--------------------+---------+
|Pfizer gang. My a...|        8|
|Me when i attende...|       16|
|@pabloLID07 @king...|       27|
|Nope. That's not ...|       15|
|@Riaanafrica @BT_...|       47|
|loudspeaker Major...|       31|
|They want to give...|       24|
|Experts worry var...|       19|
|Does it feel weir...|       34|
|...good question....|       36|
|Hey, check out th...|       35|
|@SarahJTodd @chri...|       18|
|Plans to launch a...|       24|
|No Vaccine, No Fo...|       14|
|@suzseddon Once t...|       53|
|@QuigleyCatriona ...|       39|
|@idgiveuakidney A...|       26|
|We are urging our...|       24|
|@DrAmarMOH Meanwh...|        6|
|So thankful to be...|       45|
+--------------------+---------+
only showing top 20 rows



In [17]:
 tweet_res = tweet_df.withColumn('word', fc.explode(fc.split(fc.col('tweet'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)
tweet_res.show()

+-------+------+
|   word| count|
+-------+------+
|    the|583435|
|     to|418294|
|     of|278235|
|    and|263064|
|      a|255965|
|vaccine|226552|
|     in|200967|
|     is|190475|
|    for|176946|
|      I|152786|
|       |119379|
|    are|119118|
|   that|115509|
|    you|113757|
|   with|107141|
|   have|105128|
|     be|103526|
|     on| 98398|
|     it| 87367|
|    get| 85986|
+-------+------+
only showing top 20 rows



# hashtag

In [18]:
hashtag_df = df[['hashtags']]

In [19]:
hashtag_df

DataFrame[hashtags: array<string>]

In [20]:
hashtag_df.show()

+--------------------+
|            hashtags|
+--------------------+
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|[covid, covid19va...|
|                  []|
|[covid19, covidva...|
|                  []|
|                  []|
|[topic, eu, merke...|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|    [covid19vaccine]|
|                  []|
|                  []|
+--------------------+
only showing top 20 rows



In [21]:
hashtag_df.select("*", fc.explode("hashtags"))\
    .agg(fc.count("hashtags"))\
    .show()

+---------------+
|count(hashtags)|
+---------------+
|         235412|
+---------------+



In [22]:
with open('./vaccine_key_words.txt') as f:
    keyword_data = f.readlines()

In [23]:
for keyword in keyword_data:
    tmp = keyword.replace('\n', '')
    hashtag_set.add(tmp)

NameError: name 'hashtag_set' is not defined

In [None]:
hashtag_set

In [None]:
for hashtag in hashtag_set:
    num = hashtag_df[fc.array_contains('hashtags', hashtag)].count()
    print(hashtag, num)