<img src="https://deecamp.chuangxin.com/assets/image/logo_nav_zh.jpg" width="40%">

# 此文代码为主线代码，主要负责提取原始数据和转换为可用数据
# 不要修改此文件，复制此文件到自己文件夹后再操作

## 1.spark 库和环境设定

In [1]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly
os.environ['JAVA_HOME'] = '/usr/local/jdk1.8.0_162'#必须保留，否则无法启动JVM
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[16]").appName("spark-demo").getOrCreate()

## 2.读取总数据和标签数据

**raw_data为通过spark读取的原始数据**

格式为pyspark.sql.dataframe.DataFrame

In [2]:
%time raw_data=spark.read.parquet('/data/final_data/data/')#1G内存

CPU times: user 3 ms, sys: 0 ns, total: 3 ms
Wall time: 2.65 s


**label_raw_data为通过读取的带有标签的原始数据**

格式为pyspark.sql.dataframe.DataFrame

In [2]:
%time label_raw_data=spark.read.parquet('/data/final_data/label/user_name_label_give')#1G内存

CPU times: user 2.93 ms, sys: 821 µs, total: 3.75 ms
Wall time: 3.11 s


## 3.将两个原始数据转成对应的pandas数据

**raw_pd_data为转成pandas数据格式的总数据**

**label_pd_raw_data为转成pands数据格式的标签数据**

In [4]:
%time raw_pd_data=raw_data.toPandas()#运行时间非常长,读一次需要较大内存13G   需要用的时候再开启

CPU times: user 43.4 s, sys: 3.54 s, total: 46.9 s
Wall time: 1min 9s


In [3]:
%time label_pd_raw_data=label_raw_data.toPandas()#不到1G

CPU times: user 30.9 ms, sys: 3.78 ms, total: 34.7 ms
Wall time: 1.11 s


## 清理内存占用（保留）

In [6]:
#raw_data=None
#label_raw_data=None#释放1G内存

In [7]:
#raw_pd_data=None#释放4G内存
#label_pd_raw_data=None

In [4]:
label_pd_raw_data.to_csv('/data/csv/label.csv')

## 4.构建标签用户事件数据

### 4.1获取list[包含所有标签用户user_name]

**tmp_label_user_list 为临时变量**

**label_user_list 为 包含所有标签用户名的list**

In [8]:
tmp_label_user_list=label_raw_data.select('user_name').collect()
label_user_list=[int(row.user_name) for row in tmp_label_user_list]

### 4.2 获取user_label 字典

**dict_user_label为提取的标签字典 user:label**

In [9]:
dict_user_label=label_pd_raw_data.set_index('user_name')['label'].to_dict()

## 4.3 获取有标签用户的事件

**label_events为所有标签用户事件（带标签）**

In [10]:
%time label_event_tmp1=raw_data.filter(raw_data['user_name'].isin(label_user_list)) #使用df.filter 筛选 有标签用户在总数据集中的事件

CPU times: user 2.53 s, sys: 1.31 s, total: 3.83 s
Wall time: 23.4 s


In [11]:
label_event_tmp2=label_event_tmp1.toPandas()

In [12]:
label_event_tmp3=label_event_tmp2.copy()

In [13]:
label_event_tmp3['label'] = label_event_tmp3['user_name'].map(dict_user_label)

In [14]:
tmp_good_bad_dict={'good':0,'bad':1}

In [15]:
label_event_tmp4=label_event_tmp3.copy()

In [16]:
label_event_tmp4['label']=label_event_tmp3['label'].map(tmp_good_bad_dict)

## 4.4 IP分段

In [17]:
label_event_tmp4.replace(to_replace=r'^\s*$',value=np.nan,regex=True,inplace=True)   #将空字符替换为nan


label_event_tmp5=label_event_tmp4['ip'].str.split('.', expand=True,n = 3)
label_event_tmp5.columns=['ip_1','ip_2','ip_3','ip_4']
label_event_tmp6=pd.concat([label_event_tmp5,label_event_tmp4], axis=1) #数据合并

In [18]:
label_event_tmp6

