In [1]:
import findspark
findspark.init("C:\Spark")
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import SparkContext
sc = SparkContext("local", "Simple")

In [2]:
spark=SparkSession.builder.getOrCreate()

In [3]:
df = spark.read.json("people.json")

In [4]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
# 打印模式信息

In [6]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [7]:
# 选择多列

In [8]:
df.select(df.name, df.age + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [9]:
# 条件过滤

In [10]:
df.filter(df.age > 20).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [11]:
# 分组聚合

In [12]:
df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



In [13]:
# 排序

In [14]:
df.sort(df.age.desc()).show()

+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  19| Justin|
|null|Michael|
+----+-------+



In [15]:
# 多列排序

In [16]:
df.sort(df.age.desc(), df.name.asc()).show()

+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  19| Justin|
|null|Michael|
+----+-------+



In [17]:
# 对列进行重命名

In [18]:
df.select(df.name.alias("username"), df.age).show()

+--------+----+
|username| age|
+--------+----+
| Michael|null|
|    Andy|  30|
|  Justin|  19|
+--------+----+



In [19]:
# 利用反射机制推断RDD模式

In [20]:
from pyspark.sql.types import Row

In [21]:
def f(x):
    rel = {}
    rel['name'] = x[0]
    rel['age'] = x[1]
    return rel

In [22]:
peopleDF = sc.textFile("people.txt").map(lambda line: line.split(",")).map(lambda x: Row(**f(x))).toDF()

In [23]:
# 必须注册为临时表才能提供下面的查询

In [24]:
peopleDF.createOrReplaceTempView("people")

In [25]:
personsDF = spark.sql("select * from people")

In [26]:
personsDF.rdd.map(lambda t : "Name:"+t[0]+","+"Age:"+t[1]).foreach(print)

In [27]:
for i in personsDF.rdd.map(lambda t : "Name:"+t[0]+","+"Age:"+t[1]).collect():
    print(i)

Name: 29,Age:Michael
Name: 30,Age:Andy
Name: 19,Age:Justin


In [28]:
# 使用编程方式定义RDD模式

In [29]:
from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

In [30]:
peopleRDD = sc.textFile("people.txt")

In [31]:
# 定义一个模式字符串

In [32]:
schemaString = "name age"

In [33]:
# 根据模式字符串生成模式

In [34]:
fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True), schemaString.split(" ")))

In [35]:
schema = StructType(fields)

In [36]:
rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1]))

In [40]:
peopleDF = spark.createDataFrame(rowRDD, schema)

In [41]:
# 必须注册成为临时表才能供下面查询使用

In [42]:
results  = spark.sql("SELECT * FROM people")

In [43]:
results.rdd.map( lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]).foreach(print)

In [45]:
for i in results.rdd.map( lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]).collect():
    print(i)

name:  29,age:Michael
name:  30,age:Andy
name:  19,age:Justin


In [46]:
# 把RDD保存成文件

In [47]:
peopleDF = spark.read.format("json").load("people.json")

In [48]:
 peopleDF.select("name", "age").write.format("csv").save("newpeople.csv")

In [49]:
peopleDF.rdd.saveAsTextFile("newpeople.txt")

In [50]:
# 创建StreamingContext对象

In [51]:
from pyspark import SparkContext

In [53]:
from pyspark.streaming import StreamingContext

In [54]:
ssc = StreamingContext(sc, 1)

In [55]:
# spark sql的运行机制

Spark SQL对SQL语句的处理和关系型数据库类似，即词法/语法解析、绑定、优化、执行。Spark SQL会先将SQL语句解析成一棵树，然后使用规则(Rule)对Tree进行绑定、优化等处理过程。

Spark SQL由Core、Catalyst、Hive、Hive-ThriftServer四部分构成：

Core: 负责处理数据的输入和输出，如获取数据，查询结果输出成DataFrame等

Catalyst: 负责处理整个查询过程，包括解析、绑定、优化等

Hive: 负责对Hive数据进行处理

Hive-ThriftServer: 主要用于对hive的访问


In [None]:
Spark SQL运行原理

1. 使用SessionCatalog保存元数据

在解析SQL语句之前，会创建SparkSession，或者如果是2.0之前的版本初始化SQLContext，SparkSession只是封装了SparkContext和SQLContext的创建而已。会把元数据保存在SessionCatalog中，涉及到表名，字段名称和字段类型。创建临时表或者视图，其实就会往SessionCatalog注册

2. 解析SQL,使用ANTLR生成未绑定的逻辑计划

当调用SparkSession的sql或者SQLContext的sql方法，我们以2.0为准，就会使用SparkSqlParser进行解析SQL. 使用的ANTLR进行词法解析和语法解析。它分为2个步骤来生成Unresolved LogicalPlan：

3. 使用分析器Analyzer绑定逻辑计划

在该阶段，Analyzer会使用Analyzer Rules，并结合SessionCatalog，对未绑定的逻辑计划进行解析，生成已绑定的逻辑计划。

4. 使用优化器Optimizer优化逻辑计划

优化器也是会定义一套Rules，利用这些Rule对逻辑计划和Exepression进行迭代处理，从而使得树的节点进行和并和优化

5. 使用SparkPlanner生成物理计划

SparkSpanner使用Planning Strategies，对优化后的逻辑计划进行转换，生成可以执行的物理计划SparkPlan.

6. 使用QueryExecution执行物理计划

此时调用SparkPlan的execute方法，底层其实已经再触发JOB了，然后返回RDD
--------------------- 
参考原文链接：https://blog.csdn.net/zhanglh046/article/details/78360980