In [102]:
import os
import socket
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import types as T, functions as F, SparkSession
from pyspark.sql.window import Window
from pyspark import StorageLevel
from jinja2 import Environment, FileSystemLoader
import datetime
import json

In [2]:
LOGIN = ""  # Your gateway.st login
APP_NAME = "LAB1"  # Any name for your Spark-app

In [3]:
NORMALIZED_APP_NAME = APP_NAME.replace('/', '_').replace(':', '_').replace(' ', '_').replace('\\', '_')

APPS_TMP_DIR = os.path.join(os.getcwd(), "tmp")
APPS_CONF_DIR = os.path.join(os.getcwd(), "conf")
APPS_LOGS_DIR = os.path.join(os.getcwd(), "logs")
LOG4J_PROP_FILE = os.path.join(APPS_CONF_DIR, "pyspark-log4j-{}.properties".format(NORMALIZED_APP_NAME))
LOG_FILE = os.path.join(APPS_LOGS_DIR, 'pyspark-{}.log'.format(NORMALIZED_APP_NAME))
EXTRA_JAVA_OPTIONS = (
    "-Dlog4j.configuration=file://{} "
    "-Dspark.hadoop.dfs.replication=1 "
    "-Dhttps.protocols=TLSv1.0,TLSv1.1,TLSv1.2,TLSv1.3"
    .format(LOG4J_PROP_FILE)
)

LOCAL_IP = socket.gethostbyname(socket.gethostname())

In [4]:
for directory in [APPS_CONF_DIR, APPS_LOGS_DIR, APPS_TMP_DIR]:
    if not os.path.exists(directory):
        os.makedirs(directory)

env = Environment(loader=FileSystemLoader('/opt'))
template = env.get_template("pyspark_log4j.properties.template")
template.stream(logfile=LOG_FILE).dump(LOG4J_PROP_FILE)

In [5]:
spark = (
    SparkSession
    .builder
    .appName(APP_NAME)
    
    # Master URI/configuration
    .master("k8s://https://10.32.7.103:6443")
    
    .config("spark.driver.host", LOCAL_IP)
    
    # Web-UI port for your Spark-app
    .config("spark.ui.port", "4040")
    .config("spark.driver.bindAddress", "0.0.0.0")
    
    # How many CPU cores allocate to driver process
    .config("spark.driver.cores", "2")
    
    # How many RAM allocate to driver process
    .config("spark.driver.memory", "4g")
    
    # How many executors to create
    .config("spark.executor.instances", "3")
    
    # How many CPU cores allocate to each executor
    .config("spark.executor.cores", '2')
    
    # How many RAM allocate to each executor
    .config("spark.executor.memory", "4g")
    
    # How many extra RAM allocate to each executor pod to handle with JVM overheads
    # Total pod RAM = 'spark.executor.memory' + ('spark.executor.memory' * 'spark.kubernetes.memoryOverheadFactor')
    .config("spark.kubernetes.memoryOverheadFactor", "0.2")
    
    # How many RAM from the pool allocate to store the data
    # Additional info: https://spark.apache.org/docs/latest/tuning.html#memory-management-overview
    .config("spark.memory.fraction", "0.6")
    .config("spark.memory.storageFraction", "0.5")
    
    .config("spark.network.timeout", "180s")
    .config("spark.sql.autoBroadcastJoinThreshold", "-1")
    .config("spark.driver.extraJavaOptions", EXTRA_JAVA_OPTIONS)
    
    # Namespace to create executor pods. You are allowed to create pods only in your own namespace
    .config("spark.kubernetes.namespace", LOGIN)
    
    # Extra labels to your driver/executor pods in Kubernetes
    .config("spark.kubernetes.driver.label.appname", APP_NAME)
    .config("spark.kubernetes.executor.label.appname", APP_NAME)
    
    # Spark executor image
    .config("spark.kubernetes.container.image", f"node03.st:5000/spark-executor:{LOGIN}")

    .config("spark.kubernetes.container.image.pullPolicy", "Always")
    
    # If true - delete completed/failed pods. 
    # If your executors goes down you can set 'false' to check logs and troubleshoot your app.
    .config("spark.kubernetes.executor.deleteOnTermination", "true")
    
    .config("spark.local.dir", "/tmp/spark")
    .getOrCreate()
)

