<h3>Init Spark</h3>

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext

conf = SparkConf().setAppName('appName').setMaster('local')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

In [3]:
from pyspark.sql.functions import col, isnan, when, trim, sort_array, collect_list, explode

<h3>Define data path</h3>

In [4]:
import os

data_dir = 'Set your path here'
followers = os.path.join(data_dir, 'followers.parquet')
followers_posts = os.path.join(data_dir, 'followers_posts_api_final.json')
followers_posts_likes = os.path.join(data_dir, 'followers_posts_likes.parquet')
posts = os.path.join(data_dir, 'posts_api.json')
posts_likes = os.path.join(data_dir, 'posts_likes.parquet')

<h3>Load dataset</h3>

In [5]:
posts_df = sqlContext.read.json(posts)

In [6]:
posts_likes_df = spark.read.load(posts_likes)

In [7]:
followers_df = spark.read.load(followers)

In [8]:
followers_posts_df = sqlContext.read.json(followers_posts)

In [9]:
followers_posts_likes_df = spark.read.load(followers_posts_likes)

<h1>Task 1</h1>

<h3>Get top posts</h3>

In [9]:
posts_df = posts_df.na.drop(subset=["date"])

In [10]:
prefix = 'task1_posts'

column = 'likes'
answer = posts_df.orderBy(posts_df[column].desc(), asc=False).limit(20).toPandas()
answer_json = answer.to_json()
with open('{}_{}.json'.format(prefix, column), 'w') as f:
    f.write(answer_json)
    
column = 'reposts'
answer = posts_df.orderBy(posts_df[column].desc(), asc=False).limit(20).toPandas()
answer_json = answer.to_json()
with open('{}_{}.json'.format(prefix, column), 'w') as f:
    f.write(answer_json)
    
column = 'comments'
answer = posts_df.orderBy(posts_df[column].desc(), asc=False).limit(20).toPandas()
answer_json = answer.to_json()
with open('{}_{}.json'.format(prefix, column), 'w') as f:
    f.write(answer_json)

In [11]:
prefix = 'task1_followers'

column = 'likes'
answer = followers_posts_df.orderBy(followers_posts_df[column].desc(), asc=False).limit(20).toPandas()
answer_json = answer.to_json()
with open('{}_{}.json'.format(prefix, column), 'w') as f:
    f.write(answer_json)
    
column = 'reposts'
answer = followers_posts_df.orderBy(followers_posts_df[column].desc(), asc=False).limit(20).toPandas()
answer_json = answer.to_json()
with open('{}_{}.json'.format(prefix, column), 'w') as f:
    f.write(answer_json)
    
column = 'comments'
answer = followers_posts_df.orderBy(followers_posts_df[column].desc(), asc=False).limit(20).toPandas()
answer_json = answer.to_json()
with open('{}_{}.json'.format(prefix, column), 'w') as f:
    f.write(answer_json)

<h1>Task 2</h1>

<h3>Get top followers</h3>

In [14]:
likes_count = posts_likes_df.groupby('likerId').count()
answer = likes_count.orderBy(col('count').desc(), asc=False).limit(20)
answer_json = answer.toPandas().to_json()
with open('task2_by_likes.json', 'w') as f:
    f.write(answer_json)

In [15]:
df_exploded = followers_posts_df.withColumn('copy_history', explode('copy_history'))
answer = df_exploded.filter(df_exploded.copy_history.owner_id == '-94').groupby('owner_id').count().sort('count', ascending=False).limit(20)
answer_json = answer.toPandas().to_json()
with open('task2_by_reposts.json', 'w') as f:
    f.write(answer_json)

<h1>Task 3</h1>

<h3>Get posts and reposts</h3>

In [16]:
df_exploded = followers_posts_df.withColumn('copy_history', explode('copy_history'))

In [17]:
filtered = df_exploded.filter(df_exploded.copy_history.owner_id == '-94')
selected = filtered.select(filtered.copy_history.id.alias("group_post_id"), filtered.id.alias("user_post_id"))
selected = selected.sort('copy_history.id')

In [18]:
answer = selected.groupby("group_post_id").agg(collect_list("user_post_id")).sort('group_post_id')
answer_json = answer.toPandas().to_json()
with open('task3_answer.json', 'w') as f:
    f.write(answer_json)

<h1>Task 4</h1>

<h3>Load emojis</h3>

In [10]:
import emoji
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
import pandas as pd

