In [1]:
import os
import socket
# from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, length, when, col
# from pyspark.sql.types import BooleanType, IntegerType, LongType, StringType, ArrayType, FloatType, StructType, StructField
import pyspark.sql.functions as F
# from pyspark.sql.functions import pandas_udf
# from pyspark.sql.functions import PandasUDFType
from jinja2 import Environment, FileSystemLoader

In [2]:
# setting constants
APP_NAME = "LAB-1-ANTONOV-A"
NORMALIZED_APP_NAME = APP_NAME.replace('/', '_').replace(':', '_')

APPS_TMP_DIR = os.path.join(os.getcwd(), "tmp")
APPS_CONF_DIR = os.path.join(os.getcwd(), "conf")
APPS_LOGS_DIR = os.path.join(os.getcwd(), "logs")
LOG4J_PROP_FILE = os.path.join(APPS_CONF_DIR, "pyspark-log4j-{}.properties".format(NORMALIZED_APP_NAME))
LOG_FILE = os.path.join(APPS_LOGS_DIR, 'pyspark-{}.log'.format(NORMALIZED_APP_NAME))
EXTRA_JAVA_OPTIONS = "-Dlog4j.configuration=file://{} -Dspark.hadoop.dfs.replication=1 -Dhttps.protocols=TLSv1.0,TLSv1.1,TLSv1.2,TLSv1.3"\
    .format(LOG4J_PROP_FILE)

LOCAL_IP = socket.gethostbyname(socket.gethostname())

# preparing configuration files from templates
for directory in [APPS_CONF_DIR, APPS_LOGS_DIR, APPS_TMP_DIR]:
    if not os.path.exists(directory):
        os.makedirs(directory)

env = Environment(loader=FileSystemLoader('/opt'))
template = env.get_template("pyspark_log4j.properties.template")
template\
    .stream(logfile=LOG_FILE)\
    .dump(LOG4J_PROP_FILE)

#running spark
SPARK_ADDRESS = "local[4]"

spark = SparkSession\
    .builder\
    .appName("lab-1-antonov-app")\
    .master(SPARK_ADDRESS)\
    .config("spark.ui.port", "4040")\
    .config("spark.memory.fraction", "0.8")\
    .config("spark.memory.storageFraction", "0.6")\
    .config("spark.driver.memory", "4g")\
    .config("spark.driver.extraJavaOptions", EXTRA_JAVA_OPTIONS)\
    .config("spark.executor.memory", "6g")\
    .getOrCreate()

In [3]:
# printing important urls and pathes
print("Web UI: {}".format(spark.sparkContext.uiWebUrl))
print("\nlog4j file: {}".format(LOG4J_PROP_FILE))
print("\ndriver log file: {}".format(LOG_FILE))

Web UI: http://lab-1-antonov-74ff5f8d58-87xbt:4040

log4j file: /home/jovyan/work/conf/pyspark-log4j-LAB-1-ANTONOV-A.properties

driver log file: /home/jovyan/work/logs/pyspark-LAB-1-ANTONOV-A.log


## Step 0 
Read the data

In [4]:
posts_df = spark.read.json("file:///home/jovyan/datasets/bigdata20/posts_api.json")
posts_likes_df = spark.read.parquet("file:///home/jovyan/datasets/bigdata20/posts_likes.parquet")
followers_df = spark.read.parquet("file:///home/jovyan/datasets/bigdata20/followers.parquet")
followers_posts_df = spark.read.json("file:///home/jovyan/datasets/bigdata20/followers_posts_api_final.json")
followers_posts_likes_df = spark.read.parquet("file:///home/jovyan/datasets/bigdata20/followers_posts_likes.parquet")

## Step 1

Top 20 posts in the group: (a) by likes; (b) by comments; (c) by reposts.

### a) Top 20 posts by likes

In [5]:
posts_df.select('id', col('likes.count').name('likes'))\
.orderBy("likes", ascending = False)\
.limit(20).toPandas()

