SparkSession是Spark 2.0引入的新概念。SparkSession为用户提供了统一的切入点，来让用户学习spark的各项功能。 在spark的早期版本中，SparkContext是spark的主要切入点，由于RDD是主要的API，我们通过sparkcontext来创建和操作RDD。对于每个其他的API，我们需要使用不同的context。例如，对于Streming，我们需要使用StreamingContext；对于sql，使用sqlContext；对于hive，使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API，就需要为他们建立接入点。所以在spark2.0中，引入SparkSession作为DataSet和DataFrame API的切入点。SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext)，所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了SparkContext，所以计算实际上是由SparkContext完成的。

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf, SQLContext

In [2]:
conf = SparkConf().setAppName("sparkApp1").setMaster("local")
sc = SparkContext.getOrCreate(conf)
# sqlContext = SQLContext(sc)

In [3]:
spark = SparkSession.builder.master('local').appName('sparkApp1')\
    .getOrCreate()

In [4]:
#createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
#可以由RDD, list, pandas.DataFrame 转化为DateFrame
#当schema由列名的列表充当时，每一列的类型由data推断得出
#当schema不存在，schema（列名和类型）将从RDD of Row，命名元组，字典中推断出来
#当schema是pyspark.sql.types.DataType 或是数据类型的字符串，必须跟真是数据相匹配或者运行时出现异常
#如果给定的模式不是pyspark.sql.types.StructType，他将被包装称 pyspark.sql.types.StructType
# data from list
l = [('Alice', 1)]
print(spark.createDataFrame(l).collect())
# 
print(spark.createDataFrame(l, ['name', 'age']).collect())

[Row(_1='Alice', _2=1)]
[Row(name='Alice', age=1)]


In [5]:
#  schema from dict
d = [{'name': 'Alice', 'age': 1}]
spark.createDataFrame(d).collect()



[Row(age=1, name='Alice')]

In [6]:
# data from rdd
rdd = sc.parallelize(l)
print(spark.createDataFrame(rdd).collect())
df = spark.createDataFrame(rdd, ['name', 'age'])
df.collect()

[Row(_1='Alice', _2=1)]


[Row(name='Alice', age=1)]

In [7]:
from pyspark.sql import Row
Person = Row('name', 'age')
person = rdd.map(lambda r: Person(*r))
df2 = spark.createDataFrame(person)
df2.collect()

[Row(name='Alice', age=1)]

In [8]:
from pyspark.sql.types import *
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])
df3 = spark.createDataFrame(rdd, schema)
df3.collect()

[Row(name='Alice', age=1)]

In [9]:
# from pandas.df
import pandas
print(spark.createDataFrame(df.toPandas()).collect())
print(spark.createDataFrame(pandas.DataFrame([1, 2])).collect())

[Row(name='Alice', age=1)]
[Row(0=1), Row(0=2)]


In [10]:
spark.createDataFrame(rdd, "a: string, b: int").collect()

[Row(a='Alice', b=1)]

In [11]:
rdd = rdd.map(lambda row: row[1])
spark.createDataFrame(rdd, 'int').collect()

[Row(value=1)]

In [222]:
# spark.createDataFrame(rdd, "boolean").collect()

In [13]:
# range(start, end=None, step=1, numPartitions=None)
# 使用单个名为id的pyspark.sql.types.LongType列创建一个DataFrame
# 一个参数则为最终值
print(spark.range(1, 7, 2).collect())
print(spark.range(3).collect())

[Row(id=1), Row(id=3), Row(id=5)]
[Row(id=0), Row(id=1), Row(id=2)]


In [223]:
# sql(sqlQuery)
# 返回使用sql语言操作结果的DataFrame
# df.createOrReplaceTempView("table1")
# df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1")
# df2.collect()

In [15]:
# table(tableName)
df.createOrReplaceTempView("table1")
df2 = spark.table("table1")
sorted(df.collect()) == sorted(df2.collect())

True

In [16]:
# SQLContext 使用基本同 SparkSession

In [17]:
# register(name, f, returnType=None)
# 将函数注册为sql函数
strlen = spark.udf.register("stringLengthString", lambda x: len(x))
spark.sql("SELECT stringLengthString('test')").collect()

[Row(stringLengthString(test)='4')]

In [18]:
spark.sql("SELECT 'foo' AS text").select(strlen("text")).collect()

[Row(stringLengthString(text)='3')]

In [19]:
from pyspark.sql.types import IntegerType
_ = spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
spark.sql("SELECT stringLengthInt('test')").collect()

[Row(stringLengthInt(test)=4)]

In [20]:
# f是用户定义的函数
# Spark使用给定用户定义函数的返回类型作为注册用户定义函数的返回类型。不应该指定returnType。在这种情况下，这个API的工作方式就像注册（name，f）一样
# pyspark.sql.functions.udf 产生一个用户定义函数
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
slen = udf(lambda s: len(s), IntegerType())
_ = spark.udf.register("slen", slen)
spark.sql("SELECT slen('test')").collect()

[Row(slen(test)=4)]

In [21]:
# registerJavaFunction(name, javaClassName, returnType=None)
# 定义一个用java编写的用户定义函数作为一个SQL函数
# 除了名称和函数本身之外，还可以指定返回类型。当没有指定返回类型时，我们会通过反射来推断它。
from pyspark.sql.types import IntegerType
#spark.udf.registerJavaFunction(
#    "javaStringLength", "test.org.apache.spark.sql.JavaStringLength", IntegerType())
#spark.sql("SELECT javaStringLength('test')").collect()

