# 연결

In [3]:
import findspark
import pandas as pd

findspark.init()
from pyspark.sql import SparkSession

#데이터를 분석의 성능을 높이기위해 분산처리인 yarn을 적용
spark = SparkSession.builder.master("yarn").\
	appName("mypyspark").getOrCreate()

#모두 하둡파일 시스템의 경로로 지정해 두었습니다.
ProductFilePath  = "hdfs:///user/s21410781/finance/01.Product.csv"
Search1FilePath  = "hdfs:///user/s21410781/finance/02.Search1.csv"
Search2FilePath  = "hdfs:///user/s21410781/finance/03.Search.csv"
CustomFilePath  = "hdfs:///user/s21410781/finance/04.Custom.csv"
SessionFilePath  = "hdfs:///user/s21410781/finance/05.Session.csv"
MasterFilePath  = "hdfs:///user/s21410781/finance/06.Master.csv"

# csv파일을 파이썬으로 읽어옵니다.
Product = spark.read.csv(ProductFilePath, header='true', sep=',')
Product.createOrReplaceTempView("product")

Search1 = spark.read.csv(Search1FilePath, header='true', sep=',')
Search1.createOrReplaceTempView("search1")

Search2 = spark.read.csv(Search2FilePath, header='true', sep=',')
Search2.createOrReplaceTempView("search2")

Custom = spark.read.csv(CustomFilePath, header='true', sep=',')
Custom.createOrReplaceTempView("custom")

Session = spark.read.csv(SessionFilePath, header='true', sep=',')
Session.createOrReplaceTempView("session")

Master = spark.read.csv(MasterFilePath, header='true', sep=',')
Master.createOrReplaceTempView("master")



# search2파일의 검색수를 문자열 -> 숫자로 바꿈 (과정)

In [26]:
# 데이터가 숫자의 데이터여야할것이 문자열로 저장이 되어있었어서 숫자로 바꾸어주는것 (ex. 23,000 -> 230000)
search = spark.sql("""
            select SESS_DT ,KWD_NM ,replace(SEARCH_CNT,',','') as SEARCH_CNT 
            from search2   
            """)

search.createOrReplaceTempView("search")

# search파일의 일별-> 월별로 고침(과정)

In [27]:
#월별로 분석을 위해서 월별로 고침
search_1 = spark.sql("""
            select substr(SESS_DT,3,4) as SESS_DT ,KWD_NM ,sum(SEARCH_CNT) as SEARCH_CNT 
            from search
            group by SESS_DT,KWD_NM
            order by SESS_DT,SEARCH_CNT""")

search_1.createOrReplaceTempView("search_1")

# 월별 전체 검색어량

In [33]:
search_3 = spark.sql("""
            select SESS_DT ,round(sum(SEARCH_CNT),0) as SEARCH_CNT
            from search_2
            group by SESS_DT""")

search_3.createOrReplaceTempView("search_3")
p_search_3=search_3.toPandas()
p_search_3.to_csv("p_search_3.csv",mode="w")

+-------+-----------+
|SESS_DT| SEARCH_CNT|
+-------+-----------+
|   1808|1.0128466E7|
|   1805|1.1737245E7|
|   1804|1.1481214E7|
|   1809|   1.1089E7|
|   1806| 1.125495E7|
|   1807|1.1559653E7|
+-------+-----------+



# 월별 검색어별 수량

In [28]:
search_2 = spark.sql("""
            select SESS_DT ,KWD_NM ,sum(SEARCH_CNT) as SEARCH_CNT
            from search_1
            group by SESS_DT,KWD_NM
            order by SESS_DT,SEARCH_CNT desc""")

search_2.createOrReplaceTempView("search_2")

p_search_2=search_2.toPandas()
p_search_2.to_csv("p_search_2.csv",mode="w")

# 4,5,6,7,8,9월 검색어 TOP20

In [42]:
Top20_4=spark.sql("""
            select SESS_DT ,KWD_NM ,SEARCH_CNT
            from search_2
            where SESS_DT=1804
            order by SEARCH_CNT desc
            limit 20""")

Top20_4.createOrReplaceTempView("Top20_4")
p_Top20_4=Top20_4.toPandas()
p_Top20_4.to_csv("p_Top20_4.csv",mode="w")

Top20_5=spark.sql("""
            select SESS_DT ,KWD_NM ,SEARCH_CNT
            from search_2
            where SESS_DT=1805
            order by SEARCH_CNT desc
            limit 20""")

