# spark sql study

In [15]:
sc?

In [27]:
lines=sc.parallelize(["pan","i like pan"])

In [36]:
lines.first()

'pan'

In [37]:
lines.count()

2

In [2]:
for i in range(1):
    print i*i

0


In [6]:
from pyspark.sql import HiveContext,Row

In [7]:
from pyspark.sql import SQLContext,Row

In [None]:
sqlctx=sqlContextl(sc)

In [8]:
hivectx=HiveContext(sc)

入门

In [21]:
from pyspark.sql import SparkSession
spark=SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option","some-value") \
    .getOrCreate()

In [1]:
df=spark.read.json("../../examples/src/main/resources/people.json")
#df=spark.read.json("./examples/src/main/resources/people.json")
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [2]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [4]:
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [8]:
df.select(df['name'],df['age'],df['age']+1).show()

+-------+----+---------+
|   name| age|(age + 1)|
+-------+----+---------+
|Michael|null|     null|
|   Andy|  30|       31|
| Justin|  19|       20|
+-------+----+---------+



In [9]:
df.filter(df['age']>21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [11]:
df.groupby('age').count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



In [12]:
df.createOrReplaceTempView('people')

In [13]:
sqlDF=spark.sql("select * from people")

In [14]:
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



Global Temporary View

In [15]:
df.createGlobalTempView('people')

In [17]:
spark.sql("select * from global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [18]:
spark.newSession().sql("select * from global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



创建数据集(Dataset)

In [16]:
from pyspark.sql import Row

sc=spark.sparkContext

# Load a text file and convert each line to a Row.
lines=sc.textFile("examples/src/main/resources/people.txt")
parts=lines.map(lambda l:l.split(","))
people=parts.map(lambda p: Row(name=p[0],age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople =spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
#spark.sql("select * from people").show()
#SQL can be run over DataFrames that have been registered as a table.
teenagers=spark.sql("select name from people where age>=13 and age <=19")
teenagers.show()
# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.

teenNames=teenagers.rdd.map(lambda p: "Name: " +p.name).collect()
for name in teenNames:
    print(name)

# for name in teenagers:
#     print(name)




+------+
|  name|
+------+
|Justin|
+------+

Name: Justin


In [26]:
#import data types
from pyspark.sql.types import *

sc=spark.sparkContext

lines=sc.textFile("examples/src/main/resources/people.txt")
parts=lines.map(lambda l:l.split(","))
# Each line is converted to a tuple.
people=parts.map(lambda p:(p[0],p[1].strip()))

# The schema is encoded in a string.
schemaString="name age"

fields=[StructField(field_name,StringType(),True) for field_name in schemaString.split()]
schema=StructType(fields)

# Apply the schema to the RDD.
schemaPeople=spark.createDataFrame(people,schema)

schemaPeople.createOrReplaceTempView("people")

#results=spark.sql("select name from people")
results=spark.sql("select * from people")
results.show()





+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



In [30]:
df=spark.read.load("examples/src/main/resources/users.parquet")
df.select("name","favorite_color").write.save("namesAndFavColors.parquet")
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [35]:
df=spark.sql("select * from parquet.`examples/src/main/resources/users.parquet`")
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



JSON Datasets

In [36]:
sc=spark.sparkContext
path="examples/src/main/resources/people.json"
peopleDF=spark.read.json(path)

peopleDF.printSchema()

peopleDF.createOrReplaceTempView("people")

teenagerNamesDF=spark.sql("select name from people where age between 13 and 19")
teenagerNamesDF.show()

jsonString=['{"name":"yin","adress":{"city":"Beijing","state":"xizhimen"}}']
otehpeopleRDD=sc.parallelize(jsonString)
otherpeople=spark.read.json(otehpeopleRDD)
otherpeople.show()




root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+------+
|  name|
+------+
|Justin|
+------+

+------------------+----+
|            adress|name|
+------------------+----+
|[Beijing,xizhimen]| yin|
+------------------+----+



# JDBC To Other Databases

In [None]:
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

jdbcDF2 = spark.read \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Saving data to a JDBC source
jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

分布式SQL引擎

In [None]:
#Thrift JDBC / ODBC服务器：
./bin/beeline
beeline> !connect jdbc:hive2://localhost:10000

# 开始测试

In [111]:
sc=spark.sparkContext
path="../../spark-test.csv"
financeDF=spark.read.csv(path,header=True)
financeDF.printSchema()

# financeDF.createGlobalTempView("finance")
#df.registerTempTable("people");  --注册临时表
# teenagerNamesDF=spark.sql("select * from global_temp.finance limit 10 ")
# teenagerNamesDF.cache("fa")
# teenagerNamesDF.show()

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- subject_code: string (nullable = true)
 |-- dept_code: string (nullable = true)
 |-- group_organ_code: string (nullable = true)
 |-- money: string (nullable = true)
 |-- area: string (nullable = true)
 |-- large_area: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- items: string (nullable = true)
 |-- data_type: string (nullable = true)
 |-- subject_detail: string (nullable = true)
 |-- subject_type: string (nullable = true)
 |-- subject_detail_type: string (nullable = true)
 |-- subject_type_8: string (nullable = true)
 |-- subject_type_15: string (nullable = true)
 |-- total_subject_type: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- create_by: string (nullable = true)
 |-- create_datetime: string (nullable = true)
 |-- update_by: string (nullable = true)
 |-- update_datetime: string (nullable = true)



In [None]:
financeDF.createGlobalTempView("finance")

In [112]:
teenagerNamesDF=spark.sql("""select date as `中文` from global_temp.finance limit 1""").show()      
# teenagerNamesDF.show()

+--------+
|      中文|
+--------+
|2016/4/1|
+--------+



In [40]:
teenagerNamesDF=spark.sql(""" select date  ,substring(data_type,5) as data_type,
sum( case when subject_type='收入' then money else 0 end )  `收入`
from global_temp.finance 
group by date,data_type
order by date,data_type
  """)
teenagerNamesDF.show()


+--------+---------+-----------------+
|    date|data_type|               收入|
+--------+---------+-----------------+
|2016/4/1|       实际|       1310081.17|
|2016/5/1|       实际|5463629.550000001|
|2016/6/1|       实际|             40.0|
+--------+---------+-----------------+



In [6]:
teenagerNamesDF.createGlobalTempView("finance_basic")
fbDF=spark.sql("""select * from global_temp.finance_basic""")
fbDF.show()

# teenagerNamesDF.createOrReplaceTempView("finance_basic")
# fbDF=spark.sql("""select * from finance_basic""")
# fbDF.show()

+--------+---------+-----------------+
|    date|data_type|               收入|
+--------+---------+-----------------+
|2016/4/1|       实际|       1310081.17|
|2016/5/1|       实际|5463629.550000001|
|2016/6/1|       实际|             40.0|
+--------+---------+-----------------+



In [121]:
fbDF=spark.sql("""select * from global_temp.finance_basic limit 1""")
fbDF.show()

+--------+---------+----------+
|    date|data_type|        收入|
+--------+---------+----------+
|2016/4/1|       实际|1310081.17|
+--------+---------+----------+



In [171]:
spark.sql(""" select t.date,CAST(t.date as string) ,CAST(t.date as DATE)-- ,datediff("2009-03-01", "2009-02-27")
--,datediff("2018-01-01",CAST(t.date as string) )
,trunc(date,"yyyy-MM-dd"),trunc("2018-01-03","YYYY-MM")
from global_temp.finance t limit 2 """).show()

+--------+--------+----+-------------------------------------+----------------------------------------+
|    date|    date|date|trunc(CAST(date AS DATE), yyyy-MM-dd)|trunc(CAST(2018-01-03 AS DATE), YYYY-MM)|
+--------+--------+----+-------------------------------------+----------------------------------------+
|2016/4/1|2016/4/1|null|                                 null|                                    null|
|2016/4/1|2016/4/1|null|                                 null|                                    null|
+--------+--------+----+-------------------------------------+----------------------------------------+



In [229]:

spark.sql(""" select date 
--,to_date('2018-01-03') ,cast(regexp_replace(date,"/","-") as date ),CURRENT_DATE
--,date_format(cast(regexp_replace(date,"/","-") as date),"yyyy-MM") as `月份`
--,from_unixtime(unix_timestamp(date,'yyyy/MM/dd'),'yyyy-MM')
--,trunc(regexp_replace(date,"/","-") ,"YYYY"),trunc(regexp_replace(date,"/","-") ,"MM")
--,add_months(regexp_replace(date,"/","-"),-12)
,date_format(add_months(regexp_replace(date,"/","-"),-12),"yyyy-MM") as `去年同期月份`
,date_format(add_months(regexp_replace(date,"/","-"),-3),"yyyy")  as `财年`
,subject_type,case when subject_type='收入' then money else 0 end  as `钱`
from global_temp.finance where subject_type='收入' limit 5 """) #.show()

# teenagerNamesDF1=spark.sql(""" select date from global_temp.finance  limit 10 """)  

DataFrame[date: string, 去年同期月份: string, 财年: string, subject_type: string, 钱: string]

In [520]:
#支持 with 
spark.sql(""" with aa as (select * from global_temp.finance  )
  ,bb as ( select a1.`区域`,a1.`数据类型2`
  from aa a1 
  left join aa a0 on a1.`区域`=a0.`区域` and a1.`数据类型2`=a0.`数据类型2` and a1.`去年同期月份`=a0.`月份` )
select * from bb where bb.`区域`="北京"  
""").show()

ERROR:root:An unexpected error occurred while tokenizing input
The following traceback may be corrupted or invalid
The error message is: ('EOF in multi-line string', (1, 0))



AnalysisException: u"cannot resolve '`a1.\u533a\u57df`' given input columns: [company_name, subject_detail, create_datetime, id, money, date, update_datetime, subject_type_15, subject_detail, subject_detail_type, subject_type_8, data_type, date, update_datetime, large_area, subject_detail_type, area, data_type, total_subject_type, area, subject_type_8, dept, money, update_by, subject_type_15, update_by, create_by, id, group_organ_code, items, total_subject_type, create_by, dept, subject_code, subject_type, subject_code, items, subject_type, group_organ_code, dept_code, company_name, dept_code, create_datetime, large_area]; line 4 pos 21;\n'Project [*]\n+- 'Filter ('bb.\u533a\u57df = \u5317\u4eac)\n   +- 'SubqueryAlias bb\n      +- 'Project ['a1.\u533a\u57df, 'a1.\u6570\u636e\u7c7b\u578b2]\n         +- 'Join LeftOuter, ((('a1.\u533a\u57df = 'a0.\u533a\u57df) && ('a1.\u6570\u636e\u7c7b\u578b2 = 'a0.\u6570\u636e\u7c7b\u578b2)) && ('a1.\u53bb\u5e74\u540c\u671f\u6708\u4efd = 'a0.\u6708\u4efd))\n            :- SubqueryAlias a1\n            :  +- SubqueryAlias aa\n            :     +- Project [id#0, date#1, subject_code#2, dept_code#3, group_organ_code#4, money#5, area#6, large_area#7, dept#8, items#9, data_type#10, subject_detail#11, subject_type#12, subject_detail_type#13, subject_type_8#14, subject_type_15#15, total_subject_type#16, company_name#17, create_by#18, create_datetime#19, update_by#20, update_datetime#21]\n            :        +- SubqueryAlias finance, `global_temp`.`finance`\n            :           +- Relation[id#0,date#1,subject_code#2,dept_code#3,group_organ_code#4,money#5,area#6,large_area#7,dept#8,items#9,data_type#10,subject_detail#11,subject_type#12,subject_detail_type#13,subject_type_8#14,subject_type_15#15,total_subject_type#16,company_name#17,create_by#18,create_datetime#19,update_by#20,update_datetime#21] csv\n            +- SubqueryAlias a0\n               +- SubqueryAlias aa\n                  +- Project [id#8026, date#8027, subject_code#8028, dept_code#8029, group_organ_code#8030, money#8031, area#8032, large_area#8033, dept#8034, items#8035, data_type#8036, subject_detail#8037, subject_type#8038, subject_detail_type#8039, subject_type_8#8040, subject_type_15#8041, total_subject_type#8042, company_name#8043, create_by#8044, create_datetime#8045, update_by#8046, update_datetime#8047]\n                     +- SubqueryAlias finance, `global_temp`.`finance`\n                        +- Relation[id#8026,date#8027,subject_code#8028,dept_code#8029,group_organ_code#8030,money#8031,area#8032,large_area#8033,dept#8034,items#8035,data_type#8036,subject_detail#8037,subject_type#8038,subject_detail_type#8039,subject_type_8#8040,subject_type_15#8041,total_subject_type#8042,company_name#8043,create_by#8044,create_datetime#8045,update_by#8046,update_datetime#8047] csv\n"

In [485]:
# 报错了先查看 表是否写的正确   ,注意 别名 为中文的字段，引用时也要加 限定符 ``  ex:select * from aa where aa.`区域`="北京"
teenagerNamesDF=spark.sql(""" with aa as ( select date_format(add_months(regexp_replace(date,"/","-"),-3),"yyyy")  as `财年`
,date_format(cast(regexp_replace(date,"/","-") as date),"yyyy-MM") as `月份`
,date_format(add_months(regexp_replace(date,"/","-"),-12),"yyyy-MM") as `去年同期月份`
,date `日期`,area `区域`,data_type `数据类型`,substring(data_type,5) `数据类型2`,
sum( case when subject_type='收入' then money else 0 end ) as `全收入`,
sum( case when subject_type='收入' and subject_code='60011108' then money else 0 end ) as `齿科双算`, -- 60011108收入 
sum( case when subject_detail_type='体检收入' then money else 0 end ) as `体检收入`,
sum( case when subject_code='64010201' or subject_code='64010202' then money else 0 end ) as  `变动成本1` ,   --64010201 and 64010202
sum( case when subject_detail_type='疾病检测收入' then money else 0 end ) as `疾病检测收入`,
sum( case when subject_detail_type='齿科收入' then money else 0 end ) as `齿科收入`,
sum( case when subject_code='64010221' then money else 0 end ) as `变动成本3`   ,  --64010221
sum( case when subject_detail_type='门诊收入' then money else 0 end ) as `门诊收入`,
sum( case when subject_code='64010222' then money else 0 end ) as `变动成本4`  , --64010222
sum( case when subject_detail_type='医疗管理收入' then money else 0 end ) as `医疗管理收入`,
sum( case when subject_code='64010211' then money else 0 end ) as `变动成本5`,   --64010211
sum( case when subject_detail_type='销售商品收入' then money else 0 end ) as `销售商品收入`,
sum( case when subject_detail_type='其他收入' then money else 0 end ) as `其他收入`,
sum( case when subject_detail_type='落关联成本' then money else 0 end ) as `落关联成本`,
sum( case when subject_type='成本费用' then money else 0 end ) as `成本费用`, 
nvl( sum( case when subject_type='收入' then money else 0 end ),0)-nvl(sum( case when subject_type='收入' and subject_code='60011108' then money else 0 end ),0) as `收入`,
nvl( sum( case when subject_type='收入' then money else 0 end ),0)-nvl(sum( case when subject_type='成本费用' then money else 0 end ) ,0) as `税前利润`,
case when area in ('西康','华检','元化医疗','健维管理','臻景','香港','BVI') then '非一体化'  else '一体化' end as `是否一体化`
from global_temp.finance
group by date_format(add_months(regexp_replace(date,"/","-"),-3),"yyyy")
    ,date_format(cast(regexp_replace(date,"/","-") as date),"yyyy-MM")
    ,date_format(add_months(regexp_replace(date,"/","-"),-12),"yyyy-MM")
    ,date ,area ,data_type,substring(data_type,5)
    ,case when area in ('西康','华检','元化医疗','健维管理','臻景','香港','BVI') then '非一体化' else '一体化' end 
)
,a1 as ( --今年和去年同期 处理为一条
select a1.`财年`,a1.`月份`,a1.`区域`,a1.`数据类型`,a1.`数据类型2`,a1.`是否一体化`,a1.`体检收入`,a0.`体检收入`  as `去年同期体检收入`
from aa a1
left join aa a0 on a1.`区域`=a0.`区域` and a1.`数据类型2`=a0.`数据类型2` and a1.`去年同期月份`=a0.`月份`
)
--select * from aa where aa.`区域`="北京"
select * from a1 where a1.`区域`="北京"
""")

teenagerNamesDF.show()

+----+-------+---+------+-----+-----+----------+--------+
|  财年|     月份| 区域|  数据类型|数据类型2|是否一体化|      体检收入|去年同期体检收入|
+----+-------+---+------+-----+-----+----------+--------+
|2016|2016-05| 北京|2016实际|   实际|  一体化|5268518.68|    null|
|2016|2016-04| 北京|2016实际|   实际|  一体化|    1220.0|    null|
+----+-------+---+------+-----+-----+----------+--------+



In [None]:
,a1 as ( --今年和去年同期 处理为一条
select 
a1.财年,a1.月份,a1.区域,a1.数据类型,a1.数据类型2,a1.是否一体化, 
a1.体检收入,a1.变动成本1,a1.疾病检测收入,a1.齿科收入,a1.变动成本3,a1.门诊收入,a1.变动成本4,a1.医疗管理收入,
a1.变动成本5,a1.销售商品收入,a1.其他收入,a1.落关联成本,a1.收入,
a0.体检收入  as 去年同期体检收入,  a0.变动成本1 as 去年同期变动成本1 ,  a0.疾病检测收入 as 去年同期疾病检测收入,
a0.齿科收入 as 去年同期齿科收入,    a0.变动成本3 as 去年同期变动成本3,   a0.门诊收入 as 去年同期门诊收入,    
a0.变动成本4  as 去年同期变动成本4,   a0.医疗管理收入  as 去年同期医疗管理收入,a0.变动成本5 as 去年同期变动成本5,   
a1.销售商品收入 as 去年同期销售商品收入,a1.其他收入 as 去年同期其他收入,    a1.落关联成本 as 去年同期落关联成本,  
a1.收入 as 去年同期收入
from a0 a1
left join a0 on a1.区域=a0.区域 and a1.数据类型2=a0.数据类型2 and a1.去年同期月份=a0.月份
)
select * from a1 

In [275]:
# spark.sql(""" select date `日期` ,substring(data_type,5) as data_type,
# sum( case when subject_type='收入' then money else 0 end )  `收入`
# ,sum( case when subject_type='收入' and subject_code='60011108' then money else 0 end ) as `齿科双算` 
# from global_temp.finance 
# group by date ,area ,data_type,substring(data_type,5)
# order by date,data_type
#   """).show()

spark.sql("""select date `日期`,area `区域`,data_type `数据类型`,substring(data_type,5) `数据类型2`,
sum( case when subject_type='收入' then money else 0 end ) as `全收入`,
sum( case when subject_type='收入' and subject_code='60011108' then money else 0 end ) as `齿科双算`
from global_temp.finance
group by date ,area ,data_type,substring(data_type,5)
    """).show()


# spark.sql("""select date `日期`,area `区域`,data_type `数据类型`,substring(data_type,5) `数据类型2`,
# sum( case when subject_type='收入' then money else 0 end ) as `全收入`,
# sum( case when subject_type='收入' and subject_code='60011108' then money else 0 end ) as `齿科双算`
# from global_temp.finace
# group by date ,area ,data_type,substring(data_type,5)
#     """).show()

+--------+---+------+-----+---------+----+
|      日期| 区域|  数据类型|数据类型2|      全收入|齿科双算|
+--------+---+------+-----+---------+----+
|2016/5/1| 江阴|2016实际|   实际|      0.0| 0.0|
|2016/4/1| 江阴|2016实际|   实际|      0.0| 0.0|
|2016/5/1| 广州|2016实际|   实际|      0.0| 0.0|
|2016/5/1| 杭州|2016实际|   实际|      0.0| 0.0|
|2016/4/1| 常州|2016实际|   实际|      0.0| 0.0|
|2016/6/1| 电商|2016实际|   实际|      0.0| 0.0|
|2016/4/1| 潍坊|2016实际|   实际|      0.0| 0.0|
|2016/5/1| 芜湖|2016实际|   实际|      0.0| 0.0|
|2016/5/1| 深圳|2016实际|   实际|      0.0| 0.0|
|2016/5/1| 佛山|2016实际|   实际|      0.0| 0.0|
|2016/5/1| 沈阳|2016实际|   实际|150239.03| 0.0|
|2016/4/1| 天津|2016实际|   实际|      0.0| 0.0|
|2016/5/1| 天津|2016实际|   实际|      0.0| 0.0|
|2016/5/1| 苏州|2016实际|   实际|      0.0| 0.0|
|2016/6/1| 总部|2016实际|   实际|      0.0| 0.0|
|2016/5/1| 重庆|2016实际|   实际|      0.0| 0.0|
|2016/5/1| JT|2016实际|   实际|      0.0| 0.0|
|2016/6/1| 杭州|2016实际|   实际|      0.0| 0.0|
|2016/5/1| 威海|2016实际|   实际|      0.0| 0.0|
|2016/6/1| 武汉|2016实际|   实际|     40.0| 0.0|
+--------+-

In [None]:
>>> df = spark.createDataFrame([('2015-04-08',)], ['d'])
>>> df.select(date_add(df.d, 1).alias('d')).collect()
[Row(d=datetime.date(2015, 4, 9))]