In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os
import re
import pandas as pd
from dateutil.parser import parse
from datetime import datetime, timedelta

def udf_wrapper(returntype):
    def udf_func(func):
        return udf(func, returnType=returntype)
    return udf_func

@udf_wrapper(StringType())
def tweet_source(s):
    return re.findall(r'\>(.+)\<', s)[0]

@udf_wrapper(StringType())
def blank_as_null(x):
    return when(x != None, col(x)).otherwise(None)

@udf_wrapper(StructType([StructField('user_city', StringType()),
                         StructField('user_province', StringType())
                        ]))
def location_lookup(loc):
        city_look = pd.read_csv('lookup/city_lookup.csv')
        city_look['keyword'] = city_look['keyword'].str.upper()
        province_look = pd.read_csv('lookup/province_lookup.csv')
        province_look['keyword'] = province_look['keyword'].str.upper()
        subdistrict_look = pd.read_csv('lookup/sub_district_lookup.csv')
        subdistrict_look['keyword'] = subdistrict_look['keyword'].str.upper()
        city = 'Undefined'
        prov = 'Undefined'
        if loc != 'UNDEFINED':
            for c in range(len(city_look)):
                if city_look['keyword'][c] in loc:
                    city = city_look['city'][c]
                    prov = city_look['state'][c]
                    break

            if ((city == 'Undefined') & (prov == 'Undefined')):
                for p in range(len(province_look)):
                    if province_look['keyword'][p] in loc:
                        city = province_look['city'][p]
                        prov = province_look['state'][p]
                        break    
                        
            if ((city == 'Undefined') & (prov == 'Undefined')):
                for s in range(len(subdistrict_look)):
                    if subdistrict_look['keyword'][s] in loc:
                        city = subdistrict_look['city'][s]
                        prov = subdistrict_look['state'][s]
                        break
        return {
                'user_city': city,
                'user_province': prov
               }

def tweet_parser(tweets_data):
    # Select used fields
    tweets = tweets_data.selectExpr('created_at_local as created_at',
                                    'id_str as tweet_id',
                                    'user.id_str as user_id',
                                    'user.screen_name as user_screenname',
                                    'user.friends_count as user_followings_count',
                                    'user.followers_count as user_followers_count',
                                    'user.statuses_count as user_tweets_count',
                                    'user.location as user_location',
                                    'user.verified as is_verified',
                                    'full_text as tweet_text', 
                                    'favorite_count', 
                                    'retweet_count',
                                    'source as tweet_source',
                                    'place.full_name as place_fullname',
                                    'place.name as place_name',
                                    'place.place_type as place_type',
                                    'place.country as place_country',
                                    'quoted_status.id_str as quoted_from_tweet_id', 
                                    'quoted_status.user.id_str as quoted_from_user_id',
                                    'quoted_status.user.screen_name as quoted_from_user_screenname',
                                    'in_reply_to_status_id_str as reply_to_tweet_id',
                                    'in_reply_to_user_id_str as reply_to_user_id',
                                    'in_reply_to_screen_name as reply_to_user_screenname',
                                    'retweeted_status.id_str as retweeted_from_tweet_id', 
                                    'retweeted_status.user.id_str as retweeted_from_user_id',
                                    'retweeted_status.user.screen_name as retweeted_from_user_screenname',)

    # Collect original tweets
    origin = tweets.where(col("quoted_from_tweet_id").isNull())\
                   .where(col("reply_to_tweet_id").isNull())\
                   .where(col("retweeted_from_tweet_id").isNull())\
                   .withColumn('tweet_type', lit('tweet'))\
                   .withColumn('tweet_source', tweet_source('tweet_source'))

    # Collect reply to tweets
    reply = tweets.where(col("quoted_from_tweet_id").isNull())\
                  .where(col("reply_to_tweet_id").isNotNull())\
                  .where(col("retweeted_from_tweet_id").isNull())\
                  .withColumn('tweet_type', lit('reply'))\
                  .withColumn('tweet_source', tweet_source('tweet_source'))

    # Collect quoted from tweets
    quote = tweets.where(col("quoted_from_tweet_id").isNotNull())\
                  .where(col("reply_to_tweet_id").isNull())\
                  .where(col("retweeted_from_tweet_id").isNull())\
                  .withColumn('tweet_type', lit('quote'))\
                  .withColumn('tweet_source', tweet_source('tweet_source'))

    # Collect retweeted from tweets
    retweet = tweets.where(col("quoted_from_tweet_id").isNull())\
                    .where(col("reply_to_tweet_id").isNull())\
                    .where(col("retweeted_from_tweet_id").isNotNull())\
                    .withColumn('tweet_type', lit('retweet'))\
                    .withColumn('tweet_source', tweet_source('tweet_source'))\
                    .withColumn('retweet_count', lit(0))\
                    .withColumn('favorite_count', lit(0))

    # Collect retweeted from quoted tweets
    retweet_from_quote = tweets.where(col("quoted_from_tweet_id").isNotNull())\
                               .where(col("reply_to_tweet_id").isNull())\
                               .where(col("retweeted_from_tweet_id").isNotNull())\
                               .withColumn('tweet_type', lit('retweet'))\
                               .withColumn('tweet_source', tweet_source('tweet_source'))\
                               .withColumn('retweet_count', lit(0))\
                               .withColumn('favorite_count', lit(0))

    # Collect reply from quoted tweets
    reply_from_quote = tweets.where(col("quoted_from_tweet_id").isNotNull())\
                             .where(col("reply_to_tweet_id").isNotNull())\
                             .where(col("retweeted_from_tweet_id").isNull())\
                             .withColumn('tweet_type', lit('reply'))\
                             .withColumn('tweet_source', tweet_source('tweet_source'))

    tweets = origin.union(reply).union(quote).union(retweet).union(retweet_from_quote).union(reply_from_quote)
            
    tweets = tweets.toPandas()
    
    is_id = ['tweet_id','user_id','quoted_from_tweet_id',
             'quoted_from_user_id','reply_to_tweet_id',
             'reply_to_user_id','retweeted_from_tweet_id',
             'retweeted_from_user_id']
    for i in is_id:
        tweets[i] = tweets[i].apply(lambda x: '_'+x if x != None else x)
    tweets = tweets[['created_at', 'tweet_id', 'user_id', 'user_screenname',
                     'user_followings_count', 'user_followers_count', 'user_tweets_count',
                     'user_location', 'is_verified', 'tweet_text', 'favorite_count',
                     'retweet_count', 'tweet_source', 'tweet_type', 
                     'place_fullname', 'place_name',
                     'place_type', 'place_country', 'quoted_from_tweet_id',
                     'quoted_from_user_id', 'quoted_from_user_screenname',
                     'reply_to_tweet_id', 'reply_to_user_id', 'reply_to_user_screenname',
                     'retweeted_from_tweet_id', 'retweeted_from_user_id',
                     'retweeted_from_user_screenname']]
    tweets = tweets.drop_duplicates(subset=['tweet_id']).reset_index(drop=True)
    tweets.to_csv('{}/spark_collection_of_tweets_{}_{}-{}.csv'.format(path, 
                                                         keyword_search, 
                                                         since.replace('-',''), 
                                                         until.replace('-','')),
                  index=False)
    return tweets

