In [7]:
import sys
import pandas as pd
import numpy as np
from math import radians, cos, sin, asin, sqrt,pi
import matplotlib.pyplot as plt
import math
import datetime
from scipy import stats

from datetime import date
import string, re

In [8]:
#set up pyspark
from pyspark.sql import SparkSession, functions, types
spark = SparkSession.builder.appName('Word Count').getOrCreate()
spark.sparkContext.setLogLevel('WARN')

assert sys.version_info >= (3, 5) # make sure we have Python 3.5+
assert spark.version >= '2.3' # make sure we have Spark 2.3+

amenities_schema = types.StructType([
    types.StructField('lat', types.DoubleType()),
    types.StructField('lon', types.DoubleType()),
    types.StructField('timestamp', types.TimestampType() ),
    types.StructField('amenity', types.StringType()),
    types.StructField('name', types.StringType() ),
    types.StructField('tags', types.MapType(types.StringType(),types.StringType())),
])


In [9]:
df = spark.read.json('amenities-vancouver.json.gz', schema = amenities_schema)
df = df.filter(df['name'].isNotNull()).cache() #filter non name and cache dataframe

In [10]:
special_words_list = ['Agritourism', 'Gaming', 'Sports', 'Recreation', 'Motion Picture']#, 'Arts', 'Botanical Gardens', 'Zoos', 'Heritage', 'Park', 'Sculpture', 'Aquarium', 'Mountain']

import nltk
#nltk.download('wordnet')
from nltk.corpus import wordnet
synonyms = []

for word in special_words_list:
    for syn in wordnet.synsets(word):
        for l in syn.lemmas():
            synonyms.append(l.name())

def remove_(words):
    new_words = words.replace("_", " ")
    return new_words
res = list(map(remove_, synonyms))

print(res)

['gambling', 'gaming', 'play', 'bet on', 'back', 'gage', 'stake', 'game', 'punt', 'sport', 'athletics', 'sport', 'sport', 'summercater', 'sport', 'sport', 'sportsman', 'sportswoman', 'mutant', 'mutation', 'variation', 'sport', 'fun', 'play', 'sport', 'sport', 'feature', 'boast', 'frolic', 'lark', 'rollick', 'skylark', 'disport', 'sport', 'cavort', 'gambol', 'frisk', 'romp', 'run around', 'lark about', 'diversion', 'recreation', 'refreshment', 'recreation']


In [11]:
wordbreak = r'[%s\s]+' % (re.escape(string.punctuation),)

split_names = df.select(
    functions.explode(functions.split(functions.lower(df['name']),pattern = wordbreak, limit = -1 )).alias('split_name'),
    df['lat'],         
    df['lon'],          
    df['timestamp'],
    df['amenity']
).cache() #splits each tag into their own row


list_positive_names = res
def is_pos_name(name):
    return name in list_positive_names
are_positive_names = functions.udf(is_pos_name, returnType=types.BooleanType())
pos_names_locations = split_names.filter(are_positive_names(split_names['split_name']))


#attempting to match close enough words, currently unsuccessful
# import difflib

# def match_misspelled_words(df_keywords,  split_names_list):
#     matching_ratings = ratings[ratings['title'].isin(difflib.get_close_matches(df_keywords['key_words'], split_names_list['split_name']))]
#     #print(matching_ratings)
#     return round(matching_ratings['rating'].mean(),2)


# split_names_list = split_names.select('split_name').toPandas()
# pos_names_locations = split_names.filter(split_names['split_name'].isin())
# df_keywords = pd.DataFrame(res, columns = ['key_words'])
# df_keywords.head(10)
# difflib.get_close_matches(df_keywords['key_words'],split_names_list['split_name'])


# movie_list['rating'] = movie_list.apply(match_misspelled_words, axis = 1, raw =  False, result_type = 'reduce',  ratings = ratings_list)

In [12]:
pos_names_locations.count()

pos_names_locations_full = df.join(pos_names_locations, ['lat', 'lon', 'timestamp', 'amenity'])
pos_names_locations_full.coalesce(1).write.json('locations_pos_names', mode='overwrite')

In [13]:
##counts_name = df.groupby('name').agg(functions.count('lat').alias('count')) 
##counts_name = counts_name.filter(counts_name['count']<2) #filters non-unique names

##df = df.join(other = counts_name, on='name') #joins back

tags = df.select(
    df['name'],
    df['lat'],         
    df['lon'],          
    df['timestamp'],
    df['amenity'],
    #df['count'],
    functions.explode(df['tags'])
).cache() #splits each tag into their own row

In [14]:
keys_and_values = tags.groupBy(['key', 'value']).agg(functions.count('name').alias('count'))
keys_and_values.filter(keys_and_values['count'] > 5).coalesce(1).write.csv('keys_and_values_counts', mode='overwrite')

In [15]:
list_positive_tags = res
def is_pos_tag(tag):
    return tag in list_positive_tags
are_positive_tags = functions.udf(is_pos_tag, returnType=types.BooleanType())
pos_key_locations = tags.filter(are_positive_tags(tags['key']))
pos_value_locations = tags.filter(are_positive_tags(tags['value']))