In [22]:
#spark.udf.registerJavaFunction(
#    "javaStringLength2", "test.org.apache.spark.sql.JavaStringLength")
#spark.sql("SELECT javaStringLength2('test')").collect()

In [23]:
# registerJavaUDAF(name, javaClassName)
#　将java用户自己定义的函数作为sql函数
#df = spark.udf.registerJavaUDAF("javaUDAF", "test.org.spark.MyDoubleAvg")
#df.createOrReplaceTempView("df")
#spark.sql("SELECT name, javaUFAD(id) as avg from df group by name").collect()

In [24]:
# pyspark.sql.DaraFrameReader
# csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, \
# maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None)
# inferSchema inferSchema - 从数据中自动推断输入模式。它需要额外的数据传递。如果设置为None，则使用默认值false。
# header 使用第一行作为列的名称。如果设置为None，则使用默认值false。
df = spark.read.csv('HR.csv')
df.dtypes

[('_c0', 'string'),
 ('_c1', 'string'),
 ('_c2', 'string'),
 ('_c3', 'string'),
 ('_c4', 'string'),
 ('_c5', 'string'),
 ('_c6', 'string'),
 ('_c7', 'string'),
 ('_c8', 'string'),
 ('_c9', 'string')]

In [25]:
df1 = spark.read.csv('HR.csv', header=True)
df1.dtypes

[('satisfaction_level', 'string'),
 ('last_evaluation', 'string'),
 ('number_project', 'string'),
 ('average_montly_hours', 'string'),
 ('time_spend_company', 'string'),
 ('Work_accident', 'string'),
 ('left', 'string'),
 ('promotion_last_5years', 'string'),
 ('sales', 'string'),
 ('salary', 'string')]

In [26]:
df2= spark.read.csv('HR.csv', header=True, inferSchema=True)
df2.dtypes

[('satisfaction_level', 'double'),
 ('last_evaluation', 'double'),
 ('number_project', 'int'),
 ('average_montly_hours', 'int'),
 ('time_spend_company', 'int'),
 ('Work_accident', 'int'),
 ('left', 'int'),
 ('promotion_last_5years', 'int'),
 ('sales', 'string'),
 ('salary', 'string')]

In [27]:
rdd = sc.textFile("HR.csv")
df2 = spark.read.csv(rdd)
df2.dtypes

[('_c0', 'string'),
 ('_c1', 'string'),
 ('_c2', 'string'),
 ('_c3', 'string'),
 ('_c4', 'string'),
 ('_c5', 'string'),
 ('_c6', 'string'),
 ('_c7', 'string'),
 ('_c8', 'string'),
 ('_c9', 'string')]

In [28]:
# format(source)
# 指定数据源格式
# load(path=None, format=None, schema=None, **options)
# 从数据源加载数据并将其作为 DataFrame 返回
df = spark.read.format('text').load("book.txt")
df.dtypes

[('value', 'string')]

In [29]:
# json(path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None, allowUnquotedControlChars=None）
# 加载json数据作为dataframe
# 用法基本同csv

In [30]:
# JDBC（Java DataBase Connectivity,java数据库连接）
# 通过url访问jdbc table 构建 dataframe

In [31]:
# text(paths, wholetext=False)
# 加载文本文件并返回一个DataFrame，该DataFrame的架构以名为“value”的字符串列开始，如果存在任何分区列，则返回分区列。
#　文本文件中的每一行都是生成的DataFrame中的新行。
df = spark.read.text('example.txt')
df.collect()

[Row(value='1'), Row(value='2'), Row(value='3')]

In [32]:
df = spark.read.text('example.txt', wholetext=True)
df.collect()

[Row(value='1\n2\n3\n')]

In [33]:
# orc(path)
# hive 相关格式加载成dataframe目前ORC支持仅与Hive支持一起提供。

In [34]:
# parquet(*paths)
# Parquet是Hadoop上的一种支持列式存储文件格式。
# mergeSchema：设置我们是否应合并从所有Parquet零件文件收集的模式。这将覆盖spark.sql.parquet.mergeSchema。缺省值在spark.sql.parquet.mergeSchema中指定。

In [35]:
# schema(schema)
# 指定输入模式
# 某些数据源（例如JSON）可以根据数据自动推断输入模式。通过在这里指定模式，底层数据源可以跳过模式推断步骤，从而加速数据加载。
s = spark.read.schema("col0 INT, col1 DOUBLE")

In [36]:
# table(tableNmae)
# 以dataframe的形式返回指定的表

## class pyspark.sql.DataFrameWriter(df)

用于将DataFrame写入外部存储系统（例如文件系统，键值存储等）的接口。使用DataFrame.write（）来访问它
read()的反向函数

In [37]:
import tempfile
import os
df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))

In [38]:
os.path.join(tempfile.mkdtemp(), 'data')

'/tmp/tmpao3i7i5b/data'

In [39]:
path = "hdfs://0.0.0.0:9000/test/input"
df = spark.read.csv('HR.csv')

In [40]:
os.path.join(path,'HR')

'hdfs://0.0.0.0:9000/test/input/HR'

In [41]:
df.show(2)

+------------------+---------------+--------------+--------------------+------------------+-------------+----+--------------------+-----+------+
|               _c0|            _c1|           _c2|                 _c3|               _c4|          _c5| _c6|                 _c7|  _c8|   _c9|
+------------------+---------------+--------------+--------------------+------------------+-------------+----+--------------------+-----+------+
|satisfaction_level|last_evaluation|number_project|average_montly_hours|time_spend_company|Work_accident|left|promotion_last_5y...|sales|salary|
|              0.38|           0.53|             2|                 157|                 3|            0|   1|                   0|sales|   low|
+------------------+---------------+--------------+--------------------+------------------+-------------+----+--------------------+-----+------+
only showing top 2 rows



