In [1]:
import sys
import csv

from pyspark import SparkContext
from functools import reduce
import re
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime


In [2]:
spark.read.csv("/Users/maksim/Downloads/wechat_data_medium/weixin_biz", sep='\t', header=False).write.saveAsTable('weixin_biz', mode='overwrite')
spark.read.csv("/Users/maksim/Downloads/wechat_data_medium/weixin_click", sep='\t', header=False).write.saveAsTable('weixin_click', mode='overwrite')


In [3]:
total_weixin_biz = spark.read.table('weixin_biz')
total_weixin_click = spark.read.table('weixin_click')



In [4]:
total_weixin_biz.printSchema()
total_weixin_click.printSchema()


root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)



In [5]:
total_weixin_biz = total_weixin_biz.drop("_c6") #nothing about this field in task(email)
total_weixin_click = total_weixin_click.drop("_c5") #nothing about this field in task(email)

total_weixin_biz.printSchema()
total_weixin_click.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c7: string (nullable = true)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c6: string (nullable = true)



In [6]:
old_Names_biz = total_weixin_biz.schema.names
new_Names_biz = ["ID", "Biz ID", "Biz Name", "Biz Code", "Biz Description", "QRcode", "Timestamp"]

df_biz = reduce(lambda total_weixin_biz, idx: total_weixin_biz.withColumnRenamed(old_Names_biz[idx], new_Names_biz[idx]), range(len(old_Names_biz)), total_weixin_biz)
df_biz.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Biz ID: string (nullable = true)
 |-- Biz Name: string (nullable = true)
 |-- Biz Code: string (nullable = true)
 |-- Biz Description: string (nullable = true)
 |-- QRcode: string (nullable = true)
 |-- Timestamp: string (nullable = true)



In [7]:
old_Names_click = total_weixin_click.schema.names
print(old_Names_click)
new_Names_click = ["ID", "URL", "Title", "Read Number", "Like Number", "Timestamp"]

df_click = reduce(lambda total_weixin_click, idx: total_weixin_click.withColumnRenamed(old_Names_click[idx], new_Names_click[idx]), range(len(old_Names_click)), total_weixin_click)
df_click.printSchema()


['_c0', '_c1', '_c2', '_c3', '_c4', '_c6']
root
 |-- ID: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Read Number: string (nullable = true)
 |-- Like Number: string (nullable = true)
 |-- Timestamp: string (nullable = true)



In [20]:
#drop duplicate according to task
clean_weixin_click = df_click.orderBy("ID", "URL", "Title", col("Read Number").desc(), "Like Number", "Timestamp") \
                    .dropDuplicates(["URL"])
c_w_c = clean_weixin_click.withColumn("Biz ID", regexp_extract("URL", '_biz=(.*?)&', 0)) \
                        .select("ID", "URL", "Title", "Read Number", "Like Number", "Timestamp", regexp_replace('Biz ID', '_biz=', '').alias('Biz ID')).cache() \
                        .select("ID", "URL", "Title", "Read Number", "Like Number", "Timestamp", regexp_replace('Biz ID', '&', '').alias('Biz ID')).cache()           
c_w_c.cache()
c_w_c.show(10)
c_w_c.count()

+--------+--------------------+--------------------+-----------+-----------+----------+----------------+
|      ID|                 URL|               Title|Read Number|Like Number| Timestamp|          Biz ID|
+--------+--------------------+--------------------+-----------+-----------+----------+----------------+
|54519242|http://mp.weixin....|5岁男孩近视一年间飙升至900度,...|       2120|          4|2016-01-31|MTE1OTE0MzU2MQ==|
|54324257|http://mp.weixin....|中国人是怎样暴富的（过去靠房子，现...|       8099|         15|2016-01-31|MTQzMDIyODg2MQ==|
|54602445|http://mp.weixin....|南京下雪了！看南京人是怎么庆祝这场...|      10910|         60|2016-01-31|MjE3MDEzMzg2MQ==|
|54473599|http://mp.weixin....|周末鄂尔多斯新一股冷空气来袭&nb...|       2043|          2|2016-01-31|MjM5MDAwNDY1Mg==|
|54608008|http://mp.weixin....|为什么你的男朋友总是不理你？中五条...|       4561|          3|2016-01-31|MjM5MDAxMTU2MA==|
|54499236|http://mp.weixin....|           【安全防火过新年】|          6|          2|2016-01-31|MjM5MDAzNTI2NQ==|
|54472329|http://mp.weixin....| 这条裤子，今年春天可能要霸占时尚圈了！|   