In [16]:
df_keywords['key_words']

0        gambling
1          gaming
2            play
3          bet on
4            back
5            gage
6           stake
7            game
8            punt
9           sport
10      athletics
11          sport
12          sport
13    summercater
14          sport
15          sport
16      sportsman
17    sportswoman
18         mutant
19       mutation
20      variation
21          sport
22            fun
23           play
24          sport
25          sport
26        feature
27          boast
28         frolic
29           lark
30        rollick
31        skylark
32        disport
33          sport
34         cavort
35         gambol
36          frisk
37           romp
38     run around
39     lark about
40      diversion
41     recreation
42    refreshment
43     recreation
Name: key_words, dtype: object

In [17]:
pos_key_locations.show()

+--------------------+----------+------------+-------------------+-----------------+-----+---------+
|                name|       lat|         lon|          timestamp|          amenity|  key|    value|
+--------------------+----------+------------+-------------------+-----------------+-----+---------+
|Aikido Yoshinkai ...|49.2237917|-122.9418357|2019-09-02 22:08:26|             dojo|sport|   aikido|
|I Love Martial Ar...| 49.057546|-122.2738986|2018-06-20 15:11:24|             dojo|sport|   karate|
|Mamba Martial Art...|49.0468605|-122.3483787|2018-06-20 15:12:01|             dojo|sport|    multi|
|Yoga and Meditati...|49.0463884|-122.3485056|2016-06-13 22:36:29|meditation_centre|sport|     yoga|
|Jong Kim Martial ...|49.1852717| -122.798376|2019-03-07 14:56:23|             dojo|sport|taekwondo|
|             Re.pose|49.0497557|-122.2907591|2019-03-16 01:57:47|             dojo|sport|     yoga|
+--------------------+----------+------------+-------------------+-----------------+-----+-

In [18]:
list_positive_amenities = ['public_building', 'theatre', 'food_court', 'community_centre', 'cinema', 'casino', 'park' , 'university']
def is_pos_amenity(amenity):
    return amenity in list_positive_amenities
are_positive_amenities = functions.udf(is_pos_amenity, returnType=types.BooleanType())
pos_amenities_locations = df.filter(are_positive_amenities(df['amenity']))

In [19]:
amenities = df.groupBy('amenity').agg(functions.count('name').alias('count'))
amenities.coalesce(1).write.csv('amenties_counts', mode='overwrite') #safe to do as this only contains unique types of amenities

In [20]:
pos_amenities_locations.count()
pos_amenities_locations.coalesce(1).write.json('locations_pos_amenities', mode='overwrite')#safe to do as this only contains the tourist attractions in Vancouver (125 rows)

In [21]:
#finds all with wikidata entry, rename 'value' to 'wikidata'
tag_wikidata = tags.filter(tags['key'] == "brand:wikidata")
tag_wikidata = tag_wikidata.select(
    tag_wikidata['name'],
    tag_wikidata['lat'],         
    tag_wikidata['lon'],          
    tag_wikidata['timestamp'],
    tag_wikidata['amenity'],
    tag_wikidata['value'].alias('wikidata')
 ).cache()

#finds all with wikipedia entry, rename 'value' to 'wikipedia'
tag_wikipedia = tags.filter(tags['key'] == "wikipedia")
tag_wikipedia = tag_wikipedia.select(
    tag_wikipedia['name'],
    tag_wikipedia['lat'],         
    tag_wikipedia['lon'],          
    tag_wikipedia['timestamp'],
    tag_wikipedia['amenity'],
    tag_wikipedia['value'].alias('wikipedia')
 ).cache()

place_tourism = tags.filter(tags['key'] == "tourism")


In [22]:
counts_wikipedia = tag_wikipedia.groupBy(['wikipedia']).agg(functions.count('name').alias('count')) 
counts_wikipedia = counts_wikipedia.filter(counts_wikipedia['count']<2) #filters non-unique wikis
place_wikipedia = tag_wikipedia.join(counts_wikipedia, on = ['wikipedia'])

In [23]:
counts_wikidata = tag_wikidata.groupBy(['wikidata']).agg(functions.count('name').alias('count')) 
counts_wikidata = counts_wikidata.filter(counts_wikidata['count']<2) #filters non-unique wikis
place_wikidata = tag_wikidata.join(counts_wikidata, on = ['wikidata'])

In [24]:
#place_wikidata.count()
#place_wikipedia.count()
#place_tourism.count()
#place_tourism.show()
#place_wikidata.show()
#place_wikipedia.show()

In [25]:
#abandoned pandas code
data = pd.read_json('amenities-vancouver.json.gz', lines=True).dropna(subset=['name']) #drop rows where there is no name

#find the counts of names
counts = data.groupby(by = ['name']).aggregate('count')
#remove non-unique names
counts = counts[counts['lat'] < 2]
counts['count'] = counts['lat']
counts = counts.drop(labels= ['lat', 'lon',	'timestamp',	'amenity',	'tags'], axis = 1)

#join back for only columns with an unique name
data = data.join(counts, on=['name'], how = 'inner')

data['tags'].iloc[10]['brand:wikidata']
'brand:wikidata' in data['tags'].iloc[1]

False