In [44]:
df2 = spark.read.json(os.path.join(path,'people.json'))

In [45]:
df2.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  24|    Jim|
|  22| Justin|
|  22|   Mike|
|  25|   Dick|
|  26|Johnson|
|  22|   John|
|  27|  Cindy|
|  23|  Windy|
+----+-------+



## class pyspark.sql.DataFrame(jdf,sql_ctx)
分组到已命名列中的分布式数据集合。

In [46]:
df0 = spark.read.csv('HR.csv', header=True, inferSchema=True)
df0.show(2)
df0.dtypes

+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+
|satisfaction_level|last_evaluation|number_project|average_montly_hours|time_spend_company|Work_accident|left|promotion_last_5years|sales|salary|
+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+
|              0.38|           0.53|             2|                 157|                 3|            0|   1|                    0|sales|   low|
|               0.8|           0.86|             5|                 262|                 6|            0|   1|                    0|sales|medium|
+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+
only showing top 2 rows



[('satisfaction_level', 'double'),
 ('last_evaluation', 'double'),
 ('number_project', 'int'),
 ('average_montly_hours', 'int'),
 ('time_spend_company', 'int'),
 ('Work_accident', 'int'),
 ('left', 'int'),
 ('promotion_last_5years', 'int'),
 ('sales', 'string'),
 ('salary', 'string')]

In [47]:
# change typoe
# df = df.withColumn('average_montly_hours', df.average_montly_hours.cast('float'))

In [48]:
# agg(*exprs)
# 在dataframe上集合不通过groups（是df.group.agg()的简写)
df0.agg({"average_montly_hours":"min"}).collect()

[Row(min(average_montly_hours)=96)]

In [49]:
from pyspark.sql import functions as F
df0.agg(F.max(df0.average_montly_hours)).collect()

[Row(max(average_montly_hours)=310)]

In [50]:
# alias()
# 复制dataframe
from pyspark.sql.functions import *
df_as1 = df0.alias('df_as1')
df_as2 = df0.alias('df_as2')
joined_df = df_as1.join(df_as2, col("df_as1.average_montly_hours") == col("df_as2.average_montly_hours"), 'inner')
joined_df.show(1)

+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+
|satisfaction_level|last_evaluation|number_project|average_montly_hours|time_spend_company|Work_accident|left|promotion_last_5years|sales|salary|satisfaction_level|last_evaluation|number_project|average_montly_hours|time_spend_company|Work_accident|left|promotion_last_5years|sales|salary|
+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+
|              0.38|           0.53|             2|                 157|                 3|            0|   1|                    

In [51]:
# approxQuantile(col, probabilities, relativeError)
# df0.approxQuantile('left',(0.5,1) , 0.5)

In [52]:
# df0.coalesce(1).rdd.getNumPartitions()

In [53]:
# colRegex(colName)
# 根据正则选取列名
df = spark.createDataFrame([("a", 1), ("b", 2), ("c",  3)], ["Col1", "Col2"])
df.select(df.colRegex("`(Col2)?+.+`")).show()

+----+
|Col1|
+----+
|   a|
|   b|
|   c|
+----+



In [54]:
# columns返回列名 lsit
df0.columns

['satisfaction_level',
 'last_evaluation',
 'number_project',
 'average_montly_hours',
 'time_spend_company',
 'Work_accident',
 'left',
 'promotion_last_5years',
 'sales',
 'salary']

In [55]:
#corr(col1, col2, method=None)
#计算一个DataFrame的两列的相关性作为一个双值。目前只支持皮尔逊相关系数。DataFrame.corr（）和DataFrameStatFunctions.corr（）是彼此的别名。
#皮尔逊相关:是用于度量两个变量X和Y之间的相关（线性相关），其值介于-1与1之间 越接近正负1,相关度越大
df0.corr("time_spend_company", "left")

0.14482217493938632

In [56]:
# cov()
# 协方差,观察两列相关性 大于0:正相关 等于0:不相关 小于0:负相关
df0.cov("time_spend_company", "left")

0.09006595461255852

In [57]:
# count()
# 返回这个DataFrame中的行数。
df0.count()

14999

In [58]:
# createGlobalTempView(name)
# 如果你想拥有一个临时的view不光可以在不同的Session中共享，而且在application的运行周期内可用，那么就需要创建一个全局的临时view。并记得使用的时候加上global_temp作为前缀，因为全局的临时view是绑定到系统保留的数据库global_temp上。
df0.createGlobalTempView("HR")
df2 = spark.sql("select * from global_temp.HR")
df2.show(2)

+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+
|satisfaction_level|last_evaluation|number_project|average_montly_hours|time_spend_company|Work_accident|left|promotion_last_5years|sales|salary|
+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+
|              0.38|           0.53|             2|                 157|                 3|            0|   1|                    0|sales|   low|
|               0.8|           0.86|             5|                 262|                 6|            0|   1|                    0|sales|medium|
+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+
only showing top 2 rows



In [59]:
#df2.createGlobalTempView('HR')

In [60]:
spark.catalog.dropGlobalTempView("HR")

In [61]:
# createOrReplaceGlobalTempView(name)
# 使用给定的名称创建或替换全局临时视图。