emojis_df = pd.read_csv('emoji_sentiment.csv')
emojis_df['Sentiment'] = emojis_df[['Negative', 'Neutral', 'Positive']].idxmax(axis=1)
emojis_df['unicode_emoji'] = [chr(int(emoji, 16)) for emoji in emojis_df['Unicode codepoint'].tolist()]
positive_emojis = emojis_df[emojis_df['Sentiment'] == 'Positive']['unicode_emoji'].values.tolist()
negative_emojis = emojis_df[emojis_df['Sentiment'] == 'Negative']['unicode_emoji'].values.tolist()

unicode_positive_emojis = {}
unicode_negative_emojis = {}
unicode_neutral_emojis = {}

for key, value in emoji.UNICODE_EMOJI.items():
    if key in positive_emojis:
        unicode_positive_emojis[key] = value
    elif key in negative_emojis:
        unicode_negative_emojis[key] = value
    else:
        unicode_neutral_emojis[key] = value

<h3>Extract emojis without brodcasing</h3>

In [11]:
def extract_positive_emojis(_str):
    return ''.join(c for c in _str if c in unicode_positive_emojis)

def extract_negative_emojis(_str):
    return ''.join(c for c in _str if c in unicode_negative_emojis)

def extract_neutral_emojis(_str):
    return ''.join(c for c in _str if c in unicode_neutral_emojis)

extract_positive_emojis_udf = udf(lambda _str: extract_positive_emojis(_str), StringType())
extract_negative_emojis_udf = udf(lambda _str: extract_negative_emojis(_str), StringType())
extract_neutral_emojis_udf = udf(lambda _str: extract_neutral_emojis(_str), StringType())

<h3>Extract emojis with brodcasing</h3>

In [12]:
def extract_emojis(_str, emojis_list):
    return ''.join(c for c in _str if c in emojis_list)

def udf_extract_emojis(emojis_list):
    return udf(lambda x : extract_emojis(x, emojis_list))


b_positive_emojis = sc.broadcast(sc.parallelize(unicode_positive_emojis).collect())
b_negative_emojis = sc.broadcast(sc.parallelize(unicode_negative_emojis).collect())
b_neutral_emojis = sc.broadcast(sc.parallelize(unicode_neutral_emojis).collect())
b_emojis = sc.broadcast(sc.parallelize(emoji.UNICODE_EMOJI).collect())

<h3>Process posts</h3>

In [18]:
processed_df = posts_df.filter(posts_df.text != '')
processed_df = processed_df.withColumn('positive_emojis', udf_extract_emojis(b_positive_emojis.value)(col('text')))
processed_df = processed_df.withColumn('negative_emojis', udf_extract_emojis(b_negative_emojis.value)(col('text')))
processed_df = processed_df.withColumn('neutral_emojis', udf_extract_emojis(b_neutral_emojis.value)(col('text')))
processed_df = processed_df.select('id', 'text', 'positive_emojis', 'negative_emojis', 'neutral_emojis')
processed_df.show()

+-----+--------------------+---------------+---------------+--------------+
|   id|                text|positive_emojis|negative_emojis|neutral_emojis|
+-----+--------------------+---------------+---------------+--------------+
|17273|[Технологические ...|               |               |              |
|17301|Дорогие друзья, м...|               |               |              |
|17295|[Первый выпуск Мо...|               |               |              |
|17293|[Александр Ишевск...|               |               |              |
|17277|Для желающих увид...|               |               |              |
|17276|[Профессоры Пауль...|               |               |              |
|17326|Подготовка к праз...|               |               |              |
|17323|[Татьяна Богомазо...|               |               |              |
|17322|[У европейской и ...|               |               |              |
|17365|[«Рождественские ...|               |               |              |
|17364|[Итог

In [29]:
answer_df = processed_df.filter((processed_df.positive_emojis != '') | (processed_df.negative_emojis != '') | (processed_df.neutral_emojis != ''))
answer_df.count()

739

In [30]:
answer_json = answer_df.toPandas().to_json()
with open('task4_answer.json', 'w') as f:
    f.write(answer_json)

Py4JJavaError: An error occurred while calling o1027.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 26.0 failed 1 times, most recent failure: Lost task 0.0 in stage 26.0 (TID 47, DESKTOP-P9LJEH2, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark-3.0.0-preview2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 577, in main
  File "C:\spark-3.0.0-preview2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\serializers.py", line 835, in read_int
    length = stream.read(4)
  File "C:\Users\mstrHW\Miniconda3\lib\socket.py", line 589, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:484)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:437)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:321)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1989)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1977)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1976)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:956)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:956)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:956)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2155)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2144)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:758)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2137)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2156)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:431)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3482)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2581)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3472)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3468)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2581)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2788)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:297)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:334)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark-3.0.0-preview2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 577, in main
  File "C:\spark-3.0.0-preview2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\serializers.py", line 835, in read_int
    length = stream.read(4)
  File "C:\Users\mstrHW\Miniconda3\lib\socket.py", line 589, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:484)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:437)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:321)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	... 1 more