Top20_5.createOrReplaceTempView("Top20_5")

p_Top20_5=Top20_5.toPandas()
p_Top20_5.to_csv("p_Top20_5.csv",mode="w")

Top20_6=spark.sql("""
            select SESS_DT ,KWD_NM ,SEARCH_CNT
            from search_2
            where SESS_DT=1806
            order by SEARCH_CNT desc
            limit 20""")

Top20_6.createOrReplaceTempView("Top20_6")
p_Top20_6=Top20_6.toPandas()
p_Top20_6.to_csv("p_Top20_6.csv",mode="w")

Top20_7=spark.sql("""
            select SESS_DT ,KWD_NM ,SEARCH_CNT
            from search_2
            where SESS_DT=1807
            order by SEARCH_CNT desc
            limit 20""")

Top20_7.createOrReplaceTempView("Top20_7")
p_Top20_7=Top20_7.toPandas()
p_Top20_7.to_csv("p_Top20_7.csv",mode="w")

Top20_8=spark.sql("""
            select SESS_DT ,KWD_NM ,SEARCH_CNT
            from search_2
            where SESS_DT=1808
            order by SEARCH_CNT desc
            limit 20""")

Top20_8.createOrReplaceTempView("Top20_8")

p_Top20_8=Top20_8.toPandas()
p_Top20_8.to_csv("p_Top20_8.csv",mode="w")

Top20_9=spark.sql("""
            select SESS_DT ,KWD_NM ,SEARCH_CNT
            from search_2
            where SESS_DT=1809
            order by SEARCH_CNT desc
            limit 20""")

Top20_9.createOrReplaceTempView("Top20_9")

p_Top20_9=Top20_9.toPandas()
p_Top20_9.to_csv("p_Top20_9.csv",mode="w")



# 이월된 TOP검색어에서 없는것들 추출과정


In [60]:
top4_5=spark.sql("""
            select a.KWD_NM as KWD_NM1 ,a.SEARCH_CNT as SEARCH_CNT1 , b.KWD_NM as KWD_NM2  ,b.SEARCH_CNT as SEARCH_CNT2
            from top20_4 as a right outer join top20_5 as b on a.KWD_NM = b.KWD_NM 
            """)

top4_5.createOrReplaceTempView("top4_5")

p_top4_5=top4_5.toPandas()
p_top4_5.to_csv("p_top4_5.csv",mode="w")

top5_6=spark.sql("""
            select a.KWD_NM as KWD_NM1 ,a.SEARCH_CNT as SEARCH_CNT1 , b.KWD_NM as KWD_NM2  ,b.SEARCH_CNT as SEARCH_CNT2
            from top20_5 as a right outer join top20_6 as b on a.KWD_NM = b.KWD_NM 
            """)

top5_6.createOrReplaceTempView("top5_6")

p_top5_6=top5_6.toPandas()
p_top5_6.to_csv("p_top5_6.csv",mode="w")

top6_7=spark.sql("""
            select a.KWD_NM as KWD_NM1 ,a.SEARCH_CNT as SEARCH_CNT1 , b.KWD_NM as KWD_NM2  ,b.SEARCH_CNT as SEARCH_CNT2
            from top20_6 as a right outer join top20_7 as b on a.KWD_NM = b.KWD_NM 
            """)

top6_7.createOrReplaceTempView("top6_7")

p_top6_7=top6_7.toPandas()
p_top6_7.to_csv("p_top6_7.csv",mode="w")

top7_8=spark.sql("""
            select a.KWD_NM as KWD_NM1 ,a.SEARCH_CNT as SEARCH_CNT1 , b.KWD_NM as KWD_NM2  ,b.SEARCH_CNT as SEARCH_CNT2
            from top20_7 as a right outer join top20_8 as b on a.KWD_NM = b.KWD_NM 
            """)

top7_8.createOrReplaceTempView("top7_8")

p_top7_8=top7_8.toPandas()
p_top7_8.to_csv("p_top7_8.csv",mode="w")

top8_9=spark.sql("""
            select a.KWD_NM as KWD_NM1 ,a.SEARCH_CNT as SEARCH_CNT1 , b.KWD_NM as KWD_NM2  ,b.SEARCH_CNT as SEARCH_CNT2
            from top20_8 as a right outer join top20_9 as b on a.KWD_NM = b.KWD_NM 
            """)

