In [13]:
import findspark

In [14]:
findspark.init()

In [1]:
import pyspark
import random

### set master

* 连接到启动好的spark的master上

In [2]:
conf = pyspark.SparkConf().setMaster("spark://192.170.5.172:7077").setAppName("zhanghui_first").set("spark.ui.port","8080")

In [3]:
sc = pyspark.SparkContext(conf=conf)

In [4]:
num_samples = 100000000
def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)

3.14114128


### don't set master

In [7]:
conf = pyspark.SparkConf().setAppName("zhanghui_first").set("spark.ui.port","8080")

In [None]:
sc = pyspark.SparkContext(conf=conf)

In [732]:
num_samples = 100000000
def inside(p):     
    x, y = random.random(), random.random()
    return x*x + y*y < 1
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)

3.1414768


### stop

In [733]:
sc.stop()

### hdfs文件

In [3]:
conf = pyspark.SparkConf().setMaster("spark://192.170.5.172:7077").setAppName("count_log").set("spark.ui.port","8080")
sc = pyspark.SparkContext(conf=conf)

In [4]:
d = sc.textFile("hdfs://192.170.5.172:9000/log_to_jp_2")

In [360]:
d.count()

3756306

In [361]:
d.first()

'2017-07-16 14:34:39,570 - dynamic_content_loader - 146 - 20655 - 140032765228800 - INFO - user_id                                           A0000033961'

# follow book

In [362]:
import re

In [363]:
s = re.compile('2017-*')

In [364]:
begin_with_2017 = d.filter(lambda line: s.match('2017'))

* 转换操作

In [365]:
begin_with_2017.first()

'2017-07-16 14:34:39,570 - dynamic_content_loader - 146 - 20655 - 140032765228800 - INFO - user_id                                           A0000033961'

* 行动操作

### RDD take

In [130]:
begin_with_2017.union(begin_with_2017)

UnionRDD[46] at union at NativeMethodAccessorImpl.java:0

In [133]:
begin_with_2017.take(0)

[]

In [134]:
begin_with_2017.take(1)

['2017-07-23 14:45:03,903 - dynamic_content_loader - 146 - 10210 - 139996224378624 - INFO - index                                                                        A0000488530']

In [136]:
begin_with_2017.take(4)

['2017-07-23 14:45:03,903 - dynamic_content_loader - 146 - 10210 - 139996224378624 - INFO - index                                                                        A0000488530',
 'Action                                                                        initAmount',
 'customer_id                                                                  A0000488530',
 'level                                                                               1001']

In [137]:
begin_with_2017.saveAsTextFile('test_saveAsTextFile.csv')

### parallelize使用

In [151]:
nums = sc.parallelize([1,2,3,4])

In [270]:
nums

ParallelCollectionRDD[61] at parallelize at PythonRDD.scala:480

* create a RDD

In [152]:
nums.map(lambda x : x**2).collect()

[1, 4, 9, 16]

In [153]:
lines = sc.parallelize(['hi zhanghui', 'saiya'])

In [154]:
words = lines.flatMap(lambda x:x.split(' ') )

In [155]:
words

PythonRDD[64] at RDD at PythonRDD.scala:48

In [156]:
words.collect()

['hi', 'zhanghui', 'saiya']

In [168]:
lines1 = sc.parallelize(['hi zhanghui', 'saiya', 'zhanghui'])

In [169]:
words1 = lines1.map(lambda x:x.split(' ') )

In [170]:
words1

PythonRDD[84] at RDD at PythonRDD.scala:48

In [174]:
words1.collect()

[['hi', 'zhanghui'], ['saiya'], ['zhanghui']]

In [175]:
# words1.distinct().collect() TypeError: unhashable type: 'list'

In [177]:
words.cartesian(words1).collect()

[('hi', ['hi', 'zhanghui']),
 ('zhanghui', ['hi', 'zhanghui']),
 ('hi', ['saiya']),
 ('zhanghui', ['saiya']),
 ('hi', ['zhanghui']),
 ('zhanghui', ['zhanghui']),
 ('saiya', ['hi', 'zhanghui']),
 ('saiya', ['saiya']),
 ('saiya', ['zhanghui'])]

