In [1]:
import findspark
findspark.init()


In [2]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import sys
from settings import data_folder,parquet_extension
import os
import pyspark.sql.functions as f

sc = SparkContext("local", "test")
sql_context = SQLContext(sc)

# Part 1

In [None]:
def read_parquet(x):
    file = x+parquet_extension
    path = os.path.join(data_folder,file)
    return sql_context.read.parquet(path)

1) count of comments, posts (all), original posts, reposts and likes made by user


In [6]:
file = 'userWallPosts.parquet'
path = os.path.join(data_folder,file)
posts = sql_context.read.parquet(path)
all_posts_count_by_user = posts.groupBy("from_id").count()
print('Count posts (all):')
all_posts_count_by_user.show()

print('Count original post:')
original_posts = posts.filter(f.col("from_id") ==  f.col("owner_id")).groupBy("from_id").count()
original_posts.show()

print('Count likes:')
file = 'userWallLikes.parquet'
path = os.path.join(data_folder,file)
likes = sql_context.read.parquet(path)
like_group = likes.groupBy("likerId").count()
like_group.show()

print('Count comments:')
file = 'userWallComments.parquet'
path = os.path.join(data_folder,file)
comments = sql_context.read.parquet(path)
comment_group = comments.groupBy("from_id").count()
comment_group.show()


Count posts (all):
+---------+-----+
|  from_id|count|
+---------+-----+
| 35891340|    2|
|351757168|  374|
|  1086513|    1|
| 63009707|    3|
|346826732|    1|
| 31132484|    1|
|329708654|    1|
|263602644|    1|
| 19298137|    1|
| 50426099|    1|
|190211473|    1|
| 20903647|    2|
| 10455365|    1|
|321746548|    3|
|167801567|    1|
|179213185|    2|
|189314728|    2|
|  1268702|    2|
|   477628|    1|
|177025789|    1|
+---------+-----+
only showing top 20 rows

Count original post:
+-------+-----+
|from_id|count|
+-------+-----+
|  71097|   41|
| 158464|   12|
| 248753|   85|
| 294695|   12|
| 340803|  225|
| 370791|   66|
| 374522|   44|
| 430369|  105|
| 470076|   63|
| 562162|   57|
| 931861|   59|
| 955394|  169|
| 982718|   86|
|1003101|   28|
|1042947|   70|
|1085213|  123|
|1463020|   73|
|1498648|   50|
|1673954|   54|
|1824008|    7|
+-------+-----+
only showing top 20 rows

Count likes:
+---------+-----+
|  likerId|count|
+---------+-----+
|186421714|    7|
| 15216

2) count of friends, groups, followers

In [11]:
parts = ['groupsProfiles','friends','followers']
paths = list(map(lambda x:os.path.join(data_folder, x + parquet_extension), parts))

def get_part(path):
    parquet = sql_context.read.parquet(path)
    if path.find(parts[0]) !=-1:
        return parquet.select('id').count()
    else:
        return parquet.select('profile').count()


parts_count = list(map(lambda x:get_part(x),paths))
for i,p in enumerate(parts_count):
    print('Count of '+parts[i]+' '+str(p))

Count of groupsProfiles 2082472
Count of friends 100439465
Count of followers 92567996


3) count of videos, audios, photos, gifts

In [15]:
from pyspark.sql.functions import col

ile = 'friendsProfiles.parquet'
path = os.path.join(data_folder,file)
parquet = sql_context.read.parquet(path)

def select(x):
    return parquet.where(col('counters.'+x).isNotNull()).count()


names = 'videos audios photos gifts'
names = names.split()


result = list(map(select,names))
for i,p in enumerate(result):
    print('Count of '+names[i]+' '+str(p))

Count of videos 56
Count of audios 56
Count of photos 56
Count of gifts 33


4) count of "incoming" (made by other users) comments, max and mean "incoming" comments per post

In [26]:
from pyspark.sql import functions as F

info = 'userWallComments'
parquet = sql_context.read.parquet(os.path.join(data_folder,info+parquet_extension))
parquet = parquet.where(col('from_id')!=col('post_owner_id'))

