## Pyspark 연습
* Basic Data handling
* Descriptive summary
* Feature extraction
* Build ML model

* 참고
    * https://spark.apache.org/docs/2.2.0/mllib-statistics.html
    * https://jaeyung1001.tistory.com/59
    * https://hendra-herviawan.github.io/pyspark-dataframe-row-columns.html
    * https://hendra-herviawan.github.io/pyspark-groupby-and-aggregate-functions.html

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

In [2]:
import pandas as pd
from datetime import datetime, date
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [3]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [4]:
df.select(df.c).show()

+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+



In [5]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [6]:
df.repartition(1).count()

3

In [5]:
df = spark.read.csv('./dataset/clickstream/total_datapoint.csv', inferSchema=True, header=True)

In [28]:
df.count()

8772446

In [6]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- CUS_ID: integer (nullable = true)
 |-- TIME_ID: integer (nullable = true)
 |-- SITE: string (nullable = true)
 |-- SITE_CNT: double (nullable = true)
 |-- ST_TIME: double (nullable = true)
 |-- SITE_NM: string (nullable = true)
 |-- BACT_NM: string (nullable = true)
 |-- MACT_NM: string (nullable = true)
 |-- ACT_NM: string (nullable = true)
 |-- AC_TIME: integer (nullable = true)
 |-- YY_MM_DD: integer (nullable = true)
 |-- DAY: integer (nullable = true)



In [7]:
df.show()

+---+------+----------+--------------------+--------+-------+-----------------------+-------------+------------+-------------+-------+--------+---+
|_c0|CUS_ID|   TIME_ID|                SITE|SITE_CNT|ST_TIME|                SITE_NM|      BACT_NM|     MACT_NM|       ACT_NM|AC_TIME|YY_MM_DD|DAY|
+---+------+----------+--------------------+--------+-------+-----------------------+-------------+------------+-------------+-------+--------+---+
|  0|     1|2012070905|    search.naver.com|     3.0|  794.0|            네이버 검색|인터넷/컴퓨터|        검색|     포털검색|      5|20120709|  0|
|  1|     1|2012072507|     plus.google.com|     1.0|    1.0|              구글 Plus|     커뮤니티|  블로그/SNS|          SNS|      7|20120725|  2|
|  2|     1|2012081116|joongang.joinsmsn...|     2.0|    5.0|               중앙일보|  뉴스/미디어|      일간지|   종합일간지|     16|20120811|  5|
|  3|     1|2012090304|      news.naver.com|     5.0|  504.0|            네이버 뉴스|  뉴스/미디어|  인터넷신문|     포털뉴스|      4|20120903|  0|
|  4|     1|2012090506|   

In [14]:
from pyspark.sql.types import StringType
df = df.withColumn('TIME_ID', df['TIME_ID'].cast(StringType()))

In [15]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- CUS_ID: integer (nullable = true)
 |-- TIME_ID: string (nullable = true)
 |-- SITE: string (nullable = true)
 |-- SITE_CNT: double (nullable = true)
 |-- ST_TIME: double (nullable = true)
 |-- SITE_NM: string (nullable = true)
 |-- BACT_NM: string (nullable = true)
 |-- MACT_NM: string (nullable = true)
 |-- ACT_NM: string (nullable = true)
 |-- AC_TIME: integer (nullable = true)
 |-- YY_MM_DD: integer (nullable = true)
 |-- DAY: integer (nullable = true)



In [16]:
from pyspark.sql import functions as F
df = df.withColumn('new_date',F.to_date(F.unix_timestamp('TIME_ID', 'yyyyMMddHH').cast('timestamp')))

In [17]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- CUS_ID: integer (nullable = true)
 |-- TIME_ID: string (nullable = true)
 |-- SITE: string (nullable = true)
 |-- SITE_CNT: double (nullable = true)
 |-- ST_TIME: double (nullable = true)
 |-- SITE_NM: string (nullable = true)
 |-- BACT_NM: string (nullable = true)
 |-- MACT_NM: string (nullable = true)
 |-- ACT_NM: string (nullable = true)
 |-- AC_TIME: integer (nullable = true)
 |-- YY_MM_DD: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- new_date: date (nullable = true)



In [8]:
df.describe().show()

+-------+------------------+------------------+--------------------+-------------+------------------+------------------+-----------------+---------+-------+--------------------+-----------------+-------------------+------------------+
|summary|               _c0|            CUS_ID|             TIME_ID|         SITE|          SITE_CNT|           ST_TIME|          SITE_NM|  BACT_NM|MACT_NM|              ACT_NM|          AC_TIME|           YY_MM_DD|               DAY|
+-------+------------------+------------------+--------------------+-------------+------------------+------------------+-----------------+---------+-------+--------------------+-----------------+-------------------+------------------+
|  count|           8772446|           8772446|             8772446|      8772446|           8772446|           8772439|          8772446|  8772446|8772446|             8772446|          8772446|            8772446|           8772446|
|   mean|4066912.0803559236|1239.7318502730025|2.01252721501

In [27]:
df.describe(['SITE_CNT']).show()

+-------+------------------+
|summary|          SITE_CNT|
+-------+------------------+
|  count|           8772446|
|   mean| 5.478712094665502|
| stddev|11.282212254464055|
|    min|               1.0|
|    max|            1158.0|
+-------+------------------+



