In [1]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName="sample")
spark = pyspark.sql.SparkSession(sc)

In [2]:
origin_df = spark.read.text("log.gz")
origin_df.printSchema()
origin_df.show()

root
 |-- value: string (nullable = true)

+--------------------+
|               value|
+--------------------+
|116.45.156.236	-	...|
|118.39.115.34	-	-...|
|118.131.135.106	-...|
|121.133.199.109	-...|
|59.6.114.215	-	-	...|
|175.213.201.55	-	...|
|112.155.248.202	-...|
|211.220.237.157	-...|
|121.191.68.57	-	-...|
|121.166.155.96	-	...|
|211.205.29.147	-	...|
|1.245.107.108	-	-...|
|59.16.75.171	-	-	...|
|125.180.10.53	-	-...|
|107.134.135.133	-...|
|211.223.26.110	-	...|
|115.40.30.227	-	-...|
|115.91.139.157	-	...|
|112.165.119.229	-...|
|218.234.98.76	-	-...|
+--------------------+
only showing top 20 rows



In [3]:
from pyspark.sql.functions import *
from pyspark.sql.functions import monotonically_increasing_id 
origin_df = origin_df.rdd.map(lambda k: k.value.split("\t")).toDF()
origin_df.printSchema()
origin_df.show()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)
 |-- _4: string (nullable = true)
 |-- _5: string (nullable = true)
 |-- _6: string (nullable = true)
 |-- _7: string (nullable = true)
 |-- _8: string (nullable = true)
 |-- _9: string (nullable = true)
 |-- _10: string (nullable = true)
 |-- _11: string (nullable = true)
 |-- _12: string (nullable = true)

