In [None]:
import pandas as pd
import numpy as np
import os
import math


from pyspark.sql.session import SparkSession
# spark세션 인스턴스
spark = SparkSession.builder.appName('master').getOrCreate()

from pyspark.sql.types import *
from pyspark.sql.functions import desc, asc
from pyspark.sql import functions as f
from pyspark.sql.functions import col

In [None]:
from pywebhdfs.webhdfs import PyWebHdfsClient
from pprint import pprint
 

# path안에 있는 파일 및 폴더이름을 리스트로 반환한다
# 단 path는 'user/hadoop/data_analysis/coupang_data/' 와 같이 http를 기입하지 않는다
def get_dir(path):
    hdfs = PyWebHdfsClient(host='localhost',port='50070', user_name='master')  # your Namenode IP & username here
    my_dir = path
    
    result = []
    
    for i in range(len(hdfs.list_dir(my_dir)['FileStatuses']['FileStatus'])):
        result.append(hdfs.list_dir(my_dir)['FileStatuses']['FileStatus'][i]['pathSuffix'])
    
    return result

# pandas dataframe으로 csv파일을 열고 dataframe을 반환한다
# 단 path는 'user/' 로 시작
def open_pd_csv(path):
    file_path = 'hdfs://192.168.0.9:9000/' + path
    data = spark.read.csv(file_path, header=True)
    result = data.toPandas()
    
    return result

# pandas dataframe을 path에 저장한다
# 단 path는 'user/' 로 시작 , 파일명까지 포함 ex) uesr/hadoop/test.csv
def save_pd_csv(df ,path):
    file_path = 'hdfs://192.168.0.9:9000/' + path
    data = spark.createDataFrame(df)
    data.coalesce(1).write.mode("overwrite").option("header","true").csv(file_path)
    
# spark datafrmae schema 설정
def ad_schema(df):
    intCols = ['rank','category','price', 'discount_percentage', 'rating_total_count',
               'reviews_for_last1year', 'sales']
    doubleCols = ['rating']
    boolCols = ['rocket_delivery', 'is_out_of_stock']
        
    for c in df.columns:
        type_col = StringType()
        if c in intCols:
            type_col = IntegerType()
        elif c in doubleCols:
            type_col = DoubleType()
        elif c in boolCols:
            type_col = BooleanType()
        
        df = df.withColumn(c,df[c].cast(type_col))
    df = df.fillna(0)
    return df

In [None]:
# 판매량 산정 후 결과 저장
basic_folder = 'user/hadoop/data_analysis/merged_data/'
file_list = get_dir(basic_folder)

for file in file_list[:1]:
    file_name = basic_folder + file

    # 스파크 dataframe 생성(csv파일 읽기)
    df = spark.read.csv('hdfs://192.168.0.9:9000/' + file_name, header=True)
    df.show()
    
    # 스키마 정의
    intCols = ['rank','category','price', 'discount_percentage', 'rating_total_count',
                  'reviews_for_last1year']
    doubleCols = ['rating']
    boolCols = ['rocket_delivery', 'is_out_of_stock']
        
    for c in df.columns:
        type_col = StringType()
        if c in intCols:
            type_col = IntegerType()
        elif c in doubleCols:
            type_col = DoubleType()
        elif c in boolCols:
            type_col = BooleanType()
        
        df = df.withColumn(c,df[c].cast(type_col))
    df = df.fillna(0)
    
    # 카테고리 출력
    category = file[:file.find('_')]
    print('카테고리 : ',category)

    # 판매량 산정
    temp = df.select('reviews_for_last1year').sort(asc('reviews_for_last1year'))
    temp = df.select('reviews_for_last1year').filter(df.reviews_for_last1year > 0).\
    sort(asc('reviews_for_last1year')).collect()[0:10]
    
    std = 0
    for r in temp :
        std += r['reviews_for_last1year']
    std = std/10

    df = df.withColumn('sales', (1080 - df.rank)*std + (df.reviews_for_last1year)*3)
    
    # 결과 저장(parquet로 저장)
    file_path = 'hdfs://192.168.0.9:9000/' + 'user/hadoop/data_analysis/result2/' + category + '.parquet'
    df.coalesce(1).write.mode("overwrite").option("header","true").format("parquet").save(file_path)

    

In [None]:
# 카테고리명과 product_id리스트를 받아서 공통 요소 출력하기

category = 'bed'
id_list = ['295552610', '60659732', '218912675', '327985427', '1370896429', 
           '4830312158', '2358790214', '46172654', '110925026', '1105311096', 
           '1401559740', '1096195704', '2627325', '1129835', '2083440667', 
           '2240204468', '1472027989', '1801552884', '1829493113', '1555433138', 
           '32023744', '5471866788', '1914536362', '51417867', '57063884', 
           '2627331', '1829493909', '1670223623', '145323302', '5335598611', 
           '1911251180', '4354081837', '5410866415', '1119670850', '78757060', 
           '4513411171', '1579676541', '5125772618', '1713348', '1539425008', 
           '5320324198', '5526939920', '4604849960', '286156541', '5359524617', 
           '133484986', '1790298133', '5417255169', '2035043294', '5125849391', 
           '1816492647', '5347898691', '169613853', '1141993983', '5369381450', 
           '133097891', '1358436262', '1458760', '313670425', '1336182884', 
           '1194148073', '1315500107', '90440244', '5301820049', '2087915123', 
           '51860502', '60951423', '5009754884', '1984949332', '30267111', '12372816', 
           '29865253', '4362832164', '115130948', '1758838803', '5339561980', 
           '2287140789', '285454538', '1960868118', '2035043478', '341306883', 
           '1357872956', '5072181289', '49300701', '41068685', '2354751719', 
           '174679218', '2082645126', '2196909053', '129157588', '4356581421', 
           '1713311', '2044209949', '136081082', '3226430', '4526702040', 
           '97627864', '120008773', '255068544']


# 해당하는 카테고리의 데이터프레임 가져오기(parquet파일 읽기)
file_name = 'user/hadoop/data_analysis/result2/' + category + '.parquet'
df = spark.read.parquet('hdfs://192.168.0.9:9000/' + file_name)
df = ad_schema(df)

# id_list에 있는 상품들로 dataframe만들기
df_target = spark.createDataFrame(id_list, StringType())
df_target = df_target.selectExpr("value as product_id")
df_target = df.join(df_target, 'product_id', how='inner')


# 공통 요소
commonFactors = []

# 최다공통 요소 출력
for col in df_target.columns[13:len(df_target.columns)-1]:
#     print('특성 :', col)
    result = df_target.groupby(col).agg(f.count(col).alias("개수"),
                                 f.sum('sales').alias('판매량')).sort(desc('개수'))
    result = result.filter((f.col(col) != 'NULL'))

    if result.count() != 0:
        key = str(result.select(col).take(1)[0])
        key = key[key.find("=")+2:key.find(")")-1]
#         count = str(result.select("개수").take(1)[0])
        count = count[count.find("(")+1:key.find(")")]
        commonFactors.append(col+ ' : ' +key)

#     result.show()
print("공통 요소 :", commonFactors)
print('-------------------------------------------------------------------------------------------------------------------')