In [1]:
import os
import collections

from pyspark.sql import SparkSession
from operator import add

In [2]:
# 创建sparkSession对象
spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/bigdata.raw") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/bigdata.t3") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.1')\
    .getOrCreate()

In [4]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df = df.limit(100000)
df.show()

+--------------------+--------------------+--------------------+
|                 _id|               emoji|            sentence|
+--------------------+--------------------+--------------------+
|[5ea4eddb59a37f98...|      :red_heart:,18|No object is so b...|
|[5ea4eddb59a37f98...|:person_shrugging...|Cant expect diffe...|
|[5ea4eddb59a37f98...|:face_with_tears_...|“ Lets go Marcus ...|
|[5ea4eddb59a37f98...|:face_with_tears_...|Asahd really is a...|
|[5ea4eddb59a37f98...|:face_with_tears_...|Yoongi Tweet Hell...|
|[5ea4eddb59a37f98...|:backhand_index_p...|we cannot afford ...|
|[5ea4eddb59a37f98...|:party_popper:,8 ...|ranks 6th in Janu...|
|[5ea4eddb59a37f98...|:person_facepalmi...|Ok people are rea...|
|[5ea4eddb59a37f98...|:smiling_face_wit...|Cant wait to meet...|
|[5ea4eddb59a37f98...| :clapping_hands:,11|Congratulations M...|
|[5ea4eddb59a37f98...|:face_with_tears_...|Met orlando brown...|
|[5ea4eddb59a37f98...|      :weary_face:,4|Im goin to bed :w...|
|[5ea4eddb59a37f98...|  :

## Find the appearance frequency of every emoji.

In [None]:
# emoji = ":red_heart:"
emojis = df.select('emoji')
# emojis.show()
def split_str(line):
    res = []
    
    words = line.emoji.split(" ")
    for word in words:
        tmp = word.split(',')[0]
        res.append(tmp)
    return " ".join(res)

emojis = emojis.rdd.map(split_str)
emojis.take(3)

In [None]:
result = emojis.flatMap(lambda x: x.split(" ")) \
        .map(lambda x: (x, 1)) \
        .reduceByKey(add) \
        .sortBy(lambda x: x[1], ascending= False) 
#         sortBy(lambda x: x[1], False)
# result.take(3)
result.take(10)
# for v, k in result:
#     print("{} {}".format(v, k))

In [None]:
new_names = ['emoji', 'fre']
result = result.toDF(*new_names)
result = result.withColumn('fre', result['fre']/1890000)

In [None]:
# result.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

## For every emoji, ﬁnd the 3 other emojis that are used most frequently with it.

In [None]:
from itertools import combinations

emojis = df.select('emoji')

def split_arr(line):
    res = []
    
    words = line.emoji.split(" ")
    for word in words:
        tmp = word.split(',')[0]
        res.append(tmp)
    return res

def combination(line):
    combs = list(combinations(line, 2))
    res = []
    
    top10 = [':face_with_tears_of_joy:', ':red_heart:',':loudly_crying_face:', ':fire:', \
             ':smiling_face_with_heart-eyes:', ':female_sign:', ':clapping_hands:', \
             ':folded_hands:', ':male_sign:', ':backhand_index_pointing_right:']
    for comb in combs:
        p0 = comb[0]
        p1 = comb[1]
        if p0 not in top10:
            p0 = 'others'
        if p1 not in top10:
            p1 = 'others'
    
        if p0 != p1:
            res.append((p0, p1))
            res.append((p1, p0))
    return res

emojis_comb = emojis.rdd.map(split_arr) \
        .filter(lambda x: len(x) > 1) \
        .map(combination) \
        .flatMap(lambda x: x) \
        .map(lambda x: (x, 1)) \
        .reduceByKey(add) \
        .map(lambda x: (x[0][0], (x[0][1], x[1]))) 

def sort_func(x):
    return x[1]

def top_10(line):
    candidate = tuple(list(line[1]))
    return (line[0], candidate)
#     sort_candidate = sorted(candidate, key=sort_func, reverse=True)
#     res = []
#     count = 0
#     while count < 10 and count < len(sort_candidate):
#         res.append(sort_candidate[count][0])
#         count += 1
    
#     return (line[0], res)

result = emojis_comb.groupByKey() \
        .map(top_10) \

result.take(1)

# for v, k in result:
#     print("{} {}".format(v, k))

In [None]:
# new_names = ['emoji', '']
# result = result.toDF(*new_names)
result = result.toDF()
result = result.selectExpr("_1 as emoji", "_2 as col")
result.show()

In [None]:
result.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

## For every emoji, determine it is used more with words begin with lower case or word begin with upper case.

In [5]:
def check_case(line):
    res = []
    sentence = line.sentence.split(' ')
    emojis = line.emoji.split(' ')
    
    for emoji in emojis:
        e, place = emoji.split(',')
        word = sentence[int(place)-1]

        if word[0].isupper():
            res.append((e, (1, 0)))
        elif word[0].islower():
            res.append((e, (0, 1)))
        else:
            res.append((e, (0, 0)))
    return res


result = df.rdd.map(check_case) \
        .flatMap(lambda x: x) \

upper = result.map(lambda x: (x[0], x[1][0])) \
                .reduceByKey(add)

lower = result.map(lambda x: (x[0], x[1][1])) \
                .reduceByKey(add) 

result = upper.join(lower) \
                .map(lambda x: (x[0], x[1][0], x[1][1]))

result.take(10)

[(':red_heart:', 2315, 5469),
 (':person_shrugging:', 226, 2081),
 (':female_sign:', 119, 61),
 (':face_with_tears_of_joy:', 2809, 15592),
 (':backhand_index_pointing_down:', 306, 607),
 (':party_popper:', 771, 532),
 (':person_facepalming:', 231, 1627),
 (':smiling_face_with_heart-eyes:', 1216, 4118),
 (':clapping_hands:', 827, 1289),
 (':weary_face:', 322, 2248)]

In [6]:
result = result.toDF()
result = result.selectExpr("_1 as emoji", "_2 as upper", "_3 as lower")
result.show()

+--------------------+-----+-----+
|               emoji|upper|lower|
+--------------------+-----+-----+
|         :red_heart:| 2315| 5469|
|  :person_shrugging:|  226| 2081|
|       :female_sign:|  119|   61|
|:face_with_tears_...| 2809|15592|
|:backhand_index_p...|  306|  607|
|      :party_popper:|  771|  532|
|:person_facepalming:|  231| 1627|
|:smiling_face_wit...| 1216| 4118|
|    :clapping_hands:|  827| 1289|
|        :weary_face:|  322| 2248|
|         :male_sign:|  121|   47|
|:loudly_crying_face:| 1399| 5868|
|      :folded_hands:|  734| 1977|
|    :hundred_points:|  464| 1246|
|:rolling_on_the_f...|  257| 1369|
|     :flexed_biceps:|  336|  925|
|:backhand_index_p...| 1039| 1051|
|       :crying_face:|  154|  701|
|      :purple_heart:|  388|  966|
|      :yellow_heart:|  183|  509|
+--------------------+-----+-----+
only showing top 20 rows



In [7]:
result.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

## Find the average of the number of emoji used in a sentence.

In [None]:
emojis = df.select('emoji')
def cal_count(line):
    res = 0
    
    words = line.emoji.split(" ")
    for word in words:
        res += 1
    return res

emojis_count = emojis.rdd.map(cal_count)
emojis_mapped = emojis_count.map(lambda x: (x, 1))
total_count = emojis_mapped.reduceByKey(add).sortByKey()
total_count.take(10)

In [None]:
result = total_count.toDF()
result = result.selectExpr("_1 as num", "_2 as counts")
result.show()

In [None]:
result.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

## For every emoji, ﬁnd the position (head, middle, end) that the emoji occurs most in a sentence.

0 - head, 1 - middle, 2 - end

In [None]:
def position(line):
    sentence = line.sentence.split(' ')
    emojis = line.emoji.split(' ')
    
    sentence_length = len(sentence)
    res = []
    
    for emoji in emojis:
        e, place = emoji.split(',')
        if int(place)/sentence_length < 1/3:
            res.append((e, 0))
        elif int(place)/sentence_length > 2/3:
            res.append((e, 2))
        else:
            res.append((e, 1))
    return res

def sort_func(x):
    return x[1]

def most_position(line):
    positions = list(line[1])
    positions_sort = sorted(positions, key=sort_func, reverse=True)
    
    return (line[0], positions_sort[0][0])


position = df.rdd.map(position) \
            .flatMap(lambda x: x) \
            .map(lambda x: ((x[0], x[1]), 1)) \
            .reduceByKey(add) \
            .map(lambda x: (x[0][0], (x[0][1], x[1]))) \
            .groupByKey() \
            .map(most_position)

position.take(3)

In [None]:
result = position.toDF()
result = result.selectExpr("_1 as emoji", "_2 as pos")
result.show()

In [None]:
result.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

## Analyze the relation between the length of sentence and the number of emoji used in the sentence.

In [None]:
def length_relation(line):
    
    sentence = line.sentence.split(' ')
    emojis = line.emoji.split(' ')
    
    sentence_length = len(sentence)
    emojis_length = len(emojis)
    return (sentence_length, emojis_length)

length = df.rdd.map(length_relation)
length_mapped = length.map(lambda x: (x, 1))
relation = length_mapped.reduceByKey(add).sortByKey() \
                        .map(lambda x: (x[0][0], x[0][1], x[1]))

relation.take(10)

In [None]:
result = relation.toDF()
result = result.selectExpr("_1 as sent_len", "_2 as emoji_len", "_3 as count")
result.show()

In [None]:
result.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

## For every emoji, summarize the frequency that it could be used more than once in a sentence.

If the times that it used more than once, we consider it equally.

In [None]:
emojis = df.select("emoji")

def split_arr(line):
    dict_ = collections.defaultdict(int)
    res = []
    words = line.emoji.split(" ")
    for word in words:
        tmp = word.split(',')[0]
        dict_[tmp] += 1
    
    for k, v in dict_.items():
        if v > 1:
            res.append(k)
    return res

emojis = emojis.rdd.map(split_arr)
emojis_filter = emojis.filter(lambda x: len(x) > 0) \
                .flatMap(lambda x: x) \
                .map(lambda x: (x, 1)) \
                .reduceByKey(add)

emojis_filter.take(3)

In [None]:
result = emojis_filter.toDF()
result = result.selectExpr("_1 as emoji", "_2 as count")
result.show()

In [None]:
result.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

## For every emoji, ﬁnd the average word length in the sentences that contain it.

In [None]:
def average_length(line):
    res = []
    sentence = line.sentence.split(' ')
    sentence_length = len(sentence)
    
    emojis = line.emoji.split(' ')
    for emoji in emojis:
        tmp = emoji.split(',')[0]
        res.append((tmp, sentence_length))
    return res
    
emojis_ave = df.rdd.map(average_length) \
            .flatMap(lambda x: x) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)

# emojis_ave.take(3)

sentences_count = emojis_ave.map(lambda x: (x[0][0], x[1])) \
                    .reduceByKey(add)

# sentences_count.take(3)

words_count = emojis_ave.map(lambda x: (x[0][0], x[0][1]*x[1])) \
                    .reduceByKey(add)
# words_count.take(3)

ave_result = sentences_count.join(words_count) \
            .map(lambda x: (x[0], round(x[1][1] / x[1][0], 0)))

ave_result.take(10)

In [None]:
result = ave_result.toDF()
result = result.selectExpr("_1 as emoji", "_2 as ave_len")
result.show()

In [None]:
result.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()