In [2]:
#Hey I've added this so we can see properly SQL Views from Spark (Simon)
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [3]:
import os
import sys


os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [4]:
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [5]:
#First: filter the restaurant
biz_df = sqlContext.read.json("original_data/business.json")

In [6]:
biz_df.createOrReplaceTempView('biz_table')
biz_res = sqlContext.sql('SELECT * FROM biz_table WHERE categories LIKE \'%Restaurants%\'')
biz_res.show()

+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|              city|               hours|is_open|     latitude|      longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|30 Eglinton Avenue W|[,, u'full_bar', ...|QXAEGFB4oINsVuTFx...|Specialty Food, R...|       Mississauga|[9:0-1:0, 9:0-0:0...|      1|43.6054989743|  -79.652288909|Emerald Chinese R...|    L5R 3E7|         128|  2.5|   ON|
|10110 Johnston Rd...|[,, u'beer_and_wi...|gnKjwL_1w79qoiV3I...|Sushi Bars, Resta...|         Charlotte|[17:30-2

In [7]:
biz_res.createOrReplaceTempView('biz_res_table')

In [8]:
def filter_res(parent,filename,key):
    df = sqlContext.read.json("original_data/"+filename)
    table_name=filename[:-5]+'_table'
    df.createOrReplaceTempView(filename[:-5]+'_table')
    sqlquery = 'SELECT c.* FROM {} c LEFT JOIN {} b ON c.{} = b.{} WHERE b.{} IS NOT NULL'.format(table_name,parent,key,key,key)
    print(sqlquery)
    df_res=sqlContext.sql(sqlquery)
    return df_res

In [9]:
checkin_res = filter_res('biz_res_table','checkin.json','business_id')
review_res = filter_res('biz_res_table','review.json','business_id')
review_res.createOrReplaceTempView('review_res_table')
tip_res = filter_res('review_res_table','tip.json','user_id') #since tip is on individual level, match them with users
user_res = filter_res('review_res_table','user.json','user_id') 

SELECT c.* FROM checkin_table c LEFT JOIN biz_res_table b ON c.business_id = b.business_id WHERE b.business_id IS NOT NULL
SELECT c.* FROM review_table c LEFT JOIN biz_res_table b ON c.business_id = b.business_id WHERE b.business_id IS NOT NULL
SELECT c.* FROM tip_table c LEFT JOIN review_res_table b ON c.user_id = b.user_id WHERE b.user_id IS NOT NULL
SELECT c.* FROM user_table c LEFT JOIN review_res_table b ON c.user_id = b.user_id WHERE b.user_id IS NOT NULL


In [12]:
#sample  business into dev_set and test_set
import pyspark.sql.functions as f
review_res = review_res.withColumn('index_1', f.monotonically_increasing_id())
review_res.createOrReplaceTempView('review_res_table')
sqlquery = 'SELECT * FROM review_res_table ORDER BY RAND(42) LIMIT {}'.format(1000)
review_sample = sqlContext.sql(sqlquery)
review_sample.createOrReplaceTempView('review_sample_table')

sqlquery = 'SELECT a.* FROM review_res_table a LEFT JOIN review_sample_table b ON a.index_1 = b.index_1 WHERE b.index_1 IS NULL '
review_train = sqlContext.sql(sqlquery)
review_train_all = sqlContext.sql(sqlquery)
# 


In [11]:
#sample 500 into dev_set and 500 into test_set
from pyspark.sql.functions import desc
review_sample = review_sample.withColumn('index_2', f.monotonically_increasing_id())
review_dev = review_sample.limit(500)
review_test = review_sample.sort(desc("index_2")).limit(500)

In [13]:
review_train = review_train_all.limit(2500)

In [18]:
df_biz_useful=biz_res.select('business_id','review_count')
ind_useful=review_res.select('user_id','useful','business_id') #aggregate ind
mapped_test = ind_useful.rdd.map(lambda x: (x[0],x[1]))
ind_total_useful = mapped_test.reduceByKey(lambda a,b:a+b)
mapped_test = ind_useful.rdd.map(lambda x: (x[0],1))
ind_total_review =  mapped_test.reduceByKey(lambda a,b:a+b)

In [19]:
# convert to spark data frame
df_ind_total_useful = ind_total_useful.toDF(["user_id", "total_useful"])
df_ind_total_review = ind_total_review.toDF(["user_id", "total_reviews"])
df_ind_useful = df_ind_total_useful.join(df_ind_total_review, df_ind_total_useful.user_id == df_ind_total_review.user_id)
df_ind_useful = df_ind_useful.withColumn('avg_useful' , df_ind_useful.total_useful/df_ind_useful.total_reviews)
df_ind_useful.show()

+--------------------+------------+--------------------+-------------+-------------------+
|             user_id|total_useful|             user_id|total_reviews|         avg_useful|
+--------------------+------------+--------------------+-------------+-------------------+
|-0Ji0nOyFe-4yo8BK...|           0|-0Ji0nOyFe-4yo8BK...|            1|                0.0|
|-1KKYzibGPyUX-Mwk...|           1|-1KKYzibGPyUX-Mwk...|            1|                1.0|
|-1zQA2f_syMAdA04P...|           0|-1zQA2f_syMAdA04P...|            2|                0.0|
|-2Pb5d2WBPtbyGT_b...|           0|-2Pb5d2WBPtbyGT_b...|            1|                0.0|
|-3bsS2i9xqjNnIA1f...|           3|-3bsS2i9xqjNnIA1f...|            2|                1.5|
|-3i9bhfvrM3F1wsC9...|          22|-3i9bhfvrM3F1wsC9...|            9| 2.4444444444444446|
|-47g7LR58tpHlm7Bm...|           0|-47g7LR58tpHlm7Bm...|            1|                0.0|
|-4Anvj46CWf57KWI9...|           1|-4Anvj46CWf57KWI9...|            1|                1.0|

In [20]:
df_biz_useful=df_biz_useful.toPandas()
df_ind_useful=df_ind_useful.toPandas()
df_ind_useful = df_ind_useful.loc[:,~df_ind_useful.columns.duplicated()]

import pandas as pd
from pyspark.sql import SparkSession
from snorkel.labeling import PandasLFApplier,LFAnalysis
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
pd_dev = pd.read_csv("review_dev_labelled.csv",header=0, index_col=0)
df_dev = spark.createDataFrame(pd_dev)


In [21]:
#Threshold Analysis: Low Useful Rate
temp=pd_dev.merge(df_ind_useful, on= 'user_id', how='left')
temp.groupby('label')['avg_useful'].agg('mean')
def q(x):
            return x.quantile(0.85)
temp.groupby('label')['total_reviews'].agg('median')
temp.groupby('label')['total_reviews'].agg(q)

label
0    1.021650
1    1.457707
Name: avg_useful, dtype: float64

label
0     8.0
1    10.0
Name: total_reviews, dtype: float64

label
0    52.6
1    86.0
Name: total_reviews, dtype: float64

In [73]:
#Try to apply Labeling function
import pyspark.sql.functions as F
from snorkel.labeling import LabelModel
from snorkel.labeling.apply.spark import SparkLFApplier
import pandas as pd
import numpy as np
#from snorkel.labeling import ,LFAnalysis
from snorkel.labeling import LFAnalysis
from pyspark.sql import Row
from snorkel.labeling.lf import labeling_function
#from snorkel.labeling.lf.nlp_spark import spark_nlp_labeling_function
from snorkel.preprocess import preprocessor

ABSTAIN = -1
NEGATIVE = 0
POSITIVE = 1

@labeling_function()
def high_useful(x):
    return POSITIVE if x.useful > 8 else ABSTAIN

dict_biz_useful = dict(zip(df_biz_useful.business_id, df_biz_useful.stars))

@labeling_function(resources=dict(dict_biz_useful=dict_biz_useful))
def high_useful_biz(x,dict_biz_useful):
    if x.business_id!='#NAME?':
        a = dict_biz_useful[x.business_id]
    else:
        a = 0
    return POSITIVE if (x.useful > 8 and a < x.useful ) else ABSTAIN


In [64]:
com_info = zip(df_ind_useful.avg_useful, df_ind_useful.total_reviews)
dict_ind_useful = dict(zip(df_ind_useful.user_id, com_info ))
@labeling_function(resources=dict(dict_biz_useful=dict_biz_useful))
def high_useful_ind(x,dict_ind_useful):
    if x.userid_id!='#NAME?':
        if x.user_id in dict_ind_useful.keys():
            a = dict_ind_useful[x.user_id][0]
            b = dict_ind_useful[x.user_id][1]
        else:
            a = 0
            b = 0 
    else:
        a = 0
        b = 0
    return NEGATIVE if (x.useful > a) else ABSTAIN

In [71]:
@labeling_function(resources=dict(dict_ind_useful=dict_ind_useful))
def picky_indiv(x,dict_ind_useful):
    if x.user_id!='#NAME?':
        if x.user_id in dict_ind_useful.keys():
            a = dict_ind_useful[x.user_id][0]
            b = dict_ind_useful[x.user_id][1]
        else:
            a = 0
            b = 0 
    else:
        a = 0
        b = 0
    return NEGATIVE if a < 1 else ABSTAIN

In [59]:
lfs = []
applier = SparkLFApplier(lfs)
L_dev = applier.apply(df_dev.rdd)
g_label =np.array(df_dev.select('label').collect())
LFAnalysis(L_dev, lfs).lf_summary(g_label) 

Unnamed: 0,j,Polarity,Coverage,Overlaps,Conflicts,Correct,Incorrect,Emp. Acc.
picky_indiv,0,[1],0.596,0.0,0.0,117,181,0.392617


In [74]:
# Build label matrix
import re
lfs = [high_useful,high_useful_biz,picky_indiv]
applier = SparkLFApplier(lfs)
L_dev = applier.apply(df_dev.rdd)
g_label =np.array(df_dev.select('label').collect())
LFAnalysis(L_dev, lfs).lf_summary(g_label) 

Unnamed: 0,j,Polarity,Coverage,Overlaps,Conflicts,Correct,Incorrect,Emp. Acc.
high_useful,0,[1],0.012,0.012,0.0,4,2,0.666667
high_useful_biz,1,[1],0.012,0.012,0.0,4,2,0.666667
picky_indiv,2,[0],0.596,0.0,0.0,181,117,0.607383