In [62]:
# createTempView(name)
# 用这个DataFrame创建一个本地临时视图。
# 这个临时表的生命周期与用来创建这个DataFrame的SparkSession绑定在一起。如果视图名称已经存在于目录中，则抛出temptablealready存在异常。
df0.createTempView("HR")
df2 = spark.sql("select * from HR")
sorted(df0.collect()) == sorted(df2.collect())

True

In [63]:
# 删除本地视图
spark.catalog.dropTempView('HR')

In [64]:
# crossJoin(other)
# 用另一个dataframe产生笛卡尔积
# df.crossJoin(df2.select("height"))

In [65]:
# cube(*col)
# 使用指定的列创建当前DataFrame的多维多维数据集，因此我们可以在它们上运行聚合。
df0.cube(df0.number_project, df0.left).count().orderBy('number_project', 'left').show()

+--------------+----+-----+
|number_project|left|count|
+--------------+----+-----+
|          null|null|14999|
|          null|   0|11428|
|          null|   1| 3571|
|             2|null| 2388|
|             2|   0|  821|
|             2|   1| 1567|
|             3|null| 4055|
|             3|   0| 3983|
|             3|   1|   72|
|             4|null| 4365|
|             4|   0| 3956|
|             4|   1|  409|
|             5|null| 2761|
|             5|   0| 2149|
|             5|   1|  612|
|             6|null| 1174|
|             6|   0|  519|
|             6|   1|  655|
|             7|null|  256|
|             7|   1|  256|
+--------------+----+-----+



In [66]:
# describe(*cols)
# 计算数字和字符串列的基本统计信息。
# 这包括count，mean，stddev，min和max。如果未给出列，则此函数将计算所有数字或字符串列的统计信息。
df0.describe(['number_project', 'left']).show()

+-------+------------------+-------------------+
|summary|    number_project|               left|
+-------+------------------+-------------------+
|  count|             14999|              14999|
|   mean|  3.80305353690246| 0.2380825388359224|
| stddev|1.2325923553183513|0.42592409938029885|
|    min|                 2|                  0|
|    max|                 7|                  1|
+-------+------------------+-------------------+



In [67]:
# distinct()
# 删除重复行
df0.distinct().count()

11991

In [68]:
# drop(*cols)
# 返回一个新的DataFrame掉落指定的列。如果模式不包含给定的列名（s），那么这是一个no-op。

In [263]:
# dropDuplicates(subset=None)
# 返回一个新的DataFrame，去掉重复的行，可选地只考虑某些列。
# 对于静态批处理DataFrame，它只是删除重复的行。对于流式DataFrame，它将把所有数据跨触发器保存为中间状态，以删除重复的行。您可以使用withWatermark()来限制重复数据的延迟，系统将相应地限制状态。此外，为了避免重复的可能性，将会删除比水印更早的数据。
# 同drop_duplicates()
from pyspark.sql import Row
df = sc.parallelize([ \
    Row(name='Alice', age=5, height=80), \
    Row(name='Bob', age=5, height=80), \
    Row(name='Alice', age=10, height=80)]).toDF()
df.dropDuplicates().show()

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
|  5|    80|  Bob|
| 10|    80|Alice|
+---+------+-----+



In [70]:
df.drop_duplicates(['name', 'height']).show()

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+



In [83]:
# dropna(how='any', thresh=None, subset=None)
#返回一个新的DataFrame来省略带有null值的行。
#dataframe.dropna（）和dataFrameNafuntions.drop（）是彼此的别名。
#how: any:有一个就删除 all:全是才删除
#thresh: 设置超过多少个na删除行,会重置how
#subset: 对选定列名
df3 = spark.read.csv(os.path.join(path, "null.csv"), header=True, inferSchema=True)
df3.show()
df3.dropna().show()
# drop全为null的行
df3.dropna("all").show()
# drop多于2的行
df3.na.drop(thresh=2).show()
# drop掉name列为null的行
df3.na.drop(subset=["name"]).show()
df3.na.drop("all", subset=("name", "height")).show()

+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|alice|
|   5|  null|  bob|
|null|  null|  tom|
|null|  null| null|
+----+------+-----+

+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|alice|
+---+------+-----+

+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|alice|
|   5|  null|  bob|
|null|  null|  tom|
+----+------+-----+

+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|alice|
|  5|  null|  bob|
+---+------+-----+

+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|alice|
|   5|  null|  bob|
|null|  null|  tom|
+----+------+-----+

+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|alice|
|   5|  null|  bob|
|null|  null|  tom|
+----+------+-----+



In [87]:
# fillna(value, subset=None)
# 替换na.fill（）的空值，别名。 DataFrame.fillna（）和DataFrameNaFunctions.fill（）是彼此的别名。
# value:int，long，float，string，bool或dict。用来替换空值的值。如果值是字典，则子集将被忽略，并且值必须是从列名（字符串）到替换值的映射。
# subset:可选的列名称要考虑。子集中指定的不具有匹配数据类型的列将被忽略。例如，如果value是一个字符串，并且子集包含一个非字符串列，则非字符串列将被忽略。
df3.na.fill(50).show()
df3.na.fill(False).show()
df3.na.fill({'age':50, 'name': 'unknow'}).show()

+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|alice|
|  5|    50|  bob|
| 50|    50|  tom|
| 50|    50| null|
+---+------+-----+

+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|alice|
|   5|  null|  bob|
|null|  null|  tom|
|null|  null| null|
+----+------+-----+

+---+------+------+
|age|height|  name|
+---+------+------+
| 10|    80| alice|
|  5|  null|   bob|
| 50|  null|   tom|
| 50|  null|unknow|
+---+------+------+