Unnamed: 0,id,likes
0,32022,1637
1,35068,1629
2,17492,1516
3,18526,1026
4,19552,955
5,41468,952
6,19419,868
7,29046,824
8,32546,786
9,24085,765


### b) Top 20 posts by comments

In [6]:
posts_df.groupby("id")\
.agg(F.count('comments').name("comments_count"))\
.withColumnRenamed("id", "post_id")\
.orderBy("comments_count", ascending = False)\
.limit(20)\
.toPandas()

Unnamed: 0,post_id,comments_count
0,1463,1
1,18880,1
2,34618,1
3,4072,1
4,1532,1
5,3120,1
6,28241,1
7,37117,1
8,12402,1
9,3034,1


### c) Top 20 posts by reposts

In [7]:
posts_df.select('id', col('reposts.count').name('reposts'))\
.orderBy('reposts', ascending = False)\
.limit(20)\
.toPandas()

Unnamed: 0,id,reposts
0,17492,334
1,19552,246
2,32022,210
3,11842,129
4,19419,126
5,13532,110
6,17014,105
7,35068,101
8,41266,92
9,12593,90


## Step 2
Top 20 users by (a) likes and (b) reposts they have made

### a) Top 20 users by likes

In [8]:
posts_likes_df.groupby('likerId')\
.agg(F.count('itemId').name('likes'))\
.withColumnRenamed("likerId", "user_id")\
.orderBy('likes', ascending = False)\
.limit(20)\
.toPandas()

Unnamed: 0,user_id,likes
0,2070090,4801
1,2397858,2055
2,1475301,1829
3,18239,1569
4,546612,1245
5,6371,907
6,1841959,746
7,78440957,709
8,120248,699
9,40981497,611


### b) Top 20 users by the number of reposts

In [9]:
posts_df.groupby(col('copy_history.owner_id').alias('user_id'))\
.agg(F.count(col('copy_history')).alias('reposts_count'))\
.orderBy('reposts_count', ascending = False)\
.select("user_id", "reposts_count")\
.where(col("user_id")[0] > 0 )\
.limit(20)\
.toPandas()

Unnamed: 0,user_id,reposts_count
0,[18239],20
1,[1398],3
2,[53083705],2
3,[252574],2
4,[1700430],1
5,[3716921],1
6,[3776500],1
7,[19069415],1
8,[14170688],1
9,[156180717],1


## Step 3
Get reposts of the original posts of the itmo group (posts.json) from user posts

In [10]:
itmo_posts = followers_posts_df.select('owner_id', col('copy_history')['id'][0].name('orig_post_id'))\
.where(col('copy_history')['owner_id'][0] == -94)\
.withColumnRenamed('owner_id', 'user_id')\
.groupby('orig_post_id')\
.agg(F.collect_list('user_id').name('users'))\

itmo_posts.printSchema()


itmo_posts.limit(10).toPandas()

root
 |-- orig_post_id: long (nullable = true)
 |-- users: array (nullable = false)
 |    |-- element: long (containsNull = false)



Unnamed: 0,orig_post_id,users
0,41424,"[1475301, 282843035, 172808182]"
1,42388,[180907432]
2,39407,"[8082648, 2031644]"
3,39719,[89417157]
4,38963,"[6591522, 15641504, 172808182, 377805819, 4778..."
5,40011,"[1546152, 1098272, 317799]"
6,39259,"[86937823, 159562593, 135556, 28405519, 253204..."
7,40084,[268247082]
8,42009,"[443525857, 281951154]"
9,41506,"[139081799, 29899117, 537403451, 527580876, 52..."


## Step 4
Emojies

In [11]:
# download emoji library
import sys
!{sys.executable} -m pip install --user --trusted-host pypi-registry.supplementary-services.svc.cluster.local --index http://pypi-registry.supplementary-services.svc.cluster.local:8080 emoji

Looking in indexes: http://pypi-registry.supplementary-services.svc.cluster.local:8080