### map reduce使用

In [208]:
s = re.compile('2017-*')
key_num = 0

def return_key_and_01(a_string):
    global key_num
    if s.match(a_string):
        key_num = key_num+1
        return (key_num, (a_string,1))
    else:
        return (key_num,(a_string,1))

In [209]:
lines_with_tag = begin_with_2017.map(return_key_and_01)

In [210]:
lines_with_tag.take(1)

[(1,
  ('2017-07-23 14:45:03,903 - dynamic_content_loader - 146 - 10210 - 139996224378624 - INFO - index                                                                        A0000488530',
   1))]

In [228]:
def return_reduce(a,b):
    return (a[0], a[1]+1)

In [229]:
key_line_count = lines_with_tag.reduceByKeyLocally(return_reduce)

In [260]:
key_line_count.take(1)

[('2017-07-23 14:45:03,903 - dynamic_content_loader - 146 - 10210 - 139996224378624 - INFO - index                                                                        A0000488530',
  1)]

* 42是log文件的总数，因此每一个log文件都是分开处理的，因此key的累加是每个文件级别的

In [231]:
key_line_count = lines_with_tag.reduceByKey(return_reduce)

In [None]:
key_line_count.collect()

### 单词计数

In [286]:
d.take(1)

['2017-07-23 14:45:03,903 - dynamic_content_loader - 146 - 10210 - 139996224378624 - INFO - index                                                                        A0000488530']

In [470]:
word_counts = d.flatMap(lambda x:x.split(' ')).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)

In [None]:
word_counts.take(10)

### 根据key值定义分区

In [472]:
d

hdfs://192.170.5.172:9000/log_to_jp_2 MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [473]:
d.take(2)

['2017-07-16 14:34:39,570 - dynamic_content_loader - 146 - 20655 - 140032765228800 - INFO - user_id                                           A0000033961',
 'Action                                             initAmount']

In [476]:
d_partition = d.partitionBy(10).persist()

In [478]:
d_partition.partitioner

<pyspark.rdd.Partitioner at 0x7f88ed204940>

In [480]:
import urllib

In [488]:
from urllib.parse import urlparse

In [491]:
urlparse('http://www.baidu.com/zhanghui').netloc

'www.baidu.com'

In [483]:
hash('assasss')

3620816216146955347

In [492]:
def hash_domain(url):
    return hash(urlparse.urlparse(url).netloc)

In [493]:
d.partitionBy(20, hash_domain)

MapPartitionsRDD[147] at mapPartitions at PythonRDD.scala:427

### try whole_textfiles

In [494]:
d_w = sc.wholeTextFiles("hdfs://192.170.5.172:9000/log_to_jp_2")

In [495]:
d_w.count()

37

In [496]:
d_w.take(1)

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.


* 总是报错IOPub data rate exceeded

In [501]:
d_w.keys().take(2)

['hdfs://192.170.5.172:9000/log_to_jp_2/debug.log.26',
 'hdfs://192.170.5.172:9000/log_to_jp_2/debug.log.29']

In [510]:
count_file_wordcount = d_w.flatMapValues(lambda y:y.split(' ') )

### 统计文件词数

In [514]:
count_file_wordcount.mapValues(lambda x:1).reduceByKey(lambda x, y:x+y).take(1)

[('hdfs://192.170.5.172:9000/log_to_jp_2/errors.log.46', 10535)]

In [522]:
nums = sc.parallelize([('1','ss'), ("a",'asd')])

In [523]:
nums.saveAsSequenceFile("hdfs://192.170.5.172:9000/s")

###  使用累加器

In [599]:
s = re.compile('2017-*')
key_num = sc.accumulator(0)

def return_key_and_01(a_string):
    global key_num
    if s.match(a_string):
        key_num +=1
        return (key_num, (a_string,1))
    else:
        return (key_num,(a_string,1))

In [None]:
d.take(10)

In [601]:
lines_with_tag = d.map(return_key_and_01)

