In [1]:
import os
import sys
import socket
import re
import numpy as np
import string
import warnings
from timeit import default_timer as timer
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,desc,row_number,col,year,month,dayofmonth,dayofweek,to_timestamp,size,isnan,lit,lower
import pyspark.sql.functions as F
from pyspark.sql.types import MapType, StringType, IntegerType, StructType, StructField, FloatType, ArrayType

# Config

In [2]:
country_code = "US"
country_name = "united-states"
language_code = "en"
print(country_code)

US


In [3]:
try:
    spark
except NameError:
    if 'samuel' in socket.gethostname().lower():
        print('Create Local SparkSession')
        spark = SparkSession.builder.config(
        "spark.driver.host", "localhost").appName(
        "get-tweets-mentions").getOrCreate()
    else:
        print('Create Cluster SparkSession')
        spark = SparkSession.builder.appName(
        "get-tweets-mentions").getOrCreate()
    
# Local
print('Hostname:', socket.gethostname())
if  'samuel' in socket.gethostname().lower():
    path_to_data='../../data'
# Cluster
else:
    path_to_data='/user/spf248/twitter/data'
    
path_to_tweets=os.path.join(path_to_data,'decahose/parsed/tweets/tweets-with-identified-location-'+country_code)
path_to_timelines=os.path.join(path_to_data,'timelines','extract',country_name)
path_to_mentions=os.path.join(path_to_data,'mentions')
path_to_keywords=os.path.join(path_to_data,'keywords/labor/lang')
print(path_to_tweets)
print(path_to_timelines)
print(path_to_mentions)
print(path_to_keywords)

Hostname: Samuels-MacBook-Pro.local
../../data/decahose/parsed/tweets/tweets-with-identified-location-US
../../data/timelines/extract/united-states
../../data/mentions
../../data/keywords/labor/lang


# Data Processing

In [4]:
print('Import Datasets')
tweets=spark.read.parquet(path_to_tweets)
timelines=spark.read.parquet(path_to_timelines)
df=tweets.unionByName(timelines)

Import Datasets


In [5]:
print("CACHE")
df.cache()

print("REPARTITION")
df.repartition(2000)

CACHE
REPARTITION


DataFrame[tweet_id: string, created_at: timestamp, text: string, tweet_lang: string, user_id: string, user_location: string, place_id: string, tweet_longitude: string, tweet_latitude: string]

In [6]:
print("DROP DUPLICATES")
df=df.drop_duplicates(subset=['tweet_id'])

print("LOWERCASE")
df = df.withColumn('text',lower(col('text')))

print("SELECT LANGUAGE")
df = df.where(df.tweet_lang==language_code)

print("EXTRACT YEAR AND MONTH")
df = df.withColumn('year', year('created_at').cast("string"))
df = df.withColumn('month', month('created_at').cast("string"))
df = df.withColumn('day', dayofmonth('created_at').cast("string"))

DROP DUPLICATES
LOWERCASE
SELECT LANGUAGE
EXTRACT YEAR AND MONTH


In [7]:
print('IMPORT MENTIONS')
mentions=spark.read.option('header','true').csv(os.path.join(path_to_keywords,language_code))
mentions=list(mentions.toPandas()['mention'])
print('# Mentions:',len(mentions))
print('\n'.join(mentions))

IMPORT MENTIONS
# Mentions: 22
anyone hiring?
i am unemployed
i got fired
i got laid off
i have been fired
i have been laid off
i have gotten laid off
i was fired
i was laid off
i've been fired
i've been laid off
i've gotten laid off
looking for a job
looking for a new job
lost my job
need a job
need a new job
need help finding a job
searching for a job
searching for a new job
who is hiring
who’s hiring


In [8]:
print('LOOKUP MENTIONS')
for mention in mentions:
    field_mention='n_'+mention.replace(' ','_').replace('?','_')
    df=df.withColumn(field_mention, df.text.contains(mention).cast("int"))

LOOKUP MENTIONS


In [9]:
print('APPEND CONSTANT')
df=df.withColumn('n_tweets', lit(1))

df=df.drop('tweet_id','created_at','tweet_lang','place_id','tweet_longitude','tweet_latitude','text')

print("COUNT TWEETS AND MENTIONS BY YEAR, MONTH, LOCATION, AND USER")
df=df.groupBy('year','month','day','user_location','user_id').sum()

APPEND CONSTANT
COUNT TWEETS AND MENTIONS BY YEAR, MONTH, LOCATION, AND USER


In [None]:
print('SAVE')
start = timer()

df.coalesce(1).write.mode("overwrite").json(os.path.join(path_to_mentions,country_code+'-json'))

end = timer()
print('DONE IN', round(end - start), 'SEC')

In [7]:
print('Computing Time (in hour):',round((1580139947404-1580138670777)/(1000*3600),2))

Computing Time (in hour): 0.35
