In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import random
import math
from pytrends.request import TrendReq

from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
from pytrends.request import TrendReq
import numpy as np
import warnings
import random # Choose random list element
import time
import math
from random import randint
from time import sleep

In [2]:
input_country_code      = ''
input_language          =  'en'
input_keyword_column    = 'keyword'
input_gt_period         = '2018-01-01 2018-02-01'
input_gt_category       = '0'
nr_batches              = 1
nr_top_keywords         = None
limit_final_list        = None

input_period            = 12 # period is the interval period in months
input_data_col          = 'week'
input_group_feature     = 'keyword' 
input_rank_feature      = 'rank' 
input_value_feature     = 'scores'
input_period_feature    = 'period'
input_start_period      = -2
input_end_period        = -1

input_growth_measure    = 'trend_scores'       # either trend_scores OR trend_rank
input_size_measure      = 'end_value_scores'     # either avg_scores / end_value_scores / avg_rank / end_value_rank

In [3]:
future_50_foods = ["laver seaweed","wakame seaweed","adzuki beans","black turtle beans","broad beans","fava beans","bambara groundnuts","bambara beans","cowpeas","lentils","marama beans",
                   "mung beans","soy beans","nopales","amaranth","buckwheat","finger millet","fonio","khorasan wheat","quinoa","spelt","teff","wild rice","pumpkin flowers","okra","orange tomatoes",
                   "beet greens","broccoli rabe","kale","moringa","pak-choi","bok-choy","pumpking leaves","red cabbage","spinach","watercress","enoki mushrooms","maitake mushrooms",
                   "saffron milk cap mushrooms","flax seeds","hemp seeds","sesame seeds","walnuts","black salsify","parsley root","white icicle radish ","winter radish",
                   "alfalfa sprouts","sprouted kidney beans","sprouted chickpeas","lotus root","ube","purple yam","yam bean root","jicama","red indonesian sweet potatoes","cilembu sweet potatoes"]

temp_future_50_foods = ["quinoa","vegetarian butcher", "lentils", "black salsify","walnuts"]

df_future_50_foods = spark.createDataFrame(temp_future_50_foods, StringType()).dropDuplicates().withColumnRenamed('value', 'keyword')

In [4]:
def partition(lst, n): 
    division = len(lst) / float(n)
    return [lst[int(math.ceil(division * i)): int(math.ceil(division * (i + 1)))] for i in range(n)]

def check_volume(keyword_list, country_code, period, google_cat):
    pytrends = TrendReq(hl='en-US', tz = 60, timeout=(10,25))  # hl = host language, tz = timezone offset (for example US CST is '360', Western European Summer time is '60')
 
    output = []
    for term in keyword_list:
        # Create payload and capture API tokens. Only needed for interest_over_time(), interest_by_region() & related_queries()
        sleep(randint(1, 3))
        pytrends.build_payload([term], cat=google_cat, geo=country_code, gprop='', timeframe=period)  # up to 5 items at a time
        tmp_df_gtrends = pytrends.interest_over_time()
        if tmp_df_gtrends.empty:
            warnings.warn('Not sufficient data available for keyword %s in %s' % (term, country_code))
        else:
            output.append(term)
    return output
  
def search_pairs(list_pairs, country_code, period, baseline, google_cat): # Search a list of keyword pairs and calculate the overall relative imporatance based on aggregated search volumes.
    pytrends = TrendReq(hl='en-US', tz = 60)  # hl = host language, tz = timezone offset (for example US CST is '360', Western European Summer time is '60')
 
    unsorted_df = []
    for term in list_pairs:
        # Create payload and capture API tokens. Only needed for interest_over_time(), interest_by_region() & related_queries()
        sleep(randint(1, 3))
        pytrends.build_payload(term, cat=google_cat, timeframe=period, geo=country_code, gprop='')  # up to 5 items at a time
        if term == [baseline, baseline]:
            importance = 1
        else:
            tmp_df_gtrends = pytrends.interest_over_time()
            if tmp_df_gtrends.empty:
                warnings.warn('Not sufficient data available for the combination %s in %s' % (term, country_code))
                importance = np.NaN
            else:
                importance = tmp_df_gtrends[term[0]].sum() / tmp_df_gtrends[term[1]].sum()
        unsorted_df.append(importance)
    return unsorted_df
  
def make_pairs_with_baseline(keyword_list, baseline):  # Create a list with in each element a list of 2 (nested list)
    double_terms = []
    for item in keyword_list: 
       double_terms.append([item, baseline])
    print(double_terms)
    return double_terms

def make_pairs_shifted(keyword_list):
    search_terms_shifted = keyword_list[1:]
    search_terms_shifted.extend(['this term will be removed'])
    
    double_terms = []
    for item in range(0,len(keyword_list)):
       double_terms.append([keyword_list[item], search_terms_shifted[item]])
    double_terms = double_terms[:-1] # Remove last pair
    return double_terms
  
