# Concert Tweet Classifier

## Import Necesary Packages

In [1]:
import sparknlp
from pyspark.sql.types import *
from pyspark.sql.functions import count, when, col
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import (Tokenizer, Normalizer,
                                LemmatizerModel, StopWordsCleaner)
from pyspark.ml import Pipeline
from nltk.corpus import stopwords
import pyspark.sql.functions as F
import pandas as pd
from sparknlp.pretrained import PretrainedPipeline

## Start the spark-NLP session

In [2]:
spark = sparknlp.start()

In [3]:
spark.sparkContext.defaultParallelism

4

In [163]:
# adjust show output format to pandas-like
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

# support converting pandas to spark
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

## Read the data

In the first go, there were 25k rows of null - where the schema did not match the data. I decided to do some quick cleaning.

In [5]:
def remove_extra_seps(in_file, out_file, sep):
    """removes newline characters that come before the line reaches four segments(3 separators)
    and combines "middle sections" with extra separators into a single segment by removing the separators.
    
    Args:
        in_file: path to read file
        out_file: path to write file
        sep: separator/delimitor
    """
    n_chunks = 4
    
    with open(in_file, 'r') as rf:
        with open(out_file, 'w') as wf:
            while True:
                line = rf.readline()
                
                # if end of file
                if line == '':
                    break
                    
                # if line has less than n_sep, strip the newline and add the next line
                if len(line.split(sep)) < n_chunks:
                    line = line.strip('\n')
                    line += rf.readline()
                
                wf.write(line)

In [6]:
remove_extra_seps('../../data/test_set_tweets.txt',
                      '../../data/test_set_tweets_clean.txt',
                     '\t')
remove_extra_seps('../../data/training_set_tweets.txt',
                      '../../data/training_set_tweets_clean.txt',
                     '\t')

In [7]:
# set the schema
tweet_schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("t_id", StringType(), True),
    StructField("t_text", StringType(), True),
    StructField("t_dt", TimestampType(), True)
    ])

In [8]:
tweets_test = spark.read.csv('../../data/test_set_tweets_clean.txt', 
                              sep='\t',
                              schema=tweet_schema,
                              header="false")

In [9]:
tweets_training = spark.read.csv('../../data/training_set_tweets_clean.txt', 
                                 sep="\t", 
                                 schema=tweet_schema,
                                 header='false')

Since our data is unlabeled for our task, these test/train splits are not particularly useful, but a vestige of the original data set and purpose. We'll combine them.

In [10]:
tweets = tweets_test.union(tweets_training)

#### Future: Consider reading the data as a single column and then parsing. Compare outcome / number of tweets retrieved to that with the csv reading

## Basic Info About the Data Set

### Tweets

In [13]:
tweets.select('*').show(5)

+--------+----------+--------------------+-------------------+
| user_id|      t_id|              t_text|               t_dt|
+--------+----------+--------------------+-------------------+
|36287076|7277196841|@CHELLEYCHELLEZ w...|2010-01-01 13:52:38|
|36287076|7276402546| @iDejaTia lol u are|2010-01-01 13:18:14|
|36287076|7276054760|@iDejaTia same he...|2010-01-01 13:03:09|
|36287076|7276049365| @iDejaTia same here|2010-01-01 13:02:56|
|36287076|7274735472|@EmpressNRG Wah g...|2010-01-01 12:06:18|
+--------+----------+--------------------+-------------------+
only showing top 5 rows



In [14]:
tweets.select([count(when(col(c).isNull(), c)).alias(c) for c in 
        tweets.columns]).show()

# print("""+-------+-----+------+-----+
# |user_id| t_id|t_text| t_dt|
# +-------+-----+------+-----+
# |  33289|33179| 32631|53805|
# +-------+-----+------+-----+""")

+-------+-----+------+-----+
|user_id| t_id|t_text| t_dt|
+-------+-----+------+-----+
|  34555|34490| 34232|54671|
+-------+-----+------+-----+



In [15]:
tweets.count()

