# Spark Lab

## General info

Using `pyspark` you'll have to complete 6 tasks (the tasks are listed below). This lab is **REQUIRED**.

This lab also will be checked automatically. Thus, please be careful and **read task conditions attentively**.


<font size="5" style="background-color:orange;"><b>PLAGIARISM IS PROHIBITED! <br> PUNISHMENT UP TO NON-ADMISSION TO THE EXAM!</b></font>

---

The lab deadline is ***19.05 23:59***. To submit the lab, please put the completed notebook in your **NFS home directory** in the following path:

(path in gateway.st) `/nfs/home/<your-login>/spark-lab.ipynb`

(path in jupyter containers) `/home/jovyan/nfs-home/spark-lab.ipynb`

---

This notebook is available in your Jupyter servers by following path `/home/jovyan/shared-data/notebooks/BDML/SparkLab-Template.ipynb`.

---

## Lab description

During the lab you'll have to analyze social network (VK) data. The data contain user posts, group posts, user likes.
Detailed datasets info and tasks description is provided below.


---

## Tasks list

The detailed information about each task is provided below.

- 1. Find top posts:
    - 1a. by likes count (1 point)
    - 1b. by comments count (1 point)
    - 1c. by reposts count (1 point)
- 2. Find top users:
    - 2a. by likes are **received** (1 point)
    - 2b. by reposts are **made** (1 point)
- 3. Extract made users' reposts from the **ITMO group** (3 points)
- 4. Extrace emojis in user posts and find top positive emojis, top neutral emojis and top negative emojis. (3 points)
- 5. Probable fans (4 points)
- 6. Probable friends (5 points)

---

## How to complete a task

For each task you will have a predefined function signature. ***DO NOT MODIFY SIGNATURES!***

You'll have to put the code to transform Spark DataFrame and return modified dataframe from the function.

Limitations:
- Imports inside the functions are not allowed (except task 4).
- Access to global variables are not allowed. If you'll use global variable inside your functions, Lab-checker will not provide the variable to the execution context and the task will fail with `NameError` exception.
- Dynamically generated code execution is not allowed (`eval`, `exec`, `compile`, etc)
- Inner functions and closures are not allowed (except task 4)

---

## FAQ

FAQ section is located in the end of the notebook

In [170]:
import os
import socket
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import types as T, functions as F, SparkSession
from pyspark.sql.window import Window
from pyspark import StorageLevel
from jinja2 import Environment, FileSystemLoader
import datetime
import json

In [97]:
LOGIN = "YOUR_LOGIN"  # Your gateway.st login
APP_NAME = "PySparkLab_APP"  # Any name for your Spark-app

In [98]:
NORMALIZED_APP_NAME = APP_NAME.replace('/', '_').replace(':', '_').replace(' ', '_').replace('\\', '_')

APPS_TMP_DIR = os.path.join(os.getcwd(), "tmp")
APPS_CONF_DIR = os.path.join(os.getcwd(), "conf")
APPS_LOGS_DIR = os.path.join(os.getcwd(), "logs")
LOG4J_PROP_FILE = os.path.join(APPS_CONF_DIR, "pyspark-log4j-{}.properties".format(NORMALIZED_APP_NAME))
LOG_FILE = os.path.join(APPS_LOGS_DIR, 'pyspark-{}.log'.format(NORMALIZED_APP_NAME))
EXTRA_JAVA_OPTIONS = (
    "-Dlog4j.configuration=file://{} "
    "-Dspark.hadoop.dfs.replication=1 "
    "-Dhttps.protocols=TLSv1.0,TLSv1.1,TLSv1.2,TLSv1.3"
    .format(LOG4J_PROP_FILE)
)

LOCAL_IP = socket.gethostbyname(socket.gethostname())

In [99]:
for directory in [APPS_CONF_DIR, APPS_LOGS_DIR, APPS_TMP_DIR]:
    if not os.path.exists(directory):
        os.makedirs(directory)

env = Environment(loader=FileSystemLoader('/opt'))
template = env.get_template("pyspark_log4j.properties.template")
template.stream(logfile=LOG_FILE).dump(LOG4J_PROP_FILE)