def get_raw_scores(keyword_list_sorted, country_code, period, google_cat):
    double_terms = make_pairs_shifted(keyword_list_sorted)
    print('double terms:', double_terms)
    # Login to Google. Only need to run this once, the rest of requests will use the same session.
    pytrends = TrendReq(hl='en-US', tz=60) 
    
    raw_data = pd.DataFrame(columns= ['week', 'ref_word', 'keyword', 'value_ref_word', 'value_keyword', 'id_ref_word', 'id_keyword'])
    iter=1
    for term in double_terms:
        # Create payload and capture API tokens. Only needed for interest_over_time(), interest_by_region() & related_queries()
        sleep(randint(1, 3))
        pytrends.build_payload(term, cat=google_cat, timeframe=period, geo=country_code, gprop='')

        # Interest Over Time
        tmp_df_gtrends = pytrends.interest_over_time()
        if tmp_df_gtrends.empty:
            warnings.warn('Not sufficient data available for the combination %s in %s' % (term, country_code))
        else:
            lst_week_epoch = tmp_df_gtrends.index.values.tolist()
            lst_week = []
            for item in lst_week_epoch:
                item = int(item / 1000000000)
                item = time.strftime('%Y%m%d', time.localtime(item))
                lst_week.append(item)
            tmp_df_gtrends = tmp_df_gtrends.assign(week=lst_week)
            df_gtrends_headers = list(tmp_df_gtrends.columns.values)
            df_gtrends_headers = [x for x in df_gtrends_headers if x != 'isPartial']
            tmp_df_gtrends = tmp_df_gtrends[df_gtrends_headers]
            tmp_df_gtrends = pd.melt(tmp_df_gtrends, id_vars=['week', term[1]], var_name='ref_word')
            tmp_df_gtrends.rename(columns={'value':'value_ref_word'}, inplace=True)
            tmp_df_gtrends = pd.melt(tmp_df_gtrends, id_vars=['week', 'value_ref_word', 'ref_word'], var_name='keyword')
            tmp_df_gtrends.rename(columns={'value':'value_keyword'}, inplace=True)
            tmp_df_gtrends = tmp_df_gtrends.assign(country_code=country_code)
            tmp_df_gtrends = tmp_df_gtrends.assign(id_ref_word=iter, id_keyword=iter+1)
            raw_data = pd.concat([raw_data, tmp_df_gtrends])
        iter = iter + 1
        
    # Add a part for the most popular keyword
    tmp = raw_data[raw_data['id_ref_word'] == 1]
    tmp['id_keyword'] = tmp['id_ref_word'].values
    tmp['keyword'] = tmp['ref_word'].values
    tmp['value_keyword'] = tmp['value_ref_word'].values
    raw_data = pd.concat([tmp, raw_data])
    return raw_data
  
def get_relative_rank(keyword_list, country_code, period, google_cat):
    baseline         = random.choice(keyword_list) 
    print("The baseline keyword is " + str(baseline))
    double_terms     = make_pairs_with_baseline(keyword_list, baseline)
    
    df                = pd.DataFrame(columns = ['score', 'search_term'])
    df['search_term'] = keyword_list
    df['score']       = search_pairs(double_terms, country_code, period, baseline, google_cat)
    
    # Create a sorted list of terms
    df                 = df.sort_values('score', ascending = False).reset_index(drop=True)
    sorted_list        = df['search_term'].tolist()
    print('Search Terms Sorted by Volume')
    print(sorted_list)
    return sorted_list
  
# Pandas DF to Spark DF
def pandas_to_spark(pandas_df):
    #Create PySpark DataFrame Schema
    spark_df = sqlContext.createDataFrame(pandas_df)
    return spark_df

  
def get_raw_google_trends_batch_search(df, keyword_feature, country_code, period, google_cat, nr_batches, nr_top_keywords = None, limit_final_list = None): 
    keywords_with_volume       = []
    df_pd                      = df.toPandas()
    keyword_list               = df_pd[keyword_feature].tolist()
    keyword_list               = list(set(keyword_list)) # Get unqiue categories
    random.shuffle(keyword_list)
    batch_lists                = partition(keyword_list, nr_batches)
    print("Phase 1: Collect the keywords with a search volume and select the keywords with a large volume (if nr_top_keywords is not None)")

    for batch in batch_lists:
        batch                  = check_volume(batch, country_code, period, google_cat)
        ordered_batch         = get_relative_rank(batch, country_code, period, google_cat)
        if nr_top_keywords:
            keywords_with_volume = keywords_with_volume + ordered_batch[0:nr_top_keywords]
        else:
            keywords_with_volume = keywords_with_volume + ordered_batch
        
     
    if limit_final_list:
        keywords_with_volume = keywords_with_volume[0:limit_final_list]        
    
    print("Phase 2: Get raw Google data for the keywords with large volume")
    print("The list of keywords with volume is ", str(keywords_with_volume))
    if nr_batches > 1:                                                
        ordered_search_terms = get_relative_rank(keywords_with_volume, country_code, period, google_cat)
    else:
        ordered_search_terms = keywords_with_volume
                                                    
    raw_scores = get_raw_scores(ordered_search_terms, country_code, period, google_cat)
    raw_scores_spark_df = pandas_to_spark(raw_scores)
    return raw_scores_spark_df

In [5]:
raw_google_trends_data_future_50_foods = get_raw_google_trends_batch_search(df_future_50_foods, 
                                                                            input_keyword_column, 
                                                                            input_country_code, 
                                                                            input_gt_period, 
                                                                            input_gt_category, 
                                                                            nr_batches, 
                                                                            nr_top_keywords, 
                                                                            limit_final_list)

In [6]:
display(raw_google_trends_data_future_50_foods)