# The New Pipeline

This is a rough draft of our new code. We're using PySpark's DataFrame and Pipeline API (for the most part) to re-implement what we've already done, and then move forward. It's been much more efficient (from a human-time-spent perspective; not necessarily from time/space complexity perspective) to use thus far. 

## Basics

In [3]:
import pyspark.sql as sql

ss = sql.SparkSession.builder.appName("TwitterTokenizing")\
                             .getOrCreate()

## Importing Tweet Data

In [4]:
import pyspark.sql.types as types

tweets_schema = types.StructType([
  types.StructField('id', types.LongType()),
  types.StructField('timestamp', types.LongType()),
  types.StructField('postalCode', types.StringType()),
  types.StructField('lon', types.DoubleType()),
  types.StructField('lat', types.DoubleType()),
  types.StructField('tweet', types.StringType()),
  types.StructField('user_id', types.LongType()),
  types.StructField('application', types.StringType()),
  types.StructField('source', types.StringType())
])
tweets_df = ss.read.csv('tweets2.csv',
                         escape='"',
                         header='true',
                         schema=tweets_schema,
                         mode='DROPMALFORMED')
tweets_df = tweets_df.drop('id') \
                     .drop('postalCode') \
                     .drop('user_id') \
                     .drop('application') \
                     .drop('source')

print('Dataframe columns:')
print(tweets_df.columns)
print('Sample row:')
print(tweets_df.take(1))
print('Number of tweets:')
print(tweets_df.count())

Dataframe columns:
['timestamp', 'lon', 'lat', 'tweet']
Sample row:
[Row(timestamp=1435723208, lon=-73.951206, lat=40.79435, tweet=u'Incident on #VariousLocalExpressBuses SB from 5th Avenue:106th Street to 5th Avenue: 57th Street http://t.co/KrLOmkAqcE')]
Number of tweets:
38012483


## Importing Tokenizer

In [5]:
import os
import sys

# From https://stackoverflow.com/a/36218558 .
def sparkImport(module_name, module_directory):
    """
    Convenience function. 
    
    Tells the SparkContext sc (must already exist) to load
    module module_name on every computational node before
    executing an RDD. 
    
    Args:
        module_name: the name of the module, without ".py". 
        module_directory: the path, absolute or relative, to
                          the directory containing module
                          module_Name. 
    
    Returns: none. 
    """
    module_path = os.path.abspath(
        module_directory + "/" + module_name + ".py")
    sc.addPyFile(module_path)

# Add all scripts from repository to local path. 
# From https://stackoverflow.com/a/35273613 .
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

import twokenize
sparkImport("twokenize", "..")

print ("Original tweet:")
example_tweet = u':( :( :( Incident on #VariousLocalExpressBuses SB from 5th Avenue:106th Street to 5th Avenue: 57th Street http://t.co/KrLOmkAqcE'
print(example_tweet)
print("Tokenized tweet:")
print(twokenize.tokenize(example_tweet))

Original tweet:
:( :( :( Incident on #VariousLocalExpressBuses SB from 5th Avenue:106th Street to 5th Avenue: 57th Street http://t.co/KrLOmkAqcE
Tokenized tweet:
[u':(', u':(', u':(', u'Incident', u'on', u'#VariousLocalExpressBuses', u'SB', u'from', u'5th', u'Avenue', u':', u'106th', u'Street', u'to', u'5th', u'Avenue', u':', u'57th', u'Street', u'http://t.co/KrLOmkAqcE']


## Tokenize the Data

In [6]:
import pyspark.sql.functions as functions

sql_tokenize = functions.udf(
    lambda tweet: twokenize.tokenize(tweet),
    returnType=types.ArrayType(types.StringType()))
tweets_df = tweets_df \
    .withColumn("tweet_tokens", sql_tokenize(tweets_df.tweet)) \
    .drop('tweet')

print(tweets_df.columns)
print(tweets_df.take(1))

['timestamp', 'lon', 'lat', 'tweet', 'tokens']
[Row(timestamp=1435723208, lon=-73.951206, lat=40.79435, tweet=u'Incident on #VariousLocalExpressBuses SB from 5th Avenue:106th Street to 5th Avenue: 57th Street http://t.co/KrLOmkAqcE', tokens=[u'Incident', u'on', u'#VariousLocalExpressBuses', u'SB', u'from', u'5th', u'Avenue', u':', u'106th', u'Street', u'to', u'5th', u'Avenue', u':', u'57th', u'Street', u'http://t.co/KrLOmkAqcE'])]


## Filter by Date

In [8]:
date_column = tweets_df['timestamp'].cast(types.TimestampType()) \
                                    .cast(types.DateType())

tweets_df = tweets_df.withColumn('date', date_column) \
                     .drop('timestamp')
print(tweets_df.columns)
print(tweets_df.take(1))

['lon', 'lat', 'tweet', 'tokens', 'date']
[Row(lon=-73.951206, lat=40.79435, tweet=u'Incident on #VariousLocalExpressBuses SB from 5th Avenue:106th Street to 5th Avenue: 57th Street http://t.co/KrLOmkAqcE', tokens=[u'Incident', u'on', u'#VariousLocalExpressBuses', u'SB', u'from', u'5th', u'Avenue', u':', u'106th', u'Street', u'to', u'5th', u'Avenue', u':', u'57th', u'Street', u'http://t.co/KrLOmkAqcE'], date=datetime.date(2015, 7, 1))]


In [10]:
import datetime

date_to_column = functions.lit(datetime.datetime(2016, 3, 3))
date_from_column = functions.lit(functions.date_sub(date_to_column, 31))
filtered_tweets_df = tweets_df.filter(
    ~(tweets_df.date < date_from_column)
    & (tweets_df.date < date_to_column))

print(filtered_tweets_df.count())
print(filtered_tweets_df.take(1))

151306
[Row(lon=-74.27470828, lat=40.59844873, tweet=u'This is me, YL a voice from JERSEY.. And im pushin this #blackpowermovement ! Links in my bio! #BlackHistoryMonth ! https://t.co/MLvAS9jIqj', tokens=[u'This', u'is', u'me', u',', u'YL', u'a', u'voice', u'from', u'JERSEY', u'..', u'And', u'im', u'pushin', u'this', u'#blackpowermovement', u'!', u'Links', u'in', u'my', u'bio', u'!', u'#BlackHistoryMonth', u'!', u'https://t.co/MLvAS9jIqj'], date=datetime.date(2016, 2, 1))]
