In [None]:
## Notebook property setup.
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import udf, col

import sys
import time
import os.path
import json
from datetime import datetime
from operator import add

from pyspark.sql import functions as F
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import preprocessor as p
import string

## Enable inline graphs
%matplotlib inline

## Display precision for pandas dataframe
pd.set_option('precision',10)

## Set up language classifier, used to filtered out non-English files
import langid
langid.set_languages(['de','fr','it','en','zh','ar','ja','ko', 'es','ms','tr','hi','bn','pa'])

workdir = "/mnt/381c2633-4d72-4555-9be8-19e922cce4a1/parquet_out"

## Reading DATA

###### Raw data are saved as json.gz format. We need to load and parse these data into spark RDD. Note that, the sc.textFile function's input directory could be either a file or a directory. Spark context will create partitions automatically. 

In [None]:
# 2013 data

data_2013_raw = sc.textFile("/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/Backup_tw_2013_7/2013-07/2013-07-*,\
/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/Backup_tw_2013_8/2013-08/2013-08-*,\
/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/Backup_tw_2013_9/2013-09/2013-09-*,\
/home/danielshi/Backup_tw_2013_2/2013-02/2013-02-*,\
/home/danielshi/Backup_tw_2013_3/2013-03/2013-03-*,\
/home/danielshi/Backup_tw_2013_6/2013-06/2013-06-*,\
/mnt/2b53fde0-61da-4eeb-a038-9910540ff9ad/Backup_tw_2013_10/2013-10/2013-10-*,\
/mnt/2b53fde0-61da-4eeb-a038-9910540ff9ad/Backup_tw_2013_11/2013-11/2013-11-*,\
/mnt/2b53fde0-61da-4eeb-a038-9910540ff9ad/Backup_tw_2013_12/2013-12/2013-12-*,\
/mnt/73dc2fdb-c49c-484c-bef8-7a6fc6abbc70/Backup_tw_2013_4/2013-04/2013-04-*,\
/mnt/73dc2fdb-c49c-484c-bef8-7a6fc6abbc70/Backup_tw_2013_5/2013-05/2013-05-*")

In [None]:
# 2014 data

data_2014_raw = sc.textFile("/mnt/73dc2fdb-c49c-484c-bef8-7a6fc6abbc70/Backup_tw_2014_1/2014-01/2014-01-*,\
/mnt/381c2633-4d72-4555-9be8-19e922cce4a1/Backup_tw_2014_2/2014-02/2014-02-*,\
/mnt/381c2633-4d72-4555-9be8-19e922cce4a1/Backup_tw_2014_3/2014-03/2014-03-*,\
/mnt/381c2633-4d72-4555-9be8-19e922cce4a1/Backup_tw_2014_4/2014-04/2014-04-*,\
/mnt/b93e71ec-8ddf-4033-bd42-770c05bc68aa/Backup_tw_2014_5/2014-05/2014-05-*,\
/mnt/b93e71ec-8ddf-4033-bd42-770c05bc68aa/Backup_tw_2014_6/2014-06/2014-06-*,\
/mnt/b93e71ec-8ddf-4033-bd42-770c05bc68aa/Backup_tw_2014_7/2014-07/2014-07-*,\
/mnt/4e8ba653-f2f0-4e18-a51e-458026833dee/Backup_tw_2014_8/2014-08/2014-08-*,\
/mnt/4e8ba653-f2f0-4e18-a51e-458026833dee/Backup_tw_2014_9/2014-09/2014-09-*,\
/mnt/4e8ba653-f2f0-4e18-a51e-458026833dee/Backup_tw_2014_10/2014-10/2014-10-*,\
/mnt/66e695cd-1a0c-4e3b-9a50-55e01b788529/Backup_tw_2014_11/2014-11/2014-11-*,\
/mnt/66e695cd-1a0c-4e3b-9a50-55e01b788529/Backup_tw_2014_12/2014-12/2014-12-*")

In [None]:
# Helper function to keep track the run time of a spark ops.
def getTime(start):
    sec = time.time() - start
    m, s = divmod(sec, 60)
    h, m = divmod(m, 60)
    print('Spark operation takes - %d:%02d:%02d which is %d seconds in total' % (h,m,s,sec))
    
    
# Remove invalid tweet which has length less than 1000.
def ValidJson(d):
    return len(d) > 1000

# load json object, if a line is invalid, substitute as an empty dict (which has len() == 0 )
def loadJson(d):
    try:
        js = json.loads(d)
        
    except ValueError as e:
        js = {}
        
    except Exception:
        js = {}
        
    return js