In [91]:
# filter(condition)
# 使用给定的条件过滤行。
# 同 where()
df3.filter(df3.age > 3).collect()
df.where(df.age >= 10).collect()

[Row(age=10, height=80, name='Alice')]

In [92]:
# first()
# 返回第一行
df.first()

Row(age=5, height=80, name='Alice')

In [93]:
# foreach(f)
# 对dataframe的每一行应用f函数
# 是df.rdd.foreach()的缩写
def f(person):
    print(person.name)
df.foreach(f)

In [95]:
# foreachPartition(f)
# 用法同foreach()只不过是作用于每个分区
# def f(people):
#     for person in people:
#         peint(person.name)
# df.foreachPartition(f)

In [98]:
# groupby()
# 使用指定的列对DataFrame进行分组，因此我们可以对它们运行聚合。有关所有可用聚合函数，请参阅分组数据。
# 同groupBy()
print(df.groupBy().avg().collect())
sorted(df.groupBy('name').agg({'age': 'mean'}).collect())

[Row(avg(age)=6.666666666666667, avg(height)=80.0)]


[Row(name='Alice', avg(age)=6.666666666666667)]

In [99]:
sorted(df.groupBy(df.name).avg().collect())

[Row(name='Alice', avg(age)=6.666666666666667, avg(height)=80.0)]

In [100]:
sorted(df.groupBy(["name", df.age]).count().collect())

[Row(name='Alice', age=5, count=2), Row(name='Alice', age=10, count=1)]

In [102]:
# head(n=None)
# 返回前n行
# 所有数据将被加载到内存,所以数组很小时使用
df.head(2)

[Row(age=5, height=80, name='Alice'), Row(age=5, height=80, name='Alice')]

In [103]:
# intersect(other)
# 返回一个新的dataframe包含相同的行

In [108]:
df3.show()

+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|alice|
|   5|  null|  bob|
|null|  null|  tom|
|null|  null| null|
+----+------+-----+



In [112]:
# join(other, on=None, how=None)
# 使用给定的连接表达式与另一个DataFrame进行连接。
# on: join条件
# how: join的方式inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.
df.join(df3, df.name == df3.name, 'outer').select(df.name, df3.height).collect()

[Row(name=None, height=None),
 Row(name=None, height=80),
 Row(name='Alice', height=None),
 Row(name='Alice', height=None),
 Row(name='Alice', height=None),
 Row(name=None, height=None),
 Row(name=None, height=None)]

In [114]:
cond = [df.name == df3.name, df.age == df3.age]
df.join(df3, cond, 'outer').select(df.name, df3.age).collect()

[Row(name=None, age=5),
 Row(name=None, age=None),
 Row(name=None, age=10),
 Row(name='Alice', age=None),
 Row(name=None, age=None),
 Row(name='Alice', age=None),
 Row(name='Alice', age=None)]

In [116]:
df.join(df3, 'name').select(df.name, df3.height).collect()

[]

In [117]:
# limit(num)
# 将结果技术限制为制定的数字
df.limit(1).collect()

[Row(age=5, height=80, name='Alice')]

In [118]:
# orderBy()
# 返回按指定列排序的新DataFrame。
# ascending:布尔值或布尔值列表（默认值为True）。按升序排列与降序排列。指定多个排序顺序的列表。如果指定了列表，则列表的长度必须等于列的长度。
df.sort(df.age.desc()).collect()

[Row(age=10, height=80, name='Alice'),
 Row(age=5, height=80, name='Alice'),
 Row(age=5, height=80, name='Alice')]

In [119]:
df.sort("age", ascending=False).collect()

[Row(age=10, height=80, name='Alice'),
 Row(age=5, height=80, name='Alice'),
 Row(age=5, height=80, name='Alice')]

In [123]:
df.orderBy(df.age.desc()).collect()

[Row(age=10, height=80, name='Alice'),
 Row(age=5, height=80, name='Alice'),
 Row(age=5, height=80, name='Alice')]

In [121]:
from pyspark.sql.functions import *
df.sort(asc('age')).collect()

[Row(age=5, height=80, name='Alice'),
 Row(age=5, height=80, name='Alice'),
 Row(age=10, height=80, name='Alice')]

In [124]:
df.orderBy(desc("age"), "name").collect()

[Row(age=10, height=80, name='Alice'),
 Row(age=5, height=80, name='Alice'),
 Row(age=5, height=80, name='Alice')]

In [132]:
df.orderBy(["age", "name"], ascending=[False, True]).collect()

[Row(age=10, height=80, name='Alice'),
 Row(age=5, height=80, name='Alice'),
 Row(age=5, height=80, name='Bob')]

In [133]:
# printSchema()
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- height: long (nullable = true)
 |-- name: string (nullable = true)



In [139]:
# randomSplit(weights, seed=None)
# 根据比重随机分配dataframe
# weights: 如果和不是1的话,会被标准话为1
split = df0.randomSplit([1.0, 2.0, 3.0], 10)
for item in split:
    print(item.count()/split[0].count())

1.0
1.988614055751865
2.900274833137024


In [140]:
# repartition(numPartitions, *cols)
# 返回由给定分区表达式分区的新DataFrame。生成的DataFrame是散列分区的。
# numPartitions可以是一个int值，用于指定分区或列的目标数量。如果它是一个列，它将被用作第一个分区列。如果未指定，则使用默认的分区数量。
df.repartition(10).rdd.getNumPartitions()

10

In [143]:
data = df.union(df)
data.show()

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
|  5|    80|  Bob|
| 10|    80|Alice|
|  5|    80|Alice|
|  5|    80|  Bob|
| 10|    80|Alice|
+---+------+-----+