+---------------+---+---+--------------------+----+--------------------+--------+---+---+--------------------+--------------------+--------------------+
|             _1| _2| _3|                  _4|  _5|                  _6|      _7| _8| _9|                 _10|                 _11|                 _12|
+---------------+---+---+--------------------+----+--------------------+--------+---+---+--------------------+--------------------+--------------------+
| 116.45.156.236|  -|  -|[01/Dec/2020:16:2...| GET|https://estat.zum...|HTTP/1.1|200|  0|https://zum.com/?...|Mozilla/5.0 (W

In [4]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType

schema = StructType([
    StructField("host", StringType()),
    StructField("_", StringType()),
    StructField("user_id", StringType()),
    StructField("date", StringType()),
    StructField("http_method", StringType()),
    StructField("url", StringType()),
    StructField("http_version", StringType()),
    StructField("status_code", IntegerType()),
    StructField("length", IntegerType()),
    StructField("referrer", StringType()),
    StructField("user_agent", StringType()),
    StructField("cookie", StringType()),
    StructField("data", StringType()),
    StructField("time", StringType()),
    StructField("zuid", StringType()),
    StructField("cookie_zuid", StringType()),
    StructField("query", StringType()),

])

data_schema = StructType([
    StructField("event", StringType()),
    StructField("properties", StringType()),
])

company_synonym_schema = StructType([
    StructField("market", StringType()),
    StructField("company", StringType()),
    StructField("code", StringType()),
    StructField("synonym", StringType()),
])

suffix_added_schema = StructType([
    StructField("company", StringType()),
    StructField("code", StringType()),
    StructField("synonym_with_suffix", StringType()),
])

In [5]:
import urllib

def process(k):
    d = {key: value[0] for key, value in urllib.parse.parse_qs(urllib.parse.urlparse(k._6).query).items()}
    
    cookie_dict = {}
    for s in k["_12"].split(";"):
        s = s.strip()
        arr = s.split("=")
        cookie_dict[arr[0]] =  "=".join(arr[1:])
    
    
    return [int(k[f"_{i}"]) if i == 8 or i == 9 else k[f"_{i}"] for i in range(1, len(k) + 1)] + [d.get("data", ""), d.get("time", ""), d.get("_ZUID", "")] + [cookie_dict.get("_ZUID", "")] \
            + [urllib.parse.parse_qs(urllib.parse.urlparse(k._10).query).get("query", [""])[0]]


zoomlog_df = origin_df.rdd.map(process).toDF(schema)

zoomlog_df = zoomlog_df.withColumn("data", unbase64(col("data")).cast(StringType()))
zoomlog_df = zoomlog_df.withColumn("data", from_json(col("data"), data_schema))


filtered_zoomlog_df = zoomlog_df.where((col("host") != "112.216.127.98") & 
          (col("data").event == "@PageView") &
          (col("referrer").startswith("http://search.zum.com") | col("referrer").startswith("https://search.zum.com")) &
          (col("cookie_zuid") != ""))

filtered_zoomlog_df.printSchema()
filtered_zoomlog_df.show()


root
 |-- host: string (nullable = true)
 |-- _: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- http_method: string (nullable = true)
 |-- url: string (nullable = true)
 |-- http_version: string (nullable = true)
 |-- status_code: integer (nullable = true)
 |-- length: integer (nullable = true)
 |-- referrer: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- cookie: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- event: string (nullable = true)
 |    |-- properties: string (nullable = true)
 |-- time: string (nullable = true)
 |-- zuid: string (nullable = true)
 |-- cookie_zuid: string (nullable = true)
 |-- query: string (nullable = true)

+---------------+---+-------+--------------------+-----------+--------------------+------------+-----------+------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+--------------

In [6]:
filtered_zoomlog_df.select("host", "data", "referrer","cookie_zuid", "query").show()

+---------------+--------------------+--------------------+--------------------+-----------------------+
|           host|                data|            referrer|         cookie_zuid|                  query|
+---------------+--------------------+--------------------+--------------------+-----------------------+
|    59.4.110.83|[@PageView, {"ver...|http://search.zum...|5F2927F0-8BB6-42E...|     광주은행인터넷뱅킹|
|   59.23.16.122|[@PageView, {"ver...|http://search.zum...|7DDD9C98-8C54-407...|                 네이버|
|  125.130.38.20|[@PageView, {"ver...|http://search.zum...|255FD4DC-E9E6-4A7...|               핑크라벨|
|121.186.217.237|[@PageView, {"ver...|http://search.zum...|48D102BF-82A5-43D...|  퍼스널컬러 자가진단법|
|  124.53.247.63|[@PageView, {"ver...|http://search.zum...|32AA0121-19ED-4A7...|백종원 막내 세은 뿔났다|
|    183.97.22.5|[@PageView, {"ver...|http://search.zum...|D0208919-1D1F-4F0...|     대통령 추미애 면담|
|121.150.236.142|[@PageView, {"ver...|http://search.zum...|7CD45006-46DE-4AA...|                 안정

In [7]:
query_count_df = filtered_zoomlog_df.groupby("query").agg(countDistinct(col("cookie_zuid")).alias("count")).where(col("query") != "").orderBy(desc("count"))
query_count_df.cache()
query_count_df.show()

+----------------------+-----+
|                 query|count|
+----------------------+-----+
|                네이버|  872|
|                  다음|  550|
|                유튜브|  378|
|       공인인증서 폐지|  316|
|         코로나 확진자|  213|
|       예스터데이 진성|  207|
|       윤석열 직무정지|  191|
|대한항공 아시아나 인수|  180|
|         국세청 홈택스|  174|
|         하프클럽 특가|  159|
|     ebs 온라인 클래스|  155|
|          레이디스코드|  149|
|                권리세|  139|
|               e학습터|  134|
|                홍수아|  115|
|                임영웅|  114|
|                  구글|  110|
|                최하민|  103|
|              우리은행|   90|
|       터닝포인트 옥희|   87|
+----------------------+-----+
only showing top 20 rows



In [8]:

company_synonym_df = spark.read.option("header","false").option("delimiter","\t").csv("company_synonym", company_synonym_schema)

company_synonym_df.printSchema()
company_synonym_df.show()

root
 |-- market: string (nullable = true)
 |-- company: string (nullable = true)
 |-- code: string (nullable = true)
 |-- synonym: string (nullable = true)

+------+----------+------+----------------+
|market|   company|  code|         synonym|
+------+----------+------+----------------+
| kospi|  동화약품|000020|        동화약품|
| kospi|  동화약품|000020|          000020|
| kospi|  KR모터스|000040|  에쓰엔티모터스|
| kospi|  KR모터스|000040|    효성기계공업|
| kospi|  KR모터스|000040|          000040|
| kospi|  KR모터스|000040|  에쓰앤티모터스|
| kospi|  KR모터스|000040|       S&T모터스|
| kospi|  KR모터스|000040|  에스앤티모터스|
| kospi|  KR모터스|000040|        KR모터스|
| kospi|  KR모터스|000040|  에스엔티모터스|
| kospi|      경방|000050|            경방|
| kospi|      경방|000050|          000050|
| kospi|메리츠화재|000060|      메리츠화재|
| kospi|메리츠화재|000060|동양화재해상보험|
| kospi|메리츠화재|000060|          000060|
| kospi|메리츠화재|000060|        동양화재|
| kospi|메리츠화재|000060|      매리츠보험|
| kospi|메리츠화재|000060|  메리츠화재보험|
| kospi|메리츠화재|000060|    동양화재보험|
| kospi|삼양홀딩스|000070|     

In [9]:
suffix_keyword_df = spark.read.text("suffix_keyword")
suffix_keyword_df = suffix_keyword_df.withColumnRenamed("value", "suffix")
suffix_keyword_df.show()
suffix_array = [r.suffix for r in suffix_keyword_df.select("suffix").collect()]
suffix_array

+------+
|suffix|
+------+
|  공시|
|  매출|
|  실적|
|  이익|
|  종목|
|  주가|
|  주식|
+------+



['공시', '매출', '실적', '이익', '종목', '주가', '주식']

In [10]:
suffix_added_df = company_synonym_df.rdd.flatMap(lambda x: [[x.company, x.code, x.synonym + suffix] for suffix in [""] + suffix_array + [" " + suffix for suffix in suffix_array]]).toDF(suffix_added_schema)
suffix_added_df.show()

+--------+------+-------------------+
| company|  code|synonym_with_suffix|
+--------+------+-------------------+
|동화약품|000020|           동화약품|
|동화약품|000020|       동화약품공시|
|동화약품|000020|       동화약품매출|
|동화약품|000020|       동화약품실적|
|동화약품|000020|       동화약품이익|
|동화약품|000020|       동화약품종목|
|동화약품|000020|       동화약품주가|
|동화약품|000020|       동화약품주식|
|동화약품|000020|      동화약품 공시|
|동화약품|000020|      동화약품 매출|
|동화약품|000020|      동화약품 실적|
|동화약품|000020|      동화약품 이익|
|동화약품|000020|      동화약품 종목|
|동화약품|000020|      동화약품 주가|
|동화약품|000020|      동화약품 주식|
|동화약품|000020|             000020|
|동화약품|000020|         000020공시|
|동화약품|000020|         000020매출|
|동화약품|000020|         000020실적|
|동화약품|000020|         000020이익|
+--------+------+-------------------+
only showing top 20 rows



In [11]:
suffix_count_df = suffix_added_df.join(query_count_df, query_count_df.query == suffix_added_df.synonym_with_suffix, "inner")
suffix_count_df = suffix_count_df.drop("synonym_with_suffix")
suffix_count_df.cache()
suffix_count_df.show()

+------------+------+------------+-----+
|     company|  code|       query|count|
+------------+------+------------+-----+
|    동화약품|000020|    동화약품|    1|
|    KR모터스|000040|    KR모터스|    1|
|  메리츠화재|000060|  메리츠화재|    7|
|    유한양행|000100|    유한양행|    3|
|        두산|000150|        두산|    1|
|    대림산업|000210|    대림산업|    1|
|  일동홀딩스|000230|    일동제약|    1|
|      기아차|000270|  기아자동차|    4|
|      기아차|000270|  기아차주가|    1|
|한화손해보험|000370|한화손해보험|    1|
|롯데손해보험|000400|롯데손해보험|    1|
|    대동공업|000490|    대동공업|    1|
|    흥국화재|000540|    흥국화재|    1|
|  SK하이닉스|000660|    하이닉스|    2|
|    현대건설|000720|현대건설주가|    1|
|    삼성화재|000810|    삼성화재|    7|
|유진투자증권|001200|유진투자증권|    1|
|    동국제강|001230|    동국제강|    1|
|    현대해상|001450|    현대해상|    6|
|    삼부토건|001470|    삼부토건|    1|
+------------+------+------------+-----+
only showing top 20 rows



In [12]:
result_df = suffix_count_df.groupby(col("company"), col("code")).agg(sum(col("count")).alias("count")).orderBy(desc("count"))
result_df.show()
result_df.coalesce(1).write.format("csv").option("header","true").option("encoding","UTF-8").mode("overwrite").save("result")

+------------+------+-----+
|     company|  code|count|
+------------+------+-----+
|       NAVER|035420|  878|
|      카카오|035720|  555|
|      KB금융|105560|   67|
|    신한지주|055550|   61|
|하나금융지주|086790|   53|
|      한진칼|180640|   42|
|    기업은행|024110|   34|
|      유니크|011320|   25|
|종근당바이오|063160|   20|
|아시아나항공|020560|   17|
|    삼성전자|005930|   17|
|    경보제약|214390|   16|
|    대한항공|003490|   15|
|      예스24|053280|   15|
|    대웅제약|069620|   14|
|  메가스터디|072870|   14|
|    삼성생명|032830|   12|
|      다나와|119860|   11|
|    삼성카드|029780|   10|
|      파미셀|005690|   10|
+------------+------+-----+
only showing top 20 rows

