In [1]:
# Package import
import pyspark
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.pandas.config import set_option, reset_option, get_option

import matplotlib.pyplot as plt



In [2]:
# Spark Session
spark = SparkSession.builder.master("yarn").appName('Task_2').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-02-10 03:57:46,974 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Read json data
all_json = spark.read.format('json').load("/datasets/covid19/*.json")
all_json = all_json.filter(all_json.verified =="TRUE")   # Get verified users
all_json = all_json.filter(all_json.followers_count.rlike("^[0-9]*$")).cache()
df = all_json.to_pandas_on_spark()
df.head()

2022-02-10 03:59:08,926 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-02-10 03:59:08,961 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

Unnamed: 0,account_created_at,account_lang,country_code,created_at,favourites_count,followers_count,friends_count,is_quote,is_retweet,lang,place_full_name,place_type,reply_to_screen_name,reply_to_status_id,reply_to_user_id,retweet_count,screen_name,source,status_id,text,user_id,verified
0,2009-11-20T21:36:41Z,,,2020-03-31T00:00:00Z,2207,87295,830,False,False,es,,,,,,3,NewsweekEspanol,TweetDeck,1244776422079705092,“Tú eres tu pareja sexual más segura”: la guía...,91430932,True
1,2011-05-16T14:51:19Z,,,2020-03-31T00:00:00Z,18675,809756,1611,False,False,es,,,,,,4,tvnnoticias,Twitter Media Studio,1244776422209536002,Los taxis tienen un nuevo horario de circulaci...,299693451,True
2,2009-11-11T16:42:28Z,,,2020-03-31T00:00:00Z,9573,632332,1111,False,False,es,,,,,,0,TUDNUSA,TweetDeck,1244776421953806337,Sin futbol hasta que todos los jugadores se va...,89225092,True
3,2009-06-04T21:26:24Z,,,2020-03-31T00:00:00Z,5450,4873740,776,False,False,en,,,,,,3,ANCALERTS,Twitter Media Studio,1244776422582833154,Here are some tips on how you can best protect...,44728980,True
4,2009-06-04T21:26:24Z,,,2020-03-31T00:00:00Z,5450,4873740,776,False,False,en,,,,,,5,ANCALERTS,TweetDeck,1244776421257629698,New York welcomes hospital ship as coronavirus...,44728980,True


In [4]:
# Record the followers at the start and the end of this period
sorted_df = df.sort_values('created_at', ascending=True)

start = sorted_df.drop_duplicates(subset=['user_id'], keep='first')[['user_id','followers_count']]
start = start.set_index('user_id').sort_index()

end = sorted_df.drop_duplicates(subset=['user_id'], keep='last')[['user_id','followers_count']]
end = end.set_index('user_id').sort_index()

end.head()

                                                                                

Unnamed: 0_level_0,followers_count
user_id,Unnamed: 1_level_1
1000010898,6096
1000061352726224896,39454
100006904,55462
1000070043198160896,2235
1000093272,4409


In [6]:
# Get the user's screen name
user_name = sorted_df.drop_duplicates(subset=['user_id'], keep='last')[['user_id','screen_name']]
user_name = user_name.set_index('user_id').sort_index()

set_option("compute.ops_on_diff_frames", True)

In [7]:
# Combine the start and end to calculate the user increase
combined_followers = start.join(end, how="inner", lsuffix='_start', rsuffix='_end').sort_index()
combined_followers = combined_followers.join(user_name, how="inner")
active_users = df.groupby(['user_id']).nunique()['status_id']
combined_followers = combined_followers.join(active_users.sort_index(), how="inner")
combined_followers.head()

                                                                                

Unnamed: 0_level_0,followers_count_start,followers_count_end,screen_name,status_id
user_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1000061352726224896,39454,39454,RemziyeTosunHDP,1
100006904,49956,55462,AlexLopezMaya,12
1000126860104216576,6567,6567,estadaoverifica,1
100017027,9637,9637,YuriQuinones,1
100024370,40324,44964,OphirGottlieb,18


In [8]:
# Calculate the user increase
combined_followers['Increase'] = combined_followers['followers_count_end'].astype(int) - combined_followers['followers_count_start'].astype(int)

## Get the top 1000 activate
most_active_user = active_users.sort_values(ascending=False).iloc[999]
combined_followers['Top1000'] = (combined_followers['status_id']>=int(most_active_user)).astype(int)

combined_followers.head()

                                                                                

Unnamed: 0_level_0,followers_count_start,followers_count_end,screen_name,status_id,Increase,Top1000
user_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1000061352726224896,39454,39454,RemziyeTosunHDP,1,0,0
100006904,49956,55462,AlexLopezMaya,12,5506,0
1000126860104216576,6567,6567,estadaoverifica,1,0,0
100017027,9637,9637,YuriQuinones,1,0,0
100024370,40324,44964,OphirGottlieb,18,4640,0


In [11]:
# Get the result
result = combined_followers[['screen_name','Increase','Top1000']]

result = result.sort_values('Increase', ascending=False)[:1000]
result_pd = result.to_pandas()
result_pd = result_pd.set_index('screen_name',drop=True)
result_pd[result_pd['Top1000']==1].to_csv('Task2.csv')

result_pd.head()

                                                                                

Unnamed: 0_level_0,Increase,Top1000
screen_name,Unnamed: 1_level_1,Unnamed: 2_level_1
RahulGandhi,775966,0
WHO,770134,1
CNN,701252,0
HLGatell,642789,0
RajatSharmaLive,624546,0


In [None]:
SparkSession.stop(spark)