In [118]:
import sys
import os
os.environ['SPARK_HOME'] = "spark"
sys.path.append("spark/python")
sys.path.append("spark/python/lib")

In [119]:
# get coordinates of a given city
import httplib2
import json


def cityPos(name):
    url = "https://maps.googleapis.com/maps/api/geocode/json?" + \
          "key=AIzaSyBsZErhxaT1oVgMrT-xGLcAN5nK3UHeGBU&address=" + name
    req = httplib2.Http(".cache")
    resp, content = req.request(url, "GET")
    res = json.loads(content)
    return res["results"][0]["geometry"]

In [120]:
# reform the data preparing for fitting the model
import csv
import json
import codecs
import time
from couch import Couch

COUCHDB_NAME = "classified2"
REFORMED_FILE = "data/output0.csv"

food_dict = {}
rev_dict = {}

def trans(path):
    con = Couch(COUCHDB_NAME)
    jsonData = con.query_all()

    csvfile = open(REFORMED_FILE, 'w', newline='')
    writer = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_ALL)
    keys=['id', 'time', 'lat', 'lng', 'polarity', 'followers', 'following', 'food']
    writer.writerow(keys)
    
    i = 0
    for dic in jsonData:
        if dic['location']['coordinates'] is None:
            city = dic['location']['place_name']
            city = city.replace(" ","%20")
            coor = cityPos(city)
            lng = coor['location']['lng']
            lat = coor['location']['lat']
        else:
            lng = dic['location']['coordinates'][0]
            lat = dic['location']['coordinates'][1]
            
        dt = dic['created_at']['year']+'-'+trans_month(dic['created_at']['month'])+'-'+dic['created_at']['day']+\
                ' '+dic['created_at']['time']
        timeArray = time.strptime(dt, "%Y-%m-%d %H:%M:%S")
        timestamp = time.mktime(timeArray)
        
        foods = dic['food_list']
        if foods is None or len(foods) == 0:
            writer.writerow([i, timestamp, lat, lng, dic['polarity'], dic['user']['followers'], dic['user']\
                                 ['following'], "-1"])
        else:
            for food in foods:
                food_class = get_food_class(food)
                writer.writerow([i, timestamp, lat, lng, dic['polarity'], dic['user']['followers'], dic['user']\
                                 ['following'], food_class])
        i += 1
    csvfile.close()
    
def trans_month(month):
    month_dic = {'Jan': '1', 'Feb': '2', 'Mar': '3', 'Apr': '4', 'May': '5', 'Jun': '6', \
                 'Jul': '7', 'Aug': '8', 'Sep': '9', 'Oct': '10', 'Nov': '11', 'Dec': '12'}
    return month_dic[month]

def get_food_class(food):
    if not food in food_dict.keys():
        food_dict[food] = str(len(food_dict))
    return food_dict[food]

def get_rev_dict():
    for key,value in food_dict.items():
        rev_dict[value] = key

def get_food_type(food_class):
    the_class = str(int(food_class))
    if the_class in rev_dict.keys():
        return rev_dict[the_class]
    return None

In [121]:
APP_NAME = "random forest test"
SPARK_URL = "local[*]"
RANDOM_SEED = 12345
TRAINING_DATA_RATIO = 0.7
RF_NUM_TREES = 3
RF_MAX_DEPTH = 4
RF_NUM_BINS = 32

In [122]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName(APP_NAME) \
    .master(SPARK_URL) \
    .getOrCreate()

In [123]:
# read data from couchdb and reform them into a dataframe
trans(REFORMED_FILE)

df = spark.read \
    .options(header = "true", inferschema = "true") \
    .csv(REFORMED_FILE)

print("Total number of rows: %d" % df.count())

Total number of rows: 49




In [124]:
df.show()

+---+-------------+------------+-----------+--------+---------+---------+----+
| id|         time|         lat|        lng|polarity|followers|following|food|
+---+-------------+------------+-----------+--------+---------+---------+----+
|  0|1.524673187E9| -37.7760072|144.9708071|  0.8883|      380|      564|   0|
|  1|1.524710653E9| -33.8688197|151.2092955|  0.4019|      766|      552|   0|
|  2|1.524712536E9| -37.8136276|144.9630576|     0.0|      853|      558|   0|
|  3|1.524713009E9|-33.86642251|151.2012542|     0.0|       37|      169|   0|
|  4|1.524699928E9|    -28.0183|   153.3921|   0.926|       84|      143|   0|
|  5|1.524705058E9| -33.8688197|151.2092955| -0.2484|      154|      219|   0|
|  5|1.524705058E9| -33.8688197|151.2092955| -0.2484|      154|      219|   1|
|  6|1.524675517E9|   -33.88888|  151.27759|     0.0|     4111|     4550|   0|
|  7| 1.52489073E9| -37.8136276|144.9630576| -0.3078|       93|      477|   0|
|  8|1.524941428E9| -33.8688197|151.2092955|  0.7543