comment_group = parquet.groupby('post_id').agg(F.count('id'))
print('count all')
comment_group.agg(F.sum('count(id)')).show()
print('mean')
comment_group.agg(F.mean('count(id)')).show()
print('min')
comment_group.agg(F.min('count(id)')).show()
print('max')
comment_group.agg(F.max('count(id)')).show()


count all
+--------------+
|sum(count(id))|
+--------------+
|        206158|
+--------------+

mean
+-----------------+
|   avg(count(id))|
+-----------------+
|5.797143017828018|
+-----------------+

min
+--------------+
|min(count(id))|
+--------------+
|             1|
+--------------+

max
+--------------+
|max(count(id))|
+--------------+
|          1658|
+--------------+



5) count of "incoming" likes, max and mean "incoming" likes per post

In [27]:
file = 'userWallLikes.parquet'
path = os.path.join(data_folder,file)
parquet = sql_context.read.parquet(path)

data = parquet.filter(parquet.likerId != parquet.ownerId)
data_group = data.groupby('itemId').agg(F.count('itemType'))

print('count all')
data_group.agg(F.sum('count(itemType)')).show()
print('mean')
data_group.agg(F.mean('count(itemType)')).show()
print('min')
data_group.agg(F.min('count(itemType)')).show()
print('max')
data_group.agg(F.max('count(itemType)')).show()


count all
+--------------------+
|sum(count(itemType))|
+--------------------+
|             6466783|
+--------------------+

mean
+--------------------+
|avg(count(itemType))|
+--------------------+
|   52.76120815553942|
+--------------------+

min
+--------------------+
|min(count(itemType))|
+--------------------+
|                   1|
+--------------------+

max
+--------------------+
|max(count(itemType))|
+--------------------+
|               35649|
+--------------------+



6) count of geo tagged posts

In [29]:
file = 'userWallPosts.parquet'
path = os.path.join(data_folder,file)
parquet = sql_context.read.parquet(path)

parquet.filter(parquet.geo.isNotNull()).count()

14603

7) count of open / closed (e.g. private) groups a user participates in

In [33]:
file = 'groupsProfiles.parquet'
path = os.path.join(data_folder,file)
groupsProfiles = sql_context.read.parquet(path)

file = 'userGroupsSubs.parquet'
path = os.path.join(data_folder,file)
userGroupsSubs = sql_context.read.parquet(path)

groupsProfiles = groupsProfiles.select('key','is_closed')

df1_r = groupsProfiles.select(*(col(x).alias(x + '_df1') for x in groupsProfiles.columns))
df2_r = userGroupsSubs.select(*(col(x).alias(x + '_df2') for x in userGroupsSubs.columns))

temp = df2_r.join(df1_r, col('group_df2') == col('key_df1'))  #q
print('Count groups per user where is_closed = 1')
temp.groupby('user_df2').agg(F.sum('is_closed_df1')).show()
print('Count groups per user where is_closed = 0')
temp.groupby('user_df2').agg(F.count('is_closed_df1')-F.sum('is_closed_df1')).show()

Count groups per user where is_closed = 1
+---------+------------------+
| user_df2|sum(is_closed_df1)|
+---------+------------------+
|231446297|                 0|
|324793578|               315|
| 31599746|                 0|
|283155188|                 0|
| 61368718|                 1|
| 70063923|                36|
|278576875|                 0|
|  3937539|                 0|
| 46798936|                 0|
|281287603|                 0|
|  7117016|                 0|
| 34540597|                 0|
| 87449362|                 0|
|141719570|                 0|
|142459547|                16|
|192959832|                 0|
| 49368505|                 0|
| 32932278|                 0|
| 22790730|                 0|
|176822243|                 0|
+---------+------------------+
only showing top 20 rows

Count groups per user where is_closed = 0
+---------+-------------------------------------------+
| user_df2|(count(is_closed_df1) - sum(is_closed_df1))|
+---------+-----------------------

