In [0]:

import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.JsonDSL._

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Encoder


val spark: SparkSession =
    SparkSession
        .builder()
        .appName("StatsAnalyzer")
        .enableHiveSupport()
        .config("hive.exec.dynamic.partition", "true")
        .config("hive.exec.dynamic.partition.mode", "nonstrict")
        .getOrCreate()
        

val log1=spark.sparkContext.textFile("s3://dataeng-handson/logs/*.gz")
 

case class Cflog(base_date: String, adid: String, uuid: String,name: String, timestamp: String, gtmTimes: String, screen_name: String
                , item_id: String, content_type: String, item_category: String, is_zb_agent: String, building_id: String, area_type_id: String
                , agent_id: String, status: String , button_name: String) 

def parseRawJson(line: String) = {
     val pieces = line.split("\\|") 
     
     val adid = pieces.apply(1).toString
     val uuid = pieces.apply(2).toString
     val name = pieces.apply(3).toString
     val timestamp = pieces.apply(8).toString
     val gtmTimes =pieces.apply(7).toString  
    //JSON Parse
    val jsonString = pieces.apply(9).toString
    implicit val formats = DefaultFormats
    val result = parse(jsonString)

    var screen_name      = (result \ "screen_name").extractOpt[String].getOrElse("NULL").replaceAll("nil", "NULL")
    var item_id           = (result \ "item_id").extractOpt[String].getOrElse("NULL").replaceAll("nil", "NULL")
    val	content_type     = (result \ "content_type").extractOpt[String].getOrElse("NULL").replaceAll("nil", "NULL")
    val	item_category    = (result \ "item_category").extractOpt[String].getOrElse("NULL").replaceAll("nil", "NULL")
    val	is_zb_agent      = (result \ "is_zb_agent").extractOpt[String].getOrElse("NULL").replaceAll("nil", "NULL")
    val	building_id      = (result \ "building_id").extractOpt[String].getOrElse("NULL").replaceAll("nil", "NULL")
    val	area_type_id     = (result \ "area_type_id").extractOpt[String].getOrElse("NULL").replaceAll("nil", "NULL")
    val	agent_id         = (result \ "agent_id").extractOpt[String].getOrElse("NULL").replaceAll("nil", "NULL")
    val	button_name         = (result \ "button_name").extractOpt[String].getOrElse("NULL").replaceAll("nil", "NULL")
    val status  = (result \ "status").extractOpt[String].getOrElse("NULL").replaceAll("nil", "NULL")
    val base_date = timestamp.substring(0,10)
    
    
    Cflog(base_date, adid, uuid, name, timestamp, gtmTimes, screen_name, item_id, content_type, item_category, is_zb_agent, building_id, area_type_id, agent_id, status , button_name)
}

val logsDFAll = log1.map(line => parseRawJson(line)).toDF()
logsDFAll.createOrReplaceTempView("logs")
sqlContext.cacheTable("logs")

In [1]:
%sql
create database story_data

In [2]:
logsDFAll.show

In [3]:
%sql

select count(*) 
from logs

In [4]:
%sql
 create external table story_data.applog
 (
    adid string,
    uuid string,
    name string,
    timestamp string,
    gtmTimes string,
    screen_name string,
    item_id string,
    content_type string,
    item_category string,
    is_zb_agent string,
    building_id string,
    area_type_id string,
    agent_id string,
    status  string,
    button_name string
 )
 partitioned by (base_date date)
 STORED AS PARQUET
 LOCATION 's3://dataeng-handson/silver/applog'
 tblproperties ("parquet.compress"="SNAPPY" ,"classification"="parquet")

In [5]:
%sql
select * 
from story_data.applog

In [6]:
%sql
show tables from story_data;

In [7]:
%sql

insert overwrite table   story_data.applog
    PARTITION(base_date)  
 select 
    adid,
    uuid,
    name,
    timestamp,
    gtmTimes,
    screen_name,
    item_id,
    content_type,
    item_category,
    is_zb_agent,
    building_id,
    area_type_id,
    agent_id,
    status,
    button_name,
    to_date(base_date, 'yyyy-MM-dd') as base_date 
from logs

In [8]:
%sql
-- 기준일자 ADID 단지ID 이벤트일시


select base_date,
adid,
building_id as apart_id,
timestamp as base_dt
from story_data.applog
where item_category = '아파트'

In [9]:
%sql
-- table for number of view per village 