In [None]:
# @@@@@@@@@@@@@@@@@@@ test @@@@@@@@@@@@@@@@@@@ -- begin

In [37]:
employees = [(1, "John", 25), (2, "Ray", 35), (3,"Mike", 24), (4, "Jane", 28), 
             (5, "Kevin", 26), 
             (6, "Vincent", 35), (7,"James", 38), (8, "Shane", 32), 
             (9, "Larry", 29), (10, "Kimberly", 29),
             (11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
employees=spark.createDataFrame(employees, schema=["emp_id","name","age"])
employees.show()

+------+--------+---+
|emp_id|    name|age|
+------+--------+---+
|     1|    John| 25|
|     2|     Ray| 35|
|     3|    Mike| 24|
|     4|    Jane| 28|
|     5|   Kevin| 26|
|     6| Vincent| 35|
|     7|   James| 38|
|     8|   Shane| 32|
|     9|   Larry| 29|
|    10|Kimberly| 29|
|    11|    Alex| 28|
|    12|   Garry| 25|
|    13|     Max| 31|
+------+--------+---+



In [38]:
salary=[(1,1000),(2,2000),(3,3000),(4,4000)]
salary=spark.createDataFrame(salary, schema=["emp_id","salary"])
salary.show()

+------+------+
|emp_id|salary|
+------+------+
|     1|  1000|
|     2|  2000|
|     3|  3000|
|     4|  4000|
+------+------+



In [39]:
department=[(1,1000),(2,2000),(3,3000),(4,4000)]
department=spark.createDataFrame(department, schema=["emp_id","departement"])
department.show()

+------+-----------+
|emp_id|departement|
+------+-----------+
|     1|       1000|
|     2|       2000|
|     3|       3000|
|     4|       4000|
+------+-----------+



In [41]:
final_data = employees.join(salary, on='emp_id', how='left')\
    .join(department, on='emp_id', how='left')
final_data.show()

+------+--------+---+------+-----------+
|emp_id|    name|age|salary|departement|
+------+--------+---+------+-----------+
|     7|   James| 38|  null|       null|
|     6| Vincent| 35|  null|       null|
|     9|   Larry| 29|  null|       null|
|     5|   Kevin| 26|  null|       null|
|     1|    John| 25|  1000|       1000|
|    10|Kimberly| 29|  null|       null|
|     3|    Mike| 24|  3000|       3000|
|    12|   Garry| 25|  null|       null|
|     8|   Shane| 32|  null|       null|
|    11|    Alex| 28|  null|       null|
|     2|     Ray| 35|  2000|       2000|
|     4|    Jane| 28|  4000|       4000|
|    13|     Max| 31|  null|       null|
+------+--------+---+------+-----------+



In [None]:
# @@@@@@@@@@@@@@@@@@@ test @@@@@@@@@@@@@@@@@@@ -- end

In [125]:
# filter dataframe based on whether the data in them have food or not
df_no_food = df.filter(df['food'] == -1)
df_with_food = df.filter(df['food'] >= 0)

print("Number of rows without food: %d" % df_no_food.count())
print("number of rows with food: %d" % df_with_food.count())

Number of rows without food: 3
number of rows with food: 46


In [126]:
# transform dataframe into RDD and split reformed data into tranning data and test data
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

transformed_df = df_with_food.rdd.map(lambda row: LabeledPoint(row[-1], Vectors.dense(row[1:-1])))

splits = [TRAINING_DATA_RATIO, 1.0 - TRAINING_DATA_RATIO]
training_data, test_data = transformed_df.randomSplit(splits, RANDOM_SEED)

print("Number of training set rows: %d" % training_data.count())
print("Number of test set rows: %d" % test_data.count())

Number of training set rows: 35
Number of test set rows: 11


In [127]:
# train the model using training data
from pyspark.mllib.tree import RandomForest
from time import *

start_time = time()
num_classes = len(food_dict)

model = RandomForest.trainClassifier(training_data, numClasses=num_classes, categoricalFeaturesInfo={}, \
    numTrees=RF_NUM_TREES, featureSubsetStrategy="auto", impurity="gini", \
    maxDepth=RF_MAX_DEPTH, maxBins=32, seed=RANDOM_SEED)

end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Time to train model: 0.296 seconds


In [128]:
# make predictions using test data and calculate the accuracy
predictions = model.predict(test_data.map(lambda x: x.features))
labels_and_predictions = test_data.map(lambda x: x.label).zip(predictions)
acc = labels_and_predictions.filter(lambda x: x[0] == x[1]).count() / float(test_data.count())
print("Model accuracy: %.3f%%" % (acc * 100))

Model accuracy: 81.818%


In [129]:
# deal with data without food
transformed_df_no_food = df_no_food.rdd.map(lambda row: LabeledPoint(row[-1], Vectors.dense(row[1:-1])))
predict_foods = model.predict(transformed_df_no_food.map(lambda x: x.features))

In [142]:
# combine id with predicted food class
rdd_predict_foods = df_no_food.rdd.map(lambda row: row[0]).zip(predict_foods.map(int))
list_predict_food = rdd_predict_foods.collect()

In [143]:
# transform predicted rdd to dataframe and join it to original data that without food
df_predict_foods = spark.createDataFrame(list_predict_food, schema=["id","food"])
df_no_food = df_no_food.drop('food')

concat_df = df_no_food.join(df_predict_foods, on='id')
concat_df.show()

+---+-------------+-----------+-----------+--------+---------+---------+----+
| id|         time|        lat|        lng|polarity|followers|following|food|
+---+-------------+-----------+-----------+--------+---------+---------+----+
| 13|1.524934916E9|-37.8136276|144.9630576|     0.0|     3965|     4321|   0|
| 14|1.524935043E9|-37.8136276|144.9630576|   0.658|      849|     3987|   0|
| 30|1.524928576E9|-37.8136276|144.9630576|  0.9366|     1444|      664|   0|
+---+-------------+-----------+-----------+--------+---------+---------+----+



In [144]:
# two dataframes: df_with_food and concat_df
df_with_food.show()
#concat_df.first()

+---+-------------+------------+-----------+--------+---------+---------+----+
| id|         time|         lat|        lng|polarity|followers|following|food|
+---+-------------+------------+-----------+--------+---------+---------+----+
|  0|1.524673187E9| -37.7760072|144.9708071|  0.8883|      380|      564|   0|
|  1|1.524710653E9| -33.8688197|151.2092955|  0.4019|      766|      552|   0|
|  2|1.524712536E9| -37.8136276|144.9630576|     0.0|      853|      558|   0|
|  3|1.524713009E9|-33.86642251|151.2012542|     0.0|       37|      169|   0|
|  4|1.524699928E9|    -28.0183|   153.3921|   0.926|       84|      143|   0|
|  5|1.524705058E9| -33.8688197|151.2092955| -0.2484|      154|      219|   0|
|  5|1.524705058E9| -33.8688197|151.2092955| -0.2484|      154|      219|   1|
|  6|1.524675517E9|   -33.88888|  151.27759|     0.0|     4111|     4550|   0|
|  7| 1.52489073E9| -37.8136276|144.9630576| -0.3078|       93|      477|   0|
|  8|1.524941428E9| -33.8688197|151.2092955|  0.7543

In [147]:
# reform the dataframe to prepare for tranforming to json
concat_df = concat_df.drop('id')
concat_df.show()

+-------------+-----------+-----------+--------+---------+---------+----+
|         time|        lat|        lng|polarity|followers|following|food|
+-------------+-----------+-----------+--------+---------+---------+----+
|1.524934916E9|-37.8136276|144.9630576|     0.0|     3965|     4321|   0|
|1.524935043E9|-37.8136276|144.9630576|   0.658|      849|     3987|   0|
|1.524928576E9|-37.8136276|144.9630576|  0.9366|     1444|      664|   0|
+-------------+-----------+-----------+--------+---------+---------+----+



In [146]:
concat_df.toJSON().first()

'{"id":13,"time":1.524934916E9,"lat":-37.8136276,"lng":144.9630576,"polarity":0.0,"followers":3965,"following":4321,"food":0}'

In [27]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

start_time = time()

metrics = BinaryClassificationMetrics(labels_and_predictions)
print("Area under Precision/Recall (PR) curve: %.f" % (metrics.areaUnderPR * 100))
print("Area under Receiver Operating Characteristic (ROC) curve: %.3f" % (metrics.areaUnderROC * 100))

end_time = time()
elapsed_time = end_time - start_time
print("Time to evaluate model: %.3f seconds" % elapsed_time)

Area under Precision/Recall (PR) curve: 5
Area under Receiver Operating Characteristic (ROC) curve: 45.000
Time to evaluate model: 0.183 seconds
