### 核心概念
spark可以分为一个driver（笔记本电脑或集群网关机器）和若干个executor（各个节点），通过SparkContext连接Spark集群、创建RDD、累加器（accumlator）、广播变量（broadcast variables），简单认为SparkContext是Spark程序的根本。

<img src="../imgs/img01.png" width = "250" height = "250" />

在Spark中所有的处理和计算任务都被组织成一系列Resilient Distributed Dataset（弹性分布式数据集，简称RDD）上的transformations（转换）和actions（动作）。

### 初始化RDD方法1

如果你本地内存中已经有一份序列数据（python中的list），你可以通过sc.parallelize去初始化一个RDD。当你执行这个操作时，list中的元素会被自动分块（partitioned），并把每一块送到不同的机器上。

    from pyspark import SparkContext
    sc = SparkContext('local', 'pyspark')
    
    rdd = sc.parallelize([1,2,3,4,5])
    rdd
    
    # 查看在多少个分区
    rdd.getNumPartitions()
    
    # 查看分区状况
    rdd.glom().collect()
    
    # tips：使用sc.parallelize，可以把list，numpy array或pandas series或Pandas Dataframe转成Spark RDD。
   
### 初始化RDD方法2
第2种方法当然是直接读文本到RDD中。
文本中的每一行都被当做item，需要注意的是Spark一般都默认你的路径指向HDFS，若要从本地读取文件的话，需要使用file://开头的全局路径。
    
    import os
    cwd = os.getcwd()
    cwd
    rdd = sc.textFile("file://" + cwd + "/Desktop/spark-example/data/names/yob1880.txt")
    rdd
    rdd.first()
    
其余初始化RDD的方法
    
    1. HDFS上的文件
    2. Hive中的数据库和表
    3. Spark SQL得到的结果
    
#### 练习作业
把names/yob1880.txt文件读成RDD格式的对象，命名成rdd，取出第一行（tips：rdd.first()) 统计行数（tips：rdd.count()）
    
    from pyspark import SparkContext
    sc = SparkContext('local', 'pyspark')
    rdd = sc.textFile("file://" + cwd + "/Desktop/spark-example/data/names/yob1880.txt")
    rdd.first()
    rdd.count()

### RDD transformation
RDD可以经过一系列变化得到新的RDD，RDD上最常用到transformation：
    
    map() 对RDD的每一个item都执行同一个操作
    flatMap() 对RDD中的item执行同一个操作以后得到一个list，然后以平铺的方式把所有的结果组成新的list
    filter() 筛选出来满足条件的item
    distinct() 对RDD中的item去重
    sample() 从RDD中的item中采样一部分，有放回或无放回
    sortBy() 对RDD中的item进行排序
   
    ps: collect()的action可以把item转化为list，Transformation，可以一个接一个地串联。
    
    def odd_opt(x):
        if x % 2 == 1:
            return 2 * x
        else:
            return x
            
    result_rdd = (rdd.map(odd_opt).filter(lambda x: x > 6).distinct())
    result_rdd.collect()
    
#### 练习作业

使用`sc.textFile()`载入`"names/yob1880.txt"`到RDD中<br>
练习`map`, `filter` 和 `distinct`操作<br>
进行组合操作得到所有以M开头的名字组成的RDD<br>
tips：`first()` 这个action也许可以在中间的某些部分帮助到你

rdd.filter(lambda x: x[0] == 'M').distinct().collect()

### RDD间的操作

    rdd1.union(rdd2) 所有rdd1和rdd2中的item组合
    rdd1.intersection(rdd2) rdd1和rdd2的交集
    rdd1.substract(rdd2) 所有在rdd1但不在rdd2中的item（差集）
    rdd1.cartesian(rdd2) rdd1和rdd2中所有元素的笛卡尔乘积



### 惰性计算

Spark中的一个核心概念是惰性计算。当把一个RDD转换为另一个RDD的时候，这个转换不会立即生效。

Spark会把它先记在心里，等到真的需要拿到转换结果的时候，才会重新组织你的transformations。

### 更复杂的transform和action
以元组形式组织的k-v对(key, value)，我们把它叫做pair RDDs，这种item结构的数据，定义了一系列的transform和action。

    reduceByKey() 对所有有着相同key的items执行reduce操作
    groupByKey() 返回类似(key, listOfValues)元组的RDD，后面的value list是同一个key下面的
    sortByKey() 按照key排序
    countByKey() 按照key去对item个数进行统计
    collectAsMap() 和collect有些类似，但返回的是k-v字典
    
下面是spark的一些例子，如何使用spark如何做统计？

    rdd = sc.parallelize(["Hello hello", "Hello New Bob", "York says hello"])
    result_rdd = (rdd.flatMap(lambda x: x.split(" ")).map(lambda word: word.lower()).map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y))
    result_rdd.collect()
    
    [('hello', 4), ('new', 1), ('bob', 1), ('york', 1), ('says', 1)]
    
    我们将结果以k-v字典形式返回
    result = result_rdd.collectAsMap()
    result
    
    {'hello': 4, 'new': 1, 'bob': 1, 'york': 1, 'says': 1}

    如果你想要出现频次最高的2个词，可以这样做：
    
    result_rdd.sortBy(keyfunc=lambda(word, count): count, ascending=False).take(2)
    

