In [None]:
import json
from os.path import abspath
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql.window as W
from pyspark.sql.types import MapType,StringType,ArrayType


from datetime import datetime
from datetime import timedelta

today=datetime.now().strftime("%Y-%m-%d")
yesterday=(datetime.now()-timedelta(1)).strftime("%Y-%m-%d")

print(today,yesterday)

pd.set_option('display.max_rows', 1000)
pd.options.display.float_format = '{:.2f}'.format

warehouse_location_path = '/home/jovyan/work/spark-warehouse'

warehouse_location = abspath(warehouse_location_path)

print(warehouse_location)

spark = SparkSession \
    .builder \
    .appName("shopee-category-search-extract") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("spark.debug.maxToStringFields",200) \
    .config("spark.sql.debug.maxToStringFields",2000) \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.extraJavaOptions","-Dio.netty.tryReflectionSetAccessible=true -Xms4096m") \
    .config("spark.driver.extraJavaOptions","-Dio.netty.tryReflectionSetAccessible=true -Xms4096m") \
    .enableHiveSupport() \
    .getOrCreate()

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")


In [None]:
import prestodb

cols=['product_no','day_has_session','last_day_has_session','impression','daily_impression',\
      'clicks','users','cost','gmv','ctr','roi','avg_user_gmv','image_link','product_type','is_zero_suppliyers']

def get_presto_data(sql):
    conn=prestodb.dbapi.connect(
        host='ec2-54-218-99-163.us-west-2.compute.amazonaws.com',
        port=8889,
        user='root',
        catalog='hive',
        schema='marketing',
    )
    cur = conn.cursor()
    cur.execute(sql)
    rows = cur.fetchall()

    conn.close()
    return rows

In [None]:
sql="""
select a.*
    ,b.image_link
    ,b.product_type
    ,c.is_suppliyers
from
(select 
    product_no,
    count(distinct dt) as day_has_session,
    max(dt) as last_day_has_session,
    sum(cast(impression as int)) as impression,
    sum(cast(impression as int))/count(distinct dt) as daily_impression,
    sum(cast(click_pv as int)) as clicks,
    sum(cast(users as int)) as users,
    sum(cast(cost as double)) as cost,
    sum(cast(gmv as double)) as gmv,
    round(100.000*sum(cast(click_pv as int))/sum(cast(impression as int)),4) as ctr,
    round(1.000*sum(cast(gmv as double))/sum(cast(cost as double)),2) as roi,
    round(1.000*sum(cast(gmv as double))/sum(cast(users as int)),3) as avg_user_gmv
from marketing.ad_report_analysis_base a
where dt >= date_format(current_date - interval '7' day, '%Y-%m-%d')
    and ad_channel = 'facebook'
    and lower(campaign_name) not like '%_shopify_%'
    and lower(campaign_name) not like '%_deshopify_%'
    and lower(campaign_name) not like '%独立站%'
    and lower(campaign_name) like '%_msite_%'
    and cast(cost as double) > 0
    and cast(impression as int) > 0
group by 1)a
left join(
    select id,image_link,product_type
    from marketing.facebook_catalog_app_main
    where dt=date_format(current_date - interval '1' day, '%Y-%m-%d')
)b on a.product_no = b.id
left join(
    select pno,
        case when write_uid = 8 then 'zero-suppliyers' else 'other-source' end as is_suppliyers,
        count(1) cnt
    from jiayundw_dm.product_profile_df
    where date_id=date_format(current_date - interval '1' day, '%Y-%m-%d')
    group by 1,2
)c on a.product_no = c.pno
where impression>=100
"""

rows = get_presto_data(sql)
print("how many data:", len(rows),"\n sample line:", rows[0])

rdd = spark.sparkContext.parallelize(rows)
df=rdd.toDF(cols)

df=df.withColumn("date_written", F.lit(datetime.now().strftime('%Y-%m-%d')))

df.printSchema()
df.write.format("parquet").mode("overwrite").partitionBy("date_written").save(warehouse_location_path + "/fb-data/")

In [None]:
# import requests
# import urllib
# import json

# def is_nsfw(row_of_data, img_index, retry = 2):
#     if retry <= 0:
#         print("error: too many retry:", retry)
#         row_of_data.append(0.01010101)
#         return row_of_data
#     img = row_of_data[img_index].replace('.jpg', '_350x350.jpg')
    
#     try:
#         url = 'http://192.168.129.233:5001?url={img}'.format(img = img)
#         # print("running url:\n", url,'\n\nimg:',row_of_data[img_index])
#         r = requests.get(url)
#         r = json.loads(r.text)
#         row_of_data.append(r['score'])
#         return row_of_data
#     except Exception as e:
#         print("error may returned:", retry, e)
#         return is_nsfw(img, retry-1)