In [100]:
spark = (
    SparkSession
    .builder
    .appName(APP_NAME)
    
    # Master URI/configuration
    .master("k8s://https://10.32.7.103:6443")
    
    .config("spark.driver.host", LOCAL_IP)
    
    # Web-UI port for your Spark-app
    .config("spark.ui.port", "4040")
    .config("spark.driver.bindAddress", "0.0.0.0")
    
    # How many CPU cores allocate to driver process
    .config("spark.driver.cores", "2")
    
    # How many RAM allocate to driver process
    .config("spark.driver.memory", "4g")
    
    # How many executors to create
    .config("spark.executor.instances", "3")
    
    # How many CPU cores allocate to each executor
    .config("spark.executor.cores", '2')
    
    # How many RAM allocate to each executor
    .config("spark.executor.memory", "4g")
    
    # How many extra RAM allocate to each executor pod to handle with JVM overheads
    # Total pod RAM = 'spark.executor.memory' + ('spark.executor.memory' * 'spark.kubernetes.memoryOverheadFactor')
    .config("spark.kubernetes.memoryOverheadFactor", "0.2")
    
    # How many RAM from the pool allocate to store the data
    # Additional info: https://spark.apache.org/docs/latest/tuning.html#memory-management-overview
    .config("spark.memory.fraction", "0.6")
    .config("spark.memory.storageFraction", "0.5")
    
    .config("spark.network.timeout", "180s")
    .config("spark.sql.autoBroadcastJoinThreshold", "-1")
    .config("spark.driver.extraJavaOptions", EXTRA_JAVA_OPTIONS)
    
    # Namespace to create executor pods. You are allowed to create pods only in your own namespace
    .config("spark.kubernetes.namespace", LOGIN)
    
    # Extra labels to your driver/executor pods in Kubernetes
    .config("spark.kubernetes.driver.label.appname", APP_NAME)
    .config("spark.kubernetes.executor.label.appname", APP_NAME)
    
    # Spark executor image
    .config("spark.kubernetes.container.image", f"node03.st:5000/spark-executor:{LOGIN}")

    .config("spark.kubernetes.container.image.pullPolicy", "Always")
    
    # If true - delete completed/failed pods. 
    # If your executors goes down you can set 'false' to check logs and troubleshoot your app.
    .config("spark.kubernetes.executor.deleteOnTermination", "true")
    
    .config("spark.local.dir", "/tmp/spark")
    .getOrCreate()
)

In [101]:
DATA_PATH = "hdfs:///shared/bigdata20"

# Datasets

In [102]:
itmo_posts_df = spark.read.json(f"{DATA_PATH}/posts_api.json")

### ITMO posts