In [142]:
data = df.union(df).repartition('age')
data.show()

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
|  5|    80|  Bob|
|  5|    80|Alice|
|  5|    80|  Bob|
| 10|    80|Alice|
| 10|    80|Alice|
+---+------+-----+



In [144]:
data = data.repartition(7, "age")
data.show()

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
|  5|    80|  Bob|
| 10|    80|Alice|
|  5|    80|Alice|
|  5|    80|  Bob|
| 10|    80|Alice|
+---+------+-----+



In [146]:
data = data.repartition("name", "age")
data.show()

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|  Bob|
|  5|    80|  Bob|
| 10|    80|Alice|
| 10|    80|Alice|
|  5|    80|Alice|
|  5|    80|Alice|
+---+------+-----+



In [147]:
# replace(to_replace, value=<no value>, subset=None)
# 返回一个新的DataFrame，用另一个值替换一个值。 DataFrame.replace（）和DataFrameNaFunctions.replace（）是彼此的别名。
# 替换类型相同
df3.show()

+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|alice|
|   5|  null|  bob|
|null|  null|  tom|
|null|  null| null|
+----+------+-----+



In [148]:
df3.na.replace(5, 10).show()

+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|    80|alice|
|  10|  null|  bob|
|null|  null|  tom|
|null|  null| null|
+----+------+-----+



In [150]:
df3.replace('alice', None).show()

+----+------+----+
| age|height|name|
+----+------+----+
|  10|    80|null|
|   5|  null| bob|
|null|  null| tom|
|null|  null|null|
+----+------+----+



In [154]:
df3.replace({'alice': None}).show()

+----+------+----+
| age|height|name|
+----+------+----+
|  10|    80|null|
|   5|  null| bob|
|null|  null| tom|
|null|  null|null|
+----+------+----+



In [157]:
df3.replace(['alice', 'bob'], ['a', 'b'], 'name').show()

+----+------+----+
| age|height|name|
+----+------+----+
|  10|    80|   a|
|   5|  null|   b|
|null|  null| tom|
|null|  null|null|
+----+------+----+



In [159]:
# sample(withReplacement=None, fraction=None, seed=None)
# 返回此DataFrame的采样子集。
# withReplacement: 是否重复采样
# fraction: 采样占比范围[0.0, 1.0],大概
# 种子是必须的
df0.sample(0.5, 3).count()/df0.count()

0.5012334155610374

In [160]:
df0.sample(True, 0.5, 3).count()

7402

In [163]:
# sampleBy(col, fractions, seed=None)
# 根据每层中给出的分数返回未更换的分层样本。\
from pyspark.sql.functions import col
dataset = spark.range(0, 100).select((col("id") % 3).alias("key"))
sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
sampled.groupBy("key").count().orderBy("key").show()

+---+-----+
|key|count|
+---+-----+
|  0|    3|
|  1|    7|
+---+-----+



In [164]:
# schema
# 以pyspark.sql.types.StructType的形式返回此DataFrame的模式。
df.schema

StructType(List(StructField(age,LongType,true),StructField(height,LongType,true),StructField(name,StringType,true)))

In [165]:
# select(*cols)
# 投影一组表达式并返回一个新的DataFrame。
df.select('*').show()

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
|  5|    80|  Bob|
| 10|    80|Alice|
+---+------+-----+



In [166]:
df.select('name', 'age').show()

+-----+---+
| name|age|
+-----+---+
|Alice|  5|
|  Bob|  5|
|Alice| 10|
+-----+---+



In [168]:
df.select(df.name, (df.age + 10).alias('age').cast('float')).show()

+-----+----+
| name| age|
+-----+----+
|Alice|15.0|
|  Bob|15.0|
|Alice|20.0|
+-----+----+



In [169]:
# selectExpr(*expr)
# 投影一组SQL表达式并返回一个新的DataFrame。
# 这是接受SQL表达式的select（）的变体。
df.selectExpr("age * 2", "abs(age)").show()

+---------+--------+
|(age * 2)|abs(age)|
+---------+--------+
|       10|       5|
|       10|       5|
|       20|      10|
+---------+--------+



In [170]:
# show(n=20, truncate=True, vertical=False)
# 将前n行打印给控制台
df.show(vertical=True)

-RECORD 0-------
 age    | 5     
 height | 80    
 name   | Alice 
-RECORD 1-------
 age    | 5     
 height | 80    
 name   | Bob   
-RECORD 2-------
 age    | 10    
 height | 80    
 name   | Alice 



In [177]:
# sort(*cols, **kwargs)
# 返回按指定列排序的新DataFrame。
df.sort(df.age.desc()).show()
df.sort("age", ascending=False).show()
df.orderBy(df.age.desc()).show()
from pyspark.sql.functions import *
df.sort(asc('age')).show()
df.orderBy(desc('age'), 'name').show()
df.orderBy(['age', 'name'], ascending=[0, 1]).show()

+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|Alice|
|  5|    80|Alice|
|  5|    80|  Bob|
+---+------+-----+

+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|Alice|
|  5|    80|Alice|
|  5|    80|  Bob|
+---+------+-----+

+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|Alice|
|  5|    80|Alice|
|  5|    80|  Bob|
+---+------+-----+

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|  Bob|
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+

+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|Alice|
|  5|    80|Alice|
|  5|    80|  Bob|
+---+------+-----+

+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|Alice|
|  5|    80|Alice|
|  5|    80|  Bob|
+---+------+-----+