257749

In [21]:
#I found some duplicate records
clean_weixin_biz = df_biz.dropDuplicates(["ID", "Biz ID"])
clean_weixin_biz.cache()
clean_weixin_biz.show(10)
print(clean_weixin_biz.count())

+-------+----------------+-------------+--------------------+--------------------+--------------------+---------+
|     ID|          Biz ID|     Biz Name|            Biz Code|     Biz Description|              QRcode|Timestamp|
+-------+----------------+-------------+--------------------+--------------------+--------------------+---------+
|1000019|MzAwMTYwMjAwNg==|       新注册公众号|                null|                服务大众|http://mp.weixin....|     null|
|1000549|MzAwNjI4MTg4Mw==|        河源万仟堂|              hywqt8|&nbsp;&nbsp;&nbsp...|http://mp.weixin....|     null|
|1000579|MzAxODA1Mjk4OQ==|       华东师大考研|  huadongshidakaoyan|及时为报考华东师范大学的考生提供考...|http://mp.weixin....|     null|
|1000829|MjM5ODA2MjAwMA==|          墨入水|     weixin_morushui|墨入水的情、色、诗、画，生活即是这...|http://mp.weixin....|     null|
|1000855|MzAwNjU5NzQ4OQ==|         AF珠宝|                null|专注彩色宝石，天然水晶，珠宝，银饰...|http://mp.weixin....|     null|
|1001025|MzAwNTYwNzc2Mw==|LifeMagicians|       LifeMagicians|生活魔法师帮助追求生活，面对晚餐有...|http:/

In [19]:
logFile = "/Users/maksim/Downloads/wechat_data_medium/weixin_page_test"  
logData = spark.read.text(logFile).cache()

biz_num = logData.withColumn("biz", regexp_extract("value", 'var biz = "(.*?)"', 0)) \
                .filter("biz != ''") \
                .select(regexp_extract('biz', r'"([^"]*)"', 0).alias("Biz ID")) \
                .select(regexp_replace('Biz ID', '"', '').alias('Biz ID')) 

                
url = logData.withColumn("url", regexp_extract("value", '<URL>(.*?)</URL>', 0)) \
                .filter("url != ''") \
                .select(regexp_replace('url', '<(.*?)>', '').alias('Url'))


title = logData.withColumn("title", regexp_extract("value", '<TITLE>(.*?)</TITLE>', 0)) \
                .filter("title != ''") \
                .select(regexp_replace('title', '<(.*?)>', '').alias('Title'))

            
conv_date = udf(lambda x: datetime.fromtimestamp(x).strftime('%Y-%m-%d %H:%M:%S'))     

ct = logData.withColumn("ct", regexp_extract("value", 'var ct = "(.*?)"', 0)) \
                .filter("ct != ''") \
                .select(regexp_extract('ct', r'"([^"]*)"', 0).alias("ct")) \
                .select(regexp_replace('ct', '"', '').alias('Publish Date').cast(IntegerType())) \
                .select(conv_date('Publish Date').alias("Publish Date"))

date2 = logData.withColumn("Timestamp", regexp_extract("value", '<DATE>(.*?)</DATE>', 0)) \
                .filter("Timestamp != ''") \
                .select(regexp_replace('Timestamp', '<(.*?)>', '').alias('Timestamp'))

  
    
