# 0 初始化Spark

In [3]:
from pyspark import SparkConf, SparkContext

In [4]:
conf = SparkConf().setMaster("local").setAppName("My App") # master：运行模式，默认local[*]

In [5]:
sc = SparkContext(conf = conf)

# 方式一：RDD 操作数据

基本“转换”运算

In [44]:
# 读取外部数据
textfile = sc.textFile("Desktop/lexisnexis_pg_test1.csv")
print(lines)

Desktop/lexisnexis_worldcompliance_entities.csv MapPartitionsRDD[15] at textFile at NativeMethodAccessorImpl.java:0


In [54]:
stringRDD = textfile.flatMap(lambda line:line.split(",")) # 文件的每一行按照空格拆分
#countsRDD = stringRDD.map(lambda word: (word, 1)).reduceByKey(lambda x,y:x+y)

In [55]:
stringRDD.collect()

['"Ent_ID"',
 '"Name"',
 '"FirstName"',
 '"LastName"',
 '"Prefix"',
 '"Suffix"',
 '"Aka"',
 '"NameSource"',
 '"ParentID"',
 '"GovDesignation"',
 '"EntryType"',
 '"EntryCategory"',
 '"EntrySubCategory"',
 '"Organization"',
 '"Positions"',
 '"Remarks"',
 '"DOB"',
 '"POB"',
 '"Country"',
 '"ExpirationDate"',
 '"EffectiveDate"',
 '"PictureFile"',
 '"LinkedTo"',
 '"Related_ID"',
 '"SourceWebLink"',
 '"TouchDate"',
 '"DirectID"',
 '"PassportID"',
 '"NationalID"',
 '"OtherID"',
 '"DOB2"',
 '"EntLevel"',
 '"MasterID"',
 '"Watch"',
 '"Relationships"',
 '"PrimaryName"',
 '"OriginalName2"',
 '"Gender"',
 '"EY_DailyDeltaDate"',
 '"EY_DailyDeltaFilename"',
 '"1574372"',
 '"Skvortsova',
 ' Veronika Igorevna"',
 '"Veronika Igorevna"',
 '"Skvortsova"',
 '""',
 '""',
 '""',
 '"Website"',
 '"0"',
 '""',
 '"Individual"',
 '"PEP"',
 '"Govt Branch Member"',
 '""',
 '"Head of the Federal Biomedical Agency',
 ' effective from January 22',
 ' 2020."',
 '"Career: Head',
 ' effective from January 22',
 ' 2020',

In [21]:
s1 = sc.parallelize(list(range(10))) # 将数据并行分布到背后的分布式存储，透明的分布式处理与存储
s1.collect() # 转换运算不会立即运算（延迟运算），需要collect()等动作运算才会真正执行数据操作

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [22]:
s2 = sc.parallelize(["Apple", "Orange", "Banana", "Orange", "Apple"])
s2.collect()

['Apple', 'Orange', 'Banana', 'Orange', 'Apple']

In [25]:
def add_one(x):
    return x+2

s1.map(add_one).collect() # 划分任务，把不同任务分配到不同计算机上，对于大数据集有优势

[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

In [27]:
s1.map(lambda x:x+2).collect()

[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

In [28]:
s2.map(lambda x: 'fruit:' + x).collect()

['fruit:Apple', 'fruit:Orange', 'fruit:Banana', 'fruit:Orange', 'fruit:Apple']

In [29]:
s2.filter(lambda x: 'ra' in x).collect()

['Orange', 'Orange']

In [30]:
s1.filter(lambda x: x>=3 and x <=8).collect()

[3, 4, 5, 6, 7, 8]

In [31]:
s2.distinct().collect()

['Apple', 'Orange', 'Banana']

In [33]:
s2.collect()

['Apple', 'Orange', 'Banana', 'Orange', 'Apple']

In [39]:
s3 = s1.randomSplit([0.4,0.6])

In [40]:
s3[0].collect()

[3, 4, 5, 6, 8]

In [42]:
s4 = s1.groupBy(lambda x:'偶数' if (x%2==0) else '奇数').collect()

In [46]:
sorted(s4[0][1])

[0, 2, 4, 6, 8]

In [47]:
a1 = sc.parallelize([3,1,2,5,6])
a2 = sc.parallelize([5,6])
a3 = sc.parallelize([2,7])

In [49]:
a1.union(a2).union(a3).collect() # 并集

[3, 1, 2, 5, 6, 5, 6, 2, 7]

In [51]:
a1.intersection(a3).collect() # 交集

[2]

In [52]:
a1.subtract(a3).collect() # 差集

[6, 3, 1, 5]

In [53]:
a2.cartesian(a3).collect() # 笛卡尔积 应用：交叉验证、网格搜索

[(5, 2), (5, 7), (6, 2), (6, 7)]

基本“动作”运算

In [54]:
a1.first()

3

In [56]:
a1.take(2) # 取前2个

[3, 1]

In [57]:
a1.takeOrdered(3) # 排序之后取前3个

[1, 2, 3]

In [58]:
a1.min()

1

In [59]:
a1.max()

6

In [60]:
a1.takeOrdered(3,key=lambda x:-x) # 倒序排序

[6, 5, 3]

In [61]:
a1.stats() # 描述性统计信息 机器学习需要推理性统计信息的获取

(count: 5, mean: 3.4, stdev: 1.854723699099141, max: 6.0, min: 1.0)

RDD Key-Value 基本“转换”运算

In [62]:
k1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
k1.collect()

[(3, 4), (3, 6), (5, 6), (1, 2)]

In [63]:
k1.keys().collect() # 获取键

[3, 3, 5, 1]

In [64]:
k1.values().collect() # 获取值

[4, 6, 6, 2]

In [65]:
k1.filter(lambda keyValue: keyValue[0] < 5).collect()

[(3, 4), (3, 6), (1, 2)]

In [66]:
k1.mapValues(lambda x: x * x).collect() # mapValues：针对值映射

[(3, 16), (3, 36), (5, 36), (1, 4)]

In [68]:
k1.sortByKey(ascending=False).collect()

[(5, 6), (3, 4), (3, 6), (1, 2)]

In [69]:
k1.sortByKey().collect()

[(1, 2), (3, 4), (3, 6), (5, 6)]

In [70]:
k1.reduceByKey(lambda x,y: x+y).collect() # 规约、聚集汇合 （键相同的值合并相加）

[(3, 10), (5, 6), (1, 2)]

In [71]:
k1.collect()

[(3, 4), (3, 6), (5, 6), (1, 2)]

In [6]:
k1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
k2 = sc.parallelize([(3,9),(8,9)])

In [80]:
k1.join(k2).collect()

[(3, (4, 9)), (3, (6, 9))]

In [81]:
k1.leftOuterJoin(k2).collect()

[(3, (4, 9)), (3, (6, 9)), (5, (6, None)), (1, (2, None))]

In [82]:
k1.rightOuterJoin(k2).collect()

[(8, (None, 9)), (3, (4, 9)), (3, (6, 9))]

In [83]:
k1.subtractByKey(k2).collect()

[(5, 6), (1, 2)]

In [7]:
k1.countByKey()

defaultdict(int, {3: 2, 5: 1, 1: 1})

In [8]:
k1.collectAsMap() # 如果同一个Key存在多个Value，则后边的Value会覆盖前边的Value，最终结果键唯一

{3: 6, 5: 6, 1: 2}

In [9]:
k1.lookup(3) # 取键为3的值组成一个列表

[4, 6]

In [10]:
k1.lookup(5)

[6]

广播变量 Broadcast
   是执行任务的多台计算机之间共享的方式

In [11]:
f1=sc.parallelize([(1, "apple"), (2, "orange"), (3, "bananas"), (4, "grape")])

In [15]:
f3 = f1.collectAsMap()

In [13]:
f2 = sc.parallelize([2,4,1,3])

In [16]:
f2.map(lambda x:f3[x]).collect()

['orange', 'grape', 'apple', 'bananas']

In [18]:
f4 = sc.broadcast(f3) # 设置广播变量 分布式共享变量，多台计算机之间通信，共享sc

In [20]:
f2.map(lambda x:f4.value[x]).collect()

['orange', 'grape', 'apple', 'bananas']

accumulator累加器   分布式累加

In [21]:
m1 = sc.parallelize([3,1,2,5,5])

In [35]:
total = sc.accumulator(0)
num = sc.accumulator(0)

In [36]:
m1.foreach(lambda i: [total.add(i), num.add(1)]) 

In [37]:
num.value

5

In [38]:
total.value

16

RDD Persistence 持久化
将数据缓存在内存，提高性能的措施，分布式缓存

In [39]:
from pyspark import StorageLevel

In [40]:
m1.persist(StorageLevel.MEMORY_AND_DISK)

ParallelCollectionRDD[10] at parallelize at PythonRDD.scala:195

In [41]:
m1.is_cached

True

# 方式二：DataFrame 操作数据

In [137]:
# 读取外部数据
textfile = sc.textFile("Desktop/lexisnexis_worldcompliance_entities.csv")
textfile.count()

7671461

In [138]:
textfile.take(5)

['"Ent_ID","Name","FirstName","LastName","Prefix","Suffix","Aka","NameSource","ParentID","GovDesignation","EntryType","EntryCategory","EntrySubCategory","Organization","Positions","Remarks","DOB","POB","Country","ExpirationDate","EffectiveDate","PictureFile","LinkedTo","Related_ID","SourceWebLink","TouchDate","DirectID","PassportID","NationalID","OtherID","DOB2","EntLevel","MasterID","Watch","Relationships","PrimaryName","OriginalName2","Gender","EY_DailyDeltaDate","EY_DailyDeltaFilename"',
 '"1574254","Quintanilla, Ricardo","Ricardo","Quintanilla","","","","US-AO-DMN","0","","Individual","Enforcement","Money Laundering","","Sentenced to 40 months in prison for money laundering - April 26, 2011","According to United States Attorney - District of Minnesota and U.S. Drug Enforcement Administration; January 06, 2012: On January 06, 2012, in federal court, Fairbault man was sentenced in connection with a Mexican-based drug cartel that distributed methamphetamine and cocaine throughout Minn

In [122]:
lnRDD = textfile.map(lambda line:line.split('","')) # 文件的每一行按照“,”拆分

11

In [133]:
df = lnRDD.filter(lambda x: x[0] != '"Ent_ID').toDF(lnRDD.first())

+--------+--------------------+-----------------+----------+------+------+---+----------+--------+--------------+----------+-------------+------------------+------------+--------------------+--------------------+-----------------+--------------------+------------------+--------------+-------------+--------------------+--------+----------+--------------------+--------------------+--------------------+----------+----------+-----------+----+--------+--------+-----+-------------+--------------------+-------------+------+-----------------+----------------------+
| "Ent_ID|                Name|        FirstName|  LastName|Prefix|Suffix|Aka|NameSource|ParentID|GovDesignation| EntryType|EntryCategory|  EntrySubCategory|Organization|           Positions|             Remarks|              DOB|                 POB|           Country|ExpirationDate|EffectiveDate|         PictureFile|LinkedTo|Related_ID|       SourceWebLink|           TouchDate|            DirectID|PassportID|NationalID|    OtherID

In [139]:
df.count()

10

AnalysisException: 'filter expression \'`OriginalName2`\' of type string is not a boolean.;;\nFilter OriginalName2#944: string\n+- LogicalRDD ["Ent_ID#908, Name#909, FirstName#910, LastName#911, Prefix#912, Suffix#913, Aka#914, NameSource#915, ParentID#916, GovDesignation#917, EntryType#918, EntryCategory#919, EntrySubCategory#920, Organization#921, Positions#922, Remarks#923, DOB#924, POB#925, Country#926, ExpirationDate#927, EffectiveDate#928, PictureFile#929, LinkedTo#930, Related_ID#931, ... 16 more fields], false\n'

In [71]:
from pyspark.sql import SparkSession, Row

In [85]:
session = SparkSession.builder.getOrCreate()
session

In [86]:
%time 
ln_rows = lnRDD.map(lambda p:
                    Row(
                        Ent_ID=p[0].replace('"',''),
                        Name=p[1].replace('"',''),
                        FirstName=p[2].replace('"',''),
                        LastName=p[3].replace('"','')
                    )
                   )

Wall time: 0 ns


In [87]:
%time ln_rows.take(5)

Wall time: 1.88 s


[Row(Ent_ID='Ent_ID', FirstName='FirstName', LastName='LastName', Name='Name'),
 Row(Ent_ID='1574372', FirstName=' Veronika Igorevna', LastName='Veronika Igorevna', Name='Skvortsova'),
 Row(Ent_ID='1574701', FirstName=' José Jorge', LastName='José Jorge', Name='Alperovich'),
 Row(Ent_ID='1575309', FirstName=' Kevin', LastName='Kevin', Name='Erwin'),
 Row(Ent_ID='1575311', FirstName=' Doneisha Lanee', LastName='Doneisha Lanee', Name='Bruce')]

In [88]:
ln_df = session.createDataFrame(ln_rows)
ln_df.printSchema()

root
 |-- Ent_ID: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Name: string (nullable = true)



In [90]:
ln_df.head(5)

[Row(Ent_ID='Ent_ID', FirstName='FirstName', LastName='LastName', Name='Name'),
 Row(Ent_ID='1574372', FirstName=' Veronika Igorevna', LastName='Veronika Igorevna', Name='Skvortsova'),
 Row(Ent_ID='1574701', FirstName=' José Jorge', LastName='José Jorge', Name='Alperovich'),
 Row(Ent_ID='1575309', FirstName=' Kevin', LastName='Kevin', Name='Erwin'),
 Row(Ent_ID='1575311', FirstName=' Doneisha Lanee', LastName='Doneisha Lanee', Name='Bruce')]

In [91]:
ln_df.show(5)

+-------+------------------+-----------------+----------+
| Ent_ID|         FirstName|         LastName|      Name|
+-------+------------------+-----------------+----------+
| Ent_ID|         FirstName|         LastName|      Name|
|1574372| Veronika Igorevna|Veronika Igorevna|Skvortsova|
|1574701|        José Jorge|       José Jorge|Alperovich|
|1575309|             Kevin|            Kevin|     Erwin|
|1575311|    Doneisha Lanee|   Doneisha Lanee|     Bruce|
+-------+------------------+-----------------+----------+
only showing top 5 rows



建立Spark SQL临时表

In [94]:
ln_df.registerTempTable("lexisnexis_temp_table") # 虚拟表

In [95]:
session.sql('select count(*) from lexisnexis_temp_table').show()

+--------+
|count(1)|
+--------+
|      11|
+--------+



In [142]:
lnRDD.map(lambda x: (x[2], x[3], x[0], x[1] + "，你好！")).take(5)

[('FirstName', 'LastName', '"Ent_ID', 'Name，你好！'),
 ('Veronika Igorevna',
  'Skvortsova',
  '"1574372',
  'Skvortsova, Veronika Igorevna，你好！'),
 ('José Jorge', 'Alperovich', '"1574701', 'Alperovich, José Jorge，你好！'),
 ('Kevin', 'Erwin', '"1575309', 'Erwin, Kevin，你好！'),
 ('Doneisha Lanee', 'Bruce', '"1575311', 'Bruce, Doneisha Lanee，你好！')]

In [153]:
ln_df.select("Ent_ID", "LastName", "FirstName","Name").show()

NameError: name 'concat' is not defined

In [99]:
ln_df.select(ln_df.Ent_ID, ln_df.LastName).show(5)

+-------+-----------------+
| Ent_ID|         LastName|
+-------+-----------------+
| Ent_ID|         LastName|
|1574372|Veronika Igorevna|
|1574701|       José Jorge|
|1575309|            Kevin|
|1575311|   Doneisha Lanee|
+-------+-----------------+
only showing top 5 rows



In [100]:
ln_df[ln_df["Ent_ID"], ln_df["LastName"], ln_df["FirstName"]].show(5)

+-------+-----------------+------------------+
| Ent_ID|         LastName|         FirstName|
+-------+-----------------+------------------+
| Ent_ID|         LastName|         FirstName|
|1574372|Veronika Igorevna| Veronika Igorevna|
|1574701|       José Jorge|        José Jorge|
|1575309|            Kevin|             Kevin|
|1575311|   Doneisha Lanee|    Doneisha Lanee|
+-------+-----------------+------------------+
only showing top 5 rows



In [152]:
sqlContext.sql("""
    select "Ent_ID", "LastName", "FirstName","Name",concat(Name, ',你好') from lexisnexis_temp_table
""").show()

+------+--------+---------+----+-------------------+
|Ent_ID|LastName|FirstName|Name|concat(Name, ,你好)|
+------+--------+---------+----+-------------------+
|Ent_ID|LastName|FirstName|Name|          Name,你好|
|Ent_ID|LastName|FirstName|Name|    Skvortsova,你好|
|Ent_ID|LastName|FirstName|Name|    Alperovich,你好|
|Ent_ID|LastName|FirstName|Name|         Erwin,你好|
|Ent_ID|LastName|FirstName|Name|         Bruce,你好|
|Ent_ID|LastName|FirstName|Name|        Wilson,你好|
|Ent_ID|LastName|FirstName|Name|       Leblanc,你好|
|Ent_ID|LastName|FirstName|Name|         Pratt,你好|
|Ent_ID|LastName|FirstName|Name|      Aldridge,你好|
|Ent_ID|LastName|FirstName|Name|       Kennard,你好|
|Ent_ID|LastName|FirstName|Name|     Tsarukyan,你好|
+------+--------+---------+----+-------------------+



In [155]:
session.sql('select "Ent_ID", "LastName", "FirstName","Name",concat(Name, ",你好") from lexisnexis_temp_table').show()

+------+--------+---------+----+-------------------+
|Ent_ID|LastName|FirstName|Name|concat(Name, ,你好)|
+------+--------+---------+----+-------------------+
|Ent_ID|LastName|FirstName|Name|          Name,你好|
|Ent_ID|LastName|FirstName|Name|    Skvortsova,你好|
|Ent_ID|LastName|FirstName|Name|    Alperovich,你好|
|Ent_ID|LastName|FirstName|Name|         Erwin,你好|
|Ent_ID|LastName|FirstName|Name|         Bruce,你好|
|Ent_ID|LastName|FirstName|Name|        Wilson,你好|
|Ent_ID|LastName|FirstName|Name|       Leblanc,你好|
|Ent_ID|LastName|FirstName|Name|         Pratt,你好|
|Ent_ID|LastName|FirstName|Name|      Aldridge,你好|
|Ent_ID|LastName|FirstName|Name|       Kennard,你好|
|Ent_ID|LastName|FirstName|Name|     Tsarukyan,你好|
+------+--------+---------+----+-------------------+



过滤列

In [160]:
lnRDD.filter(lambda r: r[3]=='Skvortsova' and r[16] == 'November 01, 1960').collect()

[['"1574372',
  'Skvortsova, Veronika Igorevna',
  'Veronika Igorevna',
  'Skvortsova',
  '',
  '',
  '',
  'Website',
  '0',
  '',
  'Individual',
  'PEP',
  'Govt Branch Member',
  '',
  'Head of the Federal Biomedical Agency, effective from January 22, 2020.',
  'Career: Head, effective from January 22, 2020, Agency: Federal Biomedical Agency; Head of the Federal Service for Surveillance in Healthcare (Roszdravnadzor), effective from January 21, 2020; Member of the Supervisory Board of the State Corporation Rostec, effective from September 2018; Acting Minister of Healthcare (January 15, 2020 - January 21, 2020); Member of the Executive Board of the World Health Organization (2014 - 2017); Minister of Healthcare (May 21, 2012 - January 15, 2020); Deputy Minister of Healthcare and Social Development (July 15, 2008 - May 22, 2012).  Company name: State Corporation Rostec.',
  'November 01, 1960',
  'Moscow, , Russia',
  'Russian Federation',
  '',
  '2020',
  '\\Russia\\1574372.png',
