## Spark Transformations 总结&举例

### 1. map(fun) 
- 将 func 函数作用到数据集的每个元素，生成一个新的分布式的数据集并返回

In [1]:
from pyspark.sql import SparkSession
import os, time

os.environ['SPARK_HOME'] = "D:/Spark"
spark = SparkSession.builder.appName("learning_test").getOrCreate()
sc = spark.sparkContext

# map(fun) 将 func 函数作用到数据集的每个元素，生成一个新的分布式的数据集并返回
a = sc.parallelize(('a', 'b', 'c'))
a.map(lambda x: x + '1').collect()

['a1', 'b1', 'c1']

### 2. filter(func)
- 选出所有 func 返回值为 ture 的元素，作为一个新数据集返回

In [6]:
a = sc.parallelize(range(10))
a.filter(lambda x: x%2 == 0).collect()

[0, 2, 4, 6, 8]

### 3. flatMap(func) 
- 与 map 相似，但是每个输入的 item 能够被 map 到 0 个或者更多的 items 输出，也就是说 func 的返回值应当是一个 Sequence，而不是一个单独的 item。

In [7]:
l = ['Hello Python','Hello spark','I am learning']
a = sc.parallelize(l,3)
a.flatMap(lambda line: line.split()).collect()

['Hello', 'Python', 'Hello', 'spark', 'I', 'am', 'learning']

### 4. mapPartitions(func) 
- 与 map 相似，但是 mapPartitions 的输入函数单独作用于 RDD 的每个分区(block)上，因此 func 的输入和返回值都必须是迭代器 iterator。 

    eg. 假设RDD有十个元素0~9，分成三个区，使用mapPartitions返回每个元素的平方。如果使用map方法，map中的输入函数会被调用10次，而使用mapPartitions方法，输入函数只会被调用3次，每个分区被调用1次。

In [8]:
def squareFunc(a):
    for i in a:
        yield i*i
a = sc.parallelize(range(10),3)
a.mapPartitions(squareFunc).collect()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

### 6. sample(withReplacement, fraction, seed) 
- 从数据中抽样，withReplacement 表示是否有放回，withReplacement=true 表示有放回抽样，fraction 为抽样的概率（0<=fraction<=1），seed 为随机种子。

    eg. 从1-100之间抽取样本，被抽取为样本的概率为0.2

In [9]:
data = sc.parallelize(range(1,101),2)
sample = data.sample(True, 0.2)
sample.count()

16

In [10]:
sample.collect()

[7, 18, 22, 27, 31, 34, 42, 46, 46, 51, 55, 64, 69, 73, 78, 99]

__注意: __ Spark 中的 sample 抽样，当w ithReplacement = True 时，相当于采用的是泊松抽样；当 withReplacement = False 时，相当于采用伯努利抽样，fraction 并不是表示抽样得到的样本占原来数据总量的百分比，而是一个元素被抽取为样本的概率。fraction = 0.2 并不是说明要抽出 100 个数字中 20% 的数据作为样本，而是每个数字被抽取为样本的概率为 0.2，这些数字被认为来自同一总体，样本的大小并不是固定的，而是服从二项分布。

### 7. union(otherDataset) 
- 并集操作，将源数据集与 union 中的输入数据集取并集，默认保留重复元素（如果不保留重复元素，可以利用 distinct 操作去除，下边介绍 distinct 时会介绍）。

In [11]:
data1 = sc.parallelize( range(10) )
data2 = sc.parallelize( range(6,15) )
data1.union(data2).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 12, 13, 14]

### 8. intersection(otherDataset) 
- 交集操作，将源数据集与union中的输入数据集取交集，并返回新的数据集。

In [12]:
data1 = sc.parallelize( range(10) )
data2 = sc.parallelize( range(6,15) )
data1.intersection(data2).collect()

[8, 9, 6, 7]

### 9. distinct([numTasks]) 
- 去除数据集中的重复元素。

In [13]:
data1 = sc.parallelize( range(10) )
data2 = sc.parallelize( range(6,15) )
data1.union(data2).distinct().collect()

[0, 8, 1, 9, 2, 10, 3, 11, 4, 12, 5, 13, 6, 14, 7]

