# Setup

In [312]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *

from scipy.special import gdtr
import uuid
from datetime import datetime as dt

In [313]:
# master_url = "spark://master:7077"
master_url = 'local[*]' # to see print()s in imported modules

spk = SparkSession.builder.master(master_url).getOrCreate()
sc = spk.sparkContext
sc.setLogLevel('WARN')

In [314]:
from pymongo import MongoClient

uri_str = 'mongodb://mongo:27017'
mongo = MongoClient(uri_str)
db_name = 'rancor'
mongo_ds = 'com.mongodb.spark.sql'

In [344]:
# current window
start_time = 1491751418000
end_time = 1491752017999

# prior start time
priors_start_time = start_time - 604800000

min_posts = 10
threshold = 0.7

# case sensitive?
posts_name = 'socialMediaPost'
priors_name = 'postsCluster'

posts_uri = dict(uri=uri_str, database=db_name, collection=posts_name)
priors_uri = dict(uri=uri_str, database=db_name, collection=priors_name)

In [380]:
# load smposts
df = spk.read.load(format=mongo_ds, **posts_uri).select('_id', 'featurizer', 'hashtags', 'lang', 'post_id', 'timestamp_ms')
df = df\
    .where(df['timestamp_ms'] >= start_time)\
    .where(df['timestamp_ms'] <= end_time)\
    .where(df['featurizer'] == 'hashtag')

df.count()

316

In [381]:
# load postsclusters
pr_df = spk.read.load(format=mongo_ds, **priors_uri)\
.select('_id', 'end_time_ms', 'data_type', 'term', 'similar_post_ids', 'similar_ids', 'stats')

pr_df = pr_df\
    .where(pr_df['end_time_ms'] >= priors_start_time)\
    .where(pr_df['end_time_ms'] <= start_time)\
    .where(pr_df['data_type'] == 'hashtag')

pr_df.count()

3