In [602]:
def return_reduce(a,b):
    return (a[0], a[1]+1)

In [603]:
key_line_count = lines_with_tag.reduceByKey(return_reduce)

In [604]:
key_line_count.take(2)

[(Accumulator<id=8, value=0>,
  ("2017-07-29 03:35:00,268 - index - 69 - 24922 - 139811914508032 - INFO - basic parameters:{'userCode': '15394229555', 'Action': 'initAmount', 'customer_id': 'A0000279243', 'sysSource': 'dianshua', 'loanProductType': '1000', 'level': '1001'},4",
   130712)),
 (Accumulator<id=8, value=0>,
  ('2017-07-20 00:10:04,436 - sqlalchemy.engine.base.Engine - 109 - 6214 - 139995331426048 - INFO - INSERT INTO "RENWODAIBIZ"."T_AMOUNT_CHANGE" (id, customer_id, user_code, operator_type, create_time, credit_amount, remark, product_type, xjd_credit_amount, change_credit_amount, change_credit_reason, user_level) VALUES (:id, :customer_id, :user_code, :operator_type, :create_time, :credit_amount, :remark, :product_type, :xjd_credit_amount, :change_credit_amount, :change_credit_reason, :user_level)',
   134739))]

In [607]:
key_line_count.count()

37

* 有问题，显示结果是0。可能原因是：工作节点上的任务不能访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。

### mapPartitions

* mapPartitions 分区map，意思是分区之后还是一团，需要继续map
* 避免创建对象的开销

### 模拟使用pipe

In [34]:
sc.stop()

In [35]:
conf = pyspark.SparkConf().setMaster("spark://192.170.5.172:7077").setAppName("count_log").set("spark.ui.port","8080")
sc = pyspark.SparkContext(conf=conf)

In [36]:
d = sc.textFile("hdfs://192.170.5.172:9000/log_to_jp_2/debug.log.40")

In [37]:
sc.addFile('hdfs://192.170.5.172:9000/python_test_script.py')

In [38]:
d.take(1)

['2017-07-08 16:17:29,510 - dynamic_content_loader - INFO - index                                             A0000118047']

In [39]:
pyspark.SparkFiles.get('python_test_script.py')

'/tmp/spark-f445df8b-24d7-4640-ba10-0fcfd3fb3954/userFiles-d428c45e-dc5e-4b98-914b-43dc6f0369cb/python_test_script.py'

In [40]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

In [None]:
distData.pipe('/tmp/spark-f445df8b-24d7-4640-ba10-0fcfd3fb3954/userFiles-09a8f1f8-9b7b-40a9-829c-7b5d46f84ad9/python_test_script.py').take(1)

In [62]:
d.pipe('cat').take(2)

['2017-07-08 16:17:29,510 - dynamic_content_loader - INFO - index                                             A0000118047',
 'Action                                             initAmount']

* FileNotFoundError: [Errno 2] No such file or directory: '/home/zhanghui/python_test_script.py' 
* 总是出问题

### 使用广播实现

In [5]:
sc.broadcast('/home/zhanghui/python_test_script.py')

<pyspark.broadcast.Broadcast at 0x7f03f36622b0>

In [6]:
pyspark.SparkFiles.getRootDirectory()

'/tmp/spark-f445df8b-24d7-4640-ba10-0fcfd3fb3954/userFiles-09a8f1f8-9b7b-40a9-829c-7b5d46f84ad9'

In [17]:
pyspark.SparkFiles.get('python_test_script.py')

'/tmp/spark-f445df8b-24d7-4640-ba10-0fcfd3fb3954/userFiles-09a8f1f8-9b7b-40a9-829c-7b5d46f84ad9/python_test_script.py'

In [None]:
distData.pipe('python_test_script.py').take(1)

* FileNotFoundError: [Errno 2] No such file or directory: 'python_test_script.py'

# 实战：log分析

### 生成key的时候加上文件名称

In [234]:
from pyspark.sql.functions import input_file_name

In [241]:
s = re.compile('2017-*')
key_num = 0