- 下边的一系列transactions会用的键（Key）这一概念，在进行下列有关Key操作时使用的数据集为记录伦敦各个片区（英文称为ward）中学校和学生人数相关信息的表格，下载地址： 
https://data.london.gov.uk/dataset/london-schools-atlas/resource/64f771ee-38b1-4eff-8cd2-e9ba31b90685# 
- 下载后将其中命名为WardtoSecSchool_LDS_2015的sheet里边的数据保存为csv格式，删除第一行的表头，并重新命名为school.csv 
- __数据格式为__： 
(Ward_CODE, Ward_NAME, TotalWardPupils, Ward2Sec_Flow_No., Secondary_School_URN, Secondary_School_Name, Pupil_count) 

首先对数据进行一些预处理:

In [37]:
school = sc.textFile("C:/Users/Berlin/Python/Spark_Python/school.csv")
school.count()

49288

In [42]:
# 引入 Python 的正则表达式包
import re 
rows = school.map(lambda line: re.subn(',[\s]+',': ', line))
rows.save('C:/Users/Berlin/Python/Spark_Python/mycsv.csv')

AttributeError: 'PipelinedRDD' object has no attribute 'save'

### 10. groupByKey([numTasks]) 
- 作用于由键值对(K, V)组成的数据集上，将Key相同的数据放在一起，返回一个由键值对(K, Iterable)组成的数据集。 

__注意__：1. 如果这一操作是为了后续在每个键上进行聚集（aggregation），比如 sum 或者 average，此时使用 reduceByKey 或者 aggregateByKey 的效率更高。2. 默认情况下，输出的并行程度取决于 RDD 分区的数量，但也可以通过给可选参数 numTasks 赋值来调整并发任务的数量。

In [16]:
newRows = rows.map(lambda r: r[0].split(','))  
ward_schoolname = newRows .map(lambda r: (r[1], r[5])).groupByKey()  # r[1]为ward的名字，r[5]为学校的名字
ward_schoolname.map(lambda x: {x[0]: list(x[1])}).collect()  # 列出每个ward区域内所有的学校的名字