# print(8884863)

8884863

In [16]:
# tweets.distinct().count()
print(8850656)

8850656


It looks like the time stamp can be parsed from the end of the tweet text for many of these "null" datetimes.

In [17]:
tweets.filter(col('t_dt').isNull()).take(5)

[Row(user_id=25513575, t_id='10334442280', t_text='', t_dt=None),
 Row(user_id=None, t_id=None, t_text=None, t_dt=None),
 Row(user_id=25513575, t_id='10333612651', t_text='', t_dt=None),
 Row(user_id=None, t_id=None, t_text=None, t_dt=None),
 Row(user_id=None, t_id=None, t_text=None, t_dt=None)]

In [18]:
tweets = tweets.withColumn('datetime', 
                           F.when(F.col('t_dt').isNull(), 
                                  F.to_date(F.substring('t_text', -19, 19)))
                           .otherwise(F.col('t_dt'))
                          )

In [19]:
tweets = tweets.withColumn('t_text', 
                           F.when(F.col('t_dt').isNull(), 
                                  F.expr('substring(t_text, 1, length(t_text)-20)'))
                           .otherwise(F.col('t_text'))
                           )

In [20]:
tweets = tweets.withColumn('t_dt', F.col('datetime')).drop('datetime')

In [21]:
# save as parquet and reload
# tweets.write.parquet('../../data/tweets.parquet')
tweets = spark.read.parquet('../../data/tweets.parquet')

In [22]:
tweets.select([count(when(col(c).isNull(), c)).alias(c) for c in 
        tweets.columns]).show()

+-------+-----+------+-----+
|user_id| t_id|t_text| t_dt|
+-------+-----+------+-----+
|  34555|34490| 34232|54671|
+-------+-----+------+-----+



In [23]:
tweets.count()

8884863

In [24]:
tweets = tweets.dropna(how='any', subset=['t_text'])

In [25]:
tweets.filter(col('t_dt').isNull()).take(5)

[Row(user_id=25513575, t_id='10334442280', t_text='', t_dt=None),
 Row(user_id=25513575, t_id='10333612651', t_text='', t_dt=None),
 Row(user_id=16198727, t_id='6899029209', t_text='This vid cracked me up! haha I w', t_dt=None),
 Row(user_id=20106865, t_id='10362030419', t_text="Ladies and gentlemen... come and join me.  It'", t_dt=None),
 Row(user_id=20106865, t_id='10005503765', t_text='I am talking #Survivor RIGHT NOW in stickam', t_dt=None)]

Clearly I could do some more/better data engineering here, but for this exercise, I'm going to move on, dropping any records with null values or t_text with empty strings

In [26]:
tweets = tweets.dropna(how='any')

In [27]:
tweets = tweets.filter(~(tweets.t_text == ""))

In [28]:
tweets.count()

8829912

# Concert tweets - Classifier

I am deciding to focus on english tweets for now. (may add spanish, others in the future based on presence in the data set).

In [29]:
eng_stopwords = stopwords.words('english')

setting up the pieces of my pipeline to extract text info from the tweets (we'll use a pretrained pipeline later)

In [30]:
documentAssembler = DocumentAssembler() \
     .setInputCol('t_text') \
     .setOutputCol('document')
tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('token')
normalizer = Normalizer() \
     .setInputCols(['token']) \
     .setOutputCol('normalized') \
     .setLowercase(True)
lemmatizer = LemmatizerModel.pretrained() \
     .setInputCols(['normalized']) \
     .setOutputCol('lemma')
stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['lemma']) \
     .setOutputCol('clean_lemma') \
     .setCaseSensitive(False) \
     .setStopWords(eng_stopwords)
finisher = Finisher() \
     .setInputCols(['clean_lemma']) \
     .setCleanAnnotations(False)

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [31]:
pipeline = Pipeline() \
     .setStages([
           documentAssembler,
           tokenizer,
           normalizer,
           lemmatizer,
           stopwords_cleaner,
           finisher
     ])