# Part 2

1) count of reposts from subscribed and not-subscribed groups


In [20]:
from pyspark.sql.functions import countDistinct

userGroupsSubs = read_parquet('userGroupsSubs')
userGroupsSubs = userGroupsSubs.withColumn("group_int", userGroupsSubs["group"].cast("int"))

userWallPosts = read_parquet('userWallPosts')
userWallPosts = userWallPosts.filter(userWallPosts.is_reposted).filter(userWallPosts.repost_info.orig_owner_id<0)
userWallPosts = userWallPosts.select('repost_info.orig_owner_id','repost_info.orig_from_id','is_reposted','id')


answer = userWallPosts.join(userGroupsSubs,col('orig_owner_id')==col('group_int'),how='leftouter')
answer_sub = answer.filter(answer.orig_from_id==answer.user)
# answer_sub.show()
print('count of reposts from not-subscribed groups')
answer_sub = answer_sub.select('id').distinct().count()
print(answer_sub)
print('count of reposts from subscribed groups')
print(answer.select('id').distinct().count()-answer_sub)


count of reposts from not-subscribed groups
87
count of reposts from subscribed groups
100115


2) count of deleted users in friends and followers

In [37]:
friends_profiles = read_parquet('friendsProfiles')
friends = read_parquet('friends')

deleted_friends = friends_profiles.filter(friends_profiles.deactivated == 'deleted').select('id')
answer = friends.join(deleted_friends, col('follower') == col('id'))
print('count of deleted users in friends and followers')
answer.count()

count of deleted users in friends and followers


1261990

3) aggregate (e.g. count, max, mean) characteristics for comments and likes (separtely) made by (a) friends and (b) followers per post

In [4]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col


In [16]:
user_wall_comments = read_parquet('userWallComments')
friends = read_parquet('friends')

changed_user_wall_comments = user_wall_comments.withColumn("post_owner_id_int", user_wall_comments["post_owner_id"].cast("int"))
changed_user_wall_comments = changed_user_wall_comments.withColumn("from_id_int", changed_user_wall_comments["from_id"].cast("int"))
changed_friends = friends.withColumn("profile_int", friends["profile"].cast("int"))
changed_friends = changed_friends.withColumn("follower_int", changed_friends["follower"].cast("int"))
changed_friends = changed_friends.select('profile_int','follower_int')
temp = changed_user_wall_comments.select('post_owner_id_int','from_id_int','post_id')
answer = temp.join(changed_friends, col('post_owner_id_int') == col('profile_int'),how='leftouter')

# Only friends and followers
answer = answer.filter(answer.from_id_int==answer.follower_int)

changed_friends = changed_friends.withColumn('new_profile',changed_friends['profile_int'])
changed_friends = changed_friends.withColumn('new_follower',changed_friends['follower_int'])
changed_friends = changed_friends.select('new_profile','new_follower')
final_answer = answer.join(changed_friends, col('from_id_int') == col('new_profile'),how='leftouter')
# friends answer
friends_final_answer_comments = final_answer.filter(final_answer.new_follower == final_answer.post_owner_id_int)
# followers answer
follower_final_answer_comments = final_answer.filter(final_answer.new_follower != final_answer.post_owner_id_int)


#Aggregate
friends_final_answer_group = friends_final_answer_comments.groupby('post_id')
friend_post = friends_final_answer_group.agg(F.count('post_owner_id_int'))
friend_post = friend_post.withColumn('result',friend_post['count(post_owner_id_int)'])

follower_final_answer_group = follower_final_answer_comments.groupby('post_id')
follower_post = follower_final_answer_group.agg(F.count('post_owner_id_int'))
follower_post = follower_post.withColumn('result',follower_post['count(post_owner_id_int)'])


print('max comments made by friends')
row1 = friend_post.agg({"result": "max"}).collect()[0]
print(row1)
print('min comments made by friends')
row1 = friend_post.agg({"result": "min"}).collect()[0]
print(row1)

