In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
import pandas as pd

# Use database and list tables
spark = SparkSession.builder.appName('Crimes').getOrCreate()
sc = spark.sparkContext
hive_context = HiveContext(sc)
hive_context.sql("use dmp_yelp_rs")
hive_context.sql("show tables").show()

+-----------+--------------------+-----------+
|   database|           tableName|isTemporary|
+-----------+--------------------+-----------+
|dmp_yelp_rs|           biz_table|      false|
|dmp_yelp_rs|   business_ambience|      false|
|dmp_yelp_rs| business_attributes|      false|
|dmp_yelp_rs|  business_basicdata|      false|
|dmp_yelp_rs| business_bestnights|      false|
|dmp_yelp_rs|business_business...|      false|
|dmp_yelp_rs|business_dietaryr...|      false|
|dmp_yelp_rs|business_goodformeal|      false|
|dmp_yelp_rs|business_hairspec...|      false|
|dmp_yelp_rs|      business_music|      false|
|dmp_yelp_rs|             checkin|      false|
|dmp_yelp_rs|        geo_features|      false|
|dmp_yelp_rs|              review|      false|
|dmp_yelp_rs|                 tip|      false|
|dmp_yelp_rs|         ts_features|      false|
|dmp_yelp_rs|     ts_features_new|      false|
|dmp_yelp_rs|               users|      false|
+-----------+--------------------+-----------+



In [2]:
import numpy as np

In [3]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix,CoordinateMatrix,MatrixEntry
from pyspark.sql.functions import col, create_map, lit
from itertools import chain
from pyspark.ml.feature import StringIndexer

In [4]:
import time
start = time.time()

In [None]:
'BC'
'FL'
'GA'
'MA'
'WA'
'TX'
'OR'
'OH'

In [8]:
# Query for joining review table and biz table and filtering for OH, grouping by ids
state = 'OH'
SQL = """
SELECT

a.user_id,
a.business_id,
avg(a.stars) as stars 

FROM

review a

INNER JOIN biz_table b ON b.business_id = a.business_id AND state = '%s'

GROUP BY a.user_id, a.business_id
"""%state

In [9]:
# Running query
df = spark.sql(SQL)

In [10]:
# Creating string indexer, fitting df
biz_indexer = StringIndexer(inputCol="business_id", outputCol="business_id_new").fit(df)
usr_indexer = StringIndexer(inputCol="user_id", outputCol="user_id_new").fit(df)

In [11]:
# Transform using string indexers
df = biz_indexer.transform(df)
df = usr_indexer.transform(df)

In [12]:
# df = df.select(['user_id_new','business_id_new','stars'])

In [13]:
# Create coordinate matrix
mat = CoordinateMatrix(df.rdd.map(lambda r: MatrixEntry(r.user_id_new, r.business_id_new, r.stars)))

In [14]:
# Convert matrix to row matrix
mat = mat.toRowMatrix()

In [None]:
# Computing SVD --> 200 dimensions
svd = mat.computeSVD(200, computeU=True)
U = svd.U
s = svd.s
V = svd.V
end = time.time()

In [None]:
# Create dfs from U, s, V matrices 
U_df = U.rows.map(lambda x: (x, )).toDF().toPandas()
U_df = pd.DataFrame([list(x.toArray()) for x in U_df['_1']])

s_df = pd.DataFrame(s.toArray())
V_df = pd.DataFrame(V.toArray())

In [None]:
# Saving to csv
U_df.to_csv('U_%s.csv'%state)
V_df.to_csv('V_%s.csv'%state)
s_df.to_csv('_%ss.csv'%state)

In [None]:
# Saving labels to csv
pd.DataFrame(usr_indexer.labels).to_csv('usr_labels%s.csv'%state)
pd.DataFrame(biz_indexer.labels).to_csv('biz_labels%s.csv'%state)