### SPARK INIT

In [2]:
import pyspark 
from pyspark import broadcast
from pyspark.sql import SparkSession

In [3]:
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType, StringType

spark = SparkSession.builder.master('local[2]').appName('ILDAR-HW').getOrCreate()

### LOADING DATA

In [4]:
followers = spark.read.parquet('./bigdata20/followers.parquet')

followers_posts = spark.read.json('./bigdata20/followers_posts_api_final.json')
posts = spark.read.json('./bigdata20/posts_api.json')
posts_likes = spark.read.parquet('./bigdata20/posts_likes.parquet')

followers_posts_likes = spark.read.parquet('./bigdata20/followers_posts_likes.parquet')

### TASK №1

In [6]:
posts_likes = posts \
    .select('key','likes.count') \
        .sort(desc("count")) \
            .limit(20)
follow_posts_likes = followers_posts \
    .select('key','likes.count') \
        .sort(desc("count")) \
            .limit(20)
posts_likes.write \
    .json('./Ildar/1_task/posts_likes.json')
follow_posts_likes.write \
    .json('./Ildar/1_task/follow_posts_likes.json')

In [7]:
posts_comm = posts \
    .select('key','comments.count') \
        .sort(desc("count")) \
            .limit(20)
follow_posts_comm = followers_posts \
    .select('key','comments.count') \
        .sort(desc("count")) \
            .limit(20)
posts_comm.write \
    .json('./Ildar/1_task/posts_comm.json')
follow_posts_comm.write \
    .json('./Ildar/1_task/follow_posts_comm.json')

In [8]:
posts_rep = posts \
    .select('key','reposts.count') \
        .sort(desc("count")) \
            .limit(20)
follow_posts_rep = followers_posts \
    .select('key','reposts.count') \
        .sort(desc("count")) \
            .limit(20)
posts_rep.write \
    .json('./Ildar/1_task/posts_rep.json')
follow_posts_rep.write \
    .json('./Ildar/1_task/follow_posts_rep.json')

### TASK №2

In [9]:
followers_likes = followers_posts_likes.groupby('likerId').agg(count('*').alias('likes_count')).sort(desc("likes_count")).limit(20)
followers_likes.write.json('./Ildar/2_task/followers_likes.json')
followers_own = followers_posts.filter(col('copy_history.owner_id').isNotNull()).groupby('copy_history.owner_id').agg(count('*').alias('own_count')).sort(desc("own_count")).limit(20)
followers_own.write.json('./Ildar/2_task/followers_own.json')

### TASK №3

In [10]:
posts_followers_posts = posts.select('owner_id', col('id').alias('post_id')).join(followers_posts.select(col('owner_id').alias('user_id'), 'copy_history'), posts.owner_id == followers_posts.copy_history.owner_id.getItem(0)).select('post_id', 'user_id').groupby('post_id').agg(collect_set('user_id'))
posts_followers_posts.write.json('./Ildar/3_task/posts_followers_posts.json')

### TASK №4

In [11]:
!pip install emoji



In [12]:
import emoji

In [13]:
sc = spark.sparkContext
emojis = sc.broadcast(emoji.UNICODE_EMOJI)

emoji_udf = udf(lambda text: [symb for symb in text if symb in emojis.value], ArrayType(StringType()))

posts_text = posts.filter(posts.text != '').select('id','text')
text_emoji = posts_text.select("id", emoji_udf("text").alias("emojis"))
text_emoji.write.json('./Ildar/4_task/text_emoji.json')

### TASK №5

In [15]:
likes_without_self = followers_posts_likes.filter(followers_posts_likes.ownerId != followers_posts_likes.likerId)
likes = likes_without_self.groupBy('likerId').agg(collect_set('ownerId').alias('ownerIdSet')).select(col('likerId').alias('likerUserId'), 'ownerIdSet')

final_likes = likes.join(likes_without_self, likes.likerUserId == likes_without_self.ownerId)
aggregated_likes = final_likes.withColumn("IsFriends", expr("array_contains(ownerIdSet, likerId)"))
friends = aggregated_likes.filter(aggregated_likes.IsFriends == True).groupBy(col('ownerId').alias('user')).agg(collect_set('likerId').alias('friends'))
friends.write.json('./Ildar/5_task/friends.json')

### TASK №6

In [16]:
fans = aggregated_likes.filter(aggregated_likes.IsFriends == False).groupBy(col('ownerId').alias('user')).agg(collect_set('likerId').alias('fans'))
fans.write.json('./Ildar/6_task/fans.json')