In [2]:
from pyspark.sql import SparkSession
from pymongo import MongoClient
import urllib.parse
import pyspark.sql.functions as sqlf
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinHashLSH, NGram, Tokenizer, CountVectorizer
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, ArrayType
from pyspark.sql.functions import monotonically_increasing_id

from DisjointSet import DisjointSet
import sys
sys.setrecursionlimit(3000000)

normalized_file = 'normalizedTweetsWithoutLinks_2019_test.txt'
results_file = 'results/TweetsAlike_2019_final.txt'
beautify_results_file = 'results/niceResults_2019_final.txt'

credPath = 'credentialsMongo.txt'
credFile = open(credPath, 'r')
loginMA = urllib.parse.quote_plus(credFile.readline().strip())
passwordMA = urllib.parse.quote_plus(credFile.readline()).strip()

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("twitter") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.0') \
    .config("spark.mongodb.input.uri", "mongodb+srv://" +
            loginMA + ":" +
            passwordMA +
            "@doublesearchintwitter-m3qge.mongodb.net/twitter_march2019.tweets") \
    .config("spark.mongodb.output.uri", "mongodb+srv://" +
            loginMA + ":" +
            passwordMA +
            "@doublesearchintwitter-m3qge.mongodb.net/") \
    .getOrCreate()


In [3]:
df1 = spark.read.text(normalized_file).withColumn("id", monotonically_increasing_id()) # .limit(500)
df1.count()
df1.show()