In [12]:
import emoji
from pyspark.sql.types import ArrayType, StringType
import pandas as pd

### Extracting emojies from posts

Lets first of all look at emojies by posts -- each row of the DataFrame is a post and all of the emojies in that post

In [13]:
# A UDF function that searchs for emojies in post's text
@udf(returnType=ArrayType(StringType())) 
def emojies_in_text(text):
    results = []
    for match in emoji.get_emoji_regexp().finditer(text):
        results.append(match.group())
    return results

emojies_in_posts = posts_df\
.where("text <> ''").select("id", 'text')\
.withColumn('emojies', emojies_in_text("text"))\
.where("size(emojies) > 0")\
.select(col("id").alias("Post id"), "emojies", F.size("emojies").alias("Emojies count"))\
.orderBy(F.desc(F.size("emojies")))

emojies_in_posts.limit(10).toPandas()

Unnamed: 0,Post id,emojies,Emojies count
0,30016,"[‼, ❗, ❗, ‼, 🇫🇷, 🇫🇷, 🔴, 🔴, 🇨🇱, 🇨🇱, 🔵, 🇨🇳, 🇨🇳, ...",18
1,33011,"[❗, ❗, ❗, 🔆, 📸, ⛱, ❤, 👨, 👩, 👧, 👦, 🍦, 🎹, 🎷]",14
2,33749,"[🌎, 👩🏼‍💻, 👨🏻‍💻, 🖋, ✔, ✔, ✔, ✔, 🔵, 🔴, 🔴, 🔷, ⁉]",13
3,34251,"[📽, 🎞, 🎬, ⬇, ⬇, ⬇, 🕖, 📍, 🎬, 🕖, 📍]",11
4,33557,"[❗, ❗, 🔹, 🔹, 🔹, 🔹, ▪, ▪, ▪, ▪, ▪]",11
5,15697,"[✔, ✔, ✔, ✔, ✔, ✔, ✔, ✔, ✔, ✔, ✔]",11
6,15550,"[✔, ✔, ✔, ✔, ✔, ✔, ✔, ✔, ✔, ✔]",10
7,39682,"[🤔, 📌, 🔹, 🔹, 🔹, 🔹, 🔹, 🔹, 🔹, 🔹]",10
8,34896,"[✒, 📉, 🎓, 🔴, 🔴, 📩, ✏, 📈, 🎓, 🔵]",10
9,34359,"[🔴, 🔴, 🔴, 🔵, 🔵, 🔵, 🔴, 🔴, 🔴]",9


### Top 10 most common emojies in all of the posts

In [14]:
emojies_in_posts\
.select(F.explode("emojies").alias("emoji"))\
.groupBy("emoji").count()\
.orderBy(F.desc("count")).limit(10).toPandas()

Unnamed: 0,emoji,count
0,🔥,76
1,⚡,68
2,📍,63
3,🚀,50
4,❤,47
5,❗,45
6,⬇,40
7,🔵,40
8,✔,40
9,💙,38


### Counting count of emojies per post

Lets count all the emojies inside each post. In the DataFrame below you can see how many times each emoji apeared in the given post.

In [15]:
# A UDF function that counts how many times each emoji apeared in the post
# Return type here is String, that we will parse later
@udf
def count_by_post(emojies):
    counts = {}
    for emoji in emojies:
        if emoji in counts.keys(): counts[emoji] += 1
        else: counts[emoji] = 1
    result = ""
    for key, value in counts.items():
        result += f"{key}:{value},"
    return result[:len(result)-1]

counts_by_posts = emojies_in_posts\
.select("Post Id", count_by_post("emojies").alias("counts_per_post"))\
.orderBy(F.desc(F.length("counts_per_post")))

counts_by_posts.limit(10).toPandas()

