In [1]:
import findspark
import pandas as pd
import numpy as np
import csv
findspark.init('/home/hadoop/spark-2.2.2-bin-hadoop2.7')

from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *

sc = SparkContext()
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

In [2]:
def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)

pd_df = pd.read_csv('./wadiz_remove_duplicate.csv')
spark_df = pandas_to_spark(pd_df)

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf, lit

soop = udf(lambda category, makerName, summary: (category + ' ')*1 + (makerName + ' ')*2 + summary)
spark_df = spark_df.withColumn("id", spark_df['id'].cast('integer'))
spark_df = spark_df.withColumn("achievementRate", spark_df['achievementRate'].cast('integer'))
spark_df = spark_df.withColumn("totalAmount", spark_df['totalAmount'].cast('integer'))
spark_df = spark_df.withColumn("totalSupporter", spark_df['totalSupporter'].cast('integer'))
spark_df = spark_df.withColumn("totalLike", spark_df['totalLike'].cast('integer'))
spark_df = spark_df.withColumn("rewardSatisfaction", spark_df['rewardSatisfaction'].cast('double'))
spark_df = spark_df.withColumn("makerSatisfaction", spark_df['makerSatisfaction'].cast('double'))
spark_df = spark_df.withColumn("soop", soop(spark_df['category'], spark_df['makerName'], spark_df['summary']).cast('string'))
spark_df.printSchema()

root
 |-- Unnamed: 0: string (nullable = true)
 |-- Unnamed: 0.1: string (nullable = true)
 |-- Unnamed: 0.1.1: string (nullable = true)
 |-- Unnamed: 0.1.1.1: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- makerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- achievementRate: integer (nullable = true)
 |-- totalAmount: integer (nullable = true)
 |-- totalSupporter: integer (nullable = true)
 |-- totalLike: integer (nullable = true)
 |-- rewardSatisfaction: double (nullable = true)
 |-- makerSatisfaction: double (nullable = true)
 |-- detailUrl: string (nullable = true)
 |-- campaigncomments: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- soop: string (nullable = true)



In [4]:
def dividePrice(totalAmount, totalSupporter):
    if totalSupporter == 0:
        return 0
    price = int(totalAmount / totalSupporter)
    if 0 <= price and price <30000:
        return 0
    elif 30000<= price and price <50000:
        return 1
    elif 50000<= price and price <70000:
        return 2
    elif 70000<= price and price <100000:
        return 3
    elif 100000<= price and price <200000:
        return 4
    elif 200000<= price and price< 300000:
        return 5
    elif 300000<= price and price <400000:
        return 6
    elif 400000<= price and price <500000:
        return 7
    else:
        return 8
    
price_udf = udf(dividePrice, IntegerType())
spark_df = spark_df.withColumn('rangeAmount', price_udf(spark_df['totalAmount'], spark_df['totalSupporter']))
spark_df.printSchema()

root
 |-- Unnamed: 0: string (nullable = true)
 |-- Unnamed: 0.1: string (nullable = true)
 |-- Unnamed: 0.1.1: string (nullable = true)
 |-- Unnamed: 0.1.1.1: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- makerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- achievementRate: integer (nullable = true)
 |-- totalAmount: integer (nullable = true)
 |-- totalSupporter: integer (nullable = true)
 |-- totalLike: integer (nullable = true)
 |-- rewardSatisfaction: double (nullable = true)
 |-- makerSatisfaction: double (nullable = true)
 |-- detailUrl: string (nullable = true)
 |-- campaigncomments: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- soop: string (nullable = true)
 |-- rangeAmount: integer (nullable = true)



### Text Processing and Featurization

In [5]:
spark_df.registerTempTable('wadiz')
res= sqlContext.sql('SELECT id, name, summary, category, makerName, rangeAmount, soop from wadiz')
res.show()

+-----+--------------------+--------------------+--------+-------------------+-----------+--------------------+
|   id|                name|             summary|category|          makerName|rangeAmount|                soop|
+-----+--------------------+--------------------+--------+-------------------+-----------+--------------------+
|10038|당신의 공간에 이야기를 불어넣는...|
    "사람들은 모두 각자 ...|   디자인소품|           with yoo|          3|디자인소품 with yoo wi...|
|10047|세상을 바꾸는 작은선물 밸리스 ...|
    본 프로젝트의 수익금 ...|      푸드|                밸리스|          0|푸드 밸리스 밸리스 
    본...|