Unnamed: 0,ip_1,ip_2,ip_3,ip_4,ip,ip_city,email_prefix,email_provider,event_type,mobile_prefix_3,...,user_name,user_agent,os_version,resource_owner,register_type,category,status,resource_type,resource_category,label
0,,,,,,,,,1,,...,1348648,0,0,183663,,,,1069,126,0
1,,,,,,,,,1,,...,1238148,0,0,353041,,,,1515,6,0
2,,,,,,,,,1,,...,20487,0,0,260127,,,,401,85,1
3,,,,,,,,,1,,...,1097570,0,0,563412,,,,1167,72,1
4,,,,,,,,,1,,...,479461,0,0,1224337,,,,1515,11,1
5,,,,,,,,,1,,...,1348648,0,0,1625352,,,,102,86,0
6,,,,,,,,,1,,...,1097570,0,0,1672519,,,,1167,33,1
7,,,,,,,,,1,,...,1348648,0,0,1630966,,,,1069,137,0
8,,,,,,,,,1,,...,1097570,0,0,391718,,,,1167,33,1
9,,,,,,,,,1,,...,1321969,0,0,557703,,,,102,110,0


In [19]:
label_event_tmp6.describe(include='O')

Unnamed: 0,ip_1,ip_2,ip_3,ip_4,ip,ip_city,email_prefix,email_provider,event_type,mobile_prefix_3,...,time_stamp,user_name,user_agent,os_version,resource_owner,register_type,category,status,resource_type,resource_category
count,30747,30747,30747,30747,30747,29683,35,35,70327,243,...,70327,70327,70327,70327,36765,335,3542,3542,37425,37425
unique,146,256,256,273,17365,325,34,5,5,34,...,67876,9502,389,40,26023,4,16,6,300,129
top,113,25,242,100,175.25.242.100,北京,25783,819,1,186,...,2017-10-13 15:08:12,1324185,0,0,1212321,7,0,0,1167,91
freq,1463,905,817,674,565,5218,2,18,36765,22,...,28,428,62389,62474,183,299,2812,2072,8957,5660


In [20]:
label_event_tmp6.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 70327 entries, 0 to 70326
Data columns (total 22 columns):
ip_1                 30747 non-null object
ip_2                 30747 non-null object
ip_3                 30747 non-null object
ip_4                 30747 non-null object
ip                   30747 non-null object
ip_city              29683 non-null object
email_prefix         35 non-null object
email_provider       35 non-null object
event_type           70327 non-null object
mobile_prefix_3      243 non-null object
mobile_city          243 non-null object
time_stamp           70327 non-null object
user_name            70327 non-null object
user_agent           70327 non-null object
os_version           70327 non-null object
resource_owner       36765 non-null object
register_type        335 non-null object
category             3542 non-null object
status               3542 non-null object
resource_type        37425 non-null object
resource_category    37425 non-null object
la

In [21]:
for feature in ("ip_1","ip_2","ip_3","ip_4","email_prefix","email_provider","event_type","mobile_prefix_3","user_name","user_agent","os_version",
           'resource_owner','register_type','category','status','resource_type','resource_category'):
    label_event_tmp6[feature]=pd.to_numeric(label_event_tmp6[feature],downcast='signed')
label_event_tmp6['time_stamp']=pd.to_datetime(label_event_tmp6['time_stamp'])
label_event_tmp6.head()

ValueError: Unable to parse string "unknown, 123" at position 34089

In [None]:
label_event_tmp6.info()

In [None]:
label_events=label_event_tmp6#label_events为所有标签用户事件（带标签）

## 5.几个简单的数据分析图

### 5.1用户与资源所有者的分布

In [None]:
plt.scatter(label_events['user_name'],label_events['resource_owner'])
plt.title('user_name and resource_owner')

In [None]:
label_events[label_events['event_type'] == 2].head(4)

In [None]:
label_events[label_events['event_type'] == 1].head(4)

In [None]:
#pdf.replace(to_replace=r'^\s*$',value=np.nan,regex=True,inplace=True)   #将空字符替换为nan
#pdf_notnull = pdf_notnull[pdf['ip'].notnull()]   #取出ip不为空的行
pd.concat([pdf['ip'].str.split('.', expand=True,n = 3),pdf], axis=1) #数据合并

In [None]:
label_events[label_events['event_type'] == 1].tcshrco

In [None]:
from IPy import IP

In [None]:
df_clean=raw_pd_data.fillna({'ip':"-1"})

In [None]:
for i in df_clean['ip']:
    if i != '-1':
        try:
            df_clean.loc[df_clean['ip']== i, 'ip'] = IP(i).int()
        except:
            print(i)

In [None]:
label_events.to_csv('/data/jupyter_root/label_events.csv',encoding="utf_8_sig")

In [None]:
raw_date.filter([])

In [None]:
raw_data.filter(raw_data['ip_city'] == '局域网')

In [None]:
label_pd_raw_data.to_csv('/data/jupyter_root/dcube_data/evaluate.txt',sep=',',header=None,index=None)