给定2个RDD后，可以通过一个类似SQL的方式join他们。

    ##### Home of different people
    homesRDD = sc.parallelize([
            ('Brussels', 'John'),
            ('Brussels', 'Jack'),
            ('Leuven', 'Jane'),
            ('Antwerp', 'Jill'),
        ])

    ##### Quality of life index for various cities
    lifeQualityRDD = sc.parallelize([
            ('Brussels', 10),
            ('Antwerp', 7),
            ('RestOfFlanders', 5),
        ])

    homesRDD.join(lifeQualityRDD).collect()
    
    homesRDD.leftOuterJoin(lifeQualityRDD).collect()
    
    homesRDD.rightOuterJoin(lifeQualityRDD).collect()

### Spark SQL

Spark SQL是架在Spark core之上允许我们用SQL去操作大型数据库的高级层

Spark SQL里有一种新RDD，叫做SchemaRDD，其实你可以把他看做一个table，SchemaRDD类似一种图表的RDD，每个item都是一个Row。

### <font color='red'>练习作业</font><br>
**回顾一下，我们可以在pandas里定义这样一个dataframe**
```
data = {
    'country': ['BE', 'BE', 'BE', 'NL', 'NL', 'NL'],
    'year': [1913, 1950, 2003, 1913, 1950, 2003],
    'gdp_per_capita': [4220, 5462, 21205, 4049, 5996, 21480]
}
frame = DataFrame(data)
```
**请大家在Spark中生成一个SchemaRDD，再做一个简单的SQL分析：求这3年各首都的平均GDP**

from pyspark import SQLContext, Row

sqlCtx = SQLContext(sc)

rdd = sc.parallelize([Row(country='BE', year=1913, gdp_per_capita=4220), Row(country='BE', year=1950, gdp_per_capita=5462), Row(country='BE', year=2003, gdp_per_capita=21205), Row(country='NL', year=1913, gdp_per_capita=4049), Row(country='NL', year=1950, gdp_per_capita=5996), Row(country='NL', year=2003, gdp_per_capita=21480)])

schemaRDD = sqlCtx.createDataFrame(rdd)

schemaRDD.registerTempTable("country")

avgGDP = sqlCtx.sql("select country, AVG(gdp_per_capita) as gdp from country group by country")

avgGDP.collect()

[Row(country='NL', gdp=10508.333333333334), Row(country='BE', gdp=10295.666666666666)]



### 练习作业

sc.textFile, union 和 map练习。 把提供的names数据集构建成一个SchemaRDD，包含year, name, sex和births，把它注册成一个表"names"

1.求从1939到1945年美国总出生的人口

2.统计从1880到2014每一年叫“Mary”的宝宝出生数目，并用matplotlib绘制成图像。

3.统计从1880到2014每一年男孩和女生的出生数，并绘制在一张图中。

4.统计出来每一年出生的宝宝频次最高的前1000个名字，以及它们的占比，并用绘图方式去展示出来。

In [None]:
import os
from pyspark import SQLContext, Row 

sqlCtx = SQLContext(sc)

files = os.listdir('/Users/hammer/Desktop/spark-example/data/names/')
for f in files:
    if f == 'yob1880.txt':
        y = int(f[3:7])
        text_file_path = '/Users/hammer/Desktop/spark-example/data/names/' + f 
        print(text_file_path)
        lines = sc.textFile(text_file_path)
        
        parts = lines.map(lambda l: l.split(","))
        people1 = parts.map(lambda p: Row(year=y, name=p[0], sex=p[1], births=p[2]))
    elif f[0] != '.':
        y = int(f[3:7])
        text_file_path = '/Users/hammer/Desktop/spark-example/data/names/' + f 
        print(text_file_path)
        lines = sc.textFile(text_file_path)
        
        parts = lines.map(lambda l: l.split(","))
        people2 = parts.map(lambda p: Row(year=y, name=p[0], sex=p[1], births=p[2]))
        people1 = people1.union(people2)
        
schemaRDD = sqlCtx.createDataFrame(people1)

schemaRDD.registerTempTable("names")
#print(people1.count(), people2.count())

# 1.求从1939到1945年美国总出生的人口
cnt = sqlCtx.sql("select Count(name) as cnt from names where year >= 1939 and year <= 1945")
cnt.collect() # [Row(cnt=63973)] 

# 2.统计从1880到2014每一年叫“Mary”的宝宝出生数目，并用matplotlib绘制成图像。
Mary_cnt = sqlCtx.sql("select Count(name) as mary_cnt from names where year >= 1880 and year <= 2014 and name = 'Mary' group by year ")
Mary_cnt.collect()

# 3.统计从1880到2014每一年男孩和女生的出生数
Mary_F_cnt = sqlCtx.sql("select Count(name) as name_cnt from names where sex='F' and year >= 1880 and year <= 2014 group by year")
Mary_M_cnt = sqlCtx.sql("select Count(name) as name_cnt from names where sex='M' and year >= 1880 and year <= 2014 group by year")

# 4.统计出来每一年出生的宝宝频次最高的前1000个名字
Top_1000 = sqlCtx.sql("select name, Count(name) as name_cnt from names where year >= 1880 and year <= 2014 group by year, name order by name_cnt desc  limit 1000")

