In [1]:
from pyspark.sql.types import StructType, StructField, TimestampType, StringType, DoubleType

schema = StructType([StructField("review_id", StringType(), True), StructField("date", TimestampType(), True), StructField("user_id", StringType(), True), StructField("business_id", StringType(), True), StructField("stars", DoubleType(), True)])

data_set = spark.read.json("/FileStore/tables/mini_review.txt", schema=schema, timestampFormat="yyyy-MM-dd HH:mm:ss")

In [2]:
import pyspark.sql

In [3]:
data_set.createOrReplaceTempView("reviews")
last_data_set = spark.sql("SELECT R1.review_id, R1.date, R1.user_id, R1.business_id, R1.stars FROM reviews AS R1 JOIN (SELECT max(R2.date) AS last_date, R2.user_id, R2.business_id FROM reviews AS R2 GROUP BY R2.user_id, R2.business_id) table2 ON R1.date=table2.last_date AND R1.user_id=table2.user_id AND R1.business_id=table2.business_id")

In [4]:
data_set.select("business_id", "user_id").count()

In [5]:
last_data_set.select("business_id", "user_id").count()

In [6]:
data_set.select("business_id", "user_id").distinct().count()

In [7]:
last_data_set.select("business_id", "user_id").distinct().count()

In [8]:
num_users = last_data_set.select("user_id").distinct().count()
num_business = last_data_set.select("business_id").distinct().count()

In [9]:
from scipy import sparse
import numpy as np

In [10]:
unique_users = np.concatenate(
        last_data_set.select("user_id").distinct().rdd.glom().map(
          lambda x: np.array([elem[0] for elem in x]))
        .collect())

dictOfUsers = { unique_users[i] : i for i in range(0, len(unique_users) ) }

unique_business = np.concatenate(
        last_data_set.select("business_id").distinct().rdd.glom().map(
          lambda x: np.array([elem[0] for elem in x]))
        .collect())

dictOfBusiness = { unique_business[i] : i for i in range(0, len(unique_business) ) }

In [11]:
from pyspark.sql.functions import udf

def translate(mapping):
    def translate_(col):
        return mapping.get(col)
    return udf(translate_, StringType())

last_data_set2 = last_data_set.withColumn('int_id_user', translate(dictOfUsers)('user_id'))
last_data_set2 = last_data_set2.withColumn('int_id_business', translate(dictOfBusiness)('business_id'))

In [12]:
rows = np.concatenate(
        last_data_set2.select("int_id_user").rdd.glom().map(
          lambda x: np.array([int(elem[0]) for elem in x]))
        .collect())

cols = np.concatenate(
        last_data_set2.select("int_id_business").rdd.glom().map(
          lambda x: np.array([int(elem[0]) for elem in x]))
        .collect())

datas = np.concatenate(
        last_data_set2.select("stars").rdd.glom().map(
          lambda x: np.array([int(elem[0]) for elem in x]))
        .collect())

In [13]:
sparse_matrix = sparse.coo_matrix((datas, (rows, cols)))
sparse_matrix = sparse_matrix.tocsr()

In [14]:
num_users

In [15]:
num_business

In [16]:
sparse_matrix.shape

In [17]:
#select R1.review_id, R1.date, R1.user_id, R1.business_id, R1.stars
#from 
#WHERE  (CM_PLAN_ID,Individual_ID)
#IN
#(
# Select CM_PLAN_ID, Individual_ID
# From CRM_VCM_CURRENT_LEAD_STATUS
# Where Lead_Key = :_Lead_Key
#)

#SELECT R1.review_id, R1.date, R1.user_id, R1.business_id, R1.stars FROM reviews AS R1 LEFT JOIN 
#(
#  SELECT max(R2.date), R2.user_id, R2.business_id 
#  FROM reviews AS R2 
#  GROUP BY R2.user_id, R2.business_id
#) table2
#ON 
#   R1.date=last_date
#   AND R1.user_id=R2.user_id
#   AND R1.business_id=R2.business_id 
#WHERE table2.CM_PLAN_ID IS NOT NULL

#"SELECT R1.review_id, R1.date, R1.user_id, R1.business_id, R1.stars FROM reviews as R1 WHERE R1.date, R1.user_id, R1.business_id IN (SELECT max(R2.date), R2.user_id, #R2.business_id FROM reviews as R2 GROUP BY R2.user_id, R2.business_id)"