Unnamed: 0,Post Id,counts_per_post
0,33011,"❗:3,🔆:1,📸:1,⛱:1,❤:1,👨:1,👩:1,👧:1,👦:1,🍦:1,🎹:1,🎷:1"
1,33749,"🌎:1,👩🏼‍💻:1,👨🏻‍💻:1,🖋:1,✔:4,🔵:1,🔴:2,🔷:1,⁉:1"
2,30016,"‼:2,❗:2,🇫🇷:2,🔴:3,🇨🇱:2,🔵:1,🇨🇳:2,🔷:4"
3,34896,"✒:1,📉:1,🎓:2,🔴:2,📩:1,✏:1,📈:1,🔵:1"
4,34495,"🇦🇹:1,🏔:1,📚:1,🎉:1,🇬🇧:1,🇩🇪:1,🗯:1"
5,36315,"📢:1,🎁:1,☝:1,⏰:1,📍:1,📝:1,📌:1"
6,30952,"👓:1,🍦:1,🎯:1,🔧:1,✋:1,📍:1,🕛:1"
7,34575,"🇩🇪:3,🔎:1,💿:1,⏰:1,👌🏻:1,👍🏼:1"
8,33562,"💥:1,🏆:1,🎁:1,🎮:1,💃🏼:1,❗:1"
9,34211,"✈:1,🇰🇷:1,🏫:1,🌏:1,🥋:1,☀:1"


### Counting frequency, avg count per post and difference between count and frequency

Now that we have all the counts, we can count frequency of each emoji, average count per post and difference between ovarall count and frequency. We loop through the dataframe and parse preiously mentioned 'counts' string. 

'overall_count' dictionary stores how many times given emoji was in the text, 'freqs' dictionary stores frequencies for each emoji.

Later we create a DataFrame that stores emoji, overall count, frequency, average count per post and the difference between overall count and frequency.

In [16]:
overall_count = {}
freqs = {}

for index, row in counts_by_posts.toPandas().iterrows():
    counts = [(itm[0], itm[2]) for itm in row[1].split(",")] 
    for e, c in counts:
        try: c_int = int(c)
        except: continue
        
        if e in overall_count.keys(): overall_count[e] += c_int
        else: overall_count[e] = c_int
            
        if e in freqs.keys(): freqs[e] += 1
        else: freqs[e] = 1

post_freqs = pd.DataFrame(columns=['Emoji', 'Count', 'Frequency', 'Average count per post', 'Difference'])


for key in overall_count.keys():
    e = key
    c = overall_count[key]
    f = freqs[key]
    a = overall_count[key] / freqs[key]
    
    post_freqs = post_freqs.append({
        'Emoji': e, 
        'Count': c, 
        'Frequency': f, 
        'Average count per post': a, 
        'Difference':c-f }, ignore_index=True)

### Top 5 emojies with greatest difference between overall count and frequency.

In [17]:
post_freqs.sort_values('Difference', ascending=False).head(5)

Unnamed: 0,Emoji,Count,Frequency,Average count per post,Difference
0,❗,45,20,2.25,25
55,⬇,40,16,2.5,24
34,📍,63,39,1.615385,24
15,🔵,40,17,2.352941,23
132,🔹,30,7,4.285714,23


### Top 5 emojies with greatest average count per post

In [18]:
post_freqs.sort_values('Average count per post', ascending=False).head(5)

Unnamed: 0,Emoji,Count,Frequency,Average count per post,Difference
132,🔹,30,7,4.285714,23
133,▪,21,5,4.2,16
20,✒,7,2,3.5,5
207,📷,3,1,3.0,2
242,🥧,3,1,3.0,2


## Step 5
Probable “fans”. For each user find the top 10 other users whose posts this user likes.

In [19]:
# Defining a function to display top n fans of the given user
def fans_of(user, n=10):
    return followers_posts_likes_df.groupBy("likerId", "ownerId")\
        .agg(F.count("ownerId").name("likes"))\
        .where('ownerId == {}'.format(user))\
        .withColumnRenamed("likerId", 'fan')\
        .orderBy('likes', ascending = False)\
        .limit(n)