def return_key_and_01(a_string):
    global key_num
    if s.match(a_string):
        key_num = key_num+1
        return (str(key_num)+input_file_name(), (a_string,1))
    else:
        return (str(key_num)+input_file_name(),(a_string,1))

In [242]:
# key_line_count = begin_with_2017.map(return_key_and_01).reduce(lambda x,y:(x[0],x[1]+1))

In [None]:
#key_line_count.take(1)

* 出了问题，但是想到了另一个办法

In [14]:
s = re.compile('2017-*')
key_string = ''

def return_key_and_01(a_string):
    global key_string
    if s.match(a_string):
        key_string = a_string[0:83]
    return (key_string, 1)

In [15]:
key_line_count = begin_with_2017.map(return_key_and_01).reduceByKey(lambda x,y:x+y)

In [None]:
key_line_count.take(100)

In [183]:
key_line_count

PythonRDD[132] at RDD at PythonRDD.scala:48

#### 正则使用的测试

In [19]:
len('2017-07-27 14:42:29,263')

23

In [61]:
extra_pid = re.compile('^(.{23}) - (.*?) - (.*?) - (.*?) - (.*?) - (.*?)')

* ?是匹配到后面的第一个字符串就停止的意思

In [62]:
test_regex = extra_pid.match('2017-08-03 18:16:35,864 - dynamic_content_loader - 140 - 26844 - 140401201641216 - ')

In [63]:
test_regex.groups()

('2017-08-03 18:16:35,864',
 'dynamic_content_loader',
 '140',
 '26844',
 '140401201641216',
 '')

In [366]:
before_info = re.compile('.* - (DEBUG|INFO|WARNING|ERROR) - $')

In [367]:
before_info_string = before_info.match('2017-07-22 15:46:40,831 - index - 148 - 2363 - 139994014394112 - INFO - ')
before_info_string.string

'2017-07-22 15:46:40,831 - index - 148 - 2363 - 139994014394112 - INFO - '

In [368]:
before_info_string = before_info.match('2017-07-22 15:46:40,831 - index - 148 - 2363 - 139994014394112 - INF- ')
before_info_string.string

AttributeError: 'NoneType' object has no attribute 'string'

In [369]:
before_info = re.compile('^(.{23}) - (.*?) - (.*?) - (.*?) - (.*?) - (DEBUG|INFO|WARNING|ERROR) - $')

In [370]:
before_info_string = before_info.match('2017-07-06 17:35:13,256 - dynamic_content_loader - INFO - user_id       A0000499633')
before_info_string.string

AttributeError: 'NoneType' object has no attribute 'string'

In [72]:
before_info = re.compile('^(.{23}) - (.*?) - (.*?) - (.*?) - (.*?) - (DEBUG|INFO|WARNING|ERROR) - $')

In [73]:
before_info_string = before_info.match('2017-07-22 15:46:40,831 - index - 148 - 2363 - 139994014394112 - INFO - ')

In [77]:
before_info_string.groups()

('2017-07-22 15:46:40,831', 'index', '148', '2363', '139994014394112', 'INFO')

In [76]:
type(before_info_string.groups())

tuple

In [230]:
test_string = '2017-07-23 14:45:03,903 - dynamic_content_loader - 146 - 10210 - 139996224378624 - INFO - index A0000488530'

In [234]:
s_new = re.compile('^(.{23}) - (.*?) - (.*?) - (.*?) - (.*?) - (DEBUG|INFO|WARNING|ERROR) - .*')

In [235]:
s_new.match(test_string).groups()

('2017-07-23 14:45:03,903',
 'dynamic_content_loader',
 '146',
 '10210',
 '139996224378624',
 'INFO')

### 重新定义log信息抓取

In [44]:
import re

In [2]:
d = sc.textFile("hdfs://192.170.5.172:9000/log_to_jp_2")

NameError: name 'sc' is not defined

In [46]:
s_new = re.compile('^(.{23}) - (.*?) - (.*?) - (.*?) - (.*?) - (DEBUG|INFO|WARNING|ERROR) - .*')
s_old = re.compile("^2017.*")
key_string_groups = ()