In [None]:
df_pd = spark.read.parquet(warehouse_location_path + "/fb-data/")\
.where("date_written = '{day}'".format(day=datetime.now().strftime('%Y-%m-%d')))\
.drop("date_written")

df_pd.groupBy("is_zero_suppliyers").agg(F.countDistinct("product_no"),F.sum("cost"),F.sum("gmv")).show()

rows = []
for pd_row in df_pd.toPandas().iterrows():
    r = []
    for col in cols:
        r.append(pd_row[1][col])
    rows.append(r)
    
print('data length:',len(rows),'\nSample:', rows[len(rows)-1])

In [8]:
# import random
# from multiprocessing import Pool

# pool = Pool(20)

# tasks = []
# final_result = []

# running_index = 0
# for row in rows:
#     running_index = running_index+1
#     # if running_index <=4151:
#     #     continue
    
#     task = pool.apply_async(is_nsfw, args=(row, 12))
#     tasks.append(task)
#     if len(tasks) >= 80:
#         print(datetime.now(), "running_index:", running_index, 'length of data:', len(final_result))
#         for t in tasks:
#             try:
#                 final_result.append(t.get(timeout=20))
#             except Exception as e:
#                 print("unable to get task result:", e)
#         tasks = []
#         # break

# if len(tasks) > 0:
#     for t in tasks:
#         final_result.append(t.get())
#     tasks = []
    
# if len(final_result) > 0:
#     print("length of finacountta:",len(final_result),"\nsample:",final_result[len(final_result)-1])
    
# pool.close()

2022-10-18 05:53:56.836696 running_index: 80 length of data: 0
2022-10-18 05:54:15.944820 running_index: 160 length of data: 80
2022-10-18 05:54:34.215761 running_index: 240 length of data: 160
2022-10-18 05:54:52.585144 running_index: 320 length of data: 240
2022-10-18 05:55:10.467027 running_index: 400 length of data: 320
2022-10-18 05:55:28.721398 running_index: 480 length of data: 400
2022-10-18 05:55:48.441581 running_index: 560 length of data: 480
2022-10-18 05:56:08.047796 running_index: 640 length of data: 560
2022-10-18 05:56:28.243560 running_index: 720 length of data: 640
2022-10-18 05:56:47.809303 running_index: 800 length of data: 720
2022-10-18 05:57:09.436515 running_index: 880 length of data: 800
2022-10-18 05:57:28.578204 running_index: 960 length of data: 880
2022-10-18 05:57:48.387306 running_index: 1040 length of data: 960
2022-10-18 05:58:07.114627 running_index: 1120 length of data: 1040
2022-10-18 05:58:25.949891 running_index: 1200 length of data: 1120
2022-10-1

In [11]:
from pyspark.sql.window import Window

def create_spark_df(final_result):
    cols=['product_no','day_has_session','last_day_has_session','impression','daily_impression',\
      'clicks','users','cost','gmv','ctr','roi','avg_user_gmv','image_link','product_type','is_zero_suppliyers','nsfw_score']
    rdd = spark.sparkContext.parallelize(final_result)
    df=rdd.toDF(cols)

    df=df.withColumn("date_written", F.lit(datetime.now().strftime('%Y-%m-%d')))

    df.printSchema()
    return df

def save_as_path(df):
    df.write.format("parquet").mode("overwrite").partitionBy("date_written").save(warehouse_location_path + "/fb-data-with-nsfw-score/")

new_df=create_spark_df(final_result)

exist_df = None
try:
    exist_df=spark.read.parquet(warehouse_location_path + "/fb-data-with-nsfw-score/")
except Exception as e:
    print(e)
    
if exist_df is not None and exist_df.count() >0 :
    all_df=exist_df.unionAll(new_df)
else:
    all_df=new_df
    
all_df.withColumn("rnk", F.row_number().over(Window.partitionBy("product_no").orderBy('date_written')))\
.where('rnk = 1').drop('rnk')


save_as_path(all_df)

root
 |-- product_no: string (nullable = true)
 |-- day_has_session: long (nullable = true)
 |-- last_day_has_session: string (nullable = true)
 |-- impression: long (nullable = true)
 |-- daily_impression: long (nullable = true)
 |-- clicks: long (nullable = true)
 |-- users: long (nullable = true)
 |-- cost: double (nullable = true)
 |-- gmv: double (nullable = true)
 |-- ctr: string (nullable = true)
 |-- roi: double (nullable = true)
 |-- avg_user_gmv: double (nullable = true)
 |-- image_link: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- is_zero_suppliyers: string (nullable = true)
 |-- nsfw_score: double (nullable = true)
 |-- date_written: string (nullable = false)

Unable to infer schema for Parquet. It must be specified manually.


