In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark import SparkConf
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import matplotlib.pyplot as plt
from pyspark.sql.types import StructField
import numpy as np
import pandas as pd
import matplotlib.dates as mdate
from pyspark.sql.functions import udf, col

In [2]:
warehouse_location = abspath('/user/hive/warehouse')

In [3]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .master("spark://app-13:7077") \
    .getOrCreate()
df = spark.read.format("CSV").option("header","true").\
option("timestampFormat ","yyyy-MM-dd'T'HH").\
schema("user_id int,item_id int,behavior_type int,user_geohash string,item_category int,create_time string").\
load("/user/hive/warehouse/test.db/tianchi/tianchi_mobile_recommend_train_user.csv") 
df.createOrReplaceTempView("taobao")

In [4]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [5]:
spark.sql("SELECT count(distinct item_category) from taobao").show()

+-----------------------------+
|count(DISTINCT item_category)|
+-----------------------------+
|                         8916|
+-----------------------------+



In [6]:
spark.sql("SELECT count(distinct item_id) from taobao").show()

+-----------------------+
|count(DISTINCT item_id)|
+-----------------------+
|                2876947|
+-----------------------+



In [11]:
spark.sql("SELECT item_id,behavior_type,count(*) as cnt from taobao group by item_id,behavior_type order by behavior_type desc,cnt desc").show()

+---------+-------------+---+
|  item_id|behavior_type|cnt|
+---------+-------------+---+
|303205878|            4| 50|
| 14087919|            4| 35|
|115124482|            4| 31|
|243091690|            4| 29|
|167074648|            4| 28|
|127914633|            4| 24|
|109259240|            4| 24|
|374214353|            4| 23|
|101795752|            4| 23|
|176556528|            4| 22|
|400291504|            4| 22|
| 17065447|            4| 22|
|383779671|            4| 22|
|330469986|            4| 22|
|331710542|            4| 21|
|390181058|            4| 21|
|188241513|            4| 21|
| 83098075|            4| 20|
| 72183675|            4| 20|
|387911330|            4| 19|
+---------+-------------+---+
only showing top 20 rows



In [12]:
spark.sql("SELECT count(*) as cnt from taobao where item_id=303205878 and behavior_type=4").show()

+---+
|cnt|
+---+
| 50|
+---+



In [16]:
spark.sql("""SELECT item_id,
(case behavior_type 
      when 1 then 'pv'
      when 2 then 'fav'
      when 3 then 'cart'
      when 4 then 'buy'
      END)b
FROM taobao""").show()

+---------+----+
|  item_id|   b|
+---------+----+
|232431562|  pv|
|383583590|  pv|
| 64749712|  pv|
|320593836|  pv|
|290208520|  pv|
|337869048|  pv|
|105749725|  pv|
| 76866650|  pv|
|161166643|  pv|
| 21751142|cart|
|266020206|cart|
|139144131|  pv|
|255365467|  pv|
|212072908|  pv|
|322736792|  pv|
|382807809|  pv|
|262661866|  pv|
|144902506|  pv|
|185630324|  pv|
| 76021805|  pv|
+---------+----+
only showing top 20 rows



In [18]:
spark.sql("""SELECT item_id,
(case behavior_type 
      when 1 then 'pv'
      when 2 then 'fav'
      when 3 then 'cart'
      when 4 then 'buy'
      END)behavior,
  count(*) as cnt
FROM taobao group by item_id,behavior_type order by behavior_type, cnt desc
""").show()

+---------+--------+----+
|  item_id|behavior| cnt|
+---------+--------+----+
|112921337|      pv|1431|
| 97655171|      pv|1249|
|387911330|      pv|1053|
|135104537|      pv| 916|
|  2217535|      pv| 792|
|  5685392|      pv| 767|
|128186279|      pv| 765|
|277922302|      pv| 763|
| 14087919|      pv| 740|
|209323160|      pv| 716|
|275450912|      pv| 665|
|374235261|      pv| 634|
|353381230|      pv| 606|
|211781109|      pv| 603|
|322554659|      pv| 593|
|  6703599|      pv| 589|
|217213194|      pv| 567|
| 21087251|      pv| 544|
|355292943|      pv| 538|
|303205878|      pv| 534|
+---------+--------+----+
only showing top 20 rows



In [5]:
sqlDFRaw = spark.sql("""SELECT item_id,
(case behavior_type 
      when 1 then 'pv'
      when 2 then 'fav'
      when 3 then 'cart'
      when 4 then 'buy'
      END)behavior,
  count(*) as cnt
FROM taobao group by item_id,behavior_type order by behavior_type, cnt desc
""")

