# Chirps Preprocessing

In [1]:
import pyspark

sparkConfig = {
    'spark.executor.memory': '60g',
    'spark.driver.memory': '30g',
    'spark.master': 'local[*]',
    'spark.default.parallelism': '30',
}
sc = pyspark.SparkContext('local[*]', 'Chirp Dataset')

In [2]:
import datetime
import importlib
import numpy as N
import numpy.random as NR
import matplotlib.pyplot as pyplot
import seaborn
import pandas as P
from pathlib import Path
import pyspark.mllib as M

seaborn.set_style('whitegrid')

In [3]:
import common.twitter
basepath = Path('datasets/Chirps')

## Stage 1: Time range filter

From Exploration 1, most of the tweets occur between 2017/01/01 and 2019/07/01.

In [4]:
startTrain = datetime.datetime(2017, 1, 1)
endTrain = datetime.datetime(2019, 1, 1)
startTest = endTrain
endTest = datetime.datetime(2019, 7, 1)

pathTrainInstances = basepath / 'instances_train.tsv'
pathTestInstances = basepath / 'instances_test.tsv'

In [5]:
def readline(l):
    i1, i2 = common.twitter.chirps_instance_readline(l)
    try:
        # This would fail on 4 samples because of mismatched }'s.
        # Remove these samples now rather than later
        li1 = i1.tokenised_substitute_string
        li2 = i2.tokenised_substitute_string
        return [i1, i2]
    except:
        return []
    
def instances_filter(startTime, endTime, filename, collect=False):
    def time_range_filter(x):
        return startTime <= x.datetime and x.datetime < endTime
    def to_tweet_id(x):
        # Filter by distinct tweet id's
        return (x.tweetId, x)

    rdd = sc.textFile(str(basepath / 'instances.tsv'), 90) \
        .flatMap(readline) \
        .filter(time_range_filter) \
        .map(to_tweet_id) \
        .reduceByKey(lambda a,b: a, 90) \
        .values() \
        .sortBy(lambda x:x.timestamp) \
        .map(lambda x:x.serialise)
    print(f"{rdd.count()} samples read")
    if collect:
        with open(filename, 'w') as f:
            for x in rdd.collect():
                f.write("%s\n" % x)
    else:
        rdd.saveAsTextFile(str(filename))

In [6]:
    
if not pathTrainInstances.exists():
    instances_filter(startTrain, endTrain, pathTrainInstances)

6809312 samples read


In [7]:
    
if not pathTestInstances.exists():
    instances_filter(startTest, endTest, pathTestInstances)

1950076 samples read