[{"Sir John Cass's Foundation Primary School": ['City of London 001E',
   'Tower Hamlets 015E',
   'Tower Hamlets 015B',
   'City of London 001C',
   'Tower Hamlets 026C',
   'Tower Hamlets 026A',
   'Tower Hamlets 027B',
   'Tower Hamlets 009D',
   'City of London 001A',
   'Tower Hamlets 027A',
   'Tower Hamlets 015A',
   'Tower Hamlets 015C',
   'Tower Hamlets 015D',
   'Tower Hamlets 021F',
   'Tower Hamlets 007B',
   'Tower Hamlets 016A',
   'Tower Hamlets 016C',
   'Tower Hamlets 027C',
   'Tower Hamlets 021A',
   'Tower Hamlets 017B']},
 {'Argyle Primary School': ['Camden 024C',
   'Camden 025C',
   'Camden 025A',
   'Camden 022B',
   'Camden 027A',
   'Camden 025B',
   'Camden 024D',
   'Camden 022E',
   'Camden 024A',
   'Camden 024B',
   'Camden 025D',
   'Camden 027B',
   'Camden 027C',
   'Camden 028A',
   'Camden 019D',
   'Camden 026C',
   'Camden 022C',
   'Camden 022D',
   'Camden 019B',
   'Camden 028D',
   'Camden 025E',
   'Camden 019E']},
 {'Beckford Primary School'

### 11. reduceByKey(func, [numTasks]) 
- 作用于键值对(K, V)上，按 Key 分组，然后将 Key 相同的键值对的 Value 都执行 func 操作，得到一个值，注意 func 的类型必须满足

In [32]:
pupils = newRows.map(lambda r: [r[1], int(r[6])])  # r[1]为ward的名字，r[6]为每个学校的学生数
pupils.collect()
# ward_pupils = pupils.reduceByKey(lambda x, y: x + y)   # 计算各个ward中的学生数
# ward_pupils.collect()  # 输出各个ward中的学生数

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 32.0 failed 1 times, most recent failure: Lost task 1.0 in stage 32.0 (TID 87, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "D:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 230, in main
  File "D:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 225, in process
  File "D:\Spark\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "D:\Spark\python\lib\pyspark.zip\pyspark\util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-32-4e34a2147efd>", line 1, in <lambda>
ValueError: invalid literal for int() with base 10: 'Hounslow 001A'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:162)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor49.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "D:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 230, in main
  File "D:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 225, in process
  File "D:\Spark\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "D:\Spark\python\lib\pyspark.zip\pyspark\util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-32-4e34a2147efd>", line 1, in <lambda>
ValueError: invalid literal for int() with base 10: 'Hounslow 001A'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


In [27]:
ward_pupils = pupils.aggregateByKey(0, lambda x, y: x+y, lambda x, y: x+y)
ward_pupils.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 29.0 failed 1 times, most recent failure: Lost task 1.0 in stage 29.0 (TID 83, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "D:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 230, in main
  File "D:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 225, in process
  File "D:\Anaconda3\lib\pyspark\rdd.py", line 2457, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "D:\Anaconda3\lib\pyspark\rdd.py", line 2457, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "D:\Anaconda3\lib\pyspark\rdd.py", line 370, in func
    return f(iterator)
  File "D:\Anaconda3\lib\pyspark\rdd.py", line 1876, in combineLocally
    merger.mergeValues(iterator)
  File "D:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py", line 237, in mergeValues
    for k, v in iterator:
  File "D:\Spark\python\lib\pyspark.zip\pyspark\util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-26-22bfc5225d30>", line 1, in <lambda>
ValueError: invalid literal for int() with base 10: 'Hounslow 001A'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:162)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor49.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "D:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 230, in main
  File "D:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 225, in process
  File "D:\Anaconda3\lib\pyspark\rdd.py", line 2457, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "D:\Anaconda3\lib\pyspark\rdd.py", line 2457, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "D:\Anaconda3\lib\pyspark\rdd.py", line 370, in func
    return f(iterator)
  File "D:\Anaconda3\lib\pyspark\rdd.py", line 1876, in combineLocally
    merger.mergeValues(iterator)
  File "D:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py", line 237, in mergeValues
    for k, v in iterator:
  File "D:\Spark\python\lib\pyspark.zip\pyspark\util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-26-22bfc5225d30>", line 1, in <lambda>
ValueError: invalid literal for int() with base 10: 'Hounslow 001A'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


In [36]:
from pyspark.sql import SparkSession
import os, time
import re 

os.environ['SPARK_HOME'] = "D:/Spark"
spark = SparkSession.builder.appName("learning_test").getOrCreate()
sc = spark.sparkContext

school = sc.textFile("C:/Users/Berlin/Python/Spark_Python/school.csv")
rows = school.map(lambda line: re.subn(',[\s]+',': ', line))

rows.collect()
# pupils = rows.map(lambda r: [r[1], int(r[6])])  # r[1]为ward的名字，r[6]为每个学校的学生数
# ward_pupils = pupils.reduceByKey(lambda x, y: x + y)   # 计算各个ward中的学生数
# ward_pupils.collect()  # 输出各个ward中的学生数



[("100000,Sir John Cass's Foundation Primary School,206,Pri2LSOA_Flow1,E01000005,City of London 001E,41",
  0),
 ("100000,Sir John Cass's Foundation Primary School,206,Pri2LSOA_Flow2,E01004310,Tower Hamlets 015E,22",
  0),
 ("100000,Sir John Cass's Foundation Primary School,206,Pri2LSOA_Flow3,E01004307,Tower Hamlets 015B,11",
  0),
 ("100000,Sir John Cass's Foundation Primary School,206,Pri2LSOA_Flow4,E01000003,City of London 001C,9",
  0),
 ("100000,Sir John Cass's Foundation Primary School,206,Pri2LSOA_Flow5,E01004295,Tower Hamlets 026C,6",
  0),
 ("100000,Sir John Cass's Foundation Primary School,206,Pri2LSOA_Flow6,E01004292,Tower Hamlets 026A,5",
  0),
 ("100000,Sir John Cass's Foundation Primary School,206,Pri2LSOA_Flow7,E01004293,Tower Hamlets 027B,5",
  0),
 ("100000,Sir John Cass's Foundation Primary School,206,Pri2LSOA_Flow8,E01004317,Tower Hamlets 009D,5",
  0),
 ("100000,Sir John Cass's Foundation Primary School,206,Pri2LSOA_Flow9,E01000001,City of London 001A,4",
  0),
 ("1