In [32]:
tweets = pipeline.fit(tweets).transform(tweets)

In [33]:
tweets.columns

['user_id',
 't_id',
 't_text',
 't_dt',
 'document',
 'token',
 'normalized',
 'lemma',
 'clean_lemma',
 'finished_clean_lemma']

## Basic Classifier: contains the word concert

In [34]:
concert_tweets = tweets.withColumn('concert', F.array_contains('finished_clean_lemma', 'concert'))
concert_tweets = concert_tweets.filter(concert_tweets['concert'] == 'true')

In [35]:
concert_tweets.select('t_text').take(3)

[Row(t_text="@herRoyalStarnes I just thought of the history broke down bmw's on bdays free concert tickets in the nose bleeds p (cont) http://tl.gd/4pp7k"),
 Row(t_text='Y is me @RandiICandy, @EpitomeOfADiva, and Leila Bunny n here singing Mary J like we Mary J. We in concert yall buy a ticket yall.'),
 Row(t_text="@beccalexis sup Bee? How'd the shoot go? Will you be at the concert tonight?")]

In [36]:
# concert_tweets.count()

print(12477)

12477


In [37]:
concert_tweets.columns

['user_id',
 't_id',
 't_text',
 't_dt',
 'document',
 'token',
 'normalized',
 'lemma',
 'clean_lemma',
 'finished_clean_lemma',
 'concert']