In [12]:
df=spark.read.parquet(warehouse_location_path + "/fb-data-with-nsfw-score/")
df.printSchema()
print(df.count())

root
 |-- product_no: string (nullable = true)
 |-- day_has_session: long (nullable = true)
 |-- last_day_has_session: string (nullable = true)
 |-- impression: long (nullable = true)
 |-- daily_impression: long (nullable = true)
 |-- clicks: long (nullable = true)
 |-- users: long (nullable = true)
 |-- cost: double (nullable = true)
 |-- gmv: double (nullable = true)
 |-- ctr: string (nullable = true)
 |-- roi: double (nullable = true)
 |-- avg_user_gmv: double (nullable = true)
 |-- image_link: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- is_zero_suppliyers: string (nullable = true)
 |-- nsfw_score: double (nullable = true)
 |-- date_written: date (nullable = true)

7748


In [13]:
df.groupBy("is_zero_suppliyers",\
           F.expr("case when roi<0.5 then 'roi-lower:0.5' when roi > 1.5 then 'roi-higher:1.5' else 'roi:0.5-1.5' end").alias("roi_type"))\
.agg(F.countDistinct("product_no").alias("products"),\
     F.round(F.sum("cost"),2).alias("cost"),\
     F.round(F.sum("gmv"),2).alias("gmv"),\
     F.round(F.sum("gmv")/F.sum("cost"),2).alias("roi"))\
.withColumn("cost_%", F.round(100.00*F.col('cost')/F.sum("cost").over(Window.partitionBy(F.lit(1))),2))\
.withColumn("gmv_%", F.round(100.00*F.col('gmv')/F.sum("gmv").over(Window.partitionBy(F.lit(1))),2))\
.orderBy("is_zero_suppliyers","roi_type")\
.show(truncate=False)

+------------------+--------------+--------+---------+---------+----+------+-----+
|is_zero_suppliyers|roi_type      |products|cost     |gmv      |roi |cost_%|gmv_%|
+------------------+--------------+--------+---------+---------+----+------+-----+
|other-source      |roi-higher:1.5|1652    |134237.61|343536.71|2.56|30.1  |53.41|
|other-source      |roi-lower:0.5 |3646    |43187.81 |5277.8   |0.12|9.68  |0.82 |
|other-source      |roi:0.5-1.5   |711     |169058.88|174615.07|1.03|37.91 |27.15|
|zero-suppliyers   |roi-higher:1.5|329     |25306.74 |65894.54 |2.6 |5.67  |10.25|
|zero-suppliyers   |roi-lower:0.5 |1230    |28725.8  |5696.89  |0.2 |6.44  |0.89 |
|zero-suppliyers   |roi:0.5-1.5   |180     |45464.54 |48148.9  |1.06|10.19 |7.49 |
+------------------+--------------+--------+---------+---------+----+------+-----+



In [61]:
         # F.expr("case when roi<0.5 then 'roi-lower:0.5' when roi > 1.5 then 'roi-higher:1.5' else 'roi:0.5-1.5' end").alias("roi_type"),\
df.where("product_type not like 'Shoes%'").\
groupBy(F.expr("split(product_type,' > ')[0]").alias("cat1"),\
        F.expr("case when nsfw_score > 0.3 then 'nsfw > 0.3' else 'normal' end").alias("is_nsfw"))\
.agg(F.countDistinct("product_no").alias("products"),\
     F.round(F.sum("cost"),2).alias("cost"),\
     F.round(F.sum("gmv"),2).alias("gmv"),\
     F.round(F.sum("gmv")/F.sum("cost"),2).alias("roi"))\
.withColumn("cost_%", F.round(100.00*F.col('cost')/F.sum("cost").over(Window.partitionBy(F.lit(1))),2))\
.withColumn("gmv_%", F.round(100.00*F.col('gmv')/F.sum("gmv").over(Window.partitionBy(F.lit(1))),2))\
.orderBy("cat1","is_nsfw")\
.show(100,truncate=False)

+-------------------------------------+----------+--------+---------+---------+----+------+-----+
|cat1                                 |is_nsfw   |products|cost     |gmv      |roi |cost_%|gmv_%|
+-------------------------------------+----------+--------+---------+---------+----+------+-----+
|Auto Accessories                     |normal    |179     |3620.5   |3677.65  |1.02|1.23  |0.88 |
|Auto Accessories                     |nsfw > 0.3|6       |116.2    |293.97   |2.53|0.04  |0.07 |
|Auto Parts & Accessories             |normal    |17      |159.55   |428.01   |2.68|0.05  |0.1  |
|Bags                                 |normal    |282     |14445.63 |18684.6  |1.29|4.91  |4.45 |
|Bags                                 |nsfw > 0.3|1       |1.01     |0.0      |0.0 |0.0   |0.0  |
|Beauty & Skin Care & Make-Up         |normal    |76      |1238.48  |1755.61  |1.42|0.42  |0.42 |
|Beauty & Skin Care & Make-Up         |nsfw > 0.3|16      |433.15   |370.3    |0.85|0.15  |0.09 |
|Bedding            

