# Lab №2 made by Pinaev Zakhar J41322c

## Setting workspace

#### Import required packages

In [632]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import pandas_udf, udf, size, length, when, col, array_contains, row_number
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType, IntegerType, ArrayType, StringType, StructType, StructField
from pyspark.sql.window import Window
from pyspark.sql.functions import pandas_udf

import pyarrow as pa
import emoji
import re
import socket
import pandas as pd

#### Getting LOCAL_IP of current port to make executors able to find us

In [549]:
LOCAL_IP = socket.gethostbyname(socket.gethostname())

#### Configuring Spark Session

In [171]:
spark = (
    SparkSession
    .builder
    .master('k8s://https://10.32.7.103:6443')
    .config("spark.driver.host", LOCAL_IP)
    .config("spark.driver.bindAddress", "0.0.0.0")
    .config('spark.executor.instences', '2')
    .config('spark.executor.cores', '2')
    .config('spark.cores.max', '4')
    .config('spark.executor.memory', '4g')
    .config('spark.sql.execution.arrow.enabled', 'true')
    .config('spark.kubernetes.namespace', 'zpinaev-244202')
    .config('spark.kubernetes.container.image', 'node03.st:5000/spark-executor:zpinaev-244202')
    .config("spark.kubernetes.container.image.pullPolicy", "Always")
    .config('spark.kubernetes.executor.deleteOnTermination', 'false')
    .getOrCreate()
)

#### Reading all data

In [550]:
posts_df = spark.read.json("hdfs:///shared/bigdata20/posts_api.json")

posts_likes_df = spark.read.parquet("hdfs:///shared/bigdata20/posts_likes.parquet")
followers_df = spark.read.parquet("hdfs:///shared/bigdata20/followers.parquet")
followers_posts_df = spark.read.json("hdfs:///shared/bigdata20/followers_posts_api_final.json")
followers_posts_likes_df = spark.read.parquet("hdfs:///shared/bigdata20/followers_posts_likes.parquet")

#### Caching all input data de bene esse

In [633]:
posts_df = posts_df.cache()
posts_likes_df = posts_likes_df.cache()
followers_df = followers_df.cache()
followers_posts_df = followers_posts_df.cache()
followers_posts_likes_df = followers_posts_likes_df.cache()

## Task 1
### Find the top 20 posts in the group

#### Selecting subset of needed info for task 1

In [616]:
task_1_df = posts_df.select("id", "text",
                            col("likes.count").name("likecount"),
                            col("reposts.count").name("repcount"),
                            col("comments.count").name("comcount")).cache()

### (a) by likes

In [617]:
task_1_df.orderBy("likecount", ascending=False).show(20)