In [178]:
# sortWithinPartitions(*cols, **kwargs)
# 根据分区排序
df.sortWithinPartitions('age', ascending=False).show()

+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|Alice|
|  5|    80|Alice|
|  5|    80|  Bob|
+---+------+-----+



In [181]:
# storageLevel
# 获取存储级别
df.storageLevel

StorageLevel(False, False, False, False, 1)

In [187]:
# summary(*statistics)
# 计算数字和字符串列的指定统计信息。可用统计数据为： - count - mean - stddev - min - max - 以百分比形式指定的任意近似百分比（例如，75％）
# 如果未给出统计数据，则此函数会计算count，mean，stddev，min，近似四分位数（25％，50％和75％处的百分位数）以及最大值。
print(df0.columns)
df0.select('satisfaction_level', 'time_spend_company', 'average_montly_hours').summary().show()

['satisfaction_level', 'last_evaluation', 'number_project', 'average_montly_hours', 'time_spend_company', 'Work_accident', 'left', 'promotion_last_5years', 'sales', 'salary']
+-------+-------------------+------------------+--------------------+
|summary| satisfaction_level|time_spend_company|average_montly_hours|
+-------+-------------------+------------------+--------------------+
|  count|              14999|             14999|               14999|
|   mean| 0.6128335222348166| 3.498233215547703|   201.0503366891126|
| stddev|0.24863065106114257|1.4601362305354808|   49.94309937128406|
|    min|               0.09|                 2|                  96|
|    25%|               0.44|                 3|                 156|
|    50%|               0.64|                 3|                 200|
|    75%|               0.82|                 4|                 245|
|    max|                1.0|                10|                 310|
+-------+-------------------+------------------+-------

In [191]:
# toDF(*cols)
# 返回一个带有新列名的dataframe
df.select('name', 'age').toDF('f1', 'f2').show()

+-----+---+
|   f1| f2|
+-----+---+
|Alice|  5|
|  Bob|  5|
|Alice| 10|
+-----+---+



In [192]:
# toJSON(use_unicode=True)
# 将DataFrame转换为字符串的RDD。
# 每一行都被转换成JSON文档作为返回的RDD中的一个元素。
df.toJSON().take(2)

['{"age":5,"height":80,"name":"Alice"}', '{"age":5,"height":80,"name":"Bob"}']

In [193]:
# toLocalIterator()
# 返回包含此DataFrame中所有行的迭代器。迭代器将占用与此DataFrame中最大分区一样多的内存。
list(df.toLocalIterator())

[Row(age=5, height=80, name='Alice'),
 Row(age=5, height=80, name='Bob'),
 Row(age=10, height=80, name='Alice')]

In [194]:
# toPandas()
# 转pandasdataframe
df.toPandas().head()

Unnamed: 0,age,height,name
0,5,80,Alice
1,5,80,Bob
2,10,80,Alice


In [195]:
# union(other)
# 两表格合并

In [196]:
# unionByName(other)
# 按照名字合并
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()

+----+----+----+
|col0|col1|col2|
+----+----+----+
|   1|   2|   3|
|   6|   4|   5|
+----+----+----+



In [197]:
# withColumn(colName, col)
# 通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
df.withColumn('age2', df.age + 2).collect()

[Row(age=5, height=80, name='Alice', age2=7),
 Row(age=5, height=80, name='Bob', age2=7),
 Row(age=10, height=80, name='Alice', age2=12)]

In [198]:
# withColumnRenamed(existing, new)
#通过重命名现有列来返回新的DataFrame。如果模式不包含给定的列名，则这是一个无操作。
# 更换列名的操作
df.withColumnRenamed('age', 'age2').show()

+----+------+-----+
|age2|height| name|
+----+------+-----+
|   5|    80|Alice|
|   5|    80|  Bob|
|  10|    80|Alice|
+----+------+-----+



## class pyspark.sql.GroupedData(jgd, df)
由DataFrame.groupBy（）创建的DataFrame上的一组聚合方法。

In [152]:
# apply(udf)
# 使用pandas udf映射当前DataFrame的每个组，并将结果作为DataFrame返回。
# 使用pyspark.sql.functions.pandas_udf() , 用法跟pandas.apply一样
# 该函数不支持部分聚合，并且需要对DataFrame中的所有数据进行混洗。
from pyspark.sql.functions import pandas_udf,PandasUDFType
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def normalize(pdf):
    v = pdf.v
    return pdf.assign(v=(v - v.mean())) / v.std()
df.groupby("id").apply(normalize).show()


+---+-------------------+
| id|                  v|
+---+-------------------+
|  1|-0.7071067811865475|
|  1| 0.7071067811865475|
|  0|-0.8320502943378437|
|  0|-0.2773500981126146|
|  0| 1.1094003924504583|
+---+-------------------+



In [165]:
# avg(*cols)
# 计算平均值 同mean()
df0.groupBy().avg('left').collect()

[Row(avg(left)=0.2380825388359224)]

In [199]:
# count()
# 计数

In [200]:
# pivot(pivot_col, values=None)
# 透视功能,两个版本:1、制定不同值的列表 2、另一种不支持。后者简洁但是效率低，因为spark需要先计算内部不同值的列表
df0.columns

['satisfaction_level',
 'last_evaluation',
 'number_project',
 'average_montly_hours',
 'time_spend_company',
 'Work_accident',
 'left',
 'promotion_last_5years',
 'sales',
 'salary']

In [217]:
df1 = df0.select("number_project", "average_montly_hours", "left", 'promotion_last_5years').toDF('numbers', 'hours', 'left', 'promotion')
df1.show(2)