In [51]:
html_df=df.where("nsfw_score > 0.1 and product_type not like 'Shoes%'")\
.selectExpr("product_no","round(cost,2) as cost","roi","round(nsfw_score, 4) as nsfw_score",\
            "replace(image_link,'.jpg','_350x350.jpg') as img_link","product_type")\
.orderBy(F.desc(F.col("cost"))).toPandas().head(100)

html_df.head(1)

from IPython.core.display import display, HTML
import re

def path_to_image_html(path):
    return '<img src="'+ path + '" width="60" />'

def pno_to_link(pno):
    link = 'https://www.wholeeshopping.com/product/'+re.findall(r'\d+', pno)[0]
    return '<a href="'+ link + '" target="_blank" >' + pno + '</a>'


html = html_df.to_html(escape=False, formatters=dict(img_link=path_to_image_html, product_no= pno_to_link))

display(HTML(html))

  from IPython.core.display import display, HTML


Unnamed: 0,product_no,cost,roi,nsfw_score,img_link,product_type
0,UPH022071969N,2647.9,0.72,0.42,,Underwear > Pajamas & Home Wear & Nightgowns & Bathrobes > Female Pajamas & Home Clothes
1,WDR025225077N,2002.83,0.46,0.27,,Women's Clothing > Dress
2,WDR025210602N,1912.05,0.37,0.27,,Women's Clothing > Dress
3,WDR023485340N,1513.51,0.44,0.68,,Women's Clothing > Dress
4,MTT023869005N,1030.27,0.59,0.89,,Men's Clothing > T-shirts
5,CHA017967694N,1003.29,0.23,0.39,,Clothing Accessories & Jewelry > Hair Accessories & Headwear > Dish Hair Devices
6,MBS022772648N,875.81,1.88,0.32,,Men's Clothing > Blazer & Suits > Men's Casual Suits
7,WWA023229690N,834.95,0.12,0.1,,Women's Clothing > Women's Casual Pants > Short Pants
8,UBR022894220N,672.74,1.37,0.13,,Underwear > Bra > Steel Ring Bra
9,MEB023311716N,568.45,1.39,0.44,,Men's Clothing > Casual Pants


In [48]:
df.groupBy("is_zero_suppliyers",\
         F.expr("case when roi<0.5 then 'roi-lower:0.5' when roi > 1.5 then 'roi-higher:1.5' else 'roi:0.5-1.5' end").alias("roi_type"),\
         F.expr("case when nsfw_score > 0.1 then 'nsfw > 0.1' else 'normal' end")\
         .alias("is_nsfw"))\
.agg(F.countDistinct("product_no").alias("products"),\
     F.round(F.sum("cost"),2).alias("cost"),\
     F.round(F.sum("gmv"),2).alias("gmv"),\
     F.round(F.sum("gmv")/F.sum("cost"),2).alias("roi"))\
.withColumn("cost_%", F.round(100.00*F.col('cost')/F.sum("cost").over(Window.partitionBy(F.lit(1))),2))\
.withColumn("gmv_%", F.round(100.00*F.col('gmv')/F.sum("gmv").over(Window.partitionBy(F.lit(1))),2))\
.orderBy("is_zero_suppliyers","roi_type")\
.show(truncate=False)

+------------------+--------------+----------+--------+---------+---------+----+------+-----+
|is_zero_suppliyers|roi_type      |is_nsfw   |products|cost     |gmv      |roi |cost_%|gmv_%|
+------------------+--------------+----------+--------+---------+---------+----+------+-----+
|other-source      |roi-higher:1.5|normal    |1423    |118741.96|304055.64|2.56|26.62 |47.27|
|other-source      |roi-higher:1.5|nsfw > 0.1|229     |15495.65 |39481.07 |2.55|3.47  |6.14 |
|other-source      |roi-lower:0.5 |nsfw > 0.1|532     |8293.71  |1010.75  |0.12|1.86  |0.16 |
|other-source      |roi-lower:0.5 |normal    |3114    |34894.1  |4267.05  |0.12|7.82  |0.66 |
|other-source      |roi:0.5-1.5   |nsfw > 0.1|119     |25716.92 |26415.05 |1.03|5.77  |4.11 |
|other-source      |roi:0.5-1.5   |normal    |592     |143341.96|148200.02|1.03|32.14 |23.04|
|zero-suppliyers   |roi-higher:1.5|nsfw > 0.1|33      |1066.18  |3404.7   |3.19|0.24  |0.53 |
|zero-suppliyers   |roi-higher:1.5|normal    |296     |24240