def return_key_and_01(a_string):
    global key_string_groups
    try:
        key_string_groups = s_new.match(a_string).groups()
    except Exception as e:
        if s_old.match(a_string):
            key_string_groups = ('old_pattern','old_pattern','old_pattern','old_pattern','old_pattern','old_pattern',)
    return (key_string_groups, 1)

In [None]:
d.take(10)

In [48]:
log_groups_as_keys_0 = d.map(return_key_and_01)

In [49]:
log_groups_as_keys_0.take(2)

[(('2017-07-16 14:34:39,570',
   'dynamic_content_loader',
   '146',
   '20655',
   '140032765228800',
   'INFO'),
  1),
 (('2017-07-16 14:34:39,570',
   'dynamic_content_loader',
   '146',
   '20655',
   '140032765228800',
   'INFO'),
  1)]

In [50]:
log_groups_as_keys = d.map(return_key_and_01).reduceByKey(lambda x,y:x+y)

In [None]:
log_groups_as_keys.take(1000)

### 查找每一句log打出的长度

In [52]:
tuple(([34,6,756,867,8][i] for i in [1,2,]))

(6, 756)

In [53]:
want_return_index = [0,1,2,3,4,5]

In [54]:
def wrapper_return_new_key(want_return_index):
    def return_new_key(a):
        old_key = a[0]
        num_count = a[1]
        new_key = tuple((old_key[i] for i in want_return_index))
        return (new_key, num_count)
    return return_new_key

In [55]:
return_file_line = wrapper_return_new_key([1,2])

In [56]:
log_groups_as_keys

PythonRDD[16] at RDD at PythonRDD.scala:48

In [57]:
log_groups_as_keys.take(1)

[(('2017-08-04 18:31:15,327',
   'dynamic_contents.model_func',
   '32',
   '26875',
   '140402019313408',
   'DEBUG'),
  3)]

In [58]:
file_line_count = log_groups_as_keys.map(return_file_line).reduceByKey(lambda x,y:x+y).sortBy(lambda a:a[1], ascending=False)

In [None]:
file_line_count.take(200)

### 抽取单个流程

In [60]:
is_want_thread = 0
s_new = re.compile('^(.{23}) - (.*?) - (.*?) - (.*?) - (.*?) - (DEBUG|INFO|WARNING|ERROR) - .*')
s_old = re.compile("^2017.*")
key_string_groups = ()
pid = '23050'
tid = '139929902749504'

def get_single_thread(a_string):
    global key_string_groups
    global is_want_thread
    try:
        key_string_groups = s_new.match(a_string).groups()
        if key_string_groups[3]==pid and key_string_groups[4]==tid:
            is_want_thread =1
        else:
            is_want_thread =0
            
    except Exception as e:
        if s_old.match(a_string):
            key_string_groups = ('old_pattern','old_pattern','old_pattern','old_pattern','old_pattern','old_pattern',)
            is_want_thread = 0
    return (a_string, is_want_thread)

In [None]:
d.map(get_single_thread).filter(lambda x:x[1]==1).map(lambda x:x[0]).take(10000)

### 统计流程耗时

In [62]:
log_groups_as_keys.take(1)

[(('2017-08-04 18:31:15,327',
   'dynamic_contents.model_func',
   '32',
   '26875',
   '140402019313408',
   'DEBUG'),
  3)]

In [63]:
key_index=[ 3, 4, ]
value_index = [0, 1, 2]

In [64]:
def wrapper_return_new_key(key_index, value_index):
    def return_new_key(a):
        old_key = a[0]
        num_count = a[1]
        new_key = tuple((old_key[i] for i in key_index))
        new_value = tuple((old_key[i] for i in value_index))
        return (new_key, new_value)
    return return_new_key

In [65]:
return_file_line_time = wrapper_return_new_key(key_index, value_index)

In [66]:
log_groups_as_keys.map(return_file_line_time).take(2)

[(('26875', '140402019313408'),
  ('2017-08-04 18:31:15,327', 'dynamic_contents.model_func', '32')),
 (('2363', '139993628526336'),
  ('2017-07-22 16:26:11,722', 'dynamic_content_loader', '140'))]