print('max comments made by followers')
row1 = follower_post.agg({"result": "max"}).collect()[0]
print(row1)
print('min comments made by followers')
row1 = follower_post.agg({"result": "min"}).collect()[0]
print(row1)


max comments made by friends
Row(max(result)=15)
min comments made by friends
Row(min(result)=1)
max comments made by followers
Row(max(result)=5176)
min comments made by followers
Row(min(result)=1)


The same for likes

In [17]:
userWallLikes = read_parquet('userWallLikes')
friends = read_parquet('friends')

# changed_user_wall_likes = userWallLikes.withColumn("post_owner_id_int", userWallLikes["post_owner_id"].cast("int"))
# changed_user_wall_likes = changed_user_wall_likes.withColumn("from_id_int", changed_user_wall_comments["from_id"].cast("int"))
changed_friends = friends.withColumn("profile_int", friends["profile"].cast("int"))
changed_friends = changed_friends.withColumn("follower_int", changed_friends["follower"].cast("int"))
changed_friends = changed_friends.select('profile_int','follower_int')
temp = userWallLikes.select('likerId','ownerId','itemId')
answer = temp.join(changed_friends, col('ownerId') == col('profile_int'),how='leftouter')

# Only friends and followers
answer = answer.filter(answer.likerId==answer.follower_int)

changed_friends = changed_friends.withColumn('new_profile',changed_friends['profile_int'])
changed_friends = changed_friends.withColumn('new_follower',changed_friends['follower_int'])
changed_friends = changed_friends.select('new_profile','new_follower')
final_answer = answer.join(changed_friends, col('likerId') == col('new_profile'),how='leftouter')
# friends answer
friends_final_answer_likes = final_answer.filter(final_answer.new_follower == final_answer.ownerId)
# followers answer
follower_final_answer_likes = final_answer.filter(final_answer.new_follower != final_answer.ownerId)


#Aggregate
friends_final_answer_group = friends_final_answer_likes.groupby('itemId')
friend_post = friends_final_answer_group.agg(F.count('ownerId'))
friend_post = friend_post.withColumn('result',friend_post['count(ownerId)'])

follower_final_answer_group = follower_final_answer_likes.groupby('itemId')
follower_post = follower_final_answer_group.agg(F.count('ownerId'))
follower_post = follower_post.withColumn('result',follower_post['count(ownerId)'])


print('max likes made by friends')
row1 = friend_post.agg({"result": "max"}).collect()[0]
print(row1)
print('min likes made by friends')
row1 = friend_post.agg({"result": "min"}).collect()[0]
print(row1)

print('max likes made by followers')
row1 = follower_post.agg({"result": "max"}).collect()[0]
print(row1)
print('min likes made by followers')
row1 = follower_post.agg({"result": "min"}).collect()[0]
print(row1)


max likes made by friends
Row(max(result)=33)
min likes made by friends
Row(min(result)=1)
max likes made by followers
Row(max(result)=14519)
min likes made by followers
Row(min(result)=1)


4) aggregate (e.g. count, max, mean) characteristics for comments and likes (separtely) made by (a) friends and (b) followers per user


In [18]:
# Используя прошлый пункт 
#likes
friends_final_answer_group_user = friends_final_answer_likes.groupby('ownerId')
count = friends_final_answer_group_user.agg(F.count('itemId'))

print('max likes made by friends per user')
row1 = count.agg({"count(itemId)": "max"}).collect()[0]
print(row1)
print('min likes made by friends per user')
row1 = count.agg({"count(itemId)": "min"}).collect()[0]
print(row1)

followers_final_answer_group_user = follower_final_answer_likes.groupby('ownerId')
count = followers_final_answer_group_user.agg(F.count('itemId'))

print('max likes made by followers per user')
row1 = count.agg({"count(itemId)": "max"}).collect()[0]
print(row1)
print('min likes made by followers per user')
row1 = count.agg({"count(itemId)": "min"}).collect()[0]
print(row1)


max likes made by friends per user
Row(max(count(itemId))=4327)
min likes made by friends per user
Row(min(count(itemId))=1)
max likes made by followers per user
Row(max(count(itemId))=769510)
min likes made by followers per user
Row(min(count(itemId))=1)