+-------+-----+----+---------+
|numbers|hours|left|promotion|
+-------+-----+----+---------+
|      2|  157|   1|        0|
|      5|  262|   1|        0|
+-------+-----+----+---------+
only showing top 2 rows



In [221]:
df1.groupBy('left').pivot('numbers',['3', '5']).sum("hours").show()

+----+------+------+
|left|     3|     5|
+----+------+------+
|   1| 15767|149051|
|   0|785126|436451|
+----+------+------+



## class pyspark.sql.Column(jc)

In [229]:
# alias(*alias, **kwargs)
# 使用新名称返回此列的别名（在返回多个列的表达式（例如爆炸）的情况下）。
df.select(df.age.alias('age2')).show()
df.select(df.age.alias('age3', metadata={'max': 99})).schema['age3'].metadata['max']

+----+
|age2|
+----+
|   5|
|   5|
|  10|
+----+



99

In [230]:
# asc()
# 返回升序
# desc()
# 返回降序

In [231]:
# between(lowerBound, upperBound)
# 一个布尔表达式，如果此表达式的值位于给定列之间，则该表达式的值为true。
df.select(df.name, df.age.between(2, 4)).show()

+-----+---------------------------+
| name|((age >= 2) AND (age <= 4))|
+-----+---------------------------+
|Alice|                      false|
|  Bob|                      false|
|Alice|                      false|
+-----+---------------------------+



In [232]:
# contains(other)
# 包含其他元素。基于字符串匹配返回一个布尔列。
df.filter(df.name.contains('o')).collect()

[Row(age=5, height=80, name='Bob')]

In [238]:
# endswith(other)
# 返回通过匹配结尾的
df.filter(df.name.endswith('b')).collect()

[Row(age=5, height=80, name='Bob')]

In [240]:
# getField(name)
# 在StructField中通过名称获取字段的表达式。
from pyspark.sql import Row
df = spark.createDataFrame([Row(r=Row(a=1, b='b'))])
df.select(df.r.getField('b')).show()
df.select(df.r.a).show()

+---+
|r.b|
+---+
|  b|
+---+

+---+
|r.a|
+---+
|  1|
+---+



In [242]:
# getIrem(key)
# 一个表达式，用于从列表中获取序号位置处的项目，或者通过键名从字典中获取项目。
df = spark.createDataFrame([([1, 2], {'key':'value'})], ['l', 'd'])
df.select(df.l.getItem(0), df.d.getItem('key')).show()
df.select(df.l[0], df.d['key']).show()

+----+------+
|l[0]|d[key]|
+----+------+
|   1| value|
+----+------+

+----+------+
|l[0]|d[key]|
+----+------+
|   1| value|
+----+------+



In [243]:
# isNotNull()
# 如果当前表达式不为null，则为true。
from pyspark.sql import Row
df = spark.createDataFrame([Row(name='Tom', height=80), Row(name='Alice', height=None)])
df.filter(df.height.isNotNull()).collect()

[Row(height=80, name='Tom')]

In [244]:
# isNull()
# 如果为null这为真
from pyspark.sql import Row
df = spark.createDataFrame([Row(name='Tom', height=80), Row(name='Alice', height=None)])
df.filter(df.height.isNull()).collect()

[Row(height=None, name='Alice')]

In [259]:
# isin(*cols)
df.show()
df[df.name.isin('Tom', 'Mike')].collect()

+------+-----+
|height| name|
+------+-----+
|    80|  Tom|
|  null|Alice|
+------+-----+



[Row(height=80, name='Tom')]

In [260]:
df[df.height.isin([1, 2, 80])].collect()

[Row(height=80, name='Tom')]

In [262]:
# like(other)
# SQL像表达式一样。基于SQL LIKE匹配返回一个布尔型列。
df.filter(df.name.like('Al%')).collect()

[Row(height=None, name='Alice')]

In [264]:
# otherwise(value)
# 评估条件列表并返回多个可能的结果表达式之一。如果未调用Column.otherwise（），则不匹配条件返回None。有关示例用法，请参阅pyspark.sql.functions.when（）。
from pyspark.sql.functions import *
df.select(df.name, when(df.age > 3, 1).otherwise(0)).show()

+-----+-------------------------------------+
| name|CASE WHEN (age > 3) THEN 1 ELSE 0 END|
+-----+-------------------------------------+
|Alice|                                    1|
|  Bob|                                    1|
|Alice|                                    1|
+-----+-------------------------------------+



In [273]:
# rlike(other)
# SQL RLIKE表达式（LIKE with Regex）。基于正则表达式匹配返回一个布尔列。
df.filter(df.name.rlike('ice$')).show()

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+



In [274]:
# startswith(other)
# 字符串以。开头。基于字符串匹配返回一个布尔列。
df.filter(df.name.startswith('Al')).show()

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+



In [276]:
# substr(startPos, length)
# 返回一列，该列是该列的子字符串。
df.select(df.name.substr(1,2).alias('col')).show()

+---+
|col|
+---+
| Al|
| Bo|
| Al|
+---+



In [277]:
# when(condition, value)
# 评估条件列表并返回多个可能的结果表达式之一。如果Column.otherwise（）未被调用，则不匹配条件返回None。
from pyspark.sql.functions import *
df.select(df.name, when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()

+-----+------------------------------------------------------------+
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END|
+-----+------------------------------------------------------------+
|Alice|                                                           1|
|  Bob|                                                           1|
|Alice|                                                           1|
+-----+------------------------------------------------------------+

