In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, IntegerType

In [0]:
src_df=spark.read.format("csv").option("delimiter","\t").option("header", True).load("dbfs:/FileStore/tables/data.tsv")
src_df=src_df.select(src_df.hit_time_gmt.cast(IntegerType()), "ip", "user_agent", "product_list", "referrer")

In [0]:
revenue_record_df=src_df.filter("pagename=='Order Complete'")
product_details_without_explode_df=revenue_record_df.withColumn("product_detail", split(col("product_list"),","))
product_details_derived_df=product_details_without_explode_df.select("hit_time_gmt","user_agent", "ip",explode("product_detail").alias("product_sale"))
product_details_revenue_df=product_details_derived_df.withColumn("revenue", split(col("product_sale"),";")[2].cast('int') * split(col("product_sale"),";")[3].cast('double'))
product_details_revenue_df.createOrReplaceTempView("revenue_record")
product_details_revenue_df.display()

hit_time_gmt,user_agent,ip,product_sale,revenue
1254034666,"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_4_11; en) AppleWebKit/525.27.1 (KHTML, like Gecko) Version/3.2.1 Safari/525.27.1",23.8.61.21,Electronics;Zune - 32GB;1;250;,250.0
1254034963,"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_4_11; en) AppleWebKit/525.27.1 (KHTML, like Gecko) Version/3.2.1 Safari/525.27.1",44.12.96.2,Electronics;Ipod - Nano - 8GB;1;190;,190.0
1254035260,Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.10) Gecko/2009042316 Firefox/3.0.10,67.98.123.1,Electronics;Ipod - Touch - 32GB;1;290;,290.0


In [0]:
lead_df=src_df.filter("referrer not like '%esshopzilla%'")

lead_with_name_df=lead_df.withColumn("pre_url",split(col("referrer"),".com")[0]).withColumn("lead_base_name",element_at(split(col("pre_url"),"\."),-1))
lead_with_name_keyword_df=lead_with_name_df.withColumn("param_string", element_at(split(col("referrer"),"\?"),-1)).withColumn("keyword", expr("lower(CASE WHEN lead_base_name == 'yahoo' THEN  split(split(param_string,'p=')[1],'&')[0] ELSE split(split(param_string,'q=')[1],'&')[0] END)"))

lead_with_name_keyword_df=lead_with_name_keyword_df.select("hit_time_gmt", "user_agent","ip", "lead_base_name", "keyword")
lead_with_name_keyword_df.display()
lead_with_name_keyword_df.createOrReplaceTempView("lead_record")

hit_time_gmt,user_agent,ip,lead_base_name,keyword
1254033280,Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.10) Gecko/2009042316 Firefox/3.0.10,67.98.123.1,google,ipod
1254033379,"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_4_11; en) AppleWebKit/525.27.1 (KHTML, like Gecko) Version/3.2.1 Safari/525.27.1",23.8.61.21,bing,zune
1254033478,"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_4_11; en) AppleWebKit/525.27.1 (KHTML, like Gecko) Version/3.2.1 Safari/525.27.1",112.33.98.231,yahoo,cd+player
1254033577,"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_4_11; en) AppleWebKit/525.27.1 (KHTML, like Gecko) Version/3.2.1 Safari/525.27.1",44.12.96.2,google,ipod


In [0]:

lead_to_revenue_df=spark.sql("""select l.lead_base_name,l.keyword,SUM(r.revenue) as total_revenue from revenue_record r join  lead_record l 
            on r.user_agent=l.user_agent and r.ip=l.ip and r.hit_time_gmt - 3600 * 24 < l.hit_time_gmt 
            group by lead_base_name, keyword""")
lead_to_revenue_df.display()

lead_base_name,keyword,total_revenue
google,ipod,480.0
bing,zune,250.0


In [0]:
final_df=lead_to_revenue_df.select(concat("lead_base_name",lit(".com")).alias("Search Engine Domain"), col("keyword").alias("Search Keyword"), col("total_revenue").alias("Revenue"))
final_df.display()
final_df.repartition(1).sort(desc("Revenue")).write.format("csv").option("delimiter","\t").option("header", True).mode("overwrite").save("dbfs:/FileStore/adobe/result.tsv")

Search Engine Domain,Search Keyword,Revenue
google.com,ipod,480.0
bing.com,zune,250.0