# Some tweet does not contain the 'lang' key, removing as invalid.
def containsLang(d):
    return 'lang' in d

# Raw filter using twitter's default language detection. Note that the accuracy is very low, therefore we need to apply a 
# second level language detection to further remove non-Eng tweets. 
def Eng_Label(d):
    return d['lang'] == 'en'

# Convert timestamp to unix time string, usful when finding hashtag bithdates later.
def getUnixTimeStamp(stamp):
    d = datetime.strptime(stamp,'%a %b %d %H:%M:%S +0000 %Y')
    unixtime = time.mktime(d.timetuple())
    return unixtime


# Parse out the releavant attributes from raw tweet to save memory,also converting hastags and mentions to space-separate lists.
def RawParser(d):
    processed = {"from_user":d['user']['screen_name'],
                 "from_id":d['user']['id'],
                 ## Split hashtag, we only want the text in hashtag, discard indices.
                 "tweet_id":d['id'],
                 "hashtag":" ".join([hash_string['text'] for hash_string in d['entities']['hashtags']]), 
                 ## Split terms in tweet text, remove \n and \r
                 "term": d['text'],
                 ## append loc_ to each word in location
                 #"location":['loc_' + s for s in d['user']['location'].split(" ")],
                 "location":d['user']['location'],
                 ## mention ids
                 "mention":" ".join([mention['screen_name'] for mention in d['entities']['user_mentions']]),
                 "create_time":getUnixTimeStamp(d['created_at'])
                }
    return processed



In [None]:
FeatureRDD_2013 = data_2013_raw.filter(ValidJson).map(loadJson).filter(lambda x: len(x) > 1).filter(containsLang).filter(Eng_Label).map(RawParser)

In [None]:
FeatureRDD_2014 = data_2014_raw.filter(ValidJson).map(loadJson).filter(lambda x: len(x) > 1).filter(containsLang).filter(Eng_Label).map(RawParser)

In [None]:
print("Year 2013 contains "+ str(FeatureRDD_2013.getNumPartitions())+" file partitions")
print("Year 2014 contains "+ str(FeatureRDD_2014.getNumPartitions())+" file partitions")

In [None]:
## Define Dataframe schema. Converting RDD to dataframe.
schema = StructType([StructField('create_time', DoubleType(), False),
                     StructField('from_id', StringType(), False),
                     StructField('from_user', StringType(), False),
                     StructField('hashtag', StringType(), True),
                     StructField('location', StringType(), True),
                     StructField('mention', StringType(), True),
                     StructField('term', StringType(), True),
                     StructField('tweet_id', StringType(), False)
                    ])
Feature_df_2013 = sqlContext.createDataFrame(FeatureRDD_2013, schema)
Feature_df_2014 = sqlContext.createDataFrame(FeatureRDD_2014, schema)

In [None]:
Feature_df_2013.show(2)

In [None]:
Feature_df_2014.show(2)

In [None]:
# Saving parsed 2013 data to parquet, save space, better performance
#Feature_df_2013.write.save(workdir+"/2013_Raw_Eng.parquet", format="parquet")

In [None]:
# Saving parsed 2014 data to parquet, save space, better performance
#Feature_df_2014.write.save(workdir+"/2014_Raw_Eng.parquet", format="parquet")

#### Reading parquet into rdd again for non-english filter.

In [None]:
#Ref: https://www.mail-archive.com/user@spark.apache.org/msg28820.html    changing user permission.
All_RDD2013 = spark.read.parquet(workdir+"/2013_Raw_parquet")

In [None]:
All_RDD2014 = spark.read.parquet(workdir+"/2014_Raw_parquet")

##### Saving dataframes above as an intermediate json file. Unless you need additional attributes, this should be the data the you work with for later processing steps. the raw data is not longer releavant at this time.
##### It is much more difficult to perfrom custom map reduce on dataframe; it is easier to work with RDDs. Also, It is easier to save data as json than load into rdd comparing to converting dataframe to RDD directly (will get Row type, not primative RDD).  Therefore, we save the same data as json format as well.

In [None]:
#All_RDD2013.write.json(workdir_eng+"/2013_Raw")

In [None]:
#All_RDD2014.write.json(workdir_eng+"/2014_Raw")

# Utilizing the langid package to filter out tweets which contains non english char in tweet terms.

##### There are various lanuage detection libraries for python, the problem comes down to speed and accuracy. Out of all packages I tested (apache Tika, langid, lang detect, guess-language, textblob), two stand out the most: langid and textblob. 