In [29]:
df.describe(['ST_TIME']).show()

+-------+------------------+
|summary|           ST_TIME|
+-------+------------------+
|  count|           8772439|
|   mean|134.85187802388822|
| stddev| 299.9778640844884|
|    min|               0.0|
|    max|           25274.0|
+-------+------------------+



In [20]:
df.groupBy('SITE').count().orderBy('count', ascending=False).show()

+-----------------+-------+
|             SITE|  count|
+-----------------+-------+
|    www.naver.com|1169663|
| search.naver.com| 611978|
|     www.daum.net| 547982|
|     www.nate.com| 283289|
|   blog.naver.com| 239893|
|  search.daum.net| 184816|
|   cafe.naver.com| 177760|
|   media.daum.net| 171983|
|   mail.naver.com| 157803|
|sstatic.naver.com| 153910|
|    kin.naver.com| 149198|
|   mail2.daum.net| 141135|
|    cafe.daum.net| 114749|
|   news.naver.com| 109648|
|   mail3.nate.com| 100717|
| www.facebook.com|  83023|
|   www.11st.co.kr|  80509|
|www.gmarket.co.kr|  78882|
|    news.nate.com|  76211|
|    map.naver.com|  70262|
+-----------------+-------+
only showing top 20 rows



In [21]:
df.groupBy('SITE_NM').count().orderBy('count', ascending=False).show()

+---------------+-------+
|        SITE_NM|  count|
+---------------+-------+
|         네이버|1171030|
|    네이버 검색| 982165|
|           다음| 548509|
|         네이트| 283489|
|  네이버 블로그| 251179|
|      다음 검색| 208533|
|    네이버 카페| 182038|
|     미디어다음| 171983|
|      다음 메일| 163606|
|    네이버 메일| 161696|
|      다음 카페| 144828|
|    네이버 뉴스| 134934|
|    네이트 메일| 131739|
|    네이트 뉴스| 129788|
|          G마켓|  83757|
|       페이스북|  83268|
|  네이버 지식iN|  81095|
|         11번가|  80637|
|네이버 지식쇼핑|  62060|
|     KB국민은행|  53519|
+---------------+-------+
only showing top 20 rows



In [22]:
df.groupBy('BACT_NM').count().orderBy('count', ascending=False).show()

+--------------+-------+
|       BACT_NM|  count|
+--------------+-------+
| 인터넷/컴퓨터|4189770|
|   뉴스/미디어|1292845|
|          쇼핑|1128331|
|      커뮤니티| 975819|
|   금융/부동산| 317316|
|  엔터테인먼트| 205825|
|          게임| 124541|
| 비즈니스/경제|  87964|
|     정치/행정|  80489|
|    온라인교육|  60128|
|   정보통신/IT|  55450|
|        서비스|  46611|
|생활/가정/취미|  37293|
|          여행|  36060|
|     교육/학원|  29876|
|유통/판매/운송|  28359|
|          제조|  23227|
|     건강/의학|  17802|
|   스포츠/레저|  15501|
|사회/문화/종교|  12657|
+--------------+-------+
only showing top 20 rows



In [23]:
df.groupBy('MACT_NM').count().orderBy('count', ascending=False).show()

+-----------------+-------+
|          MACT_NM|  count|
+-----------------+-------+
|             포털|2080020|
|             검색|1363454|
|       인터넷신문| 560634|
|         종합쇼핑| 520596|
|       블로그/SNS| 511496|
|     커뮤니케이션| 486011|
|     커뮤니티포털| 388400|
|         전문뉴스| 339706|
|           일간지| 288721|
|      의류 쇼핑몰| 266511|
|             은행| 137752|
|         다운로드| 113847|
|         가격비교| 103431|
|멀티미디어/동영상|  85128|
|             방송|  76139|
|       온라인게임|  60406|
|         웹서비스|  53473|
|             취업|  51231|
|   분야별커뮤니티|  49216|
|         쇼핑정보|  48528|
+-----------------+-------+
only showing top 20 rows



In [24]:
df.groupBy('ACT_NM').count().orderBy('count', ascending=False).show()

+---------------+-------+
|         ACT_NM|  count|
+---------------+-------+
|       종합포털|2079071|
|       포털검색|1235966|
|       메일계정| 478821|
|       포털뉴스| 452782|
|   포털커뮤니티| 334921|
|     포털블로그| 304635|
|       오픈마켓| 255832|
| 여성의류쇼핑몰| 179458|
|     종합일간지| 166708|
|            SNS| 141637|
|     종합쇼핑몰| 130269|
|       시중은행| 129963|
|     스포츠신문| 122358|
|       경제신문| 121289|
|     소셜커머스| 114148|
| 종합인터넷신문| 106116|
|   포털지식검색|  93483|
|       포털쇼핑|  69933|
|  동영상/비디오|  68634|
|컨텐츠공유(P2P)|  56878|
+---------------+-------+
only showing top 20 rows



## GROUPING 을 통한 Feature 생성하기
* PAGEVIEW
* DURATION
* 시간대별 PAGEVIEW / DURATION
* 요일별 PAGEVIEW / DURATION
* Top BACT_NM 50 duration 합 / pageview 합
* Top MACT_NM 100 duration 합 / pageview 합
* Top ACT_NM 500 duration 합 / pageview 합