In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("Spark_CLOB_Split") \
.config("hive.metastore.sasl.enabled", "true") \
.enableHiveSupport() \
.getOrCreate()

In [5]:
# 定义 sparkContext,这个环境被包含在了 SparkSession 之中。
sc = spark.sparkContext

In [4]:
#1.创建RDD，json数据
stringJsonRDD = sc.parallelize(("""{"id":1,"name":"A","age":19,"eyeColor":"brown"}""",
                               """{"id":2,"name":"B","age":21,"eyeColor":"blue"}""",
                               """{"id":3,"name":"C","age":18,"eyeColor":"green"}"""))

In [5]:
#2.利用spark.read.json()的方法将其转换为DataFrame
swimmersJSON = spark.read.json(stringJsonRDD)

In [6]:
#3.创建一个临时表
swimmersJSON.createOrReplaceTempView("swimmersJSON")

In [7]:
#4.使用DataFrame API来查询 show()
swimmersJSON.show()

+---+--------+---+----+
|age|eyeColor| id|name|
+---+--------+---+----+
| 19|   brown|  1|   A|
| 21|    blue|  2|   B|
| 18|   green|  3|   C|
+---+--------+---+----+



In [10]:
#5.对DataFrame的SQL查询
spark.sql("select * from swimmersJSON").collect()
#spark.sql("select * from swimmersJSON").take(2)
#spark.sql("select * from swimmersJSON").show(2)

[Row(age=19, eyeColor='brown', id=1, name='A'),
 Row(age=21, eyeColor='blue', id=2, name='B'),
 Row(age=18, eyeColor='green', id=3, name='C')]

In [11]:
#6.编程指定模式
#1）打印数据模式
swimmersJSON.printSchema() #age:long type

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [13]:
#2）指定模式
from pyspark.sql.types import *
stringCSVRDD = sc.parallelize([(1,"A",19,"brown"),(2,"B",21,"blue"),(3,"C",18,"green")])
#设定模式
schema = StructType([
    StructField("id",LongType(),True),
    StructField("name",StringType(),True),
    StructField("age",LongType(),True),
    StructField("eyeColor",StringType(),True) 
]) #字段名字，字段类型，字段是否为空

In [16]:
#对RDD应用该模式，并创建DataFrame:spark.createDataFrame(RDD,schema)
swimmers = spark.createDataFrame(stringCSVRDD,schema)
#利用DataFrame创建临时视图，才可以应用SQL查询
swimmers.createOrReplaceTempView("swimmers")
swimmers.show()

+---+----+---+--------+
| id|name|age|eyeColor|
+---+----+---+--------+
|  1|   A| 19|   brown|
|  2|   B| 21|    blue|
|  3|   C| 18|   green|
+---+----+---+--------+



In [17]:
swimmers.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



In [18]:
#7.利用DataFrame API 查询
# 得到DataFrame 行数
swimmers.count()

3

In [20]:
#运行筛选语句，使用filter筛选语句
#获取age=21的id
swimmers.select("id","age").filter("age=21").show()

+---+---+
| id|age|
+---+---+
|  2| 21|
+---+---+



In [21]:
#另一种写法
swimmers.select(swimmers.id,swimmers.age).filter(swimmers.age == 21).show()

+---+---+
| id|age|
+---+---+
|  2| 21|
+---+---+



In [23]:
#若想获得眼睛颜色以b开头的运动员名字，可以使用类似于SQL语句的like，用法为 like 'b%'
swimmers.select("name","eyeColor").filter("eyeColor like 'b%'").show()

+----+--------+
|name|eyeColor|
+----+--------+
|   A|   brown|
|   B|    blue|
+----+--------+



In [26]:
#7.对DataFrame利用SQL查询
#注：DataFrame是可以访问的，因为我们对swimmers执行了createOrReplaceTempView

#行数
spark.sql("select count(1) from swimmers").show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



In [28]:
#利用where子句运行筛选语句
spark.sql("select id,age from swimmers where age=21").show()

+---+---+
| id|age|
+---+---+
|  2| 21|
+---+---+



In [29]:
spark.sql("select name,eyeColor from swimmers where eyeColor like 'b%'").show()

+----+--------+
|name|eyeColor|
+----+--------+
|   A|   brown|
|   B|    blue|
+----+--------+



In [10]:
#8.DataFrame 场景-实时飞行性能

#获得数据集，并创建视图
flightPerfFilePath = 'file:///home/yanglab/lianhaimiao/PySpark/spark_data/flight-data/departuredelays.csv'
airpathsFilePath = 'file:///home/yanglab/lianhaimiao/PySpark/spark_data/flight-data/airport-codes-na.txt'

airports = spark.read.csv(airpathsFilePath,header = 'true', inferSchema = 'true', sep = '\t')
airports.createOrReplaceTempView("airports")

flightPerf = spark.read.csv(flightPerfFilePath,header = 'true')
flightPerf.createOrReplaceTempView("FlightPerformance")

#cache
flightPerf.cache()

DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

In [12]:
airports.show(10)

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+
only showing top 10 rows



In [13]:
flightPerf.show(10)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 10 rows



In [19]:
#通过 城市City 和 起飞代码origin 查询 华盛顿州WA 的 航班延误总数Delays
#1)对某一组内的delay求和
#2) 根据a.IATA = f.origin 连接两个表
#3) 筛选WA的数据
#4) 根据a.City, f.origin分组
#5) 按照sum(f.delay)降序排序，没有desc默认为升序
spark.sql("""
select a.City, f.origin, sum(f.delay) as Delays  
       from FlightPerformance f
       join airports a
        on a.IATA = f.origin                    
        where a.State = 'WA'                    
        group by a.City, f.origin               
        order by sum(f.delay) desc              
""").show()

+-------+------+--------+
|   City|origin|  Delays|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+



In [22]:
#分析美国大陆上的所有州
spark.sql("""
select a.State, sum(f.delay) as Delays
      from FlightPerformance f
       join airports a
        on a.IATA = f.origin  
        where a.Country = 'USA'
        group by a.State
        order by sum(f.delay) desc 
""").show(10)

+-----+---------+
|State|   Delays|
+-----+---------+
|   TX|1994943.0|
|   CA|1891919.0|
|   IL|1630792.0|
|   FL|1531877.0|
|   GA|1191014.0|
|   CO| 963061.0|
|   NY| 878929.0|
|   NV| 474208.0|
|   NJ| 452791.0|
|   AZ| 401793.0|
+-----+---------+
only showing top 10 rows

