### Problem statement


### Import all needed libs

In [1]:
# basically spark
import findspark
findspark.init()
import pyspark
import operator
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, FloatType, StringType, IntegerType
from pyspark.sql.functions import udf, row_number,column

# processing
import re
from datetime import datetime

# text preprocessing
import re
import nltk
from nltk.stem import WordNetLemmatizer 
from pyspark.ml.feature import CountVectorizer,StopWordsRemover, HashingTF, IDF, Tokenizer
nltk.download('stopwords')
nltk.download('wordnet')

#staff for LDA
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vector as oldVector, Vectors as oldVectors
from pyspark.ml.linalg import Vector as newVector, Vectors as newVectors

# import hardcoded variables
from variables import channels_not_to_consider

#for debug purpose only
import time

#pytrends - for acquiring google trends
from get_google_trends_data.pytrends.pytrends.request import TrendReq


[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/dmytro.babenko/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/dmytro.babenko/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


### Global variables definition

**User-specific variables**  
Please feel free to tweak those variables as you wish. For example, you can set number of last hours to get hottest topics.

In [2]:
# if True locations from locations_to_consider will be used to filter
get_from_location = True

# locations to filter relevant tweets
locations_to_consider = [
                         'Manhattan, NY', 
                         'Brooklyn, NY', 
                         'Queens, NY', 
                         'Bronx, NY', 
                         'Staten Island, NY'
                         'New York, USA'
                        ]

number_of_hours_to_get_topics = 2
num_of_top_interest = 15

geo = "US-NY" #US for USA

# Set window time for interesting
frame_start_datetime = "Sun Jun 27 00:00:00 +0000 2019"
frame_finish_datetime = "Sun Jun 30 23:00:00 +0000 2019"

**Technical variables**  
Those variables are needed to connect to db and other technical stuff.

In [3]:
# LDA params
num_of_topics_LDA = 10
max_iterations_LDA = 100
nomber_of_words_to_for_topic = 15  # number of words per topic

# path to CSV
historical_tweets_data = './get-tweets-by-geolocation/training_tweets.csv'
# MongoDB table
real_time_tweets_table = "usa_training_tweets_02_07.training_tweets_collection"

### Create spark session

In [4]:
sc = SparkContext()
spark = SparkSession(sc)

### Handy functions

**Text preprocessing and filtering**

In [5]:
def filter_tweet(tweet, channels_not_to_consider):
    
    if not isinstance(tweet, str):
        is_filtered = True
    elif len(tweet.split(' ')) < 3:
        is_filtered = True
    else: 
        is_filtered = False
        
    return not is_filtered
         
def process_tweet(tweet):
   
    tweet = tweet.lower() # get lowercase
    tweet = re.sub(r'@\w+', '', tweet) # filter words with non-letters at the beginning (mainly for mentions)
    tweet = re.sub(r'http://\S{,280}', '', tweet) # filter http
    tweet = re.sub(r'https://\S{,280}', '', tweet) # filter https
    tweet = re.sub(r'[^A-Za-z]', ' ', tweet) # filter all non-letters
    tweet = re.sub(r'\s{2,}', ' ', tweet) # remove multiply whitespaces
    tweet = re.sub(r'(.)\1{2,}', r'\1', tweet) # remove repeated chars (e.g. "greeeeat" -> "great")
    tweet = tweet.strip() # remove possible whitespaces from both sides of the tweet

    # lemmatize, tokenize and conquer
    processed_tweet = [lemmatizer.lemmatize(token) for token in tokenizer.tokenize(tweet)
                       if token not in stop_word_list]
    
    return processed_tweet

**Datetime handling**

In [6]:
def str_tweet_to_datetime(frame_datetime):
    ts = datetime.strptime(frame_datetime,'%a %b %d %H:%M:%S %z %Y')
    return ts

def datetime_to_tweet_str(frame_datetime):
    #print(type(frame_datetime))
    ts = datetime.strftime(frame_datetime, '%a %b %d %H:%M:%S %z %Y')
    return ts

# How to call this block with functions?

In [7]:
def tweet2google_timeframe(frame_start_datetime, frame_finish_datetime):
    start_date = str_tweet_to_datetime(frame_start_datetime)
    end_date = str_tweet_to_datetime(frame_finish_datetime)
    tim
    
def get_google_trends_by_geo(geo):
    if geo == 'US':
        return google_trends_search_topics_us, google_trends_search_queries_us
    elif geo == 'US-NY':
        return google_trends_search_topics_us_ny, google_trends_search_queries_us_ny
    
    return None, None

In [8]:
#TODO: move this function to utils
def str_rising_to_float(str):
    if str is None:
        return 0.0
    if str == '':
        return 0.0
    if str == 'Breakout':
        return 0.0
    
    str_value = str.split('%')[0]
    if '+' in str_value:
        str_value = str_value.split('+')[1]
        
    if ',' in str_value:
        str_value = str_value.replace(',', '.')
        value = 1000* float(str_value)
        return value
    return float(str_value)

In [9]:
#TODO: move this function to utils
def unique_google_trends_by_time_frame(df):
    data = df.collect()
    rising_dict = {}
    top_dict = {}
    
    geo = data[0]['geo']
    columns = df.columns

    for i in range(0, len(data)):
        rising_val = data[i][columns[1]]
        top_value = data[i][columns[2]]
        
        if rising_val in rising_dict:
            rising_dict[rising_val][0] += str_rising_to_float(data[i][columns[3]])
            rising_dict[rising_val][1] += 1
        else:
            rising_dict[rising_val] = [str_rising_to_float(data[i][columns[3]]), 1]
            
        if top_value in top_dict:
            top_dict[top_value][0] += float(data[i][columns[4]])
            top_dict[top_value][1] += 1
        else:
            top_dict[top_value] = [float(data[i][columns[4]]), 1]
    
    
    for key in top_dict:
        top_dict[key] = round(top_dict[key][0] / top_dict[key][1])
        
    for key in rising_dict:
        rising_dict[key] = round(rising_dict[key][0] / rising_dict[key][1])
    
    top_dict = sorted(top_dict.items(), key=operator.itemgetter(1), reverse=True)
    rising_dict = sorted(rising_dict.items(), key=operator.itemgetter(1), reverse=True)
    
    
    seq = []
    len_top = len(top_dict)
    len_rising = len(rising_dict)
    length = max(len_top, len_rising)
    
    row = Row(columns[1], columns[2], columns[3], columns[4], columns[5])
    
    for i in range(0, length):
        rising = rising_dict[i][0] if i < len_rising else ''
        rising_val = f"+{rising_dict[i][1]}%" if i < len_rising else None
        
        top = top_dict[i][0] if i < len_top else ''
        top_val = top_dict[i][1] if i < len_top else None
        
        seq.append(row(rising, top, rising_val, top_val, geo))
    
    dframe = spark.createDataFrame(seq)
    return dframe

In [10]:
def get_geo_name(geo):
    if geo == "US-NY":
        return "New York"
    elif geo == "US":
        return "United States"
    return ""

def print_google_trend_title(start_date, finish_date, name):
    start_date_str = start_date.strftime("%Y-%m-%d")
    if start_date == finish_date:
        print(f"\nGoogle trends {name} in {get_geo_name(geo)} during {start_date_str}")
    else:
        finish_date_str = finish_date.strftime("%Y-%m-%d")
        print(f"\nGoogle trends {name} in {get_geo_name(geo)} during {start_date_str} - {finish_date_str}")

In [11]:
def convert_datetime_in_interesting_google(df):
    columns = df.columns
    converted_df = df.rdd.map(lambda x : (
                                          x["Date"].strftime("%Y-%m-%d"), 
                                          x[columns[1]], 
                                          x[columns[2]], 
                                          x[columns[3]],
                                          x[columns[4]],
                                          x[columns[5]])).toDF([columns[0], columns[1], columns[2], columns[3], columns[4], columns[5]])
                                                
    return converted_df

# Load the data


## Here should be "magic IF" (Yevhen)

In [None]:
def get_history_and_real_timeframe(requested_start, requested_finish):

    requested_start_dt = str_tweet_to_datetime(requested_start)
    requested_finish_dt = str_tweet_to_datetime(requested_finish)
    
    const_end_history_datetime = str_tweet_to_datetime("Fri Jul 05 00:00:00 +0000 2019")

    history_start_datetime = None
    history_finish_datetime = None
    realtime_start_datetime = None
    realtime_finish_datetime = None

    assert requested_finish_dt > requested_start_dt, "Finish dataframe MUST be greater than start"

    if (requested_start_dt >= const_end_history_datetime and requested_finish_dt > const_end_history_datetime):
        realtime_start_datetime = requested_start_dt
        realtime_finish_datetime = requested_finish_dt
    elif (requested_start_dt < const_end_history_datetime and requested_finish_dt <= const_end_history_datetime):
        history_start_datetime = requested_start_dt
        history_finish_datetime = requested_finish_dt
    else:
        history_start_datetime = requested_start_dt
        history_finish_datetime = const_end_history_datetime
        realtime_start_datetime = const_end_history_datetime
        realtime_finish_datetime = requested_finish_dt
        
    return (history_start_datetime, history_finish_datetime, realtime_start_datetime, realtime_finish_datetime)

print('Example of usage!')
times = get_history_and_real_timeframe(requested_start = frame_start_datetime, 
                                       requested_finish = frame_finish_datetime)

print("Range for csv: ", times[0], times[1])
print("Time range for mongodb: ", times[2], times[3])

In [None]:
df = spark.read.csv(historical_tweets_data, inferSchema=True, header=True)

### Connecting to MongoDB for the real-time data
**Creating Spark connector**

In [None]:
spark = SparkSession.builder.appName("pipeline") \
    .config('spark.mongodb.input.uri', 'mongodb://localhost:27017/'+real_time_tweets_table) \
    .config('spark.mongodb.output.uri', 'mongodb://localhost:27017/'+real_time_tweets_table) \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.1') \
    .config('spark.mongodb.input.partitioner', 'MongoPaginateBySizePartitioner') \
    .getOrCreate()

In [None]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

In [None]:
df.createOrReplaceTempView("recent_data")

In [None]:
df.printSchema()

In [None]:
recent_data = spark.sql('SELECT text FROM recent_data WHERE lang="en" AND place.country="US" LIMIT 10')

In [None]:
recent_data.show()

# Text preprocessing

In [None]:
df = spark.read.csv(historical_tweets_data, header=True, sep=',')

In [None]:
tokenizer = nltk.WordPunctTokenizer()
lemmatizer = WordNetLemmatizer()
stop_word_list = nltk.corpus.stopwords.words('english')

In [None]:
# filter nans
df = df.rdd.filter(lambda x: x[0] != None and x[1] != None and x[2] != None and x[4] != None)

# filter out channels not to consider
df = df.filter(lambda x: x[4] not in channels_not_to_consider)

# filter by country
df = df.filter(lambda x: x[1] in 'US')

# filter by precise location
if get_from_location:
    df = df.filter(lambda x: x[2] in locations_to_consider)

# filter tweet itself
df = df.filter(lambda x: filter_tweet(x[0]))

# process tweet
df = df.map(lambda x: process_tweet(x[0]))

# final preprocesssing
df = df.filter(lambda x: len(x) > 0)

# Here should be Main dataframe already filtered

## Loading Google Trends data

In [12]:
google_trends_search_queries_us = spark.read.csv('data/google-trends/google-trends-search-queries-US.csv', inferSchema=True, header=True)
google_trends_search_topics_us = spark.read.csv('data/google-trends/google-trends-search-topics-US.csv', inferSchema=True, header=True)
google_trends_search_queries_us_ny = spark.read.csv('data/google-trends/google-trends-search-queries-US-NY.csv', inferSchema=True, header=True)
google_trends_search_topics_us_ny = spark.read.csv('data/google-trends/google-trends-search-topics-US-NY.csv', inferSchema=True, header=True)

In [13]:
#TODO: move this function to Handy function block 
def get_google_trends_by_geo(geo):
    if geo == 'US':
        return google_trends_search_topics_us, google_trends_search_queries_us
    elif geo == 'US-NY':
        return google_trends_search_topics_us_ny, google_trends_search_queries_us_ny
    
    return None, None

# Data description (Женя)
In order to collect data in a natural way:
<br>- we registered Twitter Developer account;
<br>- using credentials from Twitter Developer account we run script that collected tweets by the geolocation and saved them in mongodb;
<br>
<br><b>As a result:</b>
<br>- we collected  332548 tweets (10Gb in mongodb, ~100Mb in csv) from New-York geolocation since 30 of May up to 15 of June;
<br>- we collected  6617029 tweets (~1.69Gb in csv) from USA geolocation since 15 of June up to now.

In [None]:
#show Raw data

### Data preprocessing
# description

#### Selecting data from {{ frame_start_datetime }} to {{frame_finish_datetime}}

In [None]:
StopWords = stopwords.words("english")

tokens = df.select('text').rdd \
    .map(lambda x: x[0]) \
    .map( lambda document: re.split(" ", document))          \
    .map( lambda word: [x for x in word if x.isalpha()])           \
    .map( lambda word: [x for x in word if len(x) > 3] )           \
    .map( lambda word: [x for x in word if x not in StopWords])    \


In [None]:
tokens.collect()

In [None]:
#show preprocessed data

### Topic modeling/Latent Dirichlet allocation(LDA)

In [None]:
# this block can be commented, it's just a mock

text_file = 'data/listings.csv'
df = spark.read.csv(text_file, inferSchema=True, header=True)
df = df.select("id", "name").dropna(subset="name")

print(time.strftime('%m%d%Y %H:%M:%S'))

tokenizer = Tokenizer(inputCol="name", outputCol="tokens")
df = tokenizer.transform(df)
print(time.strftime('%m%d%Y %H:%M:%S'))

In [None]:
print(time.strftime('%m%d%Y %H:%M:%S'))

cv = CountVectorizer(inputCol="tokens", outputCol="raw_features", vocabSize=5000, minDF=3.0)
cvmodel = cv.fit(df)

print(time.strftime('%m%d%Y %H:%M:%S'))

In [None]:
print(time.strftime('%m%d%Y %H:%M:%S'))
df = cvmodel.transform(df)
print(time.strftime('%m%d%Y %H:%M:%S'))

In [None]:
idf = IDF(inputCol="raw_features", outputCol="tf_idf_features", minDocFreq=2)
idfModel = idf.fit(df)

df = idfModel.transform(df)


In [None]:
df.show(10, True)

In [None]:
#df = df.drop("name")
#df.show(10, False)

In [None]:
w = Window().orderBy(column("id"))
df = df.withColumn("id", row_number().over(w))

In [None]:
rs = df.rdd.map(lambda x: (x[0], oldVectors.fromML(x[4])))

In [None]:
rs_df = rs.toDF()
rs_df.show(10, False)

In [None]:
# Run the LDA Topic Modeler
# Note the time before and after is printed in order to find out how much time it takes to process x number of records

print(time.strftime('%m%d%Y %H:%M:%S'))
lda_model = LDA.train(rs_df['_1', '_2'].rdd.map(list), k=num_of_topics_LDA, maxIterations=max_iterations_LDA)
print(time.strftime('%m%d%Y %H:%M:%S'))

In [None]:
print(time.strftime('%m%d%Y %H:%M:%S'))
topics = lda_model.topicsMatrix()
vocabArray = cvmodel.vocabulary

topicIndices = sc.parallelize(lda_model.describeTopics(maxTermsPerTopic = wordNumbers))

def topic_render(topic):  # specify vector id of words to actual words
    terms = topic[0]
    prob = topic[1]
    
    result = []
    for i in range(nomber_of_words_to_for_topic):
        term = str(round(prob[i],3))+"  "+vocabArray[terms[i]]
        result.append(term)
    return result
print(time.strftime('%m%d%Y %H:%M:%S'))

In [None]:
print(time.strftime('%m%d%Y %H:%M:%S'))
topics_final = topicIndices.map(lambda topic:topic_render(topic)).collect()
print(time.strftime('%m%d%Y %H:%M:%S'))

# Topics

In [None]:
# based on the simple vectors(+number of words)

for topic in range(len(topics_final)):
    print ("Topic #" + str(topic+1) + "")
    for term in topics_final[topic]:
        print (term)
    print ('\n')

In [None]:
#based on the tf-idf

for topic in range(len(topics_final)):
    print ("Topic #" + str(topic+1) + "")
    for term in topics_final[topic]:
        print (term)
    print ('\n')

### Hot topics in the USA from [Google trends](https://trends.google.com/trends/explore?geo=US)

In [14]:
start_date = str_tweet_to_datetime(frame_start_datetime)
finish_date = str_tweet_to_datetime(frame_finish_datetime)

In [15]:
google_trends_topics, google_trends_queries = get_google_trends_by_geo(geo) 

##### Google trends search queries

In [16]:
interesting_google_topics = google_trends_topics.filter(
    (google_trends_topics.Date >= start_date) & (google_trends_topics.Date <= finish_date))

In [17]:
print_google_trend_title(start_date, finish_date, "Search topics")
interest_google_topics = convert_datetime_in_interesting_google(interesting_google_topics)
interest_google_topics.select("Date","Search topics - rising", "Search topics - top").show(num_of_top_interest, False)


Google trends Search topics in New York during 2019-06-27 - 2019-06-30
+----------+--------------------------------------------------------+----------------------------------------+
|Date      |Search topics - rising                                  |Search topics - top                     |
+----------+--------------------------------------------------------+----------------------------------------+
|2019-06-28|Kamala Harris - United States Senator                   |New York - City in New York             |
|2019-06-28|Marianne Williamson - American author                   |New York - US State                     |
|2019-06-28|United States women's national soccer team - Soccer team|2019 - Topic                            |
|2019-06-28|Pete Buttigieg - Mayor of South Bend                    |Weather - Topic                         |
|2019-06-28|Joe Biden - Former Vice President of the United States  |Google Search - Topic                   |
|2019-06-28|FIFA Women's World Cup - Foo

In case when timeframe is more than 1 day, filter correctly this google-trends

In [19]:
interesing_google_topics_unique= unique_google_trends_by_time_frame(interesting_google_topics)
print_google_trend_title(start_date, finish_date, "Search topics")
interesing_google_topics_unique.select("Search topics - rising", "Search topics - top").show(num_of_top_interest, False)


Google trends Search topics in New York during 2019-06-27 - 2019-06-30
+--------------------------------------------------------+----------------------------------------+
|Search topics - rising                                  |Search topics - top                     |
+--------------------------------------------------------+----------------------------------------+
|Kamala Harris - United States Senator                   |New York - City in New York             |
|Marianne Williamson - American author                   |New York - US State                     |
|Brooklyn Nets - Basketball team                         |2019 - Topic                            |
|United States women's national soccer team - Soccer team|Weather - Topic                         |
|Pete Buttigieg - Mayor of South Bend                    |YouTube - Video sharing company         |
|Joe Biden - Former Vice President of the United States  |Film - Topic                            |
|Kevin Durant - American bas

##### Google trends search queries

In [20]:
interesting_google_queries = google_trends_queries.filter(
    (google_trends_queries.Date >= start_date) & (google_trends_queries.Date <= finish_date))

In [21]:
print_google_trend_title(start_date, finish_date, "Search queries")
interest_google_queries = convert_datetime_in_interesting_google(interesting_google_queries)
interest_google_queries.select("Date", "Search queries - rising", "Search queries - top").show(num_of_top_interest, False)


Google trends Search queries in New York during 2019-06-27 - 2019-06-30
+----------+---------------------------+--------------------+
|Date      |Search queries - rising    |Search queries - top|
+----------+---------------------------+--------------------+
|2019-06-28|shay mitchell              |weather             |
|2019-06-28|marianne williamson        |google              |
|2019-06-28|kamala harris              |facebook            |
|2019-06-28|argentina vs venezuela 2019|youtube             |
|2019-06-28|usa france                 |world cup           |
|2019-06-28|brazil vs paraguay         |news                |
|2019-06-28|usa vs france              |amazon              |
|2019-06-28|colombia vs chile          |copa america        |
|2019-06-28|alex morgan                |debate              |
|2019-06-28|michael bennet             |instagram           |
|2019-06-28|argentina vs venezuela     |craigslist          |
|2019-06-28|megan rapinoe              |walmart            

In [22]:
interesing_google_queries_unique= unique_google_trends_by_time_frame(interesting_google_queries)
print_google_trend_title(start_date, finish_date, "Search queries")
interesing_google_queries_unique.show(num_of_top_interest, False)


Google trends Search queries in New York during 2019-06-27 - 2019-06-30
+---------------------------+--------------------+------+---+-----+
|Search queries - rising    |Search queries - top|Rising|Top|geo  |
+---------------------------+--------------------+------+---+-----+
|yy                         |weather             |+4950%|100|US-NY|
|shay mitchell              |pride               |+3700%|62 |US-NY|
|darren collison            |facebook            |+2950%|59 |US-NY|
|marianne williamson        |google              |+2750%|55 |US-NY|
|kamala harris              |youtube             |+2400%|48 |US-NY|
|deandre jordan             |news                |+2400%|44 |US-NY|
|india vs england           |amazon              |+1050%|44 |US-NY|
|argentina vs venezuela 2019|world cup           |+900% |42 |US-NY|
|usa france                 |debate              |+900% |35 |US-NY|
|brazil vs paraguay         |yankees             |+900% |34 |US-NY|
|usa vs france              |pride parade  

#### Hot topics - google trends (directly) (probably this will be removed)

In [23]:
start_date_str = start_date.strftime("%Y-%m-%d")
finish_date_str = finish_date.strftime("%Y-%m-%d")
pytrend = TrendReq()
pytrend.build_payload(kw_list=[' '], geo=geo, timeframe=f"{start_date_str} {finish_date_str}")

##### Search topics

In [24]:
topics_df = pytrend.related_top_search_topics(spark)

In [30]:
print_google_trend_title(start_date, finish_date, "Search topics")
topics_df.select("Search topics - rising", "Search topics - top").show(num_of_top_interest, False)


Google trends Search topics in New York during 2019-06-27 - 2019-06-30
+-------------------------------------------------------------+---------------------------------------+
|Search topics - rising                                       |Search topics - top                    |
+-------------------------------------------------------------+---------------------------------------+
|Pride parade - Topic                                         |New York - City in New York            |
|Parade - Topic                                               |New York - US State                    |
|Gay pride - Topic                                            |2019 - Topic                           |
|Debate - Topic                                               |Weather - Topic                        |
|2016 Democratic Party presidential debates and forums - Topic|YouTube - Video sharing company        |
|Fireworks - Topic                                            |Google - Technology company      

##### Search queries

In [26]:
queries_df = pytrend.related_top_search_queries(spark)

In [28]:
print_google_trend_title(start_date, finish_date, "Search queries")
queries_df.show(num_of_top_interest, False)


Google trends Search queries in New York during 2019-06-27 - 2019-06-30
+-------------------------+--------------------+-------+---+-----+
|Search queries - rising  |Search queries - top|Rising |Top|geo  |
+-------------------------+--------------------+-------+---+-----+
|marianne williamson      |weather             |+1,500%|100|US-NY|
|kamala harris            |facebook            |+1,450%|61 |US-NY|
|yankees vs red sox       |google              |+1,200%|60 |US-NY|
|yy                       |youtube             |+1,150%|52 |US-NY|
|tulsi gabbard            |amazon              |+1,050%|46 |US-NY|
|mexico vs costa rica     |news                |+950%  |46 |US-NY|
|kemba walker             |world cup           |+850%  |38 |US-NY|
|usa vs france            |craigslist          |+850%  |27 |US-NY|
|mackenzie lueck          |instagram           |+500%  |27 |US-NY|
|pride parade 2019 nyc    |yankees             |+450%  |25 |US-NY|
|pride parade             |movies              |+400%  |

### Conclusion