#content = logData.withColumn("content", regexp_extract("value", '<HTML>(.*?)</HTML>', 0)) \
#                 .select(regexp_replace('content', '<(.*?)>', '').alias('content'))
#content.take(5)

# .filter("content != ''") \

def with_column_index(sdf): 
    new_schema = StructType(sdf.schema.fields + [StructField("ColumnIndex", LongType(), False),])
    return sdf.rdd.zipWithIndex().map(lambda row: row[0] + (row[1],)).toDF(schema=new_schema)

df_biz_num = with_column_index(biz_num)
df_url = with_column_index(url)
df_title = with_column_index(title)
df_ct = with_column_index(ct)
df_date2 = with_column_index(date2)

join_on_index = df_biz_num.join(df_url, ["ColumnIndex"]).join(df_title, ["ColumnIndex"]).join(df_ct, ["ColumnIndex"]).join(df_date2, ["ColumnIndex"]).drop("ColumnIndex")
join_on_index.show(10)
join_on_index.cache()



+----------------+--------------------+--------------------+-------------------+----------+
|          Biz ID|                 Url|               Title|       Publish Date| Timestamp|
+----------------+--------------------+--------------------+-------------------+----------+
|MzAwOTI5NjczNA==|http://mp.weixin....|【成都精锐教育推荐】2016寒假颁...|2016-01-30 17:35:43|2016-01-31|
|MzA5NTcyNzk2NA==|http://mp.weixin....|茂名大道电白段坑漕路面昨天报道后市...|2016-01-30 16:48:34|2016-01-31|
|MzA3NDU1MTYwNg==|http://mp.weixin....|   为生孩子问题，婆媳姑嫂大战火力全开|2016-01-30 16:50:30|2016-01-31|
|MzA5OTI2NjAyMA==|http://mp.weixin....|有白发的朋友快收藏啊！白发不见皮肤...|2016-01-30 17:18:06|2016-01-31|
|MzA3NTQ2MDAyNA==|http://mp.weixin....|  夫妻生活常做这些危害很大，赶紧戒了吧|2016-01-30 15:27:56|2016-01-31|
|MzAwNDcwMzY1MA==|http://mp.weixin....|             因为我十分爱你|2016-01-30 11:40:27|2016-01-31|
|MjM5NjkyNzM2MQ==|http://mp.weixin....|【单身交友】香港的味道，藏在每条大...|2016-01-30 17:21:49|2016-01-31|
|MzA4ODI4NTY4MA==|http://mp.weixin....|★下午刚发生，10000年难遇一次...|2016-01-30 14:34:59|

DataFrame[Biz ID: string, Url: string, Title: string, Publish Date: string, Timestamp: string]

In [13]:
join_on_index.printSchema()

root
 |-- Biz ID: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Publish Date: string (nullable = true)
 |-- Timestamp: string (nullable = true)



In [18]:
join_final_temp = c_w_c.join(broadcast(join_on_index), ["Biz ID"])
join_final = join_final_temp.select( "Biz ID", join_on_index["URL"].alias("Article URL"), "Read Number", "Like Number", join_on_index["Title"].alias("Article title"), "Publish Date")
                            
join_final_2_temp = join_final.join(clean_weixin_biz, ["Biz ID"])
join_final_2 = join_final_2_temp.select("Article URL", "Article title", "Read Number", "Like Number", "Biz ID", "Biz Description", "Biz Name", "Publish Date")



join_final_2.repartition(1).write.csv("/Users/maksim/Downloads/cwc_out.csv", sep='|', header=True)
join_final_2.printSchema()


root
 |-- Article URL: string (nullable = true)
 |-- Article title: string (nullable = true)
 |-- Read Number: string (nullable = true)
 |-- Like Number: string (nullable = true)
 |-- Biz ID: string (nullable = true)
 |-- Biz Description: string (nullable = true)
 |-- Biz Name: string (nullable = true)
 |-- Publish Date: string (nullable = true)