In [6]:
sqlDF = sqlDFRaw.groupBy("item_id").pivot("behavior", ["pv", "fav", "cart", "buy"]).sum("cnt")

In [7]:
sqlDF.cache()

DataFrame[item_id: int, pv: bigint, fav: bigint, cart: bigint, buy: bigint]

In [22]:
sqlDF.show()

+---------+---+----+----+----+
|  item_id| pv| fav|cart| buy|
+---------+---+----+----+----+
|268113218| 26|   1|null|null|
|371610827| 12|null|null|null|
|165468308|  8|null|null|null|
|162165691|  5|null|null|null|
|238016778|  5|null|null|null|
|112152446|  2|null|null|null|
|223363762|  2|null|null|null|
|384013655|  2|null|null|null|
| 40710676|  2|null|null|null|
|216232610|  2|null|null|null|
|327320824|  2|null|null|null|
|101545006|  2|null|null|null|
|396262783|  2|null|null|null|
|282167185|  2|null|null|null|
|281262595|  2|null|null|null|
|181282439|  2|null|null|null|
|301402635|  2|null|null|null|
| 65895144|  2|null|null|null|
|358392671|  2|null|null|null|
| 64684828|  2|null|null|null|
+---------+---+----+----+----+
only showing top 20 rows



In [8]:
spark.sql("SELECT * FROM taobao where item_id=268113218").show()

+---------+---------+-------------+------------+-------------+-------------+
|  user_id|  item_id|behavior_type|user_geohash|item_category|  create_time|
+---------+---------+-------------+------------+-------------+-------------+
|110844468|268113218|            2|     9r7ls4r|         8480|2014-12-10 11|
| 15987177|268113218|            1|        null|         8480|2014-11-26 00|
| 62118780|268113218|            1|     956o9oj|         8480|2014-11-24 21|
|110844468|268113218|            1|     9r7jdti|         8480|2014-12-10 11|
| 21562611|268113218|            1|        null|         8480|2014-12-08 05|
| 21562611|268113218|            1|        null|         8480|2014-12-05 20|
|110844468|268113218|            1|     9r7jdum|         8480|2014-12-10 11|
| 62118780|268113218|            1|     956oppa|         8480|2014-11-24 21|
|110844468|268113218|            1|     9r7ls9p|         8480|2014-12-10 11|
| 21562611|268113218|            1|        null|         8480|2014-12-08 05|

In [25]:
sqlDF.where(sqlDF.pv.isNull()).show()

+---------+----+----+----+----+
|  item_id|  pv| fav|cart| buy|
+---------+----+----+----+----+
| 91956974|null|null|   1|null|
|373451095|null|null|   1|null|
|216512615|null|null|null|   2|
| 26146891|null|null|   2|   1|
| 40247840|null|   1|null|null|
|278001730|null|null|null|   1|
| 75118405|null|   1|null|null|
|  9433020|null|null|   1|null|
|229965456|null|null|null|   2|
|275466788|null|null|null|   1|
|327664315|null|null|   1|   1|
|112987715|null|null|null|   1|
| 45647804|null|null|   1|   2|
| 13797050|null|null|null|   1|
|399186166|null|null|   1|null|
|339437804|null|null|null|   1|
|  4742759|null|null|   1|null|
|306900623|null|null|null|   1|
|316582757|null|null|   1|null|
| 16105308|null|null|null|   1|
+---------+----+----+----+----+
only showing top 20 rows



In [9]:
spark.catalog.clearCache()

In [10]:
sqlDFRaw = spark.sql("""SELECT item_id,
(case behavior_type 
      when 1 then 'pv'
      when 2 then 'fav'
      when 3 then 'cart'
      when 4 then 'buy'
      END)behavior,
  count(*) as cnt
FROM taobao group by item_id,behavior_type order by behavior_type, cnt desc
""")

In [11]:
sqlDFCol = sqlDFRaw.groupBy("item_id").pivot("behavior", ["pv", "fav", "cart", "buy"]).sum("cnt")

In [12]:
sqlDFCol.cache()

DataFrame[item_id: int, pv: bigint, fav: bigint, cart: bigint, buy: bigint]

In [13]:
sqlDFCol.createOrReplaceTempView("pbcf")

In [14]:
sqlDF = spark.sql("SELECT item_id, buy/pv as frac from pbcf where pv is not null and buy is not null order by frac desc")

In [34]:
sqlDF.show()

