In [1]:
from pymongo import MongoClient
from pyspark import SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [2]:
spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/bigdata.raw") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/bigdata.pairWise") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.1')\
    .getOrCreate()

In [3]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

In [4]:
df = df.select("emoji", "sentence")

In [5]:
def divide_emoji(x):
    return x.split(" ")

def formed_(x):
    result = []
    for emoji in x[0]:
        result.append([emoji, x[1]])
    return result

def separate_emoji(x):
    temp = x.split(',')
    return temp[0]

def separate_position(x):
    temp = x.split(',')
    return temp[1]

In [6]:
rdd = df.rdd.map(list)
rdd = rdd.map(lambda x: (divide_emoji(x[0]), x[1]))
rdd = rdd.flatMap(lambda x: formed_(x))
rdd = rdd.map(lambda x: (separate_emoji(x[0]), int(separate_position(x[0])), x[1]))
rdd.take(5)

[(':red_heart:',
  18,
  'No object is so beautiful that under certain conditions it will not look ugly Oscar Wilde ↺ RT :red_heart: …'),
 (':person_shrugging:',
  13,
  'Cant expect different results doing the same thingdoing stuff different from now on :person_shrugging: 🏻 \u200d :female_sign: ️'),
 (':female_sign:',
  15,
  'Cant expect different results doing the same thingdoing stuff different from now on :person_shrugging: 🏻 \u200d :female_sign: ️'),
 (':face_with_tears_of_joy:',
  14,
  '“ Lets go Marcus ” “ Shiiit where we goin Home ” Marcus Peters :face_with_tears_of_joy:'),
 (':face_with_tears_of_joy:',
  14,
  'Asahd really is a grown man in the body of a 1 year old :face_with_tears_of_joy:')]

In [7]:
def get_pair(x):
    pos = x[1]
    sentence = x[2]
    words = sentence.split(' ')
    word =''.join(ch for ch in words[pos-1] if ch.isalpha())
    word = word.lower()
    pair = (word, words[pos])
    return pair

def pair_frequency(x):
    pair_freq = {}
    emoji = x[0]
    pairs = x[1]
    for ele in pairs:
        if ele[1] != emoji or ele[0]=='':
            continue
        if ele in pair_freq:
            pair_freq[ele] += 1
        else:
            pair_freq[ele] = 1
    result = list(pair_freq.items())
    result.sort(key = lambda x: x[1], reverse=True)
    return result

In [8]:
pairs = rdd.map(lambda x: (x[0], get_pair(x)))
group_pairs = pairs.groupBy(lambda x: x[0])
group_pairs = group_pairs.map(lambda x: (x[0], list(x[1])))
temp = group_pairs.map(lambda x: (x[0], [row[1] for row in x[1]]))
result_2 = temp.map(lambda x: (x[0], pair_frequency(x)))
result_2 = result_2.filter(lambda x: len(x[1]) != 0)
result_2 = result_2.sortBy(lambda x: len(x[1]), ascending = False)
result_2.take(2)

[(':face_with_tears_of_joy:',
  [(('this', ':face_with_tears_of_joy:'), 7063),
   (('me', ':face_with_tears_of_joy:'), 5371),
   (('it', ':face_with_tears_of_joy:'), 4971),
   (('you', ':face_with_tears_of_joy:'), 3329),
   (('up', ':face_with_tears_of_joy:'), 2401),
   (('now', ':face_with_tears_of_joy:'), 2346),
   (('shit', ':face_with_tears_of_joy:'), 2314),
   (('today', ':face_with_tears_of_joy:'), 2074),
   (('lol', ':face_with_tears_of_joy:'), 1966),
   (('that', ':face_with_tears_of_joy:'), 1954),
   (('day', ':face_with_tears_of_joy:'), 1922),
   (('time', ':face_with_tears_of_joy:'), 1798),
   (('out', ':face_with_tears_of_joy:'), 1634),
   (('him', ':face_with_tears_of_joy:'), 1505),
   (('too', ':face_with_tears_of_joy:'), 1498),
   (('again', ':face_with_tears_of_joy:'), 1488),
   (('lmao', ':face_with_tears_of_joy:'), 1451),
   (('crying', ':face_with_tears_of_joy:'), 1450),
   (('funny', ':face_with_tears_of_joy:'), 1410),
   (('video', ':face_with_tears_of_joy:'), 1373

In [9]:
result_df = result_2.toDF()

In [10]:
result_df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()