In [19]:
#comments
friends_final_answer_group_user = friends_final_answer_comments.groupby('post_owner_id_int')
count = friends_final_answer_group_user.agg(F.count('post_id'))

print('max comments made by friends per post')
row1 = count.agg({"count(post_id)": "max"}).collect()[0]
print(row1)
print('min comments made by friends per post')
row1 = count.agg({"count(post_id)": "min"}).collect()[0]
print(row1)

followers_final_answer_group_user = follower_final_answer_comments.groupby('post_owner_id_int')
count = followers_final_answer_group_user.agg(F.count('post_id'))

print('max comments made by followers per post')
row1 = count.agg({"count(post_id)": "max"}).collect()[0]
print(row1)
print('min comments made by followers per post')
row1 = count.agg({"count(post_id)": "min"}).collect()[0]
print(row1)


max comments made by friends per post
Row(max(count(post_id))=330)
min comments made by friends per post
Row(min(count(post_id))=1)
max comments made by followers per post
Row(max(count(post_id))=91350)
min comments made by followers per post
Row(min(count(post_id))=1)


In [None]:
# emoji

5) find emoji (separately, count of: all, negative, positive, others) in (a) user's posts (b) user's comments  


In [90]:
from ast import literal_eval

with open('emoji.txt') as f:
    text = f.read()
    emoji = literal_eval(tew)

pos_em = filter(lambda x: x['polarity'] >0 ,emoji)
pos_em = list(map(lambda x:x['emoji'],pos_em))
neu_em = filter(lambda x: x['polarity'] ==0 ,emoji)
neu_em = list(map(lambda x:x['emoji'],neu_em))
neg_em = filter(lambda x: x['polarity'] <0 ,emoji)
neg_em = list(map(lambda x:x['emoji'],neg_em))




In [91]:
from pyspark.sql.functions import udf

@udf('int')
def count_neg(text):
    count = 0
    for char in text:
        if char in neg_em:
            count +=1
    return count

@udf('int')
def count_pos(text):
    count = 0
    for char in text:
        if char in pos_em:
            count +=1
    return count

@udf('int')
def count_neu(text):
    count = 0
    for char in text:
        if char in neu_em:
            count +=1
    return count



In [92]:
userWallPosts = read_parquet('userWallPosts')
userWallPosts = userWallPosts.withColumn('neg_count', count_neg(userWallPosts.text))
userWallPosts = userWallPosts.withColumn('neu_count', count_neu(userWallPosts.text))
userWallPosts = userWallPosts.withColumn('pos_count', count_pos(userWallPosts.text))

print('Count negative emojis in posts')
print(userWallPosts.select('neg_count').groupBy().sum().collect()[0][0])
print('Count positive emojis in posts')
print(userWallPosts.select('pos_count').groupBy().sum().collect()[0][0])
print('Count neutral emojis in posts')
print(userWallPosts.select('neu_count').groupBy().sum().collect()[0][0])

Count negative emojis in posts
10422
Count positive emojis in posts
116817
Count neutral emojis in posts
1628


In [94]:
user_wall_comments = read_parquet('userWallComments')
user_wall_comments = user_wall_comments.withColumn('neg_count', count_neg(user_wall_comments.text))
user_wall_comments = user_wall_comments.withColumn('neu_count', count_neu(user_wall_comments.text))
user_wall_comments = user_wall_comments.withColumn('pos_count', count_pos(user_wall_comments.text))

print('Count negative emojis in comments')
print(user_wall_comments.select('neg_count').groupBy().sum().collect()[0][0])
print('Count positive emojis in comments')
print(user_wall_comments.select('pos_count').groupBy().sum().collect()[0][0])
print('Count neutral emojis in comments')
print(user_wall_comments.select('neu_count').groupBy().sum().collect()[0][0])

Count negative emojis in comments
10329
Count positive emojis in comments
74293
Count neutral emojis in comments
1344
