In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType, StructType, StructField
import pyspark.sql.functions as F
import json
import hashlib

In [2]:
spark = SparkSession \
        .builder \
        .appName("CommunityDetection") \
        .config("spark.yarn.queue", "tianwang") \
        .master("yarn") \
        .enableHiveSupport() \
        .getOrCreate()
#         .config("spark.sql.hive.convertMetastoreParquet", "false") \
#         .config("spark.ui.port", "8892") \
#         .config('spark.executor.instances', 20)\
#         .config('spark.executor.cores', 8)\

# format data on Web

In [11]:
with open('schema_douyu_web.json', 'r') as fin:
    schema = StructType.fromJson(json.load(fin))

path = '/user/dingsiwei/douyu_web_0521'
df = spark.read.json(path, schema=schema)
df.createOrReplaceTempView("df_table")

# ANDROID_FP_SCHEMA = StructType([
#     StructField("data", StructType([
#         StructField("t", LongType(), True),
#         StructField("apps", ArrayType(StringType()), True),
#         StructField("aps", ArrayType(StringType()), True),
#         StructField("cpuFreq", LongType(), True),
#         StructField("cpuVendor", StringType(), True),
#         StructField("cpuModel", StringType(), True),
#         StructField("cpuCount", LongType(), True),
#         StructField("mem", LongType(), True),
#         StructField("name", StringType(), True),
#         StructField("cell", MapType(StringType(), StringType()), True),
#         StructField("sys", StructType([
#             StructField("manufacturer", StringType(), True),
#             StructField("serial", StringType(), True),
#             StructField("fingerprint", StringType(), True),
#             ]), True),
#         StructField("ainfo", StructType([
#             StructField("root", StringType(), True),
#             ]), True),
#         StructField("props", StructType([
#             StructField("persist.sys.language", StringType(), True),
#             StructField("persist.sys.country", StringType(), True),
#             ]), True),
#         ]), True),
#     StructField("org", StringType(), True),
# ])

In [12]:
df.printSchema()

root
 |-- clearText: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- Header: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- smid: string (nullable = true)



In [13]:
def cut_clearText(item):
    if item is None:
        return 'null'
    else:
        cut_res = item.split("&")
        format_result = []
        for f in fields_clearText:
            for field in cut_res:        
                if field.startswith(f):
                    if f == "plugins":
                        # python3中bytes和string是不可混淆的，string和bytes互相转换
                        plugins_md5 = hashlib.md5(b'%s' % field.split("=")[1].encode(encoding="utf-8")).hexdigest()
                        format_result.append(plugins_md5)
                    else:
                        format_result.append(field.split("=")[1])
        return format_result


def cut_data_Header(item):
    format_result = []
    item_dict = json.loads(item)
    keys = item_dict.keys()
    fields_data_Header = ["Referer", "User-Agent", "X-Forwarded-For", "X-Real-Ip"]
    for i in fields_data_Header:
        if i in keys:
            if i == 'Referer':
                format_result.append(item_dict[i][0].split('?')[0])
            else:
                format_result.append(item_dict[i])
        else:
            format_result.append("null")
            
    return format_result


def ip_format(item):
    if item is None:
        return 'null'
    return '.'.join(item.split('.')[:3])

spark.udf.register("cut_clearText", cut_clearText, ArrayType(StringType()))
spark.udf.register("cut_data_Header", cut_data_Header, ArrayType(StringType()))
spark.udf.register("ip_format", ip_format, StringType())

In [15]:
fields_clearText = ["plugins", "ua", "canvas", "timezone", "platform", "url", "res", "status", 
                    "clientSize", "appCodeName", "appName", "area", "sdl"]
# fields_data_Header = ["Referer", "User-Agent", "X-Forwarded-For", "X-Real-Ip"]
# sql_data_Header = ', '.join(["cut_data_Header(data.Header)[{}] as {}".format(i, fields_data_Header[i]) for i in range(len(fields_data_Header))])

# the reason of not using fields_data_Header to generate sql_data_Header is escape charecter;

sql_clearText = ', '.join(["cut_clearText(clearText)[{}] as {}".format(i, fields_clearText[i]) for i in range(len(fields_clearText))])
sql = "select smid, ip, ip_format(ip) as ip_seg, cut_data_Header(data.Header)[0] as Referer, cut_data_Header(data.Header)[1] as UserAgent, cut_data_Header(data.Header)[2] as XForwardedFor, cut_data_Header(data.Header)[3] as XRealIp, {} from df_table".format(sql_clearText)

key = spark.sql(sql)

In [16]:
key.printSchema()

root
 |-- smid: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- ip_seg: string (nullable = true)
 |-- Referer: string (nullable = true)
 |-- UserAgent: string (nullable = true)
 |-- XForwardedFor: string (nullable = true)
 |-- XRealIp: string (nullable = true)
 |-- plugins: string (nullable = true)
 |-- ua: string (nullable = true)
 |-- canvas: string (nullable = true)
 |-- timezone: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- url: string (nullable = true)
 |-- res: string (nullable = true)
 |-- status: string (nullable = true)
 |-- clientSize: string (nullable = true)
 |-- appCodeName: string (nullable = true)
 |-- appName: string (nullable = true)
 |-- area: string (nullable = true)
 |-- sdl: string (nullable = true)



In [18]:
key.write.json('douyu_web_0521_format')

# format data on IOS

In [29]:
def ip_format(item):
    if item is None:
        return 'missing'
    return '.'.join(item.split('.')[:3])