|10048|배변자세 교정기 모닝스마일로 변...|
    배변자세 교정기 "모닝...|   디자인소품|       GFV Co,. Ltd|          0|디자인소품 GFV Co,. Lt...|
|10052|당신의 취미를 찾아주는 특별한 ...|
    당신의 취미를 찾아드립...|   디자인소품|               HOBY|          1|디자인소품 HOBY HOBY 
...|
|10056|건강한 두뇌를 위한 취미이야기,...|
    당신은 어떤 운동을 하...|   디자인소품|이에스 스튜디오(ES studio)|          0|디자인소품 이에스 스튜디오(ES...|
|10090|혼자있어 외로운 반려동물과 당신...|
    당신과 반려동물의 행복...|    반려동물|            (주)아이오텍|          3|반려동물 (주)아이오텍 (

In [6]:
from pyspark.ml.feature import Tokenizer,HashingTF, Word2Vec
tokenizer = Tokenizer(inputCol='soop', outputCol='keywords')
wordData = tokenizer.transform(res)
word2Vec = Word2Vec(vectorSize=100, minCount=5, inputCol='keywords', outputCol='word_vec', seed=123)
word2VecData = word2Vec.fit(wordData)
word2VecData = word2VecData.transform(wordData)

In [7]:
all_wadiz_vecs = word2VecData.select('id','word_vec').rdd.map(lambda x: (x[0], x[1])).collect()

In [8]:
all_wadiz_vecs[1]

(10047,
 DenseVector([0.0671, 0.0423, 0.0682, -0.0319, -0.0441, -0.0043, 0.0722, -0.0191, 0.043, 0.0044, 0.0248, -0.0543, -0.0116, -0.0228, -0.0022, 0.014, -0.0083, -0.0015, -0.0254, 0.0159, -0.0684, -0.0394, 0.078, 0.1482, -0.0218, -0.0021, -0.0602, 0.0266, 0.0263, -0.0261, 0.1046, -0.018, 0.0044, -0.0966, 0.0291, 0.1105, 0.0389, -0.0049, 0.0571, -0.0619, -0.0771, 0.0615, 0.0355, 0.0546, -0.0024, 0.0564, -0.0859, -0.0697, -0.0146, -0.0142, 0.0062, 0.0423, -0.0817, -0.0037, 0.1208, 0.0224, 0.0032, 0.0148, 0.1109, 0.0224, 0.007, 0.0646, -0.0861, 0.0254, 0.0006, 0.0128, -0.0978, 0.0124, -0.0792, -0.0799, 0.0497, 0.0191, -0.0668, 0.0457, -0.0247, 0.0202, 0.108, -0.0493, -0.0358, 0.026, 0.0915, 0.0364, -0.0389, -0.0303, -0.0117, 0.0071, 0.0949, -0.1168, 0.0497, -0.057, 0.0077, -0.0958, -0.0393, 0.0625, 0.0163, -0.0276, 0.1247, -0.0137, -0.0285, -0.0471]))

In [9]:
def CosineSim(vec1, vec2): 
    return np.dot(vec1, vec2) / np.sqrt(np.dot(vec1, vec1)) / np.sqrt(np.dot(vec2, vec2))

In [10]:
# input: 상품 id 리스트 > output: 추천 상품 (input_id(입력한 상품 id), id(입력한 상품과 코사인 유사도가 제일 높은 상품 id), score(코사인 유사도 계산)))
def getSimilarProduct(w_ids, sim_product_limit=10):
    schema = StructType([   
                        StructField("id", StringType(), True)
                        ,StructField("score", IntegerType(), True)
                        ,StructField("input_id", StringType(), True)
                        ])
    similar_products_df = spark.createDataFrame([], schema)
    for w_id in w_ids:
        w_id = int(w_id)
        input_vec = []
        for r in all_wadiz_vecs:
            if r[0] == w_id:
                input_vec = [(r[1])][0]
            else:
                continue
            similar_product_rdd = sc.parallelize((i[0], float(CosineSim(input_vec, i[1]))) for i in all_wadiz_vecs)
        
            similar_product_df = spark.createDataFrame(similar_product_rdd)\
                            .withColumnRenamed('_1', 'id')\
                            .withColumnRenamed('_2', 'score')\
                            .orderBy('score', ascending = False)
            similar_product_df = similar_product_df.filter(col("id") != w_id).limit(sim_product_limit)
            similar_product_df = similar_product_df.withColumn('input_id', lit(w_id))
            similar_products_df = similar_products_df \
                                        .union(similar_product_df)
    return similar_products_df

In [11]:
# input: 추천 상품 (input_id, id, score) > output: inner join (input_id, id, score, name, category, makerName, summary)
def getProductDetails(in_product):
    a = in_product.alias("a")
    b = spark_df.alias("b")
    
    return a.join(b, col("a.id") == col("b.id"), 'inner') \
             .select([col('a.'+xx) for xx in a.columns] + [col('b.name'),col('b.category'),
                                                           col('b.makerName'),col('b.summary')])
                                                        

### Test Recommend

In [12]:
import numpy as np
from pyspark.sql.functions import *

# 상품 id 리스트
wids = [54968, 53536, 42496, 30763, 34841]

# 상품 추천 및 추천 상품 디테일 
sims = getProductDetails(getSimilarProduct(wids))
sims.select('input_id','id','name','summary','category','makerName','score').toPandas()


  


Unnamed: 0,input_id,id,name,summary,category,makerName,score
0,34841,36468,봄을 닮은 아카시아 향수 : PE FU [FAIRY],"\n 도시의 매연, 자극적인 향수로 지친 후각을 자연의 향으로 힐링하세요.\r...",뷰티,코이컴퍼니,0.99183
1,53536,36898,"쾌적한 잠자리 환경을 위한 똑똑한 생활습관 ""굿베딩 라이트 스프레이""","\n 뷰티를 목적으로 하는 향수가 아닌, 자연을 닮은 향기로 안정감과 편안함을...",홈리빙,디노보(DENOVO),0.986791
2,34841,34175,"사랑하는 사람(U)을 위해 준비하는 선택, 유추프라카치아","\n 너 이름이 뭐니? 사랑하는 사람을 위해 준비하는 선택, 나의 [유추프라카...",뷰티,Ucuprakacia(유추프라카치아),0.992907
3,30763,11125,세상을 바꾼 이들의 얼굴과 철학을 담은 셀럽팝아트,"\n 2017년에는 꿈꾸는 이들이 그 꿈을 이룰 수 있도록, 'aboutfra...",디자인소품,aboutframe,0.995964
4,53536,37148,스크린 골프! 편하고 저렴하게 집에서 즐기세요 . [펀골프],\n [펀골프] 는 손 쉽게 어디서든 골프를 즐길 수 있는 골프 시뮬레이터입니...,테크·가전,(주)펀골프,0.98486
5,34841,58300,사랑스러운 내 반려견의 손그림 초상화 주문제작,"\n 아름다운 반려견과 반려견을 사랑하는 우리의 마음, 사랑받는 반려견의 행복...",반려동물,정유나,0.992129
6,54968,58713,[LTE와 가성비만남] 태블릿PC 고민? 레볼루션 G10하나로,\n 가심비 태블릿의 명가 아이뮤즈가 선보이는 첫번째 LTE 태블릿!\n필압조...,테크·가전,포유디지탈,0.99591
7,53536,49992,환경을생각하는 밀크티만들기KIT(이제는 집에서 만들수있어),\n 환경을 생각하는 밀크티만들기KIT 입니다. 환경도 생각하며 집에서도 쉽게...,푸드,오캄앤티,0.988023
8,53536,43280,간편하고 편리한! 윌빅종이그릇!,\n 윌빅종이그릇은 사이즈를 축소하여 자유로운 야외활동을 할 수 있습니다. 간...,반려동물,WilBiC,0.986404
9,54968,49818,"3형제의 제주 귀농, 청년감귤",\n 서울에서 내려온 청년농업인 형제가 일조량이 높은 제주 한경면에서 재배한 ...,푸드,청년감귤,0.994108


### User based Recomment

In [13]:
import csv
import ast
fr = open('users.csv', 'r', encoding='utf-8')
fw = open('users2.csv', 'w', encoding='utf-8')
rdr = csv.reader(fr)
final_list = []
final_list.append(['user_id', 'funding_id'])
for line in rdr:
    funding_list = ast.literal_eval(line[1])
    userid = line[0]
    for fund in funding_list:
      funding_id = fund[0]
      if int(userid) == 0:
          print(funding_id)
      final_list.append([userid, funding_id])
  
wtr = csv.writer(fw)
for row in final_list:
    wtr.writerow(row)
print(len(final_list))
fr.close()
fw.close()

457921


In [14]:
user_df = spark.read.csv(
    'users2.csv', header=True, inferSchema=True
)

In [15]:
user_df.select('user_id' ,'funding_id').show()
user_df.printSchema()

+--------+----------+
| user_id|funding_id|
+--------+----------+
|21910001|     63340|
|21910001|     65295|
|21910001|     66226|
|21910001|     65394|
|21910001|     64316|
|21910001|     64291|
|21910001|     64491|
|21910001|     62535|
|21910001|     46865|
|21910001|     56162|
|21910001|     61133|
|21910001|     58058|
|21910001|     57001|
|21910001|     58123|
|21910001|     55116|
|21910001|     57895|
|21910001|     54391|
|21910001|     58655|
|21910001|     55940|
|21910001|     56103|
+--------+----------+
only showing top 20 rows

root
 |-- user_id: integer (nullable = true)
 |-- funding_id: integer (nullable = true)



In [16]:
user_rdd = user_df.select('user_id','funding_id').rdd.map(lambda x: (x[0], x[1])).collect()

In [17]:
def getUserBasedRecoms(u_id):
    funding_ids = []
    for u in user_rdd:
        if u[0] == u_id:
            funding_ids.append(u[1])
    print(funding_ids)
    return funding_ids

In [18]:
funding_ids = getUserBasedRecoms(41140101)
print(funding_ids)
sims = getProductDetails(getSimilarProduct(funding_ids))
sims.select('input_id','id','name','summary','category','makerName','score').toPandas()

[54968, 53536, 42496, 30763, 34841]
[54968, 53536, 42496, 30763, 34841]


  


Unnamed: 0,input_id,id,name,summary,category,makerName,score
0,34841,36468,봄을 닮은 아카시아 향수 : PE FU [FAIRY],"\n 도시의 매연, 자극적인 향수로 지친 후각을 자연의 향으로 힐링하세요.\r...",뷰티,코이컴퍼니,0.99183
1,53536,36898,"쾌적한 잠자리 환경을 위한 똑똑한 생활습관 ""굿베딩 라이트 스프레이""","\n 뷰티를 목적으로 하는 향수가 아닌, 자연을 닮은 향기로 안정감과 편안함을...",홈리빙,디노보(DENOVO),0.986791
2,34841,34175,"사랑하는 사람(U)을 위해 준비하는 선택, 유추프라카치아","\n 너 이름이 뭐니? 사랑하는 사람을 위해 준비하는 선택, 나의 [유추프라카...",뷰티,Ucuprakacia(유추프라카치아),0.992907
3,30763,11125,세상을 바꾼 이들의 얼굴과 철학을 담은 셀럽팝아트,"\n 2017년에는 꿈꾸는 이들이 그 꿈을 이룰 수 있도록, 'aboutfra...",디자인소품,aboutframe,0.995964
4,53536,37148,스크린 골프! 편하고 저렴하게 집에서 즐기세요 . [펀골프],\n [펀골프] 는 손 쉽게 어디서든 골프를 즐길 수 있는 골프 시뮬레이터입니...,테크·가전,(주)펀골프,0.98486
5,34841,58300,사랑스러운 내 반려견의 손그림 초상화 주문제작,"\n 아름다운 반려견과 반려견을 사랑하는 우리의 마음, 사랑받는 반려견의 행복...",반려동물,정유나,0.992129
6,54968,58713,[LTE와 가성비만남] 태블릿PC 고민? 레볼루션 G10하나로,\n 가심비 태블릿의 명가 아이뮤즈가 선보이는 첫번째 LTE 태블릿!\n필압조...,테크·가전,포유디지탈,0.99591
7,53536,49992,환경을생각하는 밀크티만들기KIT(이제는 집에서 만들수있어),\n 환경을 생각하는 밀크티만들기KIT 입니다. 환경도 생각하며 집에서도 쉽게...,푸드,오캄앤티,0.988023
8,53536,43280,간편하고 편리한! 윌빅종이그릇!,\n 윌빅종이그릇은 사이즈를 축소하여 자유로운 야외활동을 할 수 있습니다. 간...,반려동물,WilBiC,0.986404
9,54968,49818,"3형제의 제주 귀농, 청년감귤",\n 서울에서 내려온 청년농업인 형제가 일조량이 높은 제주 한경면에서 재배한 ...,푸드,청년감귤,0.994108