This dataset consists of posts from the [ITMO group](https://vk.com/club94) in social network VK.

Dataset structure is complex and the part of dataframe is provided below.

```
root
 |-- attachments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
...
 |-- comments: struct (nullable = true)
 |    |-- can_post: long (nullable = true)
 |    |-- count: long (nullable = true)
 |    |-- groups_can_post: boolean (nullable = true)
 |-- copy_history: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- attachments: array (nullable = true)
...
 |    |    |-- date: long (nullable = true)
 |    |    |-- from_id: long (nullable = true)
 |    |    |-- geo: struct (nullable = true)
...
 |    |    |-- id: long (nullable = true)
 |    |    |-- owner_id: long (nullable = true)
 |    |    |-- post_source: struct (nullable = true)
...
 |    |    |-- post_type: string (nullable = true)
 |    |    |-- signer_id: long (nullable = true)
 |    |    |-- text: string (nullable = true)
 |-- copyright: struct (nullable = true)
...
 |-- date: long (nullable = true)
 |-- from_id: long (nullable = true)
 |-- geo: struct (nullable = true)
...
 |-- id: long (nullable = true)
 |-- key: string (nullable = true)
 |-- likes: struct (nullable = true)
 |    |-- can_like: long (nullable = true)
 |    |-- can_publish: long (nullable = true)
 |    |-- count: long (nullable = true)
 |    |-- user_likes: long (nullable = true)
 |-- marked_as_ads: long (nullable = true)
 |-- owner_id: long (nullable = true)
 |-- post_source: struct (nullable = true)
 |    |-- data: string (nullable = true)
 |    |-- platform: string (nullable = true)
 |    |-- type: string (nullable = true)
 |-- post_type: string (nullable = true)
 |-- reposts: struct (nullable = true)
 |    |-- count: long (nullable = true)
 |    |-- user_reposted: long (nullable = true)
 |-- signer_id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- unavailable: string (nullable = true)
 |-- views: struct (nullable = true)
 |    |-- count: long (nullable = true)
```

To show the entire schema you can use dataframe's method `.printSchema()`.

- Field `owner_id` indicates post owner (group or user) id. 
- Filed `id` indicates post id. **Post ids are not unique!**
- Field `copy_history` represent the reposts chain.

In [103]:
followers_posts_likes_df = spark.read.parquet(f"{DATA_PATH}/followers_posts_likes.parquet")

### Followers posts likes

This dataset contains information about the likes are made by ITMO group followers.

The structure is quite simple:

```
root
 |-- itemType: string (nullable = true)
 |-- ownerId: integer (nullable = true)
 |-- itemId: integer (nullable = true)
 |-- likerId: integer (nullable = true)
```

- Field `itemType` indicates the post type (original post or repost).
- Field `ownerId` indicates the group/user that **received** the like.
- Field `itemId` indicates object id (post/repost) that received the like.
- Filed `likerId` indicates id of the user/group **made** the like.

In [104]:
followers_posts_df = spark.read.json(f"{DATA_PATH}/followers_posts_api_final.json")

### Followers posts

The dataset contains posts of the ITMO group followers

The structure is the same with `itmo_posts_df`

# Tasks

### Task 1a

In this task you have to find the top of posts by its' likes count. 

If multiple posts have the same count - sort them by `id` in ascending order.

Dataset for the task: `itmo_posts_df`

In [105]:
def task_1a(df: "pyspark.sql.dataframe.DataFrame", 
            F: "pyspark.sql.functions") -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DataFrame here
    modified_df = df.select(
            F.col("id").name("post_id"), 
            F.col("likes.count").name("likes_count")
        ).orderBy(
            F.col("likes_count").desc(),
            F.col("post_id").asc()
    )
    
    return modified_df

In [106]:
result_1a = task_1a(
    df=itmo_posts_df,
    F=F
)
result_1a.show(10)

+-------+-----------+
|post_id|likes_count|
+-------+-----------+
|  32022|       1637|
|  35068|       1629|
|  17492|       1516|
|  18526|       1026|
|  19552|        955|
|  41468|        952|
|  19419|        868|
|  29046|        824|
|  32546|        786|
|  24085|        765|
+-------+-----------+
only showing top 10 rows



Reference output:

```
+-------+-----------+
|post_id|likes_count|
+-------+-----------+
|  32022|       1637|
|  35068|       1629|
|  17492|       1516|
|  18526|       1026|
|  19552|        955|
|  41468|        952|
|  19419|        868|
|  29046|        824|
|  32546|        786|
|  24085|        765|
+-------+-----------+
```

### Task 1b

In this task you have to find the top of posts by its' comments count. 

If multiple posts have the same count - sort them by `id` in ascending order.

Dataset for the task: `itmo_posts_df`

In [107]:
def task_1b(df: "pyspark.sql.dataframe.DataFrame", 
            F: "pyspark.sql.functions") -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DataFrame here
    modified_df = df.select(
            F.col("id").name("post_id"), 
            F.col("comments.count").name("comments_count")
        ).orderBy(
            F.col("comments_count").desc(),
            F.col("post_id").asc()
    )
    
    
    return modified_df

In [108]:
result_1b = task_1b(
    df=itmo_posts_df,
    F=F
)
result_1b.show(10)

+-------+--------------+
|post_id|comments_count|
+-------+--------------+
|  24085|           850|
|  22540|           250|
|  27722|           192|
|   8285|           148|
|  26860|           113|
|  13571|           107|
|  39294|           104|
|  36680|            96|
|  26006|            92|
|  41739|            92|
+-------+--------------+
only showing top 10 rows



Reference output:

```
+-------+--------------+
|post_id|comments_count|
+-------+--------------+
|  24085|           850|
|  22540|           250|
|  27722|           192|
|   8285|           148|
|  26860|           113|
|  13571|           107|
|  39294|           104|
|  36680|            96|
|  26006|            92|
|  41739|            92|
+-------+--------------+
```

### Task 1c

In this task you have to find the top of posts by its' reposts count. 

If multiple posts have the same count - sort them by `id` in ascending order.

Dataset for the task: `itmo_posts_df`

In [109]:
def task_1c(df: "pyspark.sql.dataframe.DataFrame", 
            F: "pyspark.sql.functions") -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DataFrame here
    modified_df = df.select(
            F.col("id").name("post_id"), 
            F.col("reposts.count").name("reposts_count")
        ).orderBy(
            F.col("reposts_count").desc(),
            F.col("post_id").asc()
    )
    
    return modified_df

In [110]:
result_1c = task_1c(
    df=itmo_posts_df,
    F=F
)
result_1c.show(10)

+-------+-------------+
|post_id|reposts_count|
+-------+-------------+
|  17492|          334|
|  19552|          246|
|  32022|          210|
|  11842|          129|
|  19419|          126|
|  13532|          110|
|  17014|          105|
|  35068|          101|
|  41266|           92|
|  12593|           90|
+-------+-------------+
only showing top 10 rows



Reference output:

```
+-------+-------------+
|post_id|reposts_count|
+-------+-------------+
|  17492|          334|
|  19552|          246|
|  32022|          210|
|  11842|          129|
|  19419|          126|
|  13532|          110|
|  17014|          105|
|  35068|          101|
|  41266|           92|
|  12593|           90|
+-------+-------------+
```

### Task 2a

In this task you have to find the top users by likes they are **received**. 

If multiple users have the same count - sort them by `id` in ascending order.

In this task you'll have to aggregate the data.

Dataset for the task: `followers_posts_likes_df`

In [111]:
def task_2a(df: "pyspark.sql.dataframe.DataFrame", 
            F: "pyspark.sql.functions") -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DataFrame here
    modified_df = df.groupBy("ownerId")\
        .agg(F.count("ownerId").name("count"))\
        .orderBy(
            F.col("count").desc(),
            F.col("ownerId").asc()
    )
    
    return modified_df

In [112]:
result_2a = task_2a(
    df=followers_posts_likes_df,
    F=F
)
result_2a.show(10)

+---------+-----+
|  ownerId|count|
+---------+-----+
|289390075|82297|
|   327458|57697|
|119920644|57084|
|273486249|54882|
| 25317378|48425|
|150371150|44686|
|187877260|35507|
|   715211|30346|
|180124822|25364|
| 17885170|24749|
+---------+-----+
only showing top 10 rows



Reference output:

```
+---------+-----+
|  ownerId|count|
+---------+-----+
|289390075|82297|
|   327458|57697|
|119920644|57084|
|273486249|54882|
| 25317378|48425|
|150371150|44686|
|187877260|35507|
|   715211|30346|
|180124822|25364|
| 17885170|24749|
+---------+-----+
```

### Task 2b

In this task you have to find the top users by reposts they are **made**. 

You can track the reposts (and check if the post if repost) using field `copy_history`.

If multiple users have the same count - sort them by `id` in ascending order.

In this task you'll have to aggregate the data.

Dataset for the task: `followers_posts_df`

In [113]:
def task_2b(df: "pyspark.sql.dataframe.DataFrame", 
            F: "pyspark.sql.functions") -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DaraFrame here
    modified_df = df.where(F.col("copy_history").isNotNull())\
                    .select(
                        F.col("owner_id"),
                        F.col("copy_history.id").name("src_post_id"),
                    ).groupBy("owner_id").agg(
                        F.count("src_post_id").name("count"))\
                        .orderBy(
                            F.col("count").desc(),
                            F.col("owner_id").asc()
                    )
    
    return modified_df

In [114]:
result_2b = task_2b(
    df=followers_posts_df,
    F=F
)
result_2b.show(10)

+---------+-----+
| owner_id|count|
+---------+-----+
|  2547211|37742|
|357231922|23349|
|168543860|18429|
| 25646344|11122|
|176861294| 9022|
|524656784| 7242|
|    29840| 7164|
|143207077| 7161|
|141687240| 6804|
|459339006| 6741|
+---------+-----+
only showing top 10 rows



Reference output:

```
+---------+-----+
| owner_id|count|
+---------+-----+
|  2547211|37742|
|357231922|23349|
|168543860|18429|
| 25646344|11122|
|176861294| 9022|
|524656784| 7242|
|    29840| 7164|
|143207077| 7161|
|141687240| 6804|
|459339006| 6741|
+---------+-----+
```

### Task 3

In this task you have to find users' posts that are reposted **from ITMO group** and collect ids of such posts in the list.

You can track the reposts using field `copy_history`. Only reposts **directly** from ITMO should be collected. Repost of the repost **shouldn't be collected**.

The final result should be `ITMO post id - [list users' post ids] - reposts count`.

Users' post ids in the list should be sorted in ascending order. Dataframe should be sorted by reposts count in descending order.

If multiple posts have the same count - sort them by `id` in ascending order.

In this task you'll have to filter and aggregate the data.

Dataset for the task: `followers_posts_df`

In [115]:
def task_3(df: "pyspark.sql.dataframe.DataFrame", 
           F: "pyspark.sql.functions") -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DaraFrame here
    modified_df = df.where(F.col("copy_history").isNotNull())\
                    .select(
                        F.col("id").name("user_post_id"),
                        F.col("copy_history.id").getItem(0).name("group_post_id"))\
                    .where(F.col("copy_history.owner_id").getItem(0) == -94)\
                    .groupBy("group_post_id")\
                    .agg(F.sort_array(F.collect_list("user_post_id")).name("user_post_ids")) \
                    .withColumn("reposts_count", F.size("user_post_ids"))\
                    .orderBy(
                        F.col("reposts_count").desc(),
                        F.col("group_post_id").asc()
                    )
    
    return modified_df

In [116]:
result_3 = task_3(
    df=followers_posts_df,
    F=F
)
result_3.show(10)

+-------------+--------------------+-------------+
|group_post_id|       user_post_ids|reposts_count|
+-------------+--------------------+-------------+
|        41266|[26, 53, 87, 88, ...|           30|
|        41468|[89, 390, 400, 44...|           25|
|        42482|[264, 713, 1190, ...|           10|
|    456239414|[25, 1063, 1266, ...|           10|
|        40090|[32, 349, 463, 13...|            9|
|        38740|[185, 1060, 1133,...|            8|
|        39259|[822, 1205, 1492,...|            8|
|        41207|[958, 1288, 2960,...|            6|
|        41546|[666, 939, 1161, ...|            6|
|        41721|[8, 274, 2801, 38...|            6|
+-------------+--------------------+-------------+
only showing top 10 rows



Reference output:

```
+-------------+--------------------+-------------+
|group_post_id|       user_post_ids|reposts_count|
+-------------+--------------------+-------------+
|        41266|[26, 53, 87, 88, ...|           30|
|        41468|[89, 390, 400, 44...|           25|
|        42482|[264, 713, 1190, ...|           10|
|    456239414|[25, 1063, 1266, ...|           10|
|        40090|[32, 349, 463, 13...|            9|
|        38740|[185, 1060, 1133,...|            8|
|        39259|[822, 1205, 1492,...|            8|
|        41207|[958, 1288, 2960,...|            6|
|        41546|[666, 939, 1161, ...|            6|
|        41721|[8, 274, 2801, 38...|            6|
+-------------+--------------------+-------------+
```

### Task 4

In this task you have to find emojis in followers posts. Then, you'll have to find top positive emojis, top neutral emojis, top negative emojis.

These tops should be returned in separate dataframes.

For this task you are allowed to use `emoji==0.6.0` package (already installed in your images).
This package import should be done inside the function.

To recognize positive, negative and neutral emojis you can use provided `emojis_sentiment_dict` 
dictionary with the structure:

```
{
 '😂': 'positive',
 '❤': 'positive',
 '♥': 'positive',
 '😍': 'positive',
 '😭': 'negative',
 '😘': 'positive',
 '💗': 'positive',
 '★': 'neutral',
 '█': 'neutral',
 '☀': 'positive',
 ...
}
```

This dictionary is obtained from the [research](https://journals.plos.org/plosone/article?id=10.1371/journal.pone.0144296)
and the [shared results](https://www.clarin.si/repository/xmlui/handle/11356/1048).

Sort the each top by emojis count in descending order. If count for multiple emojis the same, sort them in alphabetical order.

In this task you'll have to create your own [UDF (User Defined Function)](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.udf.html).
Also, you'll have to [broadcast](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.Broadcast.html)
`emojis_sentiment_dict` to share the dictionary with your executors and enrich your dataframe with sentiment.

`broadcast_func` in task function signature is the method `.broadcast()` from `spark.sparkContext`.

Dataset for the task: `followers_posts_df`

In [117]:
with open("/home/jovyan/shared-data/bigdata20/emojis_sentiment.json", "r") as f:
    emojis_sentiment_dict = json.loads(f.read())

In [180]:
def task_4(df: "pyspark.sql.dataframe.DataFrame",
           F: "pyspark.sql.functions",
           T: "pyspark.sql.types",
           emojis_data: dict,
           broadcast_func: "spark.sparkContext.broadcast") -> 'Tuple["pyspark.sql.dataframe.DataFrame"]':
    # You are able to modify any code inside this function.
    # Only 'emoji' package import is allowed for this task.
    
    import emoji
    reg_exp = emoji.get_emoji_regexp()

    # 0) Helpers
    @F.udf(returnType=T.ArrayType(T.StringType()))
    def getEmojiSlice(text):
        emojis = []
        for reg_match in reg_exp.finditer(text):
            emojis.append(reg_match.group())
        return emojis

    
    broarcasted_dict = broadcast_func(emojis_data)
    @F.udf(returnType=T.StringType())
    def getSentiment(col):
        return broarcasted_dict.value.get(col, None)

    
    # 1) Gather emoji's from df
    raw_text_emoji_df = df.where(F.col("text").isNotNull())\
                            .select(
                                F.col("id"), 
                                F.col("text")
                            )\
                            .withColumn("emojis", getEmojiSlice("text"))\
                            .where(F.size("emojis") > 0)\
                            .select(F.col("id").name("postID"), F.col("emojis"))

    # 2) Get all emojis counted
    emoji_df = raw_text_emoji_df.select(F.explode("emojis").name("emoji"))\
                               .groupBy("emoji")\
                               .agg(F.count("emoji").name("count"))\
                                .withColumn("sentiment", getSentiment(F.col("emoji")))

    positive_emojis = emoji_df.where(F.col("sentiment") == "positive")\
                                .select(
                                    F.col("emoji"),
                                    F.col("count"))\
                                .orderBy(F.col("count").desc())
    
    neutral_emojis = emoji_df.where(F.col("sentiment") == "neutral")\
                                .select(
                                    F.col("emoji"),
                                    F.col("count"))\
                                .orderBy(F.col("count").desc())
    
    negative_emojis = emoji_df.where(F.col("sentiment") == "negative")\
                                .select(
                                    F.col("emoji"),
                                    F.col("count"))\
                                .orderBy(F.col("count").desc())
    
    return (positive_emojis, neutral_emojis, negative_emojis)

In [181]:
positive_emojis, neutral_emojis, negative_emojis = task_4(
    df=followers_posts_df,
    F=F,
    T=T,
    emojis_data=emojis_sentiment_dict,
    broadcast_func=spark.sparkContext.broadcast
)

In [182]:
positive_emojis.show(10)
neutral_emojis.show(10)
negative_emojis.show(10)

+-----+-----+
|emoji|count|
+-----+-----+
|    ❤|15238|
|   👍| 6538|
|   😂| 5554|
|   🌺| 5470|
|   😍| 4583|
|   🔻| 4115|
|   🎀| 3788|
|   😉| 3687|
|    ☀| 3243|
|    ✅| 2985|
+-----+-----+
only showing top 10 rows

+-----+-----+
|emoji|count|
+-----+-----+
|   🔥|11774|
|    ®| 9144|
|   💥| 4669|
|    ❗| 3563|
|    ✨| 2833|
|   💰| 1365|
|    ©| 1150|
|   💣| 1124|
|    ✔| 1086|
|    ⚠| 1040|
+-----+-----+
only showing top 10 rows

+-----+-----+
|emoji|count|
+-----+-----+
|   📌| 1427|
|   😭|  483|
|   😔|  251|
|   💔|  251|
|    ➖|  249|
|   😨|  241|
|   😤|  206|
|   🔫|  194|
|   😲|  192|
|   😾|  165|
+-----+-----+
only showing top 10 rows



Reference output:

```
+-----+-----+
|emoji|count|
+-----+-----+
|    ❤|15238|
|   👍| 6538|
|   😂| 5554|
|   🌺| 5470|
|   😍| 4583|
|   🔻| 4115|
|   🎀| 3788|
|   😉| 3687|
|    ☀| 3243|
|    ✅| 2985|
+-----+-----+
```

---

```
+-----+-----+
|emoji|count|
+-----+-----+
|   🔥|11774|
|    ®| 9144|
|   💥| 4669|
|    ❗| 3563|
|    ✨| 2833|
|   💰| 1365|
|    ©| 1150|
|   💣| 1124|
|    ✔| 1086|
|    ⚠| 1040|
+-----+-----+
```

---

```
+-----+-----+
|emoji|count|
+-----+-----+
|   📌| 1427|
|   😭|  483|
|   💔|  251|
|   😔|  251|
|    ➖|  249|
|   😨|  241|
|   😤|  206|
|   🔫|  194|
|   😲|  192|
|   😾|  165|
+-----+-----+
```

### Task 5

In this task you have to find users' probable fans.

User A fans are users B from whom A received the most likes.

You'll have to find top-N fans for each user A. 

The result should be ordered by user A `id` and received likes count from user B. If multiple users B gave the same likes count, order them by their `id`.

In this task you'll have to use [window-function](https://sparkbyexamples.com/pyspark/pyspark-window-functions/).

Dataset for the task: `followers_posts_likes_df`

In [185]:
def task_5(df: "pyspark.sql.dataframe.DataFrame",
           F: "pyspark.sql.functions",
           W: "pyspark.sql.window.Window",
           top_n_likers: int) -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DaraFrame here
    modified_df = df.groupBy(
                         F.col("ownerId"),
                         F.col("likerId"))\
                    .agg(F.count("ownerId").name("count"))
    
    # removing self likes
    modified_df = modified_df.where(modified_df.ownerId != modified_df.likerId)
    
    # using window function
    windowSpec  = W.partitionBy("ownerId")\
                    .orderBy(
                        F.col("ownerId").asc(), 
                        F.col("count").desc(), 
                        F.col("likerId").asc()
    )
    
    modified_df = modified_df.withColumn("row_number", F.row_number().over(windowSpec))
    
    modified_df = modified_df.where(modified_df.row_number <= top_n_likers)\
                             .drop("row_number")\
                             .orderBy(
                                F.col("ownerId").asc(),
                                F.col("count").desc(),
                                F.col("likerId").asc()
                             )
    
    return modified_df

In [186]:
result_5 = task_5(
    df=followers_posts_likes_df,
    F=F,
    W=Window,
    top_n_likers=10
)
result_5.show(15)

+-------+---------+-----+
|ownerId|  likerId|count|
+-------+---------+-----+
|    637|   422527|   24|
|    637| 13237321|   21|
|    637|   162706|   20|
|    637|   392577|   16|
|    637|151081369|   16|
|    637| 49547307|   15|
|    637|145422426|   15|
|    637|   407844|   14|
|    637|    94399|   13|
|    637|   359267|   13|
|   1087| 84798348|   48|
|   1087|354351777|   30|
|   1087|230753056|   26|
|   1087|255139140|   23|
|   1087|499354771|   22|
+-------+---------+-----+
only showing top 15 rows



Reference output:

```
+-------+---------+-----+
|ownerId|  likerId|count|
+-------+---------+-----+
|    637|   422527|   24|
|    637| 13237321|   21|
|    637|   162706|   20|
|    637|   392577|   16|
|    637|151081369|   16|
|    637| 49547307|   15|
|    637|145422426|   15|
|    637|   407844|   14|
|    637|    94399|   13|
|    637|   359267|   13|
|   1087| 84798348|   48|
|   1087|354351777|   30|
|   1087|230753056|   26|
|   1087|255139140|   23|
|   1087|499354771|   22|
+-------+---------+-----+
```

### Task 6

This task is similar to Task 5. You have to find probable friends.

User A and user B are friends if they gave each other maximum number of likes. I.e. user A is the top "fan" for user B and user B is the top "fan" for user A.

If multiple users B gave the same maximum number of likes to user A, they all may be friends for user A.

User A fans are users B from whom A received the most likes.

The result should be the dataframe `user_a - user_b - likes_from_a - likes_from_b - mutual_likes`. Mutual likes is just a sum of likes from A and likes from B.

The result shouldn't contain identical friend pairs (i.e. `user_a = 123, user_b = 456 and user_a = 456, user_b = 123` are identical pairs).

The result should be ordered by mutual likes in descending order. If the mutual likes count is the same for different pairs, then sort them by user A `id` and user B `id` in ascending order.

In this task you'll have to use joins.

Dataset for the task: `followers_posts_likes_df`

In [251]:
def task_6(df: "pyspark.sql.dataframe.DataFrame",
           F: "pyspark.sql.functions",
           W: "pyspark.sql.window.Window") -> "pyspark.sql.dataframe.DataFrame":
    # Place your code to transform DaraFrame here    
    userA = df.groupBy(
                    F.col("likerId"),
                    F.col("ownerId"))\
                .agg(F.count("ownerId").name("count"))

    # Removing self likes
    userA = userA.where(userA.ownerId != userA.likerId)

    # SEARCH LIKES
    window = W.partitionBy("likerId").orderBy(
            # F.col("ownerId").asc(),
            F.col("count").desc(),
            F.col("likerId").asc()
    )


    userA = userA.withColumn("max_likes", F.max("count").over(window))\
                 .where(F.col("count") == F.col("max_likes"))\
                 .drop("max_likes") 
    # Left only top likers (with "count" == topG count within "ownerId")
    
    userB = userA.alias("userB")
    
    modified_df = userA.alias("userA").join(
                                            userB, \
                                            (F.col("userA.ownerId") == F.col("userB.likerId")) &
                                            (F.col("userA.likerId") == F.col("userB.ownerId")) &
                                            (F.col("userA.ownerId") < F.col("userB.ownerId")),
                                            "inner")\
                                        .select(
                                            F.col("userA.ownerId").name("user_a"),
                                            F.col("userB.ownerId").name("user_b"),
                                            F.col("userB.count").name("likes_from_a"),
                                            F.col("userA.count").name("likes_from_b")
                                        )

    modified_df = modified_df.withColumn("mutual_likes", modified_df.likes_from_b + modified_df.likes_from_a)

    # Removing identical (user_a = 123, user_b = 456 and user_a = 456, user_b = 123) entries
    modified_df = modified_df.withColumn(
        "sorted_ids",
        F.concat(
            F.least(F.col("user_a"), F.col("user_b")),
            F.lit("_"),
            F.greatest(F.col("user_a"), F.col("user_b"))
        )
    )

    modified_df = modified_df.dropDuplicates(["sorted_ids"])

    modified_df = modified_df.select(
                                F.col("user_a"), 
                                F.col("user_b"), 
                                F.col("likes_from_a"), 
                                F.col("likes_from_b"), 
                                F.col("mutual_likes")
    )

    modified_df = modified_df.sort(F.col("mutual_likes").desc(), F.col("user_a").asc(), F.col("user_b").asc())
    
    return modified_df

In [252]:
result_6 = task_6(
    df=followers_posts_likes_df,
    F=F,
    W=Window
)

In [253]:
result_6.show(15)

+---------+---------+------------+------------+------------+
|   user_a|   user_b|likes_from_a|likes_from_b|mutual_likes|
+---------+---------+------------+------------+------------+
| 13675440|183535934|         161|         100|         261|
|207134315|208946862|          31|          52|          83|
|145105762|267301242|          52|          28|          80|
|155963006|162366815|          12|          57|          69|
|   135451| 18737802|          45|          18|          63|
| 53368685|322831238|          11|          52|          63|
|121608397|441534917|           3|          57|          60|
|101767883|188548515|          52|           6|          58|
|209077977|272076217|          40|          18|          58|
|   460957|   715211|          53|           2|          55|
|    45781|    58440|           4|          47|          51|
| 19261491|229861638|          23|          26|          49|
| 52612744| 53720099|          32|          17|          49|
|   667303|  1113545|   

Reference output:
```
+---------+---------+------------+------------+------------+
|   user_a|   user_b|likes_from_a|likes_from_b|mutual_likes|
+---------+---------+------------+------------+------------+
| 13675440|183535934|         161|         100|         261|
|207134315|208946862|          31|          52|          83|
|145105762|267301242|          52|          28|          80|
|155963006|162366815|          12|          57|          69|
|   135451| 18737802|          45|          18|          63|
| 53368685|322831238|          11|          52|          63|
|121608397|441534917|           3|          57|          60|
|101767883|188548515|          52|           6|          58|
|209077977|272076217|          40|          18|          58|
|   460957|   715211|          53|           2|          55|
|    45781|    58440|           4|          47|          51|
| 19261491|229861638|          23|          26|          49|
| 52612744| 53720099|          32|          17|          49|
|   667303|  1113545|           4|          44|          48|
| 66022003| 95356919|          23|          22|          45|
+---------+---------+------------+------------+------------+
```

# FAQ

---

- Q: Spark reading file infinitely. Why?
- A: Probably your executor haven't started yet or they cannot start. Check if your executors are running (`kubectl get pods`). If executors are running - check the logs.

---

- Q: I cannot create Spark app. It raise exception `Exception: Java gateway process exited before sending its port number`.
- A: Check the path to your notebook. It doesn't have to contain space character (" "). Rename your directories and notebook.

---

- Q: My executors goes down with `OOMKilled`. How to fix it?
- A: `OOMKilled` pod state means the pod is exceeded RAM limit. You can increase available executor RAM using config `spark.executor.memory`. Another reason - you do something wrong with your app and collect all of the data into one executor.

---

- Q: I got less executors than have been specified in config.
- A: You have exceeded resources limit for your namespace. Adjust your resource consumption.

---

- Q: My executor failing with `Error` state.
- A: Check the logs.

---

- Q: I cannot save my notebook/data and get `No space left on device` or `Quota exceeded` error from the Jupyter Server.
- A: You have exceeded NFS-server storage quota. Delete unnecessary files from you nfs home directory (for example, logs, old data, etc)

---

If your question is not listed above, please contact the course staff.