In [1]:
import findspark
findspark.init()
import os
import pyspark
from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf) 

# 数据载入

In [2]:
# 通过命令统计raw_content中的记录条数
# raw_content = sc.textFile("file:///E:/MyProject/spark-analysis/data/spark-data.csv")
raw_content = sc.textFile('./data/spark-data.csv')
raw_content.count()

421970

In [3]:
# 从raw_content这个RDD中取出前5条记录输出
raw_content.take(5)

['"date","time","size","r_version","r_arch","r_os","package","version","country","ip_id"',
 '"2015-12-12","13:42:10",257886,"3.2.2","i386","mingw32","HistData","0.7-6","CZ",1',
 '"2015-12-12","13:24:37",1236751,"3.2.2","x86_64","mingw32","RJSONIO","1.3-0","DE",2',
 '"2015-12-12","13:42:35",2077876,"3.2.2","i386","mingw32","UsingR","2.0-5","CZ",1',
 '"2015-12-12","13:42:01",266724,"3.2.2","i386","mingw32","gridExtra","2.0.0","CZ",1']

In [4]:
# 从raw_content中采样出3条记录输出
raw_content.takeSample(False,3)

['"2015-12-12","14:10:31",39913,NA,NA,NA,"labeling","0.3","US",16501',
 '"2015-12-12","10:32:55",66867,"3.1.2","x86_64","mingw32","muhaz","1.2.6","ES",3232',
 '"2015-12-12","07:52:27",148193,"3.2.3","x86_64","linux-gnu","dichromat","2.0-0","HK",6894']

# 数据处理

## 清洗

In [5]:
# 对以上数据进行清洗，把记录中带的引号去除掉，得到content_rdd
content_rdd = raw_content.map(lambda x:x.replace('"','')).map(lambda x:x.split(','))
content_rdd.take(3)

[['date',
  'time',
  'size',
  'r_version',
  'r_arch',
  'r_os',
  'package',
  'version',
  'country',
  'ip_id'],
 ['2015-12-12',
  '13:42:10',
  '257886',
  '3.2.2',
  'i386',
  'mingw32',
  'HistData',
  '0.7-6',
  'CZ',
  '1'],
 ['2015-12-12',
  '13:24:37',
  '1236751',
  '3.2.2',
  'x86_64',
  'mingw32',
  'RJSONIO',
  '1.3-0',
  'DE',
  '2']]

In [6]:
# 有如下的text，请使用flatmap得到包含所有字母的list
# import re
# def remove_symbol(s):
#     reg = re.compile(r'[\s#,%]')
#     s = reg.sub('-',s)
#     return s.split('-')

def remove_symbol(x):
    if ',' in x:
        return x.split(',')
    elif '#' in x:
        return x.split('#')
    elif '%' in x:
        return x.split('%')
    else:
        return x.split()


text=["a b c", "d e", "f#g#h", "m n q", "r,q,w", "j%r%q"]

text_rdd = sc.parallelize(text)
text_rdd = text_rdd.flatMap(lambda s:remove_symbol(s))
print(text_rdd.collect()) # print横着显示

['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'm', 'n', 'q', 'r', 'q', 'w', 'j', 'r', 'q']


## 统计

In [7]:
# 从刚才的content_rdd中取出第7列，对不同的package类别进行统计计数
package_rdd = content_rdd.map(lambda s:str(s).split(',')[6]).filter(lambda s:s!='package').cache()
package_count_rdd = package_rdd.map(lambda s:(s,1)).reduceByKey(lambda x,y:x+y)
package_count_rdd.take(10)

[(" 'gridExtra'", 1758),
 (" 'testthat'", 1178),
 (" 'maps'", 1586),
 (" 'geosphere'", 284),
 (" 'mgcv'", 1402),
 (" 'gtools'", 1544),
 (" 'matrixcalc'", 204),
 (" 'lubridate'", 626),
 (" 'doParallel'", 1328),
 (" 'abind'", 538)]

## 排序

In [8]:
# 取出数量最多的10个package和出现的频次
package_count_sort_rdd = package_count_rdd.map(lambda t:(t[1],t[0])).sortByKey(ascending=False).cache()
package_count_sort_rdd.take(10)

[(4783, " 'Rcpp'"),
 (3913, " 'ggplot2'"),
 (3748, " 'stringi'"),
 (3449, " 'stringr'"),
 (3436, " 'plyr'"),
 (3265, " 'magrittr'"),
 (3223, " 'digest'"),
 (3205, " 'reshape2'"),
 (3046, " 'RColorBrewer'"),
 (3007, " 'scales'")]

## 过滤

In [9]:
# 取出top5的package对应的数据记录
top5_package = [tup[1] for tup in package_count_sort_rdd.take(5)] # 先取出top5 package对应的名字
top5_package_rdd = content_rdd.filter(lambda s:str(s).split(',')[6] in top5_package).cache()
top5_package_rdd.take(5)

[['2015-12-12',
  '20:56:29',
  '392860',
  '3.1.3',
  'x86_64',
  'mingw32',
  'plyr',
  '1.8.3',
  'US',
  '10'],
 ['2015-12-12',
  '20:55:19',
  '512',
  'NA',
  'NA',
  'NA',
  'plyr',
  '1.6',
  'CN',
  '23'],
 ['2015-12-12',
  '20:56:29',
  '35401',
  '3.2.2',
  'x86_64',
  'mingw32',
  'stringr',
  '1.0.0',
  'US',
  '10'],
 ['2015-12-12',
  '20:56:28',
  '2370487',
  '3.2.2',
  'i386',
  'mingw32',
  'Rcpp',
  '0.12.2',
  'US',
  '10'],
 ['2015-12-12',
  '20:56:27',
  '3643527',
  '3.2.1',
  'x86_64',
  'linux-gnu',
  'stringi',
  '1.0-1',
  'US',
  '10']]