CREATE EXTERNAL TABLE  story_data.apart_user_view_silver
(
  adid string, 
  apart_id string,
  base_dt string
)
PARTITIONED BY ( 
  base_date date )
STORED AS PARQUET
LOCATION 's3://dataeng-handson/silver/danji_user_view_silver'
tblproperties ("parquet.compress"="SNAPPY" ,"classification"="parquet")
;

In [10]:
%sql
insert overwrite table story_data.apart_user_view_silver
    PARTITION(base_date)  
select 
adid,
building_id as apart_id,
timestamp as base_dt,
to_date(base_date, 'yyyy-MM-dd') as base_date 
from story_data.applog
where item_category = '아파트'
 and building_id != '0'
 and building_id != 'NULL'

In [11]:
%sql
select * 
from story_data.apart_user_view_silver

In [12]:
%sql
select base_date, apart_id, count(*) as cnt
from story_data.apart_user_view_silver
group by base_date, apart_id
order by count(*) desc

In [13]:
%sql
select * 
from story_data.applog

In [14]:
%sql
-- 기준일자 ADID 단지ID 이벤트일시

select item_id,
timestamp as base_dt,
building_id as apart_id,
item_id
from logs
where item_category = '아파트'
and item_id != 'NULL'
and building_id != '0'

In [15]:
%sql
-- table for number of view per apartment
CREATE EXTERNAL TABLE  story_data.apart_item_view_silver
(
  adid string,
  base_dt string, 
  apart_id string,
  item_id  string
)
PARTITIONED BY ( 
  base_date date )
STORED AS PARQUET
LOCATION 's3://dataeng-handson/silver/apart_item_view_silver'
tblproperties ("parquet.compress"="SNAPPY" ,"classification"="parquet")
;

In [16]:
%sql
insert overwrite table story_data.apart_item_view_silver
    PARTITION(base_date)  
select 
adid,
building_id as apart_id,
timestamp as base_dt,
item_id,
to_date(base_date, 'yyyy-MM-dd') as base_date 
from story_data.applog
where item_category = '아파트'
and item_id != 'NULL'
and building_id != '0'

In [17]:
%sql
show tables from story_data

In [18]:
%sql
select *
from logs

In [19]:
%sql

    select  
           hour(timestamp) as h 
           , count(adid) as count
    from logs 
    where item_category = '아파트'
    and name = 'view_item'
    and screen_name = '아파트 상세'
    group by  hour(timestamp)
    order by  hour(timestamp)

In [20]:
%sql


    select base_date,
           adid, 
           timestamp as base_dt,
           building_id as danji_id 
    from logs 
    where item_category = '아파트'
    and name = 'view_item'
    and screen_name = '아파트 상세'
 
    
  

In [21]:
%sql
    select *
    from logs 
    where item_category = '아파트'
    and item_id != 'NULL'
  --  and name = 'view_item'
  and screen_name = '아파트 매물상세'

In [22]:
val danjiCountDF = sqlContext.sql(s"""

    select base_date,
           adid,
           timestamp as base_dt,
           building_id as danji_id
    from logs 
    where item_category = '아파트'
    and name = 'view_item'
    and screen_name = '아파트 상세'

""")

// Parquet 형태로 s3에 저장한다. 
danjiCountDF.write
      .mode("overwrite")
      .format("parquet")
      .save("s3://fc-class/silver/danji_user_view")


In [23]:
val danji_viewDF = spark.read.parquet("s3://fc-class/silver/danji_user_view")
 danji_viewDF.show



In [24]:
%sql
CREATE EXTERNAL TABLE class.danji_user_view_silver
(
  base_dt timestamp, 
  adid string, 
  danji_id int
)
PARTITIONED BY ( 
  base_date date 
  )
STORED AS PARQUET
LOCATION 's3://fc-class/hive/silver/danji_user_view'
tblproperties ("parquet.compress"="SNAPPY" ,"classification"="parquet")



In [25]:
sqlContext.sql(s"""

   insert overwrite table   class.danji_user_view_silver 
    PARTITION(base_date)  
    select 
           timestamp as base_dt,
           adid,
           building_id as danji_id,
           base_date
    from logs 
    where item_category = '아파트'
    and name = 'view_item'
    and screen_name = '아파트 상세'    

""")

In [26]:
%sql

select base_date
      , danji_id
     , count(*) as danji_view_count
from class.danji_user_view_silver 
group by base_date
        ,danji_id
order by      count(*) desc    