In [6]:
DATA_PATH = "hdfs:///shared/bigdata20"

# Datasets

In [7]:
itmo_posts_df = spark.read.json(f"{DATA_PATH}/posts_api.json")

In [8]:
itmo_posts_df.printSchema()

root
 |-- attachments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- album: struct (nullable = true)
 |    |    |    |-- created: long (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- owner_id: long (nullable = true)
 |    |    |    |-- size: long (nullable = true)
 |    |    |    |-- thumb: struct (nullable = true)
 |    |    |    |    |-- access_key: string (nullable = true)
 |    |    |    |    |-- album_id: long (nullable = true)
 |    |    |    |    |-- date: long (nullable = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- owner_id: long (nullable = true)
 |    |    |    |    |-- sizes: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |-- type: string (nullable = true)
 |   

In [9]:
followers_posts_likes_df = spark.read.parquet(f"{DATA_PATH}/followers_posts_likes.parquet")

In [10]:
followers_posts_likes_df.printSchema()

root
 |-- itemType: string (nullable = true)
 |-- ownerId: integer (nullable = true)
 |-- itemId: integer (nullable = true)
 |-- likerId: integer (nullable = true)



In [11]:
followers_posts_df = spark.read.json(f"{DATA_PATH}/followers_posts_api_final.json")

In [12]:
followers_posts_df.printSchema()

root
 |-- attachments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- album: struct (nullable = true)
 |    |    |    |-- created: long (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- owner_id: long (nullable = true)
 |    |    |    |-- size: long (nullable = true)
 |    |    |    |-- thumb: struct (nullable = true)
 |    |    |    |    |-- access_key: string (nullable = true)
 |    |    |    |    |-- album_id: long (nullable = true)
 |    |    |    |    |-- date: long (nullable = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- lat: double (nullable = true)
 |    |    |    |    |-- long: double (nullable = true)
 |    |    |    |    |-- owner_id: long (nullable = true)
 |    |    |    |    |-- sizes: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    

# Tasks

### Task 1a  
Find the top posts by it's likes count

In [254]:
def task_1a(df: "pyspark.sql.dataframe.DataFrame", 
            F: "pyspark.sql.functions") -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DaraFrame here
    modified_df = df.select(F.col('id').alias('post_id'), F.col('likes.count').alias('likes_count'))\
                    .orderBy(F.col("likes_count").desc(), F.col('post_id').asc())
    return modified_df

In [255]:
result_1a = task_1a(
    df=itmo_posts_df,
    F=F
)
result_1a.show(10)

+-------+-----------+
|post_id|likes_count|
+-------+-----------+
|  32022|       1637|
|  35068|       1629|
|  17492|       1516|
|  18526|       1026|
|  19552|        955|
|  41468|        952|
|  19419|        868|
|  29046|        824|
|  32546|        786|
|  24085|        765|
+-------+-----------+
only showing top 10 rows



### Task 1b  
Find the top posts by it's comments count

In [252]:
def task_1b(df: "pyspark.sql.dataframe.DataFrame", 
            F: "pyspark.sql.functions") -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DaraFrame here
    modified_df = df.select(F.col('id').alias('post_id'), F.col('comments.count').alias('comments_count'))\
                    .orderBy(F.col("comments_count").desc(), F.col('post_id').asc())
    return modified_df

In [253]:
result_1b = task_1b(
    df=itmo_posts_df,
    F=F
)
result_1b.show(10)

+-------+--------------+
|post_id|comments_count|
+-------+--------------+
|  24085|           850|
|  22540|           250|
|  27722|           192|
|   8285|           148|
|  26860|           113|
|  13571|           107|
|  39294|           104|
|  36680|            96|
|  26006|            92|
|  41739|            92|
+-------+--------------+
only showing top 10 rows



### Task 1c  
Find the top of posts by it's reposts count

In [250]:
def task_1c(df: "pyspark.sql.dataframe.DataFrame", 
            F: "pyspark.sql.functions") -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DaraFrame here
    modified_df = df.select(F.col('id').alias('post_id'), F.col('reposts.count').alias('reposts_count'))\
                    .orderBy(F.col("reposts_count").desc(), F.col('post_id').asc())
    return modified_df

In [251]:
result_1c = task_1c(
    df=itmo_posts_df,
    F=F
)
result_1c.show(10)

+-------+-------------+
|post_id|reposts_count|
+-------+-------------+
|  17492|          334|
|  19552|          246|
|  32022|          210|
|  11842|          129|
|  19419|          126|
|  13532|          110|
|  17014|          105|
|  35068|          101|
|  41266|           92|
|  12593|           90|
+-------+-------------+
only showing top 10 rows



### Task 2a  
Find the top users by likes they are received

In [248]:
def task_2a(df: "pyspark.sql.dataframe.DataFrame", 
            F: "pyspark.sql.functions") -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DaraFrame here
    modified_df = df.groupby(['ownerId']).count().orderBy(F.col('count').desc(), F.col('ownerId').asc())
    return modified_df

In [30]:
followers_posts_likes_df.select('itemType').distinct().show()

+--------+
|itemType|
+--------+
|    post|
+--------+



In [249]:
result_2a = task_2a(
    df=followers_posts_likes_df,
    F=F
)
result_2a.show(10)

+---------+-----+
|  ownerId|count|
+---------+-----+
|289390075|82297|
|   327458|57697|
|119920644|57084|
|273486249|54882|
| 25317378|48425|
|150371150|44686|
|187877260|35507|
|   715211|30346|
|180124822|25364|
| 17885170|24749|
+---------+-----+
only showing top 10 rows



### Task 2b  
Find the top users by reposts they are made

In [232]:
def task_2b(df: "pyspark.sql.dataframe.DataFrame", 
            F: "pyspark.sql.functions") -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DaraFrame here
    modified_df = followers_posts_df.select('owner_id', F.col('copy_history.owner_id').alias("copy_owner_id"))\
                    .filter("copy_owner_id is not null")\
                    .groupBy(['owner_id']).count()\
                    .orderBy(F.col('count').desc(), F.col('owner_id').asc())
    return modified_df

In [234]:
followers_posts_df.select('owner_id', F.col('copy_history.owner_id').alias("copy_owner_id"))\
    .filter("copy_owner_id is not null")\
    .groupBy(['owner_id']).count()\
    .sort('count', ascending=False)\
    .show(10)

+---------+-----+
| owner_id|count|
+---------+-----+
|  2547211|37742|
|357231922|23349|
|168543860|18429|
| 25646344|11122|
|176861294| 9022|
|524656784| 7242|
|    29840| 7164|
|143207077| 7161|
|141687240| 6804|
|459339006| 6741|
+---------+-----+
only showing top 10 rows



In [235]:
result_2b = task_2b(
    df=followers_posts_df,
    F=F
)
result_2b.show(10)

+---------+-----+
| owner_id|count|
+---------+-----+
|  2547211|37742|
|357231922|23349|
|168543860|18429|
| 25646344|11122|
|176861294| 9022|
|524656784| 7242|
|    29840| 7164|
|143207077| 7161|
|141687240| 6804|
|459339006| 6741|
+---------+-----+
only showing top 10 rows



# Task 3

Find user's posts that are reposted from ITMO group and collect ids of such posts in the list

In [51]:
itmo_posts_df.select('owner_id').distinct().show()

+--------+
|owner_id|
+--------+
|    null|
|     -94|
+--------+



In [65]:
followers_posts_df.select('owner_id', F.col('copy_history.id')[0].alias('post_id'))\
            .filter('post_id is not null')\
            .show(10)

+--------+---------+
|owner_id|  post_id|
+--------+---------+
|   94494|   182379|
|   45781|   141579|
|   45781|  1356537|
|   45781|456239168|
|   45781|   159808|
|   45781|   954308|
|   45781|        2|
|   45781|   220617|
|   45781|456239030|
|   45781|        2|
+--------+---------+
only showing top 10 rows



In [228]:
def task_3(df: "pyspark.sql.dataframe.DataFrame", 
           F: "pyspark.sql.functions") -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DaraFrame here
    modified_df = df.select('id', F.col('copy_history.id')[0].alias('group_post_id'), 
                            F.col('copy_history.owner_id')[0].alias('owner'))\
                    .filter('group_post_id is not null')\
                    .where(F.col('owner') == -94)\
                    .groupby('group_post_id')\
                    .agg(F.array_sort(F.collect_list('id')).alias('user_post_ids'), F.count('id').alias('reposts_count'))\
                    .orderBy(F.col('reposts_count').desc(), F.col('group_post_id').asc())
    return modified_df

In [229]:
result_3 = task_3(
    df=followers_posts_df,
    F=F
)
result_3.show(10)

+-------------+--------------------+-------------+
|group_post_id|       user_post_ids|reposts_count|
+-------------+--------------------+-------------+
|        41266|[26, 53, 87, 88, ...|           30|
|        41468|[89, 390, 400, 44...|           25|
|        42482|[264, 713, 1190, ...|           10|
|    456239414|[25, 1063, 1266, ...|           10|
|        40090|[32, 349, 463, 13...|            9|
|        38740|[185, 1060, 1133,...|            8|
|        39259|[822, 1205, 1492,...|            8|
|        41207|[958, 1288, 2960,...|            6|
|        41546|[666, 939, 1161, ...|            6|
|        41721|[8, 274, 2801, 38...|            6|
+-------------+--------------------+-------------+
only showing top 10 rows



# Task 4

Find top positive emojis, top neutral emojis, top negative emojis

In [187]:
with open("/home/jovyan/shared-data/bigdata20/emojis_sentiment.json", "r") as f:
    emojis_sentiment_dict = json.loads(f.read())

In [211]:
emojis_sentiment_dict['🔻']

'positive'

In [185]:
import emoji

# Find emojies in text
@F.udf(returnType=T.ArrayType(T.StringType()))
def emojies_from_text(text):
    emojies = []
    for token in emoji.get_emoji_regexp().finditer(text):
        emojies.append(token.group())
    return emojies

emojies_in_posts_df = followers_posts_df.where("text <> ''")\
                        .select('text')\
                        .withColumn('emojies', emojies_from_text('text'))\
                        .drop('text')

single_emoji_df = emojies_in_posts_df.select(F.explode(F.col('emojies')).alias("emoji"))\
                    .show(10)

+-----+
|emoji|
+-----+
|   🖤|
|   🦋|
|   🕊|
|   🖤|
|   🔅|
|   🔥|
|   😏|
|   🚣|
| 👋🏻|
|    ✅|
+-----+
only showing top 10 rows



In [223]:
def task_4(df: "pyspark.sql.dataframe.DataFrame",
           F: "pyspark.sql.functions",
           T: "pyspark.sql.types",
           emojis_data: dict,
           broadcast_func: "spark.sparkContext.broadcast") -> 'Tuple["pyspark.sql.dataframe.DataFrame"]':
    
    # You are able to modify any code inside this function.
    # Only 'emoji' package import is allowed for this task.
    
    import emoji
    
    @F.udf(returnType=T.ArrayType(T.StringType()))
    def emojies_from_text(text):
        emojies = []
        for token in emoji.get_emoji_regexp().finditer(text):
            emojies.append(token.group())
        return emojies
    
    
    emojies_df = df.where("text <> ''")\
                    .select('text')\
                    .withColumn('emojies', emojies_from_text('text'))\
                    .drop(F.col('text'))
    
    single_emoji_df = emojies_df.select(F.explode(F.col('emojies')).alias("emoji"))
    
    broadcasted_variable = broadcast_func(emojis_data)
    
    @F.udf(returnType=T.StringType())
    def udf(col):
        return broadcasted_variable.value.get(col, None)
    
    emoji_mood_df = single_emoji_df.select('emoji')\
                        .withColumn('mood', udf(F.col('emoji')))\
                        .groupBy('emoji', 'mood')\
                        .agg(F.count('emoji').alias('count'))\
                        .filter(F.col('mood').isNotNull())
    
    positive_emojis = emoji_mood_df.where(F.col('mood') == 'positive')\
                        .sort('count', ascending=False)\
                        .drop('mood')
    
    neutral_emojis = emoji_mood_df.where(F.col('mood') == 'neutral')\
                        .sort('count', ascending=False)\
                        .drop('mood')
    
    negative_emojis = emoji_mood_df.where(F.col('mood') == 'negative')\
                        .sort('count', ascending=False)\
                        .drop('mood')
    
    return (positive_emojis, neutral_emojis, negative_emojis)

In [224]:
positive_emojis, neutral_emojis, negative_emojis = task_4(
    df=followers_posts_df,
    F=F,
    T=T,
    emojis_data=emojis_sentiment_dict,
    broadcast_func=spark.sparkContext.broadcast
)

In [225]:
positive_emojis.show(10)
neutral_emojis.show(10)
negative_emojis.show(10)

+-----+-----+
|emoji|count|
+-----+-----+
|    ❤|15238|
|   👍| 6538|
|   😂| 5554|
|   🌺| 5470|
|   😍| 4583|
|   🔻| 4115|
|   🎀| 3788|
|   😉| 3687|
|    ☀| 3243|
|    ✅| 2985|
+-----+-----+
only showing top 10 rows

+-----+-----+
|emoji|count|
+-----+-----+
|   🔥|11774|
|    ®| 9144|
|   💥| 4669|
|    ❗| 3563|
|    ✨| 2833|
|   💰| 1365|
|    ©| 1150|
|   💣| 1124|
|    ✔| 1086|
|    ⚠| 1040|
+-----+-----+
only showing top 10 rows

+-----+-----+
|emoji|count|
+-----+-----+
|   📌| 1427|
|   😭|  483|
|   💔|  251|
|   😔|  251|
|    ➖|  249|
|   😨|  241|
|   😤|  206|
|   🔫|  194|
|   😲|  192|
|   😾|  165|
+-----+-----+
only showing top 10 rows



# Task 5

Find user's probable fans.  
User A fans are users B from whom A received the most likes.

In [246]:
def task_5(df: "pyspark.sql.dataframe.DataFrame",
           F: "pyspark.sql.functions",
           W: "pyspark.sql.window.Window",
           top_n_likers: int) -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DaraFrame here
    window = W.partitionBy("ownerId").orderBy(F.desc('count'), 'likerId')
    modified_df = followers_posts_likes_df.where(F.col("ownerId") != F.col("likerId"))\
                    .groupBy('ownerId', 'likerId')\
                    .agg(F.count('likerId').alias('count'))\
                    .sort('ownerId', ascending=True)\
                    .withColumn("rank", F.row_number().over(window))\
                    .where(F.col("rank") <= top_n_likers)\
                    .drop(F.col("rank"))
    return modified_df

In [247]:
result_5 = task_5(
    df=followers_posts_likes_df,
    F=F,
    W=Window,
    top_n_likers=10
)
result_5.show(15)

+-------+---------+-----+
|ownerId|  likerId|count|
+-------+---------+-----+
|    637|   422527|   24|
|    637| 13237321|   21|
|    637|   162706|   20|
|    637|   392577|   16|
|    637|151081369|   16|
|    637| 49547307|   15|
|    637|145422426|   15|
|    637|   407844|   14|
|    637|    94399|   13|
|    637|   359267|   13|
|   1087| 84798348|   48|
|   1087|354351777|   30|
|   1087|230753056|   26|
|   1087|255139140|   23|
|   1087|499354771|   22|
+-------+---------+-----+
only showing top 15 rows



# Task 6

Find probable friends.  
User A and user B are friends if they gave each other maximum number of likes.  
I.e. user A is the top "fan" for user B and user B is the top "fan" for user A.

In [166]:
# Get user's favorites by maximum number of likes
window = Window.partitionBy("likerId")
modified_df = followers_posts_likes_df.where(F.col("ownerId") != F.col("likerId"))\
                    .groupBy('likerId', 'ownerId')\
                    .agg(F.count('ownerId').alias('count'))\
                    .withColumn("max", F.max('count').over(window))\
                    .where(F.col("max") == F.col('count'))\
                    .drop(F.col("max"))\
                    .sort('count', ascending=False)\
                    .show(10)

+---------+---------+-----+
|  likerId|  ownerId|count|
+---------+---------+-----+
|271081114|150371150| 5261|
|  6524088|150371150| 4945|
|189597336|150371150| 3711|
|142999083|150371150| 3386|
|215686327|227854450| 3217|
|514404131|357231922| 2747|
|493380857|419925361| 2162|
|424434709|273486249| 2122|
| 95783577|150371150| 1983|
| 94697255|150371150| 1974|
+---------+---------+-----+
only showing top 10 rows



In [173]:
def task_6(df: "pyspark.sql.dataframe.DataFrame",
           F: "pyspark.sql.functions",
           W: "pyspark.sql.window.Window") -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DaraFrame here
    window = W.partitionBy("likerId")
    top_fans_df = df.where(F.col("ownerId") != F.col("likerId"))\
                    .groupBy('likerId', 'ownerId')\
                    .agg(F.count('ownerId').alias('count'))\
                    .withColumn("max", F.max('count').over(window))\
                    .where(F.col("max") == F.col('count'))\
                    .drop(F.col("max"))
    modified_df = top_fans_df.alias('A')\
                    .join(top_fans_df.alias('B'), 
                          [(F.col('A.ownerId') == F.col('B.likerId')) & (F.col('A.likerId') == F.col('B.ownerId')) 
                           & (F.col('A.ownerId') < F.col('A.likerId'))], 
                          'inner')\
                    .select(F.col('A.ownerId').alias('user_a'), F.col('A.likerId').alias('user_b'),
                           F.col('B.count').alias('likes_from_a'), F.col('A.count').alias('likes_from_b'),
                           (F.col('B.count') + F.col('A.count')).alias('mutual_likes'))\
                    .orderBy(F.col('mutual_likes').desc(), F.col('user_a').asc(), F.col('user_b').asc())
    return modified_df

In [174]:
result_6 = task_6(
    df=followers_posts_likes_df,
    F=F,
    W=Window
)

In [175]:
result_6.show(15)

+---------+---------+------------+------------+------------+
|   user_a|   user_b|likes_from_a|likes_from_b|mutual_likes|
+---------+---------+------------+------------+------------+
| 13675440|183535934|         161|         100|         261|
|207134315|208946862|          31|          52|          83|
|145105762|267301242|          52|          28|          80|
|155963006|162366815|          12|          57|          69|
|   135451| 18737802|          45|          18|          63|
| 53368685|322831238|          11|          52|          63|
|121608397|441534917|           3|          57|          60|
|101767883|188548515|          52|           6|          58|
|209077977|272076217|          40|          18|          58|
|   460957|   715211|          53|           2|          55|
|    45781|    58440|           4|          47|          51|
| 19261491|229861638|          23|          26|          49|
| 52612744| 53720099|          32|          17|          49|
|   667303|  1113545|   