##### Langid utilize multithreading, and works great on short text (ex. tweet terms),however, the accuracy decreases when multiple languages are mixed up in the text. It takes 0.0003 seconds to check one line. It has a major drawback: the multi-threading module in this package does not seem to work well with spark. In other words, if we try to concurrently run multiple python jobs with this lib, it will create deadlocks. It will be interesting to look into the source code of this lib to understand why.

##### Textblob is based on NLTK, and it delivers the best accuracy among other packages available. When multiple lanuage appears in the text, the majority wins (bayesian). However, it takes an average of 0.1 seconds to process one line, which is way too slow for big data practice.

##### considering accuracy and efficiency, I used Langid here. Since it does not run well with spark, I create a separate bash script to trigger 2 python instance to process data for 2013 and 2014. 

##### SHOULD CONSIDERING RUN SPARK WITH TEXTBLOB FOR BETTER ACCURACY IF TIME PERMITTED. (~ Takes 8 days)

#### Run this command to execute 2013_Eng_Filter.py and 2014_Eng_Filter.py. It takes 5 full days to finish the parsing. (if line magic does not work, run it in terminal instead.)

In [None]:
## !bash Eng_Filter.sh

##### This script will filter the json data and return English tweets only. note that, the location and hashtag may still contain non-english words.

In [None]:
# 2013 data
data_2013_Eng = sc.textFile("/mnt/1e69d2b1-91a9-473c-a164-db90daf43a3d/Eng_Json/*")

In [None]:
# 2014 data
data_2014_Eng = sc.textFile("/mnt/2b53fde0-61da-4eeb-a038-9910540ff9ad/Eng_Json/*")

#### 2013 data contains 360048691 valid english tweets, 2014 data contains 455285530 valid engish tweets. We have a total of comapring to 829, 026, 458 in the Paper.

# Now we have English data, let's go ahead and clean it a little bit.

In [None]:
#1. If the tweet has an empty location field. 

In [None]:
invalid = data_2013_Eng.map(loadJson).filter(lambda x: "location" not in x)

In [None]:
loading = time.time()

invalid.count()

getTime(loading)

In [None]:
import preprocessor as p
import string

def translating(x):
    return x.encode('utf-8').lower().translate(None, string.punctuation)

def Cleansing(d):
    txt = p.clean(d['term'].encode('ascii', 'ignore')).replace(":", "").lower()

    if d['location'] == None:
        loc_term = "empty_location"
    elif d['location'].strip(' ') == '':
        loc_term = "empty_location"
    else:
        loc_term = 'loc_' + "_".join(map(translating, d['location'].strip(' ').split(" ")))
        
    if txt == None:
        terms = "empty_tweet"
    elif txt.strip(' ') == '':
        terms = "empty_tweet"
    else:
        terms = txt.encode('utf-8').translate(None, string.punctuation).strip(' ')
        
    processed = {"from_user":d['from_user'],
                 "from_id":d['from_id'],
                 "tweet_id":d['tweet_id'],
                 "hashtag":d['hashtag'], 
                 "term": terms,
                 "location":loc_term,
                 "mention":d['mention'],
                 "create_time":d['create_time']
                }
    return processed

In [None]:
cleanRDD_2013 = data_2013_Eng.map(loadJson).filter(lambda x: "location" in x).map(Cleansing)

In [None]:
cleanRDD_2014 = data_2014_Eng.map(loadJson).filter(lambda x: "location" in x).map(Cleansing)

In [None]:
## Define Dataframe schema.
schema = StructType([StructField('create_time', DoubleType(), False),
                     StructField('from_id', StringType(), False),
                     StructField('from_user', StringType(), False),
                     StructField('hashtag', StringType(), True),
                     StructField('location', StringType(), True),
                     StructField('mention', StringType(), True),
                     StructField('term', StringType(), True),
                     StructField('tweet_id', StringType(), False)
                    ])
Final_Feature_df_2013 = sqlContext.createDataFrame(cleanRDD_2013, schema)
Final_Feature_df_2014 = sqlContext.createDataFrame(cleanRDD_2014, schema)

In [None]:
Final_Feature_df_2014.show(3)

In [None]:
workdir = "/mnt/4e8ba653-f2f0-4e18-a51e-458026833dee/final_parquet"

In [None]:
# Saving 2013 data to parquet, save space, better performance
Final_Feature_df_2013.write.save(workdir+"/2013_Eng_parquet_clean", format="parquet")

In [None]:
# Saving 2014 data to parquet, save space, better performance
Final_Feature_df_2014.write.save(workdir+"/2014_Eng_parquet_fixed", format="parquet")