def battery_format(item):
    """
    格式化电池电量
    """
    if item is None:
        return 'missing'
    elif item > 1.0 or item < 0.0:
        return "-1"
    elif item == 1.0:
        return "1"
    elif 0.9 <= item < 1.0:
        return "0.9"
    elif 0.8 <= item < 0.9:
        return "0.8"
    elif 0.7 <= item < 0.8:
        return "0.7"
    elif 0.6 <= item < 0.7:
        return "0.6"
    elif 0.5 <= item < 0.6:
        return "0.5"
    elif 0.4 <= item < 0.5:
        return "0.4"
    elif 0.3 <= item < 0.4:
        return "0.3"
    elif 0.2 <= item < 0.3:
        return "0.2"
    elif 0.1 <= item < 0.2:
        return "0.1"
    elif 0.0 <= item < 0.1:
        return "0.0"
    
brightness_format = battery_format  # brightness和battery区间一样

def boot_format(item):
    """
    格式化boot时间，精确到小时取整
    """
    if item is None:
        return 'missing'
    return str(round(item / 3600))

def totalSpace_format(item):
    """
    格式化总内存空间
    """
    if item is None:
        return 'missing'
    space = round(item / pow(1024, 3))
    if 7 < space <=8:
        return "8"
    elif 15 < space <= 16:
        return "16"
    elif 30 < space <= 32:
        return "32"
    elif 50 < space <= 64:
        return "64"
    elif 120 < space <= 128:
        return "128"
    elif 240 < space <= 256:
        return "256"
    elif 500 < space <= 512:
        return "512"
    else:
        return "-1"

t_format = boot_format

def memory_format(item):
    if len(str(item)) <= 4:
        return 'missing'
    return str(round(int(item) / pow(1024, 3)))

useSpace_format = memory_format

def operator(x, y):
    return str(y) + '_' + str(x)

def is_vpn(item):
    if item == 'false':
        return "0"
    elif item == 'True':
        return "1"
    else:
        return "missing"
    
is_root = is_vpn

def app_format(item):
    if item is None or str(item) == 'null':
        return "0"
    else:
        return "1"
    
def exist(item):
    if item is None:
        return "0"
    else:
        return item


spark.udf.register("battery_format", battery_format, StringType())
spark.udf.register("brightness_format", brightness_format, StringType())
spark.udf.register("boot_format", boot_format, StringType())
spark.udf.register("totalSpace_format", totalSpace_format, StringType())
spark.udf.register("t_format", t_format, StringType())
spark.udf.register("ip_format", ip_format, StringType())
spark.udf.register("memory_format", memory_format, StringType())
spark.udf.register("useSpace_format", useSpace_format, StringType())
spark.udf.register("operator", operator, StringType())
spark.udf.register("is_vpn", is_vpn, StringType())
spark.udf.register("is_root", is_root, StringType())
spark.udf.register("app_format", app_format, StringType())
spark.udf.register("exist", exist, StringType())

In [12]:
df.registerTempTable("df_table")

In [13]:
key = spark.sql("""
select ip, data.smid, app_format(alterApps) as alterApps, data.apputm, data.appver, data.languages, explode(data.dns) as dns, data.battery, data.boot, data.bssid, data.countryIso, data.idfa, is_vpn(data.is_vpn) as vpn, data.memory, operator(data.mnc, data.mcc) as operator, data.model, data.name, data.osver, is_root(data.root) as root, data.smseq, data.t, boot_format(data.t - data.boot) as uptime, data.totalSpace, useSpace, data.useSpace as data_useSpace, data.first, exist(inlineHooked) as inlineHooked, app_format(vpnApps) as vpnApps, app_format(monkeyApps) as monkeyApps, fpX.adapter.headers.xforwardedfor, fpStat.deviceids_per_smid_1d, fpStat.smids_per_boot_1d, fpX.adapter.headers.contentlength, fpX.adapter.proxyType, exist(fpStat.smids_per_idfa_1d) as smids_per_idfa_1d, ip_format(ip) as ip_segement, battery_format(data.battery) as battery_format, boot_format(data.boot) as boot_format, brightness_format(data.brightness) as brightness_format, data.brightness, t_format(data.t) as t_format, useSpace_format(useSpace) as useSpace_format, useSpace_format(data.useSpace) as data_useSpace_format, totalSpace_format(data.totalSpace) as totalSpace_format, memory_format(data.memory) as memory_format from df_table 
where requestType = 'all' and data.os = 'ios'
""")

In [14]:
key.select('dns', 'vpn', 'operator', 'root', 'inlineHooked', 'vpnApps', 'monkeyApps', 'smids_per_idfa_1d').show()

+---------------+-------+---------+-------+------------+-------+----------+-----------------+
|            dns|    vpn| operator|   root|inlineHooked|vpnApps|monkeyApps|smids_per_idfa_1d|
+---------------+-------+---------+-------+------------+-------+----------+-----------------+
|    192.168.0.1|      0|   460_11|      0|           0|      0|         0|                0|
|        8.8.8.8|      0|   460_11|      0|           0|      0|         0|                0|
|    192.168.0.1|      0|   460_11|      0|           0|      0|         0|                0|
|    192.168.1.1|      0|   460_02|      0|           0|      0|         0|                0|
|    192.168.0.1|      0|   460_02|      0|           0|      0|         0|                0|
|  194.168.4.100|      0|   234_10|      0|           0|      0|         0|                0|
|  194.168.8.100|      0|   234_10|      0|           0|      0|         0|                0|
|  211.140.11.66|      0|   460_02|      0|           0|    

In [16]:
# Microtimestamp to date
# key.selectExpr('boot/1000 as ts').select(F.to_date(F.col('ts'))).show(truncate=False)

# do not consider features about memory or space

In [9]:
key.write.json('global_ios_format_json_0518_update')

In [11]:
key.where(key.bssid != 'null').write.json('tongzhuo0507-9_format_json_bssid')