<h1>Task 5</h1>

<h3>Filter self-likes</h3>

In [31]:
followers_posts_likes_df = followers_posts_likes_df.select(
    followers_posts_likes_df.ownerId,
    followers_posts_likes_df.likerId
).where(followers_posts_likes_df.ownerId != followers_posts_likes_df.likerId).distinct()

<h3>Get friends ids</h3>

In [32]:
users_1 = followers_posts_likes_df.alias('users_1')
users_2 = followers_posts_likes_df.alias('users_2')

friends_data = users_1.join(users_2,
    (col('users_1.ownerId') == col('users_2.likerId')) &
    (col('users_1.likerId') == col('users_2.ownerId'))
)

In [34]:
import pyspark.sql.functions as pyspark_functions

In [35]:
friends_data = friends_data.select(
    col('users_1.ownerId').alias('user'),
    col('users_2.ownerId').alias('friends')
).groupby('user').agg(pyspark_functions.collect_list('friends').alias('friends'))

In [36]:
friends_data.show()

+-----+--------------------+
| user|             friends|
+-----+--------------------+
|  637|[1567, 94494, 815...|
| 1119|           [2004962]|
| 1127|[27857, 317799, 2...|
| 1174|[2134327, 139499389]|
| 1567|  [637, 2212, 94494]|
| 2212|              [1567]|
| 4023|[1548876, 1034920...|
| 7373|[180092, 317799, ...|
| 8909|   [27905, 12742533]|
|12671|[12977, 234753, 3...|
|12977|[12671, 269559, 3...|
|15221|[25554, 50601, 36...|
|18589|             [18751]|
|18751|             [18589]|
|18994|             [45781]|
|20972|             [29840]|
|21571|            [410199]|
|22304|[27857, 507824, 3...|
|24147|[27419, 81102, 42...|
|24770|           [4656597]|
+-----+--------------------+
only showing top 20 rows



In [37]:
answer_json = friends_data.toPandas().to_json()
with open('task5_answer.json', 'w') as f:
    f.write(answer_json)

<h1>Task 6</h1>

<h3>Get friends ids</h3>

In [38]:
users_1 = followers_posts_likes_df.alias('users_1')
users_2 = followers_posts_likes_df.alias('users_2')

friends_data = users_1.join(users_2,
    (col('users_1.ownerId') == col('users_2.likerId')) &
    (col('users_1.likerId') == col('users_2.ownerId')),
    'left_outer'
)

In [39]:
friends_data = friends_data.select(
    col('users_1.ownerId').alias('user'),
    col('users_1.likerId').alias('fan')
).groupby('user').agg(pyspark_functions.collect_list('fan').alias('fans'))

In [40]:
friends_data.show()

+----+--------------------+
|user|                fans|
+----+--------------------+
| 637|[143, 254, 450, 4...|
|1087|[662, 1818, 4177,...|
|1119|[1099, 4836, 2004...|
|1127|[1567, 1679, 2282...|
|1174|[7627, 19755, 272...|
|1567|[637, 655, 771, 1...|
|1632|[42940, 68535, 75...|
|1669|[1017, 1216, 6107...|
|2212|[1099, 1405, 1567...|
|2718|[1105, 15275, 409...|
|2976|[2541, 3448, 5304...|
|3420|[14, 17, 197, 243...|
|3768|[25068229, 386898...|
|3868|[19249, 23678, 26...|
|3972|[582, 1410, 34660...|
|4023|[4734, 21272, 283...|
|4107|[6707, 7439, 3349...|
|4990|[277835, 286893, ...|
|5630|[105815, 7696061,...|
|5648|[1614, 1991, 3757...|
+----+--------------------+
only showing top 20 rows



In [41]:
answer_json = friends_data.toPandas().to_json()
with open('task6_answer.json', 'w') as f:
    f.write(answer_json)