+--------------------+---+
|               value| id|
+--------------------+---+
|"- –û—Ö, –ê–Ω–µ—á–∫–∞, –µ—Å...|  0|
|—Ç–≤–æ–π –ø—É—Ç—å (–í–ª–∞–¥–∏–º...|  1|
|[user] –Ø —É–∂ —Ä—É–∫–∏ ...|  2|
|[user] [user] [us...|  3|
|[user] –£ –º–µ–Ω—è –Ω–µ—Ç...|  4|
|  [user] –¢–∞–∫ —á—Ç–æ, –¥–∞|  5|
|[user] [user] [us...|  6|
|–ï—â—ë –æ–¥–Ω–∞ –Æ–ª—è, —Ç–æ–ª...|  7|
|[user] –Ø –ø–µ—Ä–µ–º–∞–∑–∞...|  8|
|[user] –≠—Ç–æ —á—Ç–æ —â–∞...|  9|
|—á—É–≤–∞–∫ —Ö–æ—á–µ—Ç –∫–æ–∂–∞–Ω...| 10|
|                –û–ö–ï–ô| 11|
|—Å—É–∫–∞ –∑–Ω–∞–µ—Ç–µ —á—Ç–æ –∫...| 12|
|–£–∫—Ä–∞–∏–Ω—Ü–µ–≤ –∂–¥–µ—Ç –Ω–æ...| 13|
|    #–ó–¥–æ—Ä–æ–≤—å–µ [link]| 14|
|–ø–æ–±–µ–π—Ç–µ –º–µ–Ω—è –µ—Å–ª–∏...| 15|
|–Ω–∞–¥–µ—é—Å—å –≤—Å–µ –ø—Ä–∞–≤–∏...| 16|
|–ó–∞ 2 –Ω–µ–¥–µ–ª–∏ –¥–æ —Å–¥...| 17|
|–û–π –∑–Ω–∞–∫–æ–º–æ–µ —á—Ç–æ-—Ç...| 18|
|2 –º–∏–Ω –∏ –≤—ã–∫–ª. –ù—É ...| 19|
+--------------------+---+
only showing top 20 rows



In [4]:
tokenizer = Tokenizer(inputCol="value", outputCol="words")
ngram = NGram(n=1, inputCol="words", outputCol="ngrams")
cv = CountVectorizer(inputCol="ngrams", outputCol="features")
pipeline = Pipeline(stages=[tokenizer, ngram, cv])
model = pipeline.fit(df1)
df2 = model.transform(df1)
df2.show()
df2.count()

+--------------------+---+--------------------+--------------------+--------------------+
|               value| id|               words|              ngrams|            features|
+--------------------+---+--------------------+--------------------+--------------------+
|"- –û—Ö, –ê–Ω–µ—á–∫–∞, –µ—Å...|  0|["-, –æ—Ö,, –∞–Ω–µ—á–∫–∞,...|["-, –æ—Ö,, –∞–Ω–µ—á–∫–∞,...|(262144,[1,3,5,6,...|
|—Ç–≤–æ–π –ø—É—Ç—å (–í–ª–∞–¥–∏–º...|  1|[—Ç–≤–æ–π, –ø—É—Ç—å, (–≤–ª–∞...|[—Ç–≤–æ–π, –ø—É—Ç—å, (–≤–ª–∞...|(262144,[1,40,340...|
|[user] –Ø —É–∂ —Ä—É–∫–∏ ...|  2|[[user], —è, —É–∂, —Ä...|[[user], —è, —É–∂, —Ä...|(262144,[0,5,293,...|
|[user] [user] [us...|  3|[[user], [user], ...|[[user], [user], ...|(262144,[0,1,3,7,...|
|[user] –£ –º–µ–Ω—è –Ω–µ—Ç...|  4|[[user], —É, –º–µ–Ω—è,...|[[user], —É, –º–µ–Ω—è,...|(262144,[0,2,14,2...|
|  [user] –¢–∞–∫ —á—Ç–æ, –¥–∞|  5|[[user], —Ç–∞–∫, —á—Ç–æ...|[[user], —Ç–∞–∫, —á—Ç–æ...|(262144,[0,17,41,...|
|[user] [user] [us...|  6|[[user], [user], ...|[[user], [user], ...|(262144

231279

In [5]:
def getsparsesize(v):
    return v.values.size

getsize_udf = udf(getsparsesize, IntegerType())
df2_with_lengths = df2.select("id", "value", "features", getsize_udf("features").alias("vec_size"))
df2_with_lengths.show()

df2NotNull = df2_with_lengths.filter(getsize_udf(df2["features"]) != 0)

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=128)
model2 = mh.fit(df2)
transformed_df2 = model2.transform(df2NotNull)
transformed_df2.cache()
transformed_df2.show()

+---+--------------------+--------------------+--------+
| id|               value|            features|vec_size|
+---+--------------------+--------------------+--------+
|  0|"- –û—Ö, –ê–Ω–µ—á–∫–∞, –µ—Å...|(262144,[1,3,5,6,...|      24|
|  1|—Ç–≤–æ–π –ø—É—Ç—å (–í–ª–∞–¥–∏–º...|(262144,[1,40,340...|       7|
|  2|[user] –Ø —É–∂ —Ä—É–∫–∏ ...|(262144,[0,5,293,...|       6|
|  3|[user] [user] [us...|(262144,[0,1,3,7,...|      12|
|  4|[user] –£ –º–µ–Ω—è –Ω–µ—Ç...|(262144,[0,2,14,2...|       9|
|  5|  [user] –¢–∞–∫ —á—Ç–æ, –¥–∞|(262144,[0,17,41,...|       4|
|  6|[user] [user] [us...|(262144,[0,4,1959...|       5|
|  7|–ï—â—ë –æ–¥–Ω–∞ –Æ–ª—è, —Ç–æ–ª...|(262144,[36,45,88...|       6|
|  8|[user] –Ø –ø–µ—Ä–µ–º–∞–∑–∞...|(262144,[0,1,3,5,...|      17|
|  9|[user] –≠—Ç–æ —á—Ç–æ —â–∞...|(262144,[0,7,10,7...|       5|
| 10|—á—É–≤–∞–∫ —Ö–æ—á–µ—Ç –∫–æ–∂–∞–Ω...|(262144,[26,272,1...|       6|
| 11|                –û–ö–ï–ô|(262144,[1145],[1...|       1|
| 12|—Å—É–∫–∞ –∑–Ω–∞–µ—Ç–µ —á—Ç–æ –∫...|(262144,[

In [6]:
def colour_tweets(rows_with_ids, edges):
    cnt = 1
    for row in rows_with_ids:
        # print(row)
        cnt += 1
        if cnt % 500 == 0:
            print("row number " + str(cnt))
        
        ids = row["ids"]
        for x in ids[1:]:
            edges.union(ids[0], x)

In [7]:

def getHashColumnsAll(df0):
    # print("get hash cols all: " + str(df0))
    # return [1, 2, 3]
    
    result = []
    for y in range(len(df0) // 4):
        sum_of_hashes =\
            int(df0[4 * y][0]) +\
            int(df0[4 * y + 1][0]) +\
            int(df0[4 * y + 2][0]) +\
            int(df0[4 * y + 3][0])
        result.append(sum_of_hashes)
    return result

gethashsumsall_udf = udf(getHashColumnsAll, ArrayType(IntegerType()))

def simple(df0):
    return 42

simple_udf = udf(simple, IntegerType())

df4_all_sums = transformed_df2.select(
    "value",
    "id",
    # simple_udf("id")
    gethashsumsall_udf("hashes").alias("hashes_sum")
)

df4_all_sums.cache()
#df4_all_sums.show()

DataFrame[value: string, id: bigint, hashes_sum: array<int>]

In [9]:
max_tweet_id = df1.agg({"id": "max"}).collect()[0][0]
edges = DisjointSet(max_tweet_id + 1)

with open(results_file, 'w') as outf:     
    for k in range(128 // 4):
        print("k = " + str(k))
        
        df4_group = df4_all_sums.groupBy(df4_all_sums.hashes_sum[k])\
            .agg(
                sqlf.count('id').alias("num_tweets"),
                sqlf.collect_list("value").alias("tweets_texts"),
                sqlf.collect_list("id").alias("ids")
            )\
            .filter(col("num_tweets") > 1)
        
        df4_group.cache()
        print("df4 cached")
                        
        colour_tweets(df4_group.select("ids").collect(), edges)
        print("We coloured some tweets! Way to go!")
                       
        outf.write("=========== k = " + str(k) +"===================\n")
        for row in df4_group.collect():
            for oneTweet in row.tweets_texts:
                outf.write(oneTweet + '\n')
            outf.write(str(row.num_tweets))
            outf.write('\n\n')
                            
        print('transformed')
        print()


k = 0
df4 cached
row number 500
row number 1000
row number 1500
row number 2000
row number 2500
row number 3000
row number 3500
row number 4000
row number 4500
row number 5000
row number 5500
row number 6000
row number 6500
row number 7000
row number 7500
row number 8000
row number 8500
row number 9000
We coloured some tweets! Way to go!
transformed

k = 1
df4 cached
row number 500
row number 1000
row number 1500
row number 2000
row number 2500
row number 3000
row number 3500
row number 4000
row number 4500
row number 5000
row number 5500
row number 6000
row number 6500
row number 7000
row number 7500
row number 8000
row number 8500
row number 9000
row number 9500
row number 10000
We coloured some tweets! Way to go!
transformed

k = 2
df4 cached
row number 500
row number 1000
row number 1500
row number 2000
row number 2500
row number 3000
row number 3500
row number 4000
row number 4500
row number 5000
row number 5500
row number 6000
row number 6500
row number 7000
row number 7500
row n

row number 9000
We coloured some tweets! Way to go!
transformed

k = 23
df4 cached
row number 500
row number 1000
row number 1500
row number 2000
row number 2500
row number 3000
row number 3500
row number 4000
row number 4500
row number 5000
row number 5500
row number 6000
row number 6500
row number 7000
row number 7500
row number 8000
row number 8500
row number 9000
row number 9500
We coloured some tweets! Way to go!
transformed

k = 24
df4 cached
row number 500
row number 1000
row number 1500
row number 2000
row number 2500
row number 3000
row number 3500
row number 4000
row number 4500
row number 5000
row number 5500
row number 6000
row number 6500
row number 7000
row number 7500
row number 8000
row number 8500
row number 9000
We coloured some tweets! Way to go!
transformed

k = 25
df4 cached
row number 500
row number 1000
row number 1500
row number 2000
row number 2500
row number 3000
row number 3500
row number 4000
row number 4500
row number 5000
row number 5500
row number 6000
ro

In [13]:
import pickle
with open("edges1.pickle", 'wb') as f:
    pickle.dump(edges, f)
with open("edges1.vertices.pickle", 'wb') as f:
    pickle.dump(edges.vertices, f)

In [9]:
df1.show()

+--------------------+---+
|               value| id|
+--------------------+---+
|"- –û—Ö, –ê–Ω–µ—á–∫–∞, –µ—Å...|  0|
|—Ç–≤–æ–π –ø—É—Ç—å (–í–ª–∞–¥–∏–º...|  1|
|[user] –Ø —É–∂ —Ä—É–∫–∏ ...|  2|
|[user] [user] [us...|  3|
|[user] –£ –º–µ–Ω—è –Ω–µ—Ç...|  4|
|  [user] –¢–∞–∫ —á—Ç–æ, –¥–∞|  5|
|[user] [user] [us...|  6|
|–ï—â—ë –æ–¥–Ω–∞ –Æ–ª—è, —Ç–æ–ª...|  7|
|[user] –Ø –ø–µ—Ä–µ–º–∞–∑–∞...|  8|
|[user] –≠—Ç–æ —á—Ç–æ —â–∞...|  9|
|—á—É–≤–∞–∫ —Ö–æ—á–µ—Ç –∫–æ–∂–∞–Ω...| 10|
|                –û–ö–ï–ô| 11|
|—Å—É–∫–∞ –∑–Ω–∞–µ—Ç–µ —á—Ç–æ –∫...| 12|
|–£–∫—Ä–∞–∏–Ω—Ü–µ–≤ –∂–¥–µ—Ç –Ω–æ...| 13|
|    #–ó–¥–æ—Ä–æ–≤—å–µ [link]| 14|
|–ø–æ–±–µ–π—Ç–µ –º–µ–Ω—è –µ—Å–ª–∏...| 15|
|–Ω–∞–¥–µ—é—Å—å –≤—Å–µ –ø—Ä–∞–≤–∏...| 16|
|–ó–∞ 2 –Ω–µ–¥–µ–ª–∏ –¥–æ —Å–¥...| 17|
|–û–π –∑–Ω–∞–∫–æ–º–æ–µ —á—Ç–æ-—Ç...| 18|
|2 –º–∏–Ω –∏ –≤—ã–∫–ª. –ù—É ...| 19|
+--------------------+---+
only showing top 20 rows



In [10]:

def add_color(ind):
    return edges.find(ind) 

add_color_udf = udf(add_color, IntegerType())

df11 = df1\
    .select(
        df1.value.alias("tweet"),
        add_color_udf(df1.id).alias("color")
    )\
    .groupBy("color")\
    .agg(sqlf.collect_list("tweet").alias("tweets"))
df11.show()

+-----+--------------------+
|color|              tweets|
+-----+--------------------+
|  463|[–ù–µ —Å–ø—Ä–∞—à–∏–≤–∞–π—Ç–µ —Ñ...|
| 1829|[—Ö–æ—á—É –∑–µ–ª—ë–Ω—ã–µ –≤–æ–ª...|
| 2866|[–ó–∞–≤—Ç—Ä–∞ –±—ã –¥–æ–º–æ–π ...|
| 3749|        [–¶–µ–Ω–∞129–≥—Ä–Ω]|
| 4900|[[user] –¶–∞—Ä–∏–∑–º –Ω–µ...|
| 5518|[–î–æ —Å–ª—ë–∑, –∞—Ö–∞—Ö–∞—Ö ...|
| 6336|[–±–¥–æ —Å –Ω—É–ª—è.........|
| 6357|[–ß–µ–ª–æ–≤–µ–∫ –∏–Ω—Ç–µ—Ä–µ—Å–Ω...|
| 6620|[–∏ –≤–æ—Ç —ç—Ç–∞ –¥–≤–∏–Ω—É—Ç...|
| 7880|[–Ω–∞–¥–µ—é—Å—å, —è –∫–æ–≥–¥–∞...|
| 7982|[–Ω–∞—Å—Ç—Ä–æ–µ–Ω–∏–µ –æ—Ç—á–∏–ª...|
| 7993|[–ù–∏–∂–Ω–∏–π —Å–ø—Ä–∞–≤–∞ –º–æ...|
| 9852|[–ø—Ä–æ–±–ª–µ–º—ã –≤–µ–∑–¥–µ –æ...|
|10206|[–º–∞–º–∞ –≤—Å—Ç–∞–ª–∞ –≤ —Ç—É...|
|11458|[–ö–æ–ø–µ–π–∫–∏ –±—É–¥—É –Ω–µ ...|
|12027|        [(–ø—Ä–æ—Å—Ç–∏—Ç–µ)]|
|13285|[–û—Ç–Ω—ã–Ω–µ ‚Äì –ª–∏—á–Ω—ã–π ...|
|13289|[–ï—Å—Ç—å luxury girl...|
|15447|               [‚Äîü•Ä]|
|15790|[–∏ –∑–Ω–∞–∫–æ–º—ã–µ –∏ –Ω–µ–∑...|
+-----+--------------------+
only showing top 20 rows



In [11]:
with open("results/similar tweets.txt", "w") as outf:
    for row in df11.collect():
        tweetsCount = len(row.tweets)
        if tweetsCount <= 1:
            continue
        outf.write("============ color: " + str(row.color) + ", tweets: " + str(tweetsCount) + "===========\n")
        for tweet in row.tweets:
            outf.write(tweet)
            outf.write("\n")
        outf.write("\n\n")
        

In [1]:
df2_with_lengths.show()

NameError: name 'df2_with_lengths' is not defined

In [3]:
import matplotlib.pyplot as plt

dictOfDoubles = {}
with open("results/similar tweets.txt", "r") as doublesFile:
    for line in doublesFile:
        lineStripped = line.strip()
        if lineStripped.startswith("============ color: "):
            endpoint1 = lineStripped.find(", tweets:")
            #print(colour)
            colour = int(lineStripped[20:endpoint1])
            lineHalf = lineStripped[20:]
            startpoint2 = endpoint1 + 9
            endpoint2 = 20 + lineHalf.find("===========")
            tweets = int(lineStripped[startpoint2:endpoint2])
            #print(tweets)
            dictOfDoubles[colour] = tweets

keys = list(dictOfDoubles.keys())
vals = list(dictOfDoubles.values())
plt.hist(vals, bins=list(range(100))) # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 50, 100])
plt.yscale('log')
plt.xscale('log')
plt.savefig("log plot.pdf")
plt.show()



ImportError: No module named 'matplotlib'