In [20]:
# picking a particular user_id
user_id = 27419

# getting a list of fans for that user
fans_of(user_id, 10).toPandas()

Unnamed: 0,fan,ownerId,likes
0,1925168,27419,6
1,529276371,27419,4
2,9383,27419,4
3,1622246,27419,3
4,95884146,27419,3
5,100187585,27419,3
6,6866116,27419,3
7,24147,27419,3
8,291831320,27419,3
9,422720,27419,3


## Step 6
Probable friends. If two users like each others posts they may be friends. Find pairs of users where both users are top likers of each other.

Probable friends are a pair of users such that they are in top N of each other fan lists. Lets write a function to check that.

Looking at the lists of fans for user1 and user 2:

In [21]:
user1 = 24147
user2 = 27419

In [22]:
fans_of(user1, 10).toPandas()

Unnamed: 0,fan,ownerId,likes
0,331853691,24147,49
1,31609798,24147,44
2,529276371,24147,43
3,431349700,24147,33
4,24147,24147,25
5,34587765,24147,24
6,188799,24147,21
7,13898859,24147,19
8,70922560,24147,15
9,10260310,24147,15


In [23]:
fans_of(user2, 10).toPandas()

Unnamed: 0,fan,ownerId,likes
0,1925168,27419,6
1,529276371,27419,4
2,9383,27419,4
3,1622246,27419,3
4,95884146,27419,3
5,100187585,27419,3
6,6866116,27419,3
7,24147,27419,3
8,291831320,27419,3
9,422720,27419,3


Final function to check if two users are friends:

In [24]:
def areFriends(user1, user2, n=10):
    top_fans_of_user_1 = [row.asDict()['fan'] for row in fans_of(user1, n).select('fan').collect()]
    top_fans_of_user_2 = [row.asDict()['fan'] for row in fans_of(user2, n).select('fan').collect()]
    
    return user1 in top_fans_of_user_2 and user2 in top_fans_of_user_1

Lets check the function behaviour:

In [25]:
areFriends(user1, user2)

False

Indeed user2 has user1 in his list of fans, but not the other way around!

In [26]:
from pyspark.sql.window import *
from pyspark.sql.functions import row_number

Lets create a Dataframe where we have top 10 likers for each user

In [27]:
w = Window.partitionBy("ownerId").orderBy(F.desc('count'), 'likerId')

dfTop = followers_posts_likes_df.groupBy('ownerId', 'likerId')\
.agg(F.count('likerId').name('count'))\
.withColumn("rn", row_number().over(w))\
.where(col("rn") < 11)

Here are first two people and their top 10 fans

In [28]:
dfTop.limit(20).toPandas()

Unnamed: 0,ownerId,likerId,count,rn
0,13832,216785,14,1
1,13832,135660,13,2
2,13832,71831300,10,3
3,13832,218958,6,4
4,13832,155630,5,5
5,13832,14964847,5,6
6,13832,25211791,5,7
7,13832,519411613,5,8
8,13832,17661,4,9
9,13832,97492,3,10


Lets now join the DataFrame onto itself, getting only people who are in top 10 lists of one another i.e. who are probable friends. 

In [29]:
prob_friends_df = dfTop.alias("a")\
.join(dfTop.alias("b"), [(col('b.likerId') == col('a.ownerId')) & (col('a.likerId') == col('b.ownerId'))], 'inner')\
.select(col('a.likerId'), col('a.ownerId'))\
.where(col('a.likerId') != col('a.ownerId'))

prob_friends_df.limit(5).toPandas()

Unnamed: 0,likerId,ownerId
0,168438070,90898752
1,209077977,272076217
2,49894967,56706631
3,40147706,173546700
4,180062188,205353671


We can check the results by calling our 'areFriends' function

In [30]:
areFriends(168438070, 90898752)

True

In [31]:
areFriends(209077977, 272076217)

True

In [32]:
areFriends(180062188, 205353671)

True

In [33]:
spark.stop()