In [67]:
thread_time_consumption = log_groups_as_keys.map(return_file_line_time)

In [68]:
thread_time_consumption_sorted = thread_time_consumption.sortBy(lambda x:x[1][0]).sortByKey()

In [69]:
thread_time_consumption_sorted.take(2)

[(('13827', '139812627453696'), ('2017-07-31 08:27:34,653', 'index', '68')),
 (('13827', '139812627453696'), ('2017-07-31 08:27:34,654', 'index', '69'))]

In [70]:
from dateutil.parser import parse

In [71]:
parse('2017-07-23 15:50:03,680')

datetime.datetime(2017, 7, 23, 15, 50, 3, 680000)

In [72]:
parse('2017-07-23 15:50:03,680').timestamp() - parse('2017-07-23 15:50:02,000').timestamp()  

1.6800000667572021

In [73]:
def return_time_process(x,y):
    if x[1][0] == '' or y[0]=='':
        return x[0]+'==>'+y[1], (y[0], 0)
    time_span = parse(y[0]).timestamp() - parse(x[1][0]).timestamp()  
    return x[0]+'==>'+y[1], (y[0], x[1][1]+time_span)

In [74]:
def combine_return_time_process(x,y):
    #if y[1][0]>x[1][0]
    return x[0]+'==>'+y[0], (y[1][0],x[1][1]+y[1][1] )

In [None]:
thread_time_consumption_sorted.take(10)

In [76]:
process_lines = thread_time_consumption_sorted.aggregateByKey(('', ('', 0)),return_time_process, combine_return_time_process)

In [None]:
process_lines.take(10)

* log耗时能否查询与log的质量相关

In [None]:
process_lines.toDebugString()

### 减少数据分区

In [82]:
thread_time_consumption_sorted.getNumPartitions()

37

In [84]:
thread_time_consumption_sorted = thread_time_consumption_sorted.coalesce(15).cache()

In [85]:
thread_time_consumption_sorted.getNumPartitions()

15

### spark_sql

In [5]:
from pyspark.sql import HiveContext, Row

In [6]:
from pyspark.sql import SQLContext, Row

In [7]:
sql_cx = SQLContext(sc) 

In [11]:
level_data_df = sql_cx.read.format('csv').option('header', 'true').load('hdfs://192.170.5.172:9000/level_data.csv')

In [14]:
level_data_df.registerTempTable('level_data')

In [16]:
level_data_df.take(5)

[Row(_c0='0', USER_CODE='B0000006812', USER_LEVEL='1004'),
 Row(_c0='1', USER_CODE='B0000006879', USER_LEVEL='1004'),
 Row(_c0='2', USER_CODE='A0000722714', USER_LEVEL='1004'),
 Row(_c0='3', USER_CODE='A0000634304', USER_LEVEL='1004'),
 Row(_c0='4', USER_CODE='B0000001674', USER_LEVEL='1001')]

In [19]:
sql_cx.sql("SELECT user_code, user_level FROM level_data WHERE user_level='1004' ").take(5)

[Row(user_code='B0000006812', user_level='1004'),
 Row(user_code='B0000006879', user_level='1004'),
 Row(user_code='A0000722714', user_level='1004'),
 Row(user_code='A0000634304', user_level='1004'),
 Row(user_code='A0000822080', user_level='1004')]

In [20]:
sql_cx.cacheTable('level_data')

In [21]:
sql_cx.registerFunction('len_string', lambda x:len(x))

In [22]:
sql_cx.sql("SELECT len_string(user_code) FROM level_data WHERE user_level='1004' ").take(5)

[Row(len_string(user_code)='11'),
 Row(len_string(user_code)='11'),
 Row(len_string(user_code)='11'),
 Row(len_string(user_code)='11'),
 Row(len_string(user_code)='11')]

### 多计算节点

* 用sbin/start-slave.sh spark://192.170.5.172:7077启动到本机的master上即可；
* 可以部署计算任务；但使用hadoop分布式文件，remote spark worker会显示找不到文件block