def user_parser(tweets_data):
    users = tweets_data.selectExpr('user.id_str as user_id',
                                   'user.name as user_fullname',
                                   'user.screen_name as user_screenname',
                                   'user.created_at as user_created_at',
                                   'user.friends_count as user_followings_count',
                                   'user.followers_count as user_followers_count',
                                   'user.statuses_count as user_tweets_count',
                                   'user.location as user_location',
                                   'user.verified as is_verified')
    users = users.dropDuplicates(subset=['user_id'])
    users = users.withColumn('user_location', location_lookup('user_location'))
    users = users.selectExpr(
                             'user_id',
                             'user_fullname',
                             'user_screenname',
                             'user_created_at',
                             'user_followings_count',
                             'user_followers_count',
                             'user_tweets_count',
                             'user_location.user_city as user_city',
                             'user_location.user_province as user_province',
                             'is_verified'
                            )
    users = users.toPandas()
    users['user_id'] = users['user_id'].apply(lambda x: '_'+x)
    users['user_created_at'] = users['user_created_at'].apply(lambda x: (parse(x, ignoretz=True) + timedelta(hours=7)).strftime('%Y-%m-%d %H:%M:%S'))
    users = users[['user_id','user_screenname','user_fullname','user_created_at',
                   'user_followings_count','user_followers_count','user_tweets_count',
                   'user_city','user_province','is_verified']]
    users.to_csv('{}/spark_collection_of_users_{}_{}-{}.csv'.format(path, 
                                                   keyword_search, 
                                                   since.replace('-',''), 
                                                   until.replace('-','')),
                 index=False)
    return users

if __name__ == '__main__':
    # Create spark session
    conf = pyspark.SparkConf().setAll([('spark.executor.memory', '8g'), 
                                       ('spark.driver.memory','10g'), 
                                       ('spark.driver.maxResultSize','20g'), 
                                       ("spark.ui.showConsoleProgress", "false")])
    sc = SparkSession.builder.config(conf=conf).master('local[6]').appName('Spark Tweets Parser').getOrCreate()
    
    keyword_search = input('searched keyword : ')
    since = input('since date [YYYY-MM-DD]: ')
    until = input('until date [YYYY-MM-DD]: ')

    # Load Tweets Data
    print('Load data...', end='\r')
    path = '{}/tweet_search_result/{}'.format(os.getcwd(), keyword_search)
    tweets_data = sc.read.json('{}/*.json'.format(path).split(','))
    print('Data {} loaded!'.format(keyword_search))

    # Filter by Date
    tweets_data = tweets_data.filter(col("created_at_local") >= "{} 00:00:00".format(since))\
                             .filter(col("created_at_local") <= "{} 23:59:59".format(until))

    # Parsing required fields for analysis
    print('Parsing tweets...', end='\r')
    tweets = tweet_parser(tweets_data)
    print('Data {} tweets parsed!'.format(keyword_search))

    print('Parsing users...', end='\r')
    users = user_parser(tweets_data)
    print('Data {} users parsed!'.format(keyword_search))
    
    sc.stop()

searched keyword : bawaslu
since date [YYYY-MM-DD]: 2019-05-14
until date [YYYY-MM-DD]: 2019-05-27
Data bawaslu loaded!
Data bawaslu tweets parsed!
Data bawaslu users parsed!