+-----+--------------------+---------+--------+--------+
|   id|                text|likecount|repcount|comcount|
+-----+--------------------+---------+--------+--------+
|32022|Стесняешься петь ...|     1637|     210|      27|
|35068|У нас для вас две...|     1629|     101|       4|
|17492|Настали снежные х...|     1516|     334|      42|
|18526|[Пригласи друзей ...|     1026|      31|       9|
|19552|Ура! Нас 20 000! ...|      955|     246|       3|
|41468|Добро пожаловать ...|      952|      85|      33|
|19419|[Университет ИТМО...|      868|     126|       7|
|29046|Я ПОСТУПИЛ В УНИВ...|      824|      87|      20|
|32546|WE ARE THE CHAMPI...|      786|      68|      20|
|24085|Студенты, сегодня...|      765|      57|     850|
|40180|Ура! Кубок ICPC с...|      759|       2|      10|
|33658|ITMO.GO! Я поступ...|      708|      60|      11|
|13532|Команда студентов...|      633|     110|      15|
|40842|Хочешь поздравить...|      631|      17|       2|
|35117|К концу года мы д...|   

In [618]:
task_1_df.orderBy("likecount", ascending=False).write.parquet("hdfs:///tmp/zpinaev-244202/lab2_results_1a.parquet")

### (b) by comments

In [619]:
task_1_df.orderBy("comcount", ascending=False).show(20)

+-----+--------------------+---------+--------+--------+
|   id|                text|likecount|repcount|comcount|
+-----+--------------------+---------+--------+--------+
|24085|Студенты, сегодня...|      765|      57|     850|
|22540|Друзья, а давайте...|      212|      12|     250|
|27722|Upd: розыгрыш зав...|       23|       0|     192|
| 8285|Все знают, что в ...|        5|       1|     148|
|26860|18 мая состоится ...|      197|      23|     113|
|13571|15 российских вуз...|      114|      35|     107|
|39294|Информация по ава...|      231|      19|     104|
|36680|ОФИЦИАЛЬНО: СЕССИ...|       43|       2|      96|
|26006|Дорогие абитуриен...|       12|       1|      92|
|41739|ДОБРО ПОЖАЛОВАТЬ ...|       71|       0|      92|
|12426|Уважаемые студент...|        9|       6|      91|
|21499|Внимание! Розыгры...|      101|      11|      88|
|39163|Science Valentine...|       35|       3|      83|
|39407|Информация по ава...|      152|      12|      83|
|11267|                    |   

In [620]:
task_1_df.orderBy("comcount", ascending=False).write.parquet("hdfs:///tmp/zpinaev-244202/lab2_results_1b.parquet")

### (c) by reposts

In [621]:
task_1_df.orderBy("repcount", ascending=False).show(20)

+-----+--------------------+---------+--------+--------+
|   id|                text|likecount|repcount|comcount|
+-----+--------------------+---------+--------+--------+
|17492|Настали снежные х...|     1516|     334|      42|
|19552|Ура! Нас 20 000! ...|      955|     246|       3|
|32022|Стесняешься петь ...|     1637|     210|      27|
|11842|8 октября пропал ...|      197|     129|       7|
|19419|[Университет ИТМО...|      868|     126|       7|
|13532|Команда студентов...|      633|     110|      15|
|17014|#ExamsAreComing #...|      581|     105|      10|
|35068|У нас для вас две...|     1629|     101|       4|
|41266|В преддверии выпу...|      433|      92|       7|
|12593|Подарок всем студ...|      483|      90|      48|
|29046|Я ПОСТУПИЛ В УНИВ...|      824|      87|      20|
|11999|Начало конкурса М...|      549|      85|      17|
|41468|Добро пожаловать ...|      952|      85|      33|
|19809|[Билет на GEEK PI...|      552|      84|      11|
|17167|🎄Выходные дни в ...|    

In [622]:
task_1_df.orderBy("repcount", ascending=False).write.parquet("hdfs:///tmp/zpinaev-244202/lab2_results_1c.parquet")

#### drop cache of task 1 df

In [623]:
task_1_df.unpersist()

DataFrame[id: bigint, text: string, likecount: bigint, repcount: bigint, comcount: bigint]

## Task 2
### Find the top 20 users

### (a) by likes

#### We get df with likes for the public posts of the followers and count number of likes from each user - than sort by likes num and show top 20 of users by likes

In [634]:
followers_posts_likes_df\
                        .groupby("likerId")\
                        .agg(F.count("likerId").name("likes_count"))\
                        .orderBy("likes_count", ascending=False).show(20)

+---------+-----------+
|  likerId|likes_count|
+---------+-----------+
|194073434|       8104|
|150371150|       5332|
|271081114|       5261|
|  6524088|       4946|
|189597336|       3711|
|142999083|       3394|
|215686327|       3217|
|514404131|       2747|
|  2818498|       2350|
|419925361|       2212|
|493380857|       2162|
|424434709|       2122|
| 95783577|       1985|
| 94697255|       1975|
|  4448812|       1777|
|330771656|       1763|
|228571738|       1720|
|325927416|       1661|
|347711731|       1645|
|501177379|       1641|
+---------+-----------+
only showing top 20 rows



In [625]:
followers_posts_likes_df\
                        .groupby("likerId")\
                        .agg(F.count("likerId").name("likes_count"))\
                        .orderBy("likes_count", ascending=False).write.parquet("hdfs:///tmp/zpinaev-244202/lab2_results_2a.parquet")

### (b) by reposts they have made

#### We check if field 'copy_history' is not null, to know, if post was reposted from somewhere. Then select id's of users, who reposted post and grouping by those id's counting, how many times each user reposted something.

In [635]:
followers_posts_df\
                .where(col("copy_history").isNotNull())\
                .select("from_id").groupby("from_id")\
                .agg(F.count("from_id").name("rep_count"))\
                .orderBy("rep_count", ascending=False).show(20)

+---------+---------+
|  from_id|rep_count|
+---------+---------+
|  2547211|    37742|
|357231922|    23349|
|168543860|    18429|
| 25646344|    11122|
|176861294|     9022|
|524656784|     7242|
|    29840|     7164|
|143207077|     7161|
|141687240|     6804|
|459339006|     6741|
|514384760|     6570|
|483715951|     6052|
|445159771|     5808|
|451211328|     5646|
|426396104|     5533|
|  8325325|     5532|
|452280411|     5458|
|464220898|     5318|
|440454268|     5304|
|461319529|     5240|
+---------+---------+
only showing top 20 rows



In [637]:
followers_posts_df\
                .where(col("copy_history").isNotNull())\
                .select("from_id").groupby("from_id")\
                .agg(F.count("from_id").name("rep_count"))\
                .orderBy("rep_count", ascending=False).write.parquet("hdfs:///tmp/zpinaev-244202/lab2_results_2b.parquet")

## Task 3
### Get reposts of the original posts of the itmo group from user posts

#### First we look for reposts from itmo group (owner_id=-94) among all reposts of users. Then we group by post id ('copy_history.id') and collect user post ids ('id') to a list. Then we searh for group posts that were reposted and get their group id's. Finally we join those two df's by 'copy_history.id' and 'id' respectively.

In [638]:
users_posts_id_df = followers_posts_df\
                    .where(col("copy_history").isNotNull())\
                    .where(col("copy_history.owner_id").getItem(0) == -94)\
                    .select(col("copy_history.id")[0].name("id"), 
                            col("copy_history.text")[0].name("usr_text"), 
                            col("id").name("usr_post_id"))\
                    .groupby("id", "usr_text")\
                    .agg(F.collect_list(col("usr_post_id")).name("usrs_post_id"))\
                    .cache()


group_posts_id = posts_df\
                    .where(col("copy_history").isNotNull())\
                    .select(col("text").name("group_text"), col("id").name("group_post_id"))\
                    .cache()


group_posts_id.join(users_posts_id_df, 
                    on=[group_posts_id.group_post_id == users_posts_id_df.id])\
                    .drop("id")\
                    .show(5)
                    

+--------------------+-------------+--------------------+--------------+
|          group_text|group_post_id|            usr_text|  usrs_post_id|
+--------------------+-------------+--------------------+--------------+
|Университет ИТМО ...|        41402|Университет ИТМО ...|[212, 55, 405]|
|Так-так, где же в...|        41282|Так-так, где же в...|       [10669]|
|Первые по Петербу...|        38767|Первые по Петербу...|    [778, 364]|
|Лучшее чтение на ...|        42714|Лучшее чтение на ...|       [12501]|
|Стартовала регист...|        38877|Стартовала регист...|        [9722]|
+--------------------+-------------+--------------------+--------------+
only showing top 5 rows



In [639]:
group_posts_id.join(users_posts_id_df, 
                    on=[group_posts_id.group_post_id == users_posts_id_df.id])\
                    .drop("id").write.parquet("hdfs:///tmp/zpinaev-244202/lab2_results_3.parquet")

In [640]:
users_posts_id_df.unpersist()
group_posts_id.unpersist()

DataFrame[group_text: string, group_post_id: bigint]

## Task 4
### Emoticon task

In [562]:
# selecting only text column from followers posts
folowers_posts_text = followers_posts_df\
                        .where(col("text") != "")\
                        .select(col("text")).cache()
folowers_posts_text.show(10)

+--------------------+
|                text|
+--------------------+
|Я люблю Вас. Я вч...|
|call me by your n...|
|                  🦋|
|         Браво,Юра !|
|                  🕊|
|Самый неприятный ...|
|                  🔅|
|Пекло.Пушка.Вышка...|
|Тот неловкий моме...|
|Белые ночи + жарк...|
+--------------------+
only showing top 10 rows



In [563]:
# total number of posts
folowers_posts_text.count()

189848

In [564]:
# creating spark DF with emoji/sentiment info from external file
emoji_pdf = pd.read_excel("Emoji_Sentiment_Data2.xls")

emoji_sent_df = spark.createDataFrame(emoji_pdf).select("Emoji", "Sentiment")

emoji_sent_df.show(10)

+-----+---------+
|Emoji|Sentiment|
+-----+---------+
|   😂| positive|
|    ❤| positive|
|    ♥| positive|
|   😍| positive|
|   😭| negative|
|   😘| positive|
|   😊| positive|
|   👌| positive|
|   💕| positive|
|   👏| positive|
+-----+---------+
only showing top 10 rows



In [565]:
def extract_emojis(s):
    if s is None:
        return ''
    return ''.join(c for c in s if c in emoji.UNICODE_EMOJI['en'])

# udf to extract emojis from column texts
search_all_emojis = udf(extract_emojis, returnType=StringType())

# dataframe of one column containing emojis found in each row text
emojis_df = folowers_posts_text.withColumn("emoji_in_post", search_all_emojis(col("text")))

# list of found emojis where every element is a string of emojis from some post
listEmoji = [row["emoji_in_post"] for row in emojis_df.where(col("emoji_in_post")!='').collect()]

## dict with emojis and their sentiment
emojisDict = { row["Emoji"] : row["Sentiment"] for row in emoji_sent_df.collect() }

In [566]:
# dataframe with separate emojis
sepEmoji_df = emoji_sent_df.select("emoji")

In [567]:
# broadcasting dict with emojis and list of found emoijs from posts
emojisDictBcs = spark.sparkContext.broadcast(emojisDict)
listEmojiBcs = spark.sparkContext.broadcast(listEmoji)

In [568]:
def metricCalc(cur_emoji):
    emojiDict = emojisDictBcs.value
    emojisPostsList = listEmojiBcs.value
    
    overallCount = sum(strings.count(cur_emoji) for strings in emojisPostsList)
    
    postsNum = sum(1 for s in emojisPostsList if cur_emoji in s)
    if postsNum == 0:
        postsNum = 1
    
    frequency = postsNum / 189848
    
    avCountPost = overallCount / postsNum
    overFreqDiff = overallCount - frequency
    sentiment = emojiDict[cur_emoji]
    return Row("sentiment", "overallCount", "frequency",
               "avCountPost", "overallFreqDiff")(sentiment, overallCount,
                                                 frequency, avCountPost,
                                                 overFreqDiff)
    
outputSchema = StructType([
    StructField("sentiment", StringType(), False),
    StructField("overallCount", IntegerType(), False),
    StructField("frequency", FloatType(), False),
    StructField("avCountPost", FloatType(), False),
    StructField("overallFreqDiff", FloatType(), False),
])

metricCalcUdf = udf(metricCalc, outputSchema)

In [569]:
resultEmoji_df = sepEmoji_df.withColumn("output", metricCalcUdf(sepEmoji_df['emoji']))\
                            .select(*(sepEmoji_df.columns), "output.*")\
                            .where(col("overallCount") != 0)

In [570]:
resultEmoji_df.write.parquet("hdfs:///tmp/zpinaev-244202/lab2_results.parquet")

### Additionally, for each sentiment print:
### 1) top 10 most popular emoticons by their overall count

In [571]:
windowSpec = Window.partitionBy("sentiment").orderBy(col("overallCount").desc())

#### negative:

In [572]:
resultEmoji_df\
            .withColumn("emojiTop",row_number().over(windowSpec))\
            .where(col("emojiTop") < 11)\
            .where(col("sentiment")=="negative")\
            .drop("emojiTop").show()

+-----+---------+------------+------------+-----------+---------------+
|emoji|sentiment|overallCount|   frequency|avCountPost|overallFreqDiff|
+-----+---------+------------+------------+-----------+---------------+
|   📌| negative|        1427|0.0039768657|  1.8900663|       1426.996|
|   😭| negative|         483| 0.001232565|  2.0641026|      482.99878|
|    ⏳| negative|         412|0.0021332856|  1.0172839|      411.99786|
|   🚩| negative|         271|5.6887616E-4|  2.5092592|      270.99942|
|   😔| negative|         251|0.0011008807|   1.200957|       250.9989|
|   💔| negative|         251|0.0010166027|  1.3005182|      250.99898|
|    ➖| negative|         249|1.3695167E-4|   9.576923|      248.99986|
|   😨| negative|         241|0.0010218702|  1.2422681|      240.99898|
|   👎| negative|         231|0.0011219502|   1.084507|      230.99887|
|   😤| negative|         206| 9.797312E-4|  1.1075269|      205.99902|
+-----+---------+------------+------------+-----------+---------------+


#### neutral:

In [573]:
resultEmoji_df\
            .withColumn("emojiTop",row_number().over(windowSpec))\
            .where(col("emojiTop") < 11)\
            .where(col("sentiment")=="neutral")\
            .drop("emojiTop").show()

+-----+---------+------------+------------+-----------+---------------+
|emoji|sentiment|overallCount|   frequency|avCountPost|overallFreqDiff|
+-----+---------+------------+------------+-----------+---------------+
|   🔥|  neutral|       11774| 0.022423202|  2.7657976|      11773.978|
|    ®|  neutral|        9144|0.0022860393|  21.069124|       9143.998|
|   💥|  neutral|        4669|  0.00968143|  2.5402613|        4668.99|
|    ❗|  neutral|        3563|0.0062260334|  3.0143824|      3562.9937|
|    ✨|  neutral|        2833|0.0060838144|  2.4528139|       2832.994|
|   👉|  neutral|        1637|  0.00420863|   2.048811|      1636.9958|
|   💰|  neutral|        1365| 0.004935527|   1.456777|      1364.9951|
|    ♀|  neutral|        1280|0.0046142177|  1.4611872|      1279.9954|
|    ©|  neutral|        1150|0.0030866798|  1.9624573|       1149.997|
|   😏|  neutral|        1132| 0.004292908|   1.388957|      1131.9957|
+-----+---------+------------+------------+-----------+--------------

#### positive:

In [574]:
resultEmoji_df\
            .withColumn("emojiTop",row_number().over(windowSpec))\
            .where(col("emojiTop") < 11)\
            .where(col("sentiment")=="positive")\
            .drop("emojiTop").show()

+-----+---------+------------+------------+-----------+---------------+
|emoji|sentiment|overallCount|   frequency|avCountPost|overallFreqDiff|
+-----+---------+------------+------------+-----------+---------------+
|    ❤| positive|       15253| 0.033721715|  2.3825366|      15252.966|
|   👍| positive|        7773| 0.018040748|   2.269489|       7772.982|
|   😂| positive|        5554| 0.012262441|  2.3857388|       5553.988|
|   🌺| positive|        5470|0.0061575575|   4.679213|      5469.9937|
|   😍| positive|        4583|0.0144115295|  1.6750731|      4582.9854|
|   🔻| positive|        4115| 9.323248E-4|  23.248587|       4114.999|
|   🎀| positive|        3788|0.0032552357|    6.12945|      3787.9968|
|   😉| positive|        3687| 0.014764443|  1.3153764|      3686.9854|
|    ☀| positive|        3243|0.0069634654|  2.4531014|       3242.993|
|    ✅| positive|        2985| 0.005409591|   2.906524|      2984.9946|
+-----+---------+------------+------------+-----------+---------------+

### 2) top 5 emoticons which have the greatest difference between their overall count and frequency

In [575]:
windowSpec = Window.partitionBy("sentiment").orderBy(col("overallFreqDiff").desc())

#### negative:

In [576]:
resultEmoji_df\
            .withColumn("emojiTop",row_number().over(windowSpec))\
            .where(col("emojiTop") < 11)\
            .where(col("sentiment")=="negative")\
            .drop("emojiTop").show()

+-----+---------+------------+------------+-----------+---------------+
|emoji|sentiment|overallCount|   frequency|avCountPost|overallFreqDiff|
+-----+---------+------------+------------+-----------+---------------+
|   📌| negative|        1427|0.0039768657|  1.8900663|       1426.996|
|   😭| negative|         483| 0.001232565|  2.0641026|      482.99878|
|    ⏳| negative|         412|0.0021332856|  1.0172839|      411.99786|
|   🚩| negative|         271|5.6887616E-4|  2.5092592|      270.99942|
|   💔| negative|         251|0.0010166027|  1.3005182|      250.99898|
|   😔| negative|         251|0.0011008807|   1.200957|       250.9989|
|    ➖| negative|         249|1.3695167E-4|   9.576923|      248.99986|
|   😨| negative|         241|0.0010218702|  1.2422681|      240.99898|
|   👎| negative|         231|0.0011219502|   1.084507|      230.99887|
|   😤| negative|         206| 9.797312E-4|  1.1075269|      205.99902|
+-----+---------+------------+------------+-----------+---------------+


#### neutral:

In [577]:
resultEmoji_df\
            .withColumn("emojiTop",row_number().over(windowSpec))\
            .where(col("emojiTop") < 11)\
            .where(col("sentiment")=="neutral")\
            .drop("emojiTop").show()

+-----+---------+------------+------------+-----------+---------------+
|emoji|sentiment|overallCount|   frequency|avCountPost|overallFreqDiff|
+-----+---------+------------+------------+-----------+---------------+
|   🔥|  neutral|       11774| 0.022423202|  2.7657976|      11773.978|
|    ®|  neutral|        9144|0.0022860393|  21.069124|       9143.998|
|   💥|  neutral|        4669|  0.00968143|  2.5402613|        4668.99|
|    ❗|  neutral|        3563|0.0062260334|  3.0143824|      3562.9937|
|    ✨|  neutral|        2833|0.0060838144|  2.4528139|       2832.994|
|   👉|  neutral|        1637|  0.00420863|   2.048811|      1636.9958|
|   💰|  neutral|        1365| 0.004935527|   1.456777|      1364.9951|
|    ♀|  neutral|        1280|0.0046142177|  1.4611872|      1279.9954|
|    ©|  neutral|        1150|0.0030866798|  1.9624573|       1149.997|
|   😏|  neutral|        1132| 0.004292908|   1.388957|      1131.9957|
+-----+---------+------------+------------+-----------+--------------

#### positive:

In [578]:
resultEmoji_df\
            .withColumn("emojiTop",row_number().over(windowSpec))\
            .where(col("emojiTop") < 11)\
            .where(col("sentiment")=="positive")\
            .drop("emojiTop").show()

+-----+---------+------------+------------+-----------+---------------+
|emoji|sentiment|overallCount|   frequency|avCountPost|overallFreqDiff|
+-----+---------+------------+------------+-----------+---------------+
|    ❤| positive|       15253| 0.033721715|  2.3825366|      15252.966|
|   👍| positive|        7773| 0.018040748|   2.269489|       7772.982|
|   😂| positive|        5554| 0.012262441|  2.3857388|       5553.988|
|   🌺| positive|        5470|0.0061575575|   4.679213|      5469.9937|
|   😍| positive|        4583|0.0144115295|  1.6750731|      4582.9854|
|   🔻| positive|        4115| 9.323248E-4|  23.248587|       4114.999|
|   🎀| positive|        3788|0.0032552357|    6.12945|      3787.9968|
|   😉| positive|        3687| 0.014764443|  1.3153764|      3686.9854|
|    ☀| positive|        3243|0.0069634654|  2.4531014|       3242.993|
|    ✅| positive|        2985| 0.005409591|   2.906524|      2984.9946|
+-----+---------+------------+------------+-----------+---------------+

### 3) top 5 emoticons with average count per post

In [579]:
windowSpec = Window.partitionBy("sentiment").orderBy(col("avCountPost").desc())

#### negative:

In [580]:
resultEmoji_df\
            .withColumn("emojiTop",row_number().over(windowSpec))\
            .where(col("emojiTop") < 6)\
            .where(col("sentiment")=="negative")\
            .drop("emojiTop").show()

+-----+---------+------------+------------+-----------+---------------+
|emoji|sentiment|overallCount|   frequency|avCountPost|overallFreqDiff|
+-----+---------+------------+------------+-----------+---------------+
|    ➖| negative|         249|1.3695167E-4|   9.576923|      248.99986|
|    ✝| negative|          20|1.5802116E-5|  6.6666665|      19.999985|
|    ➰| negative|          14|1.5802116E-5|  4.6666665|      13.999984|
|   🚩| negative|         271|5.6887616E-4|  2.5092592|      270.99942|
|   😭| negative|         483| 0.001232565|  2.0641026|      482.99878|
+-----+---------+------------+------------+-----------+---------------+



#### neutral:

In [581]:
resultEmoji_df\
            .withColumn("emojiTop",row_number().over(windowSpec))\
            .where(col("emojiTop") < 6)\
            .where(col("sentiment")=="neutral")\
            .drop("emojiTop").show()

+-----+---------+------------+------------+-----------+---------------+
|emoji|sentiment|overallCount|   frequency|avCountPost|overallFreqDiff|
+-----+---------+------------+------------+-----------+---------------+
|    ®|  neutral|        9144|0.0022860393|  21.069124|       9143.998|
|   🔣|  neutral|          10|5.2673718E-6|       10.0|       9.999994|
|   🔹|  neutral|         943| 9.007206E-4|    5.51462|       942.9991|
|    ▪|  neutral|         258|2.6863595E-4|  5.0588236|      257.99973|
|   🔘|  neutral|          15|1.5802116E-5|        5.0|      14.999984|
+-----+---------+------------+------------+-----------+---------------+



#### positive:

In [582]:
resultEmoji_df\
            .withColumn("emojiTop",row_number().over(windowSpec))\
            .where(col("emojiTop") < 6)\
            .where(col("sentiment")=="positive")\
            .drop("emojiTop").show()

+-----+---------+------------+-------------+-----------+---------------+
|emoji|sentiment|overallCount|    frequency|avCountPost|overallFreqDiff|
+-----+---------+------------+-------------+-----------+---------------+
|   🔻| positive|        4115|  9.323248E-4|  23.248587|       4114.999|
|    ↙| positive|          21|1.05347435E-5|       10.5|      20.999989|
|   🎀| positive|        3788| 0.0032552357|    6.12945|      3787.9968|
|    ▫| positive|         158| 1.4748641E-4|   5.642857|      157.99985|
|   🌺| positive|        5470| 0.0061575575|   4.679213|      5469.9937|
+-----+---------+------------+-------------+-----------+---------------+



In [583]:
folowers_posts_text.unpersist()

DataFrame[text: string]

## Task 5
### Find probable "fans"

#### First we create window to order data firstly by 'likerId' and secondly by 'likes_count'. Then we start working with data - firstly counting how many likes did liker put to posts of each 'ownerId'. Then we order collected data by "likerId", "likes_count" and apply window created in the begining, which allows us to create column with top of liked 'ownerId'. Then we save only columns with 'likersTop' < 11, since we need to get Top-10. Finally, we group by 'likerId', creating a column with lists of top-10 (or less) liked users for each liker

In [641]:
windowSpec = Window.partitionBy("likerId").orderBy(col("likes_count").desc())

followers_posts_likes_df\
                        .groupby("likerId", "ownerId")\
                        .agg(F.count("ownerId").name("likes_count"))\
                        .orderBy("likerId", "likes_count", ascending=False)\
                        .withColumn("likersTop",row_number().over(windowSpec))\
                        .where(col("likersTop") < 11)\
                        .groupby("likerId")\
                        .agg(F.collect_list(col("ownerId")).name("topLikedIds"))\
                        .show(20)

+-------+--------------------+
|likerId|         topLikedIds|
+-------+--------------------+
|    496|          [59139083]|
|   2142|   [5411213, 152962]|
|   3918|         [145254284]|
|   7880|           [2812004]|
|   9376|            [111195]|
|  12046|           [3824163]|
|  13832|             [15221]|
|  18944|             [15221]|
|  20135|  [75791, 174291546]|
|  20683|            [591512]|
|  20924|            [327458]|
|  23364|         [131719318]|
|  26087|            [482756]|
|  26623|            [216575]|
|  28577|    [471165, 167177]|
|  30654|             [55586]|
|  32445|            [715211]|
|  40515|[2070090, 27420, ...|
|  41751|            [770885]|
|  42468|              [1087]|
+-------+--------------------+
only showing top 20 rows



In [642]:
followers_posts_likes_df\
                        .groupby("likerId", "ownerId")\
                        .agg(F.count("ownerId").name("likes_count"))\
                        .orderBy("likerId", "likes_count", ascending=False)\
                        .withColumn("likersTop",row_number().over(windowSpec))\
                        .where(col("likersTop") < 11)\
                        .groupby("likerId")\
                        .agg(F.collect_list(col("ownerId")).name("topLikedIds")).write.parquet("hdfs:///tmp/zpinaev-244202/lab2_results_5.parquet")

## Task 6
### Find probable friends

In [643]:
likers = followers_posts_likes_df\
                        .where(col("likerId") != col("ownerId"))\
                        .groupby("likerId", "ownerId")\
                        .agg(F.count("ownerId").name("likes_count"))\
                        .orderBy("likerId", "likes_count", ascending=False)\
                        .withColumn("likersTop",row_number().over(windowSpec))\
                        .where(col("likersTop") < 2)\
                        .groupby(col("likerId").name("likerId_1"))\
                        .agg(F.collect_list(col("ownerId"))[0].name("topLikedId_1"))
                        

likers_2 = followers_posts_likes_df\
                        .where(col("likerId") != col("ownerId"))\
                        .groupby("likerId", "ownerId")\
                        .agg(F.count("ownerId").name("likes_count"))\
                        .orderBy("likerId", "likes_count", ascending=False)\
                        .withColumn("likersTop",row_number().over(windowSpec))\
                        .where(col("likersTop") < 2)\
                        .groupby(col("likerId").name("likerId_2"))\
                        .agg(F.collect_list(col("ownerId"))[0].name("topLikedId_2"))


friends = likers.join(likers_2, 
                    on=[likers.topLikedId_1 == likers_2.likerId_2, likers.likerId_1 == likers_2.topLikedId_2],
                    how = "inner")\
                .drop("topLikedId_1", "topLikedId_2")\
                .select(col("likerId_1").name("friend_1"), 
                        col("likerId_2").name("friend_2")).cache()

#### Showing probable friends without "self-likers"

In [644]:
friends.show(15)

+---------+---------+
| friend_1| friend_2|
+---------+---------+
|339234422|  1168939|
|152590008| 95578891|
|262884476|298700947|
|253967472|387540163|
|   394348| 50185099|
|176461282|173161528|
|145291328| 87779884|
| 71427292| 70730078|
| 99842535|245259928|
| 95811304|291661975|
|  3420917|   817770|
| 77495114|  7555534|
| 22829194|134347125|
|214091560|233174236|
|291309029|450562601|
+---------+---------+
only showing top 15 rows



In [645]:
friends.write.parquet("hdfs:///tmp/zpinaev-244202/lab2_results_6.parquet")

In [646]:
friends.unpersist()

DataFrame[friend_1: int, friend_2: int]

## Task 7
### Bonus task

In [604]:
# setting threshold to zero
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10)

In [605]:
# get current Threshold
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")

'10'

In [607]:
df1 = spark.range(10000)
df2 = spark.range(10000)

df1.join(df2, on=["id"]).count()

10000

<img src="Shuffle.png">

In [609]:
# setting threshold to zero
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0)

# making join and see, that we have here SortMergeJoin and Exchanges
# which means - we have our data shuffled during join
df1.join(df2, on=["id"]).explain()

== Physical Plan ==
*(5) Project [id#61449L]
+- *(5) SortMergeJoin [id#61449L], [id#61451L], Inner
   :- *(2) Sort [id#61449L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#61449L, 200), ENSURE_REQUIREMENTS, [id=#11836]
   :     +- *(1) Range (0, 10000, step=1, splits=2)
   +- *(4) Sort [id#61451L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#61451L], Exchange hashpartitioning(id#61449L, 200), ENSURE_REQUIREMENTS, [id=#11836]




In [611]:
# now we create bucketed table which will allow to avoid exchanging thus 
# avoiding shuffling

df = spark.range(1, 16000, 1, 16).select(
        F.col("id").alias("key"), F.rand(12).alias("value")
    )

df.write.bucketBy(16, "key").sortBy("value").saveAsTable(
    "bucketed", format="parquet", mode="overwrite"
)

In [612]:
# now we join two bucketed tables

t1 = spark.table('bucketed')
t2 = spark.table('bucketed')

# so here we see no exchanges, meaning there is no shuffling now and also
# no SortMergeJoin
t2.join(t1, 'key').explain()

== Physical Plan ==
*(3) Project [key#61490L, value#61491, value#61497]
+- *(3) SortMergeJoin [key#61490L], [key#61496L], Inner
   :- *(1) Sort [key#61490L ASC NULLS FIRST], false, 0
   :  +- *(1) Filter isnotnull(key#61490L)
   :     +- *(1) ColumnarToRow
   :        +- FileScan parquet default.bucketed[key#61490L,value#61491] Batched: true, DataFilters: [isnotnull(key#61490L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/jovyan/nfs-home/spark-warehouse/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
   +- *(2) Sort [key#61496L ASC NULLS FIRST], false, 0
      +- *(2) Filter isnotnull(key#61496L)
         +- *(2) ColumnarToRow
            +- FileScan parquet default.bucketed[key#61496L,value#61497] Batched: true, DataFilters: [isnotnull(key#61496L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/jovyan/nfs-home/spark-warehouse/bucketed], PartitionFilters: [], PushedF

In [613]:
t2.join(t1, 'key').count()

0

#### Here on computational dag we see only exchange connected with count

<img src="No_Shuffle.png">

In [647]:
posts_df.unpersist()
posts_likes_df.unpersist()
followers_df.unpersist()
followers_posts_df.unpersist()
followers_posts_likes_df.unpersist()

DataFrame[itemType: string, ownerId: int, itemId: int, likerId: int]