top8_9.createOrReplaceTempView("top8_9")

p_top8_9=top8_9.toPandas()
p_top8_9.to_csv("p_top8_9.csv",mode="w")

# 5,6,7,8,9월에 갑자기 나타난 top20검색어

In [62]:
outer_5=spark.sql("""
            select KWD_NM2
            from top4_5
            where KWD_NM1 is null
            """)


outer_5.createOrReplaceTempView("outer_5")
p_outer_5=outer_5.toPandas()
p_outer_5.to_csv("p_outer_5.csv",mode="w")

outer_6=spark.sql("""
            select KWD_NM2
            from top5_6
            where KWD_NM1 is null
            """)


outer_6.createOrReplaceTempView("outer_6")
p_outer_6=outer_6.toPandas()
p_outer_6.to_csv("p_outer_6.csv",mode="w")

outer_7=spark.sql("""
            select KWD_NM2
            from top6_7
            where KWD_NM1 is null
            """)


outer_7.createOrReplaceTempView("outer_7")
p_outer_7=outer_7.toPandas()
p_outer_7.to_csv("p_outer_7.csv",mode="w")

outer_8=spark.sql("""
            select KWD_NM2
            from top7_8
            where KWD_NM1 is null
            """)


outer_8.createOrReplaceTempView("outer_8")
p_outer_8=outer_8.toPandas()
p_outer_8.to_csv("p_outer_8.csv",mode="w")

outer_9=spark.sql("""
            select KWD_NM2
            from top8_9
            where KWD_NM1 is null
            """)


outer_9.createOrReplaceTempView("outer_9")
p_outer_9=outer_9.toPandas()
p_outer_9.to_csv("p_outer_9.csv",mode="w")




# 5,6,7,8,9월에 갑자기 나타난 검색어의 검색량을 나타내는 테이블

In [29]:
outer_search=spark.sql(""" select  SESS_DT,KWD_NM,SEARCH_CNT
                            from search_2
                           where KWD_NM = '크록스' or
                               KWD_NM = '나이키운동화' or
                               KWD_NM = '온앤온' or
                               KWD_NM = '조르지오아르마니' or
                               KWD_NM = '샌들' or
                               KWD_NM = '린넨 원피스' or
                               KWD_NM = '롱원피스' or
                               KWD_NM = '핏플랍' or
                               KWD_NM = '플라스틱아일랜드' or
                               KWD_NM = '라코스테' or
                               KWD_NM = '양산' or
                               KWD_NM = '래쉬가드' or
                               KWD_NM = '아쿠아슈즈' or
                               KWD_NM = '롱패딩' or
                               KWD_NM = '헤지스레이디스' or
                               KWD_NM = '찰스앤키스' or
                               KWD_NM = '샤넬' or
                               KWD_NM = '나스' or
                               KWD_NM = '쥬시쥬디' or
                               KWD_NM = '뉴발란스키즈' or
                               KWD_NM = '아디다스키즈' or
                               KWD_NM = '정관장' or
                               KWD_NM = '블루독베이비' or
                               KWD_NM = '블루독' or
                               KWD_NM = '베베드피노' or
                               KWD_NM = '지오다노'   
                               order by SESS_DT,KWD_NM
 """)

outer_search.createOrReplaceTempView("outer_search")
p_outer_search=outer_search.toPandas()
p_outer_search.to_csv("p_outer_search.csv",mode="w")

# product 와 master를 조인해서 대분류를 끼워넣음(과정)

In [7]:
middle_table = spark.sql("""select P.CLNT_ID,P.SESS_ID, P.PD_C, replace(P.PD_BUY_AM,',','') as PD_BUY_AM, M.CLAC1_NM
            from product as P inner join master as M
            where P.PD_C = M.PD_C
            order by CLNT_ID""")

middle_table.createOrReplaceTempView("middle")

# 일별을 월별로 substr해서 고친것(과정)

In [8]:
month = spark.sql("""
            select CLNT_ID , SESS_ID , substr(SESS_DT,3,4) as SESS_DT ,DVC_CTG_NM ,ZON_NM ,CITY_NM
            from session
            order by CLNT_ID""")


month.createOrReplaceTempView("month")

# 월별 상품 대분류테이블(과정)