In [38]:
concert_tweets.select("t_text").show(30, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------+
|t_text                                                                                                                                      |
+--------------------------------------------------------------------------------------------------------------------------------------------+
|@herRoyalStarnes I just thought of the history broke down bmw's on bdays free concert tickets in the nose bleeds p (cont) http://tl.gd/4pp7k|
|Y is me @RandiICandy, @EpitomeOfADiva, and Leila Bunny n here singing Mary J like we Mary J. We in concert yall buy a ticket yall.          |
|@beccalexis sup Bee? How'd the shoot go? Will you be at the concert tonight?                                                                |
|RT @BoomKack: Janet was at Lady Gaga concert tonight she is everything!!!!!! Can't touch her!                                               |

## Basic Classifier: contains the word concert or similar words

In [39]:
concert_plus = tweets.withColumn('concert', F.array_contains('finished_clean_lemma', 'concert'))\
                     .withColumn('tour', F.array_contains('finished_clean_lemma', 'tour'))\
                     .withColumn('gig', F.array_contains('finished_clean_lemma', 'gig'))\
                     .withColumn('show', F.array_contains('finished_clean_lemma', 'show'))
concert_plus = concert_plus.withColumn('concert_like', col('concert')|col('tour')|col('gig'))
concert_plus = concert_plus.filter(concert_plus.concert_like == True)

In [40]:
concert_plus.select("t_text").show(30, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------+
|t_text                                                                                                                                      |
+--------------------------------------------------------------------------------------------------------------------------------------------+
|@Lauralu2u yeps I had curve than the tour.   Love my Droid                                                                                  |
|@herRoyalStarnes I just thought of the history broke down bmw's on bdays free concert tickets in the nose bleeds p (cont) http://tl.gd/4pp7k|
|Y is me @RandiICandy, @EpitomeOfADiva, and Leila Bunny n here singing Mary J like we Mary J. We in concert yall buy a ticket yall.          |
|@joeymcintyre You've got to be a LITTLE bit silly on tour or you wouldn't be YOU! ;)                                                        |

Looking at this super small sample, it doesn't seem like these alternate words are adding a lot to our classifier.

#### Future: maybe combination of show/tour/gig and musician/group name in addition to the concert

Since we don't have labeled data, and I'm not sure the best technique for clustering text data in this situation. Or how we would evaluate which techniqes are doing the best job identifying our concert tweets, and whether they are worth the extra complexity/computational requirements.

For now, I'm going to move on using the "concert" lemma classifier

In [41]:
df = concert_tweets.select('user_id', 't_text', 't_dt')

In [42]:
df.cache()

user_id,t_text,t_dt
85691996,@herRoyalStarnes ...,2010-01-22 10:17:15
85691996,Y is me @RandiICa...,2010-01-15 16:22:28
25611870,@beccalexis sup B...,2010-01-30 00:00:00
25611870,RT @BoomKack: Jan...,2010-01-24 00:00:00
30387809,Concert tonight a...,2009-12-09 15:06:12
71702459,They Played #FLEX...,2010-02-22 19:59:44
71702459,My First Concert....,2010-02-22 19:26:40
71702459,In The Library Wi...,2010-02-22 11:32:56
49483366,@RockStarRenRen l...,2009-07-30 11:56:06
28528232,Sooo go b4 u wet ...,2010-01-13 16:37:44


In [43]:
# df.count()
print(12444)

12444


In [44]:
df.show(1)

+--------+--------------------+-------------------+
| user_id|              t_text|               t_dt|
+--------+--------------------+-------------------+
|85691996|@herRoyalStarnes ...|2010-01-22 10:17:15|
+--------+--------------------+-------------------+
only showing top 1 row



In [476]:
sample = df.sample(withReplacement=None, fraction=0.01, seed=5)

In [477]:
sample = sample.withColumnRenamed('t_text', 'text')

In [478]:
sample.columns

['user_id', 'text', 't_dt']

In [479]:
sample.take(20)

[Row(user_id=71702459, text='In The Library With @NickAustinG... He Tryin To Get On His Jigga Shit... Oh We Goin To The Concert Tonite', t_dt=datetime.datetime(2010, 2, 22, 11, 32, 56)),
 Row(user_id=49483366, text='@RockStarRenRen lol is we going to this concert', t_dt=datetime.datetime(2009, 7, 30, 11, 56, 6)),
 Row(user_id=19688989, text='@bizymare   My son lives in Kenosha, I"m down there about a dozen times a year.    The concert was for the Kenosha area home schoolers.', t_dt=datetime.datetime(2009, 12, 11, 22, 16, 24)),
 Row(user_id=20019157, text='@itSHOWTIME how was the concert?', t_dt=datetime.datetime(2010, 1, 19, 0, 0)),
 Row(user_id=49477598, text="@audiobebop what time? I'm suppose to go to a concert tonight with Juda. uhh", t_dt=datetime.datetime(2010, 1, 15, 9, 26, 7)),
 Row(user_id=33814590, text='@MizzDania How was the concert?', t_dt=datetime.datetime(2010, 1, 22, 0, 0)),
 Row(user_id=29299184, text='The A was poppin @S_C_ @SongzYuuup and jeezy held the concert down.

## Entity Recognition

### Who

For the sake of time, I focused on pop and hip hop artists from 2009/2010 (data from wikipedia). This is extra tricky when tweeters use the artist handles (eg @JonasBrothers), again this is an area for future iteration

In [480]:
# import artist list
with open('../../data/musicians.txt', 'r') as f:
     artists = f.read().splitlines()
        
artists = list(set(artists))

In [481]:
pipeline = PretrainedPipeline("explain_document_dl", lang="en")

annotation = pipeline.transform(sample)

annotation.show(1)

explain_document_dl download started this may take some time.
Approx size to download 167.3 MB
[OK!]
+--------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| user_id|                text|               t_dt|            document|            sentence|               token|             checked|               lemma|                stem|                 pos|          embeddings|                 ner|            entities|
+--------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|71702459|In The Library Wi...|2010-02-22 11:32:56|[[document, 0, 10...|[[document, 0, 34...|[[token, 0, 1, In...

In [482]:
annotation.select("entities.result").show(20, truncate=False)

+-------------------------------------------------------+
|result                                                 |
+-------------------------------------------------------+
|[The Library With @NickAustinG, The Concert Tonite]    |
|[@RockStarRenRen]                                      |
|[Kenosha, I"m, Kenosha]                                |
|[]                                                     |
|[Juda]                                                 |
|[@MizzDania]                                           |
|[@S_C_ @SongzYuuup]                                    |
|[]                                                     |
|[I'm, Chile, Haiti, USA]                               |
|[Fred, Radio One, PAJAM]                               |
|[Decemberists]                                         |
|[]                                                     |
|[&]                                                    |
|[@JonasBrothers +, !!!!!*******]                       |
|[Lem, Chrissy

In [483]:
artists

['Uncle Kracker',
 'Warren G',
 'Pitbull',
 'Sa-Ra Creative Partners',
 'Linkin Park',
 'Grandmaster Flash',
 'Asher Roth',
 'Paul Wall',
 'Kelly Clarkson',
 "Ol' Dirty Bastard",
 'DJ Drama',
 'Tyga',
 'Busta Rhymes',
 'Big Scoob',
 'Rivers Cuomo',
 'The Alchemist',
 'Clipse',
 'Romeo',
 'Bow Wow',
 'Hurricane Chris',
 'Shinedown',
 'Kurupt',
 'T-Pain',
 'Juvenile',
 "Cam'ron",
 'Freeway',
 'The Band Perry',
 'Stoupe the Enemy of Mankind',
 'Dorrough',
 'Sammie',
 'Slim Thug',
 'CRUNK23',
 'Classified',
 'Eyedea',
 'Jordin Sparks',
 'Timbaland',
 'The Sounds',
 'La Coka Nostra',
 'Bruno Mars',
 'Sean Kingston',
 'Juicy J',
 'Adam Lambert',
 'Playaz Circle',
 'Skull Gang',
 'JAY Z',
 'Soap Nation',
 'Cobra Starship',
 'Gorilla Zoe',
 'Lady Antebellum',
 'DJ Green Lantern',
 'Chipmunk',
 'Kid Cudi',
 'Young Jeezy',
 'k-os',
 'Crunk Chris',
 'Travie McCoy',
 'KRS-One & Buckshot',
 'N.O.R.E.',
 'Decemberists',
 'Kris Allen',
 'Dead Prez',
 'Maino',
 'Chico DeBarge',
 'Noah23 & Madadam',
 '

In [484]:
# ugh. pyspark.sql.functions.typedLit doesn't exist yet in pyspark to pass the artists to a udf. 
# So I'm going to switch to pandas for this step

entities_pd = annotation.select('entities.result', 'text').toPandas()

In [485]:
entities_pd['who'] = [[entity for entity in e_list if entity in artists] for e_list in entities_pd['result']]

In [486]:
entities_pd.head(20)

Unnamed: 0,result,text,who
0,"[The Library With @NickAustinG, The Concert To...",In The Library With @NickAustinG... He Tryin T...,[]
1,[@RockStarRenRen],@RockStarRenRen lol is we going to this concert,[]
2,"[Kenosha, I""m, Kenosha]","@bizymare My son lives in Kenosha, I""m down ...",[]
3,[],@itSHOWTIME how was the concert?,[]
4,[Juda],@audiobebop what time? I'm suppose to go to a ...,[]
5,[@MizzDania],@MizzDania How was the concert?,[]
6,[@S_C_ @SongzYuuup],The A was poppin @S_C_ @SongzYuuup and jeezy h...,[]
7,[],@melodyxxx LOL u ladies go hard! Are u guys go...,[]
8,"[I'm, Chile, Haiti, USA]","Look, I'm sorry about Chile and Haiti, but for...",[]
9,"[Fred, Radio One, PAJAM]",U r! had to get Fred cause he was doing a Rad...,[]


In [487]:
who_schema = StructType([
                StructField("result", ArrayType(StringType()), True),
                StructField("text", StringType(), True),
                StructField("who", ArrayType(StringType()), True)
                ])

who = spark.createDataFrame(entities_pd, schema=who_schema)

In [488]:
who.select('*').take(10)

[Row(result=['The Library With @NickAustinG', 'The Concert Tonite'], text='In The Library With @NickAustinG... He Tryin To Get On His Jigga Shit... Oh We Goin To The Concert Tonite', who=[]),
 Row(result=['@RockStarRenRen'], text='@RockStarRenRen lol is we going to this concert', who=[]),
 Row(result=['Kenosha', 'I"m', 'Kenosha'], text='@bizymare   My son lives in Kenosha, I"m down there about a dozen times a year.    The concert was for the Kenosha area home schoolers.', who=[]),
 Row(result=[], text='@itSHOWTIME how was the concert?', who=[]),
 Row(result=['Juda'], text="@audiobebop what time? I'm suppose to go to a concert tonight with Juda. uhh", who=[]),
 Row(result=['@MizzDania'], text='@MizzDania How was the concert?', who=[]),
 Row(result=['@S_C_ @SongzYuuup'], text='The A was poppin @S_C_ @SongzYuuup and jeezy held the concert down.. Back in tally 2 papers and 2 midterms due tues. # backtoboredom', who=[]),
 Row(result=[], text='@melodyxxx LOL u ladies go hard! Are u guys goin

In [489]:
sample = sample.join(who, on='text', how='outer')

In [490]:
sample = sample.drop('result')

In [491]:
sample.show(40, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------+--------+-------------------+------------+
|text                                                                                                                                        |user_id |t_dt               |who         |
+--------------------------------------------------------------------------------------------------------------------------------------------+--------+-------------------+------------+
|For free - add your event to our calendar! http://bit.ly/T3lbr #seattle #music #events #event #bands #concerts #calendar #blog              |80949495|2009-11-02 15:10:01|[]          |
|Jason Mraz was terrific at Red Rocks Sat night.  It's a truly unique & amazing concert venue.  @http://twitpic.com/ilfxr                    |42220821|2009-09-21 15:27:57|[Jason Mraz]|
|My Daughter&#39;s First Orchestra Concert | Opensource, Nonprofits ... htt

### WHEN: looking for date-related words

In [492]:
# I'm going to start with the date matcher pretrained pipeline

date_pipe = PretrainedPipeline("match_datetime", lang="en")

date_annotation = date_pipe.transform(sample)

match_datetime download started this may take some time.
Approx size to download 12.8 KB
[OK!]


In [493]:
date_annotation.columns

['text', 'user_id', 't_dt', 'who', 'document', 'sentence', 'token', 'date']

In [494]:
date_annotation.select('text', 't_dt', 'date.result').show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------+-------------------+------------+
|text                                                                                                                                        |t_dt               |result      |
+--------------------------------------------------------------------------------------------------------------------------------------------+-------------------+------------+
|For free - add your event to our calendar! http://bit.ly/T3lbr #seattle #music #events #event #bands #concerts #calendar #blog              |2009-11-02 15:10:01|[]          |
|Jason Mraz was terrific at Red Rocks Sat night.  It's a truly unique & amazing concert venue.  @http://twitpic.com/ilfxr                    |2009-09-21 15:27:57|[]          |
|My Daughter&#39;s First Orchestra Concert | Opensource, Nonprofits ... http://bit.ly/1mcbz8                            

This is cool! It is using day-oriented words, like yesterday! I wonder if there is a way to set a reference date (as opposed to today). At least for the "Radio One concert" tweet... Doesn't look like there is, but I can use the date it outputs, get their relation with today, and apply to the date.

I'm not sure how it got 12/06 from the "Decemberists concert tonight" tweet. - maybe december + the 6 hours later?

In [495]:
date_annotation = date_annotation.select('text', F.col('date.result').alias('date_result'))

In [496]:
date_annotation.select('date_result').collect()

[Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=['2020/09/08']),
 Row(date_result=['2020/05/20']),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=['2020/11/27']),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=['2020/05/19']),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=['2020/11/03']),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=['2020/05/19']),
 Row(date_result=['2020/03/09']),
 Row(date_result=['2020/05/20']),
 Row(date_result=[]),
 Row(date_result=['2020/06/19']),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=['2020/05/19']),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=[]),
 Row(date_result=[]),


In [497]:
date_annotation.columns

['text', 'date_result']

In [498]:
date_annotation.printSchema()

root
 |-- text: string (nullable = true)
 |-- date_result: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [499]:
date_annotation.select(F.size("date_result").alias("no_of_dates")).agg({"no_of_dates": "max"}).show()

+----------------+
|max(no_of_dates)|
+----------------+
|               1|
+----------------+



I'm deciding to take the first date, since in my small sample, no tweet had more than one.

In [500]:
date_annotation = date_annotation.withColumn('date_result', F.col('date_result')[0])

In [501]:
date_annotation.select('date_result').show(10)

+-----------+
|date_result|
+-----------+
|       null|
|       null|
| 2020/09/08|
| 2020/05/20|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
+-----------+
only showing top 10 rows



In [502]:
sample = sample.join(date_annotation, on='text')

In [503]:
sample.printSchema()

root
 |-- text: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- t_dt: timestamp (nullable = true)
 |-- who: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- date_result: string (nullable = true)



In [504]:
sample = sample.withColumn('date_result', F.to_date(sample['date_result'],'yyyy/MM/dd'))

In [505]:
sample = sample.withColumn('date_diff', F.datediff(F.current_timestamp(), sample['date_result']))

In [506]:
# if date_result is within two weeks of today, get difference, and apply it to timestamp
# elif date_result has this year's date. reset the year to match the year of the tweet 
# (hardcoeded as 10 years)

sample = sample.withColumn('when', F.when((col('date_diff') > -14),
                                      F.expr("date_add(t_dt, date_diff)"))\
                          .when((F.col('date_diff') < -14) 
                                & (F.year('date_result') == F.year(F.current_timestamp())), 
                                F.date_sub('date_result', 3652))
            )

In [507]:
sample.select('*').show(4)

+--------------------+--------+-------------------+------------+-----------+---------+----------+
|                text| user_id|               t_dt|         who|date_result|date_diff|      when|
+--------------------+--------+-------------------+------------+-----------+---------+----------+
|For free - add yo...|80949495|2009-11-02 15:10:01|          []|       null|     null|      null|
|Jason Mraz was te...|42220821|2009-09-21 15:27:57|[Jason Mraz]|       null|     null|      null|
|My Daughter&#39;s...|18518302|2009-11-18 17:20:16|          []| 2020-09-08|     -112|2010-09-09|
|I have 2 tickets ...|21316433|2009-09-14 20:09:00|[The Sounds]| 2020-05-20|       -1|2009-09-13|
+--------------------+--------+-------------------+------------+-----------+---------+----------+
only showing top 4 rows



In [508]:
sample = sample.drop('date_result', 'date_diff')

In [510]:
sample.show(20)

+--------------------+--------+-------------------+------------+----------+
|                text| user_id|               t_dt|         who|      when|
+--------------------+--------+-------------------+------------+----------+
|For free - add yo...|80949495|2009-11-02 15:10:01|          []|      null|
|Jason Mraz was te...|42220821|2009-09-21 15:27:57|[Jason Mraz]|      null|
|My Daughter&#39;s...|18518302|2009-11-18 17:20:16|          []|2010-09-09|
|I have 2 tickets ...|21316433|2009-09-14 20:09:00|[The Sounds]|2009-09-13|
|music news: Washi...|12135162|2009-11-16 08:37:38|          []|      null|
|Headed over to Bi...|38184021|2009-08-22 11:01:16|          []|      null|
|Zach Deputy = Ama...|13445142|2009-08-29 10:47:30|          []|      null|
|@Stony419 Whoops ...|18991162|2009-11-17 10:42:51|          []|      null|
|@q100Brittany OMG...|73886845|2009-11-13 13:10:54|          []|      null|
|*dizzy* RT @jeske...|17494046|2009-11-05 21:56:26|          []|      null|
|Fri Nov 27 

#### Future: update "when" to have a non-hard-coded version of setting the year.

## WHERE