+---------+----+
|  item_id|frac|
+---------+----+
| 95756656|12.0|
|375587792| 9.0|
|331730418| 8.0|
|228419454| 8.0|
|333934205| 6.0|
| 28106123| 6.0|
|250846439| 6.0|
| 46229046| 5.0|
|151223032| 5.0|
|326182036| 5.0|
| 61608277| 5.0|
| 14750488| 5.0|
| 70766921| 5.0|
|283631641| 5.0|
|174121868| 5.0|
|309245674| 4.5|
|281010457| 4.5|
| 73126790| 4.0|
|158083270| 4.0|
| 77668759| 4.0|
+---------+----+
only showing top 20 rows



In [35]:
spark.sql("SELECT * FROM pbcf where item_id=95756656").show()

+--------+---+----+----+---+
| item_id| pv| fav|cart|buy|
+--------+---+----+----+---+
|95756656|  1|null|   1| 12|
+--------+---+----+----+---+



In [36]:
spark.sql("SELECT * FROM taobao where item_id=95756656").show()

+---------+--------+-------------+------------+-------------+-------------+
|  user_id| item_id|behavior_type|user_geohash|item_category|  create_time|
+---------+--------+-------------+------------+-------------+-------------+
|107209058|95756656|            4|        null|         1346|2014-11-23 18|
|107209058|95756656|            4|        null|         1346|2014-11-23 18|
|107209058|95756656|            4|        null|         1346|2014-11-23 18|
|107209058|95756656|            4|        null|         1346|2014-11-23 18|
|107209058|95756656|            4|        null|         1346|2014-11-23 18|
|107209058|95756656|            1|     9rqijno|         1346|2014-11-23 13|
|107209058|95756656|            3|     9rqijck|         1346|2014-11-23 13|
|107209058|95756656|            4|        null|         1346|2014-11-23 18|
|107209058|95756656|            4|        null|         1346|2014-11-23 18|
|107209058|95756656|            4|        null|         1346|2014-11-23 18|
|107209058|9

In [15]:
sqlDF = spark.sql("SELECT count(pv) as cpv, count(cart) as ccart, count(fav) as cfav, count(buy) as cbuy from pbcf")

In [16]:
sqlDF.show()

+-------+------+------+-----+
|    cpv| ccart|  cfav| cbuy|
+-------+------+------+-----+
|2870604|250117|205158|92753|
+-------+------+------+-----+



In [17]:
from pyecharts import Funnel

In [18]:
pbcfPandasDF = sqlDF.toPandas()



In [19]:
pbcfPandasDF

Unnamed: 0,cpv,ccart,cfav,cbuy
0,2870604,250117,205158,92753


In [30]:
attrs = pd.Series(['pv','cart','fav','buy']).tolist()
attr_value = pd.Series([100,pbcfPandasDF.iloc[0]['ccart']/pbcfPandasDF.iloc[0]['cpv']*100,pbcfPandasDF.iloc[0]['cfav']/pbcfPandasDF.iloc[0]['ccart']*100,pbcfPandasDF.iloc[0]['cbuy']/pbcfPandasDF.iloc[0]['ccart']*100]).tolist()
 
funnel1 = Funnel("总体转化漏斗图一",width=800, height=400, title_pos='center')
 
funnel1.add(name="商品交易行环节",       # 指定图例名称
            attr=attrs,                # 指定属性名称
            value = attr_value,        # 指定属性所对应的值
            is_label_show=True,        # 指定标签是否显示
            label_formatter='{c}%',    # 指定标签显示的格式
            label_pos="inside",        # 指定标签的位置
            legend_orient='vertical',  # 指定图例的方向
            legend_pos='left',         # 指定图例的位置
            is_legend_show=True)       # 指定图例是否显示
 
funnel1.render()
funnel1

In [22]:
pd.Series(['pv','cart','fav','buy']).tolist()

['pv', 'cart', 'fav', 'buy']

In [25]:
pbcfPandasDF.iloc[0]['cpv']

2870604

In [29]:
pd.Series([100,pbcfPandasDF.iloc[0]['ccart']/pbcfPandasDF.iloc[0]['cpv']*100,pbcfPandasDF.iloc[0]['cfav']/pbcfPandasDF.iloc[0]['ccart']*100,pbcfPandasDF.iloc[0]['cbuy']/pbcfPandasDF.iloc[0]['ccart']*100]).tolist()

[100.0, 8.713044362789155, 82.0248123878025, 37.08384476065201]

In [28]:
pd.Series([100,pbcfPandasDF.iloc[0]['ccart']/pbcfPandasDF.iloc[0]['cpv']]).tolist()

[100.0, 0.08713044362789155]