In [9]:
pro_month = spark.sql(""" select p.CLNT_ID ,p.SESS_ID , p.PD_BUY_AM,p.PD_C,mon.SESS_DT ,mon.DVC_CTG_NM ,mon.ZON_NM ,mon.CITY_NM ,P.CLAC1_NM
                            from middle as p join month as mon using (CLNT_ID,SESS_ID) 
                            order by CLNT_ID""")


pro_month.createOrReplaceTempView("pro_month")

# 월별 상품 대분류와 성별 연령을 합친 테이블(과정)

In [10]:
pro_month_cus = spark.sql(""" select p.CLNT_ID ,p.SESS_ID ,p.PD_C, p.PD_BUY_AM,p.SESS_DT ,p.DVC_CTG_NM ,p.ZON_NM ,p.CITY_NM ,p.CLAC1_NM, c.CLNT_GENDER ,c.CLNT_AGE
                            from  pro_month as p join custom as c using (CLNT_ID)
                            order by CLNT_ID""")


pro_month_cus.createOrReplaceTempView("pro_month_cus")


# 월별 거래량과 소비가격

In [12]:
pro_month_count=spark.sql(""" select SESS_DT, count(CLAC1_NM), int(avg(PD_BUY_AM))
                            from pro_month
                            group by SESS_DT
                            order by SESS_DT""") 
pro_month_count.createOrReplaceTempView("pro_month_count")
p_pro_month_count=pro_month_count.toPandas()
p_pro_month_count.to_csv("p_pro_month_count.csv",mode="w")

# 월별 성별에 따른 거래량

In [13]:
mon_gen=spark.sql(""" select SESS_DT, CLNT_GENDER,count(CLNT_ID)
                            from pro_month_cus
                            group by SESS_DT,CLNT_GENDER
                            order by SESS_DT""")

mon_gen.createOrReplaceTempView("mon_gen")
p_mon_gen=mon_gen.toPandas()
p_mon_gen.to_csv("p_mon_gen.csv",mode="w")

# 월별 대분류별 거래량

In [14]:
pro_mon_calc1= spark.sql(""" select CLAC1_NM,SESS_DT, count(CLAC1_NM)
                            from pro_month
                            group by CLAC1_NM,SESS_DT
                            order by CLAC1_NM,SESS_DT""")
pro_mon_calc1.createOrReplaceTempView("pro_mon_calc1")
p_pro_mon_calc1=pro_mon_calc1.toPandas()
p_pro_mon_calc1.to_csv("p_pro_mon_calc1.csv",mode="w")

In [None]:
p_pro_mon_clac1 = pd.read_csv("")

# 패딩 연결 / 경량,숏,롱 패딩같은것 다 포함되어잇음

In [8]:
padingFilePath  = "hdfs:///user/s21410781/finance/pading.csv"
pading = spark.read.csv(padingFilePath, header='true', sep=',')
pading.createOrReplaceTempView("pading")

# 패딩이라는 이름이있는 상품을 모두 찾아 count하는 테이블

In [15]:
pading_join=spark.sql(""" select *
                            from pro_month_cus as a join pading as b using(PD_C)
                         """)

pading_join.createOrReplaceTempView("pading_join")

mon_pading=spark.sql(""" select  SESS_DT, count(CLNT_ID), avg(PD_BUY_AM)
                            from pading_join
                         group by SESS_DT
                            order by SESS_DT """)

mon_pading.createOrReplaceTempView("mon_pading")
p_mon_pading=mon_pading.toPandas()
p_mon_pading.to_csv("p_mon_pading.csv",mode="w")

# 롱패딩 연결후 롱패딩이라는 이름이 있는 
# 상품을 모두 찾아 count하는 테이블

In [18]:
longpadingFilePath  = "hdfs:///user/s21410781/finance/longpading.csv"
longpading = spark.read.csv(longpadingFilePath, header='true', sep=',')
longpading.createOrReplaceTempView("longpading")

longpading_join=spark.sql(""" select *
                            from pro_month_cus as a join longpading as b using(PD_C)
                         """)

longpading_join.createOrReplaceTempView("longpading_join")

mon_longpading=spark.sql(""" select  SESS_DT, count(CLNT_ID), round(avg(PD_BUY_AM),0)
                            from longpading_join
                         group by SESS_DT
                            order by SESS_DT """)

mon_longpading.createOrReplaceTempView("mon_longpading")
p_mon_longpading=mon_longpading.toPandas()
p_mon_longpading.to_csv("p_mon_longpading.csv",mode="w")