In [37]:
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- featurizer: string (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- lang: string (nullable = true)
 |-- post_id: string (nullable = true)
 |-- timestamp_ms: double (nullable = true)



# Playground for existing code

In [38]:
from hashtag_similarity import HashtagClusters
hash_clust = HashtagClusters(10, 'http://172.17.0.1:3000/api/postsclusters', start_time)

In [39]:
bc_hc = sc.broadcast(hash_clust)

In [40]:
def process_vector(doc_id, post_id, tags, hc):
    hc.value.process_vector(doc_id, post_id, tags)
    return hc.value.get_clusters()

In [41]:
r = df.rdd.map(lambda post: process_vector(post._id, post.post_id, post.hashtags, bc_hc))

In [42]:
r.count()

227

In [43]:
r.first()

{'egyptian': {'similar_ids': [Row(oid='58ea4d53d645272bcbbeade8'),
   Row(oid='58ea4d62d645272bcbbeae1d'),
   Row(oid='58ea4da7d645272bcbbeaf68'),
   Row(oid='58ea4dd3d645272bcbbeb04b'),
   Row(oid='58ea4e29d645272bcbbeb18c'),
   Row(oid='58ea4e6dd645272bcbbeb29b'),
   Row(oid='58ea4e99d645272bcbbeb325'),
   Row(oid='58ea4eb0d645272bcbbeb37c'),
   Row(oid='58ea4ee7d645272bcbbeb465'),
   Row(oid='58ea4ef4d645272bcbbeb490')],
  'similar_post_ids': ['851088256687120385',
   '851088315315040257',
   '851088609142861825',
   '851088793037885440',
   '851089153576009728',
   '851089439879184384',
   '851089624344666112',
   '851089717735174144',
   '851089947817906177',
   '851090004646481922'],
  'stats': {'is_unlikely': 1,
   'likelihood': 0.99997407097566393,
   'prior_alpha': 1e-05,
   'prior_beta': 1,
   'total_posts': 228}},
 'palmsunday': {'similar_ids': [Row(oid='58ea4d53d645272bcbbeade8'),
   Row(oid='58ea4da7d645272bcbbeaf68'),
   Row(oid='58ea4dd3d645272bcbbeb04b'),
   Row(oid='58

# Use Spark SQL instead

In [382]:
# one row for each hashtag in smposts
df1 = df.select(col('_id.oid').alias('_id'), 
                'lang', 
                'post_id', 
                explode(df['hashtags']).alias('term'))
df1.show(10)

+--------------------+----+------------------+----------------+
|                 _id|lang|           post_id|            term|
+--------------------+----+------------------+----------------+
|58ea51fed645272bc...|  en|851093268754554881|         Lunchrm|
|58ea51fed645272bc...|  en|851093268754554881|             MSF|
|58ea51fed645272bc...|  en|851093268754554881|     Leveluplife|
|58ea5204d645272bc...|  en|851093291034701825|KathradaMemorial|
|58ea5204d645272bc...|  en|851093290804027396|      PalmSunday|
|58ea5204d645272bc...|  en|851093290804027396|           AMJOY|
|58ea5204d645272bc...|  en|851093290804027396|        Egyptian|
|58ea5204d645272bc...|  en|851093291454038017|      StanleyCup|
|58ea5207d645272bc...|  en|851093306700419075|             김준수|
|58ea5207d645272bc...|  en|851093306700419075|             XIA|
+--------------------+----+------------------+----------------+
only showing top 10 rows



In [383]:
# collect post_ids, _ids for each term 
df2 = df1\
    .groupBy(lower(df1['term']).alias('lterm'))\
    .agg(collect_set('post_id').alias('similar_post_ids'),
        collect_set('_id').alias('similar_ids'))

df2.show(100)

+--------------------+--------------------+--------------------+
|               lterm|    similar_post_ids|         similar_ids|
+--------------------+--------------------+--------------------+
|educrisisconverntion|[851095206732734465]|[58ea53ccd645272b...|
|               assad|[851094449887232000]|[58ea5318d645272b...|
|           australia|[851094146932555776]|[58ea52d0d645272b...|
|           stockholm|[8510939917561036...|[58ea52abd645272b...|
|    islamicterrorism|[851095389457580034]|[58ea53f8d645272b...|
|          milesdavis|[851095728005033984]|[58ea5449d645272b...|
|       photoshooting|[851095480218071042]|[58ea540dd645272b...|
|                 rss|[851095271442460679]|[58ea53dcd645272b...|
|            arras100|[851094392266006530]|[58ea530ad645272b...|
| chfilsummerroadtour|[851093908142473216]|[58ea5297d645272b...|
|       stagepresence|[851093792841224196]|[58ea527bd645272b...|
|                army|[851094187210661889]|[58ea52d9d645272b...|
|              erotic|[85

In [384]:
df2

DataFrame[lterm: string, similar_post_ids: array<string>, similar_ids: array<string>]

In [385]:
# join hashtag->posts with postsclusters on term
df3 = df2.join(pr_df, df2['lterm'] == pr_df['term'], 'left_outer')\
.select('lterm', 
        pr_df['similar_post_ids'].alias('pr_sim_post_ids'), 
        col('stats.total_posts').alias('pr_posts_count'))

df3.show(10)

df3.where(df3['pr_posts_count'] > 0).show()

+--------------------+---------------+--------------+
|               lterm|pr_sim_post_ids|pr_posts_count|
+--------------------+---------------+--------------+
|educrisisconverntion|           null|          null|
|               assad|           null|          null|
|           australia|           null|          null|
|           stockholm|           null|          null|
|    islamicterrorism|           null|          null|
|          milesdavis|           null|          null|
|       photoshooting|           null|          null|
|                 rss|           null|          null|
|            arras100|           null|          null|
| chfilsummerroadtour|           null|          null|
+--------------------+---------------+--------------+
only showing top 10 rows

+----------+--------------------+--------------+
|     lterm|     pr_sim_post_ids|pr_posts_count|
+----------+--------------------+--------------+
|  egyptian|[8510882566871203...|           273|
|  egyptian|[851091071

In [389]:
# term -> count priors posts
df4 = df3.select('lterm', 
                 col('pr_posts_count').cast(IntegerType()), 
                 size('pr_sim_post_ids').alias('pr_sim_posts_count'))\
    .na.fill(0)
    
df4.sort('lterm').show(10)

+-------------+--------------+------------------+
|        lterm|pr_posts_count|pr_sim_posts_count|
+-------------+--------------+------------------+
| 13reasonswhy|             0|                -1|
|    17century|             0|                -1|
|         1mdb|             0|                -1|
|      aag2017|             0|                -1|
|        abc15|             0|                -1|
|aceuvalvagyok|             0|                -1|
|        actor|             0|                -1|
|    actualite|             0|                -1|
|          afd|             0|                -1|
|     airforce|             0|                -1|
+-------------+--------------+------------------+
only showing top 10 rows



In [390]:
# term -> priors counts
df5 = df4.groupBy('lterm').sum('pr_posts_count', 'pr_sim_posts_count')
df5.show()

+--------------------+-------------------+-----------------------+
|               lterm|sum(pr_posts_count)|sum(pr_sim_posts_count)|
+--------------------+-------------------+-----------------------+
|educrisisconverntion|                  0|                     -1|
|               assad|                  0|                     -1|
|           australia|                  0|                     -1|
|           stockholm|                  0|                     -1|
|    islamicterrorism|                  0|                     -1|
|          milesdavis|                  0|                     -1|
|       photoshooting|                  0|                     -1|
|                 rss|                  0|                     -1|
|            arras100|                  0|                     -1|
| chfilsummerroadtour|                  0|                     -1|
|       stagepresence|                  0|                     -1|
|                army|                  0|                    

In [391]:
# join hashtag->posts with priors counts
df6 = df5\
.join(df2, df5['lterm'] == df2['lterm'], 'left_outer')\
.withColumn('curr_posts_count', lit(df.count()))\
.drop(df5['lterm'])

df6.show(10)

+-------------------+-----------------------+--------------------+--------------------+--------------------+----------------+
|sum(pr_posts_count)|sum(pr_sim_posts_count)|               lterm|    similar_post_ids|         similar_ids|curr_posts_count|
+-------------------+-----------------------+--------------------+--------------------+--------------------+----------------+
|                  0|                     -1|educrisisconverntion|[851095206732734465]|[58ea53ccd645272b...|             316|
|                  0|                     -1|               assad|[851094449887232000]|[58ea5318d645272b...|             316|
|                  0|                     -1|           australia|[851094146932555776]|[58ea52d0d645272b...|             316|
|                  0|                     -1|           stockholm|[8510939917561036...|[58ea52abd645272b...|             316|
|                  0|                     -1|    islamicterrorism|[851095389457580034]|[58ea53f8d645272b...|          

In [392]:
# count curr sim posts
df61 = df6.withColumn('curr_sim_posts_count', size(df6['similar_post_ids']))
df61.show(10)

+-------------------+-----------------------+--------------------+--------------------+--------------------+----------------+--------------------+
|sum(pr_posts_count)|sum(pr_sim_posts_count)|               lterm|    similar_post_ids|         similar_ids|curr_posts_count|curr_sim_posts_count|
+-------------------+-----------------------+--------------------+--------------------+--------------------+----------------+--------------------+
|                  0|                     -1|educrisisconverntion|[851095206732734465]|[58ea53ccd645272b...|             316|                   1|
|                  0|                     -1|               assad|[851094449887232000]|[58ea5318d645272b...|             316|                   1|
|                  0|                     -1|           australia|[851094146932555776]|[58ea52d0d645272b...|             316|                   1|
|                  0|                     -1|           stockholm|[8510939917561036...|[58ea52abd645272b...|          

In [393]:
# calc lam
def lam(a, b):
    return str(float(a)/b)

u_lam = udf(lam)

df62 = df61.withColumn('lam', u_lam(df61['curr_sim_posts_count'], df61['curr_posts_count']))
df62.show()

+-------------------+-----------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+
|sum(pr_posts_count)|sum(pr_sim_posts_count)|               lterm|    similar_post_ids|         similar_ids|curr_posts_count|curr_sim_posts_count|                 lam|
+-------------------+-----------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+
|                  0|                     -1|educrisisconverntion|[851095206732734465]|[58ea53ccd645272b...|             316|                   1|0.003164556962025...|
|                  0|                     -1|               assad|[851094449887232000]|[58ea5318d645272b...|             316|                   1|0.003164556962025...|
|                  0|                     -1|           australia|[851094146932555776]|[58ea52d0d645272b...|             316|                   1|0.003164556962

In [394]:
# calc likelihood scores. wrapper for gdtr scipy func.
def likelihood(a, b, c):
    # edge case: no previous posts (prob on 1st clustering run in system)
    if a is None:
        return '1'
    print(a, b, c)
    out = gdtr(a, b, float(c))
    # stringify numpy output for pickling
    return str(out)
    
u_likelihood = udf(likelihood)

df7 = df62\
.withColumn('likelihood', u_likelihood(df62['sum(pr_posts_count)'], 
                                       df62['sum(pr_sim_posts_count)'], 
                                       df62['lam']))

df7.show()

+-------------------+-----------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+
|sum(pr_posts_count)|sum(pr_sim_posts_count)|               lterm|    similar_post_ids|         similar_ids|curr_posts_count|curr_sim_posts_count|                 lam|likelihood|
+-------------------+-----------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+
|                  0|                     -1|educrisisconverntion|[851095206732734465]|[58ea53ccd645272b...|             316|                   1|0.003164556962025...|       0.0|
|                  0|                     -1|               assad|[851094449887232000]|[58ea5318d645272b...|             316|                   1|0.003164556962025...|       0.0|
|                  0|                     -1|           australia|[851094146932555776]|[58ea52d0d645272b.

In [395]:
# unlikely?
def unlikely(v):
    if float(v) > threshold:
        return 1
    else:
        return 0
u_unlikely = udf(unlikely)

df8 = df7\
.withColumn('is_unlikely', u_unlikely(df7['likelihood']))

df8.show(10)

+-------------------+-----------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+-----------+
|sum(pr_posts_count)|sum(pr_sim_posts_count)|               lterm|    similar_post_ids|         similar_ids|curr_posts_count|curr_sim_posts_count|                 lam|likelihood|is_unlikely|
+-------------------+-----------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+-----------+
|                  0|                     -1|educrisisconverntion|[851095206732734465]|[58ea53ccd645272b...|             316|                   1|0.003164556962025...|       0.0|          0|
|                  0|                     -1|               assad|[851094449887232000]|[58ea5318d645272b...|             316|                   1|0.003164556962025...|       0.0|          0|
|                  0|                     -1|

In [396]:
df8.where(df8['is_unlikely']!='1').show()


+-------------------+-----------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+-----------+
|sum(pr_posts_count)|sum(pr_sim_posts_count)|               lterm|    similar_post_ids|         similar_ids|curr_posts_count|curr_sim_posts_count|                 lam|likelihood|is_unlikely|
+-------------------+-----------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+-----------+
|                  0|                     -1|educrisisconverntion|[851095206732734465]|[58ea53ccd645272b...|             316|                   1|0.003164556962025...|       0.0|          0|
|                  0|                     -1|               assad|[851094449887232000]|[58ea5318d645272b...|             316|                   1|0.003164556962025...|       0.0|          0|
|                  0|                     -1|

In [397]:
# build stats nested object
def get_stats(a, b, c, d, e):
    return {'likelihood': (a), 'is_unlikely': (b), 
            'total_posts': (c), 'prior_alpha': (d), 'prior_beta': (e or 0)}

# string -> string map, to play nicely with mongo. (FloatType has no matching BsonValue)
u_get_stats = udf(get_stats, MapType(StringType(), StringType()))

df9 = df8.withColumn('stats', (u_get_stats(df8['likelihood'], 
                                           df8['is_unlikely'], 
                                           df8['curr_posts_count'], 
                                           df8['sum(pr_sim_posts_count)'], 
                                           df8['sum(pr_posts_count)'])))
print(df9.take(1)[0].stats)
df9.show()

{'prior_alpha': '-1', 'likelihood': '0.0', 'total_posts': '316', 'prior_beta': '0', 'is_unlikely': '0'}
+-------------------+-----------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+-----------+--------------------+
|sum(pr_posts_count)|sum(pr_sim_posts_count)|               lterm|    similar_post_ids|         similar_ids|curr_posts_count|curr_sim_posts_count|                 lam|likelihood|is_unlikely|               stats|
+-------------------+-----------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+-----------+--------------------+
|                  0|                     -1|educrisisconverntion|[851095206732734465]|[58ea53ccd645272b...|             316|                   1|0.003164556962025...|       0.0|          0|Map(likelihood ->...|
|                  0|                     -1|   

In [398]:
# cleanup all records before saving
df10 = df9.withColumnRenamed('lterm', 'term')\
.drop('sum(pr_posts_count)',
    'sum(pr_sim_posts_count)',
    'curr_sim_posts_count',
    'lam',
    'curr_posts_count',
    'is_unlikely',
    'likelihood')\
.withColumn('start_time_ms', lit(start_time))\
.withColumn('end_time_ms', lit(end_time))\
.withColumn('data_type', lit('hashtag'))\
.withColumn('created', lit(dt.now()))\
.withColumn('_id', lit(str(uuid.uuid4())))

# .withColumn('job_monitor_id', lit('TODO'))\
# .withColumn('similar_ids', lit('TODO'))\

df10.show()

+--------------------+--------------------+--------------------+--------------------+-------------+-------------+---------+--------------------+--------------------+
|                term|    similar_post_ids|         similar_ids|               stats|start_time_ms|  end_time_ms|data_type|             created|                 _id|
+--------------------+--------------------+--------------------+--------------------+-------------+-------------+---------+--------------------+--------------------+
|educrisisconverntion|[851095206732734465]|[58ea53ccd645272b...|Map(likelihood ->...|1491751418000|1491752017999|  hashtag|2017-04-21 18:56:...|4039b73e-80dc-429...|
|               assad|[851094449887232000]|[58ea5318d645272b...|Map(likelihood ->...|1491751418000|1491752017999|  hashtag|2017-04-21 18:56:...|4039b73e-80dc-429...|
|           australia|[851094146932555776]|[58ea52d0d645272b...|Map(likelihood ->...|1491751418000|1491752017999|  hashtag|2017-04-21 18:56:...|4039b73e-80dc-429...|
|   

In [399]:
df11 = df10.filter(size(df10['similar_post_ids']) >= min_posts)
df11.limit(1).show()
df11.count()

+--------+--------------------+--------------------+--------------------+-------------+-------------+---------+--------------------+--------------------+
|    term|    similar_post_ids|         similar_ids|               stats|start_time_ms|  end_time_ms|data_type|             created|                 _id|
+--------+--------------------+--------------------+--------------------+-------------+-------------+---------+--------------------+--------------------+
|egyptian|[8510937091395256...|[58ea5234d645272b...|Map(likelihood ->...|1491751418000|1491752017999|  hashtag|2017-04-21 18:56:...|4039b73e-80dc-429...|
+--------+--------------------+--------------------+--------------------+-------------+-------------+---------+--------------------+--------------------+



5

In [400]:
df11.limit(1).write.format(mongo_ds).mode('append').options(**priors_uri).save()