In [7]:
from pyspark.sql import SparkSession 
spark = SparkSession.builder \
   .master("local") \
   .appName("Word Count") \
   .config("spark.some.config.option", "some-value") \
   .getOrCreate()

builder 

具有Builder构造SparkSession实例的class属性
类Builder
的构建器SparkSession


appName（名称）
设置应用程序的名称，该名称将显示在Spark Web UI中。

如果未设置应用程序名称，将使用随机生成的名称。


参量
名称 –应用程序名称

config（key = None，value = None，conf = None ）

设置配置选项。使用此方法设置的选项会自动传播到SparkConf和SparkSession自己的配置中。

enableHiveSupport（）

启用Hive支持，包括与持久性Hive Metastore的连接，对Hive Serdes的支持以及Hive用户定义的功能。

getOrCreate（）

获取一个现有的，SparkSession或者如果不存在，则根据此构建器中设置的选项创建一个新的。

# CreateDataFrame（data，schema = None，sampleRatio = None，verifySchema = True ）

DataFrame从RDD，列表或创建一个pandas.DataFrame。



当schema是列名称列表时，将从中推断出各列的类型data。


当schema为is时None，它将尝试从中推断出模式（列名称和类型）data，该模式应为Row或namedtuple或的RDD dict。


当schema为is pyspark.sql.types.DataType或数据类型字符串时，它必须与实际数据匹配，否则将在运行时引发异常。如果给定的模式不是 pyspark.sql.types.StructType，则将其包装为 pyspark.sql.types.StructType唯一字段，并且字段名将为“ value”，每条记录也将包装为元组，随后可将其转换为行。


如果需要模式推断，samplingRatio则用于确定用于模式推断的行的比率。如果samplingRatio为，将使用第一行None。

参量

数据 –任何类型的SQL数据表示形式（例如，行，元组，整数，布尔值等）的RDD，或list或pandas.DataFrame。

schema –一个pyspark.sql.types.DataType或数据类型字符串或列名列表，默认为None。数据类型的字符串格式等于 pyspark.sql.types.DataType.simpleString，除了顶级结构类型可以省略struct<>和原子类型typeName()用作其格式，例如，使用use byte代替tinyintfor pyspark.sql.types.ByteType。我们还可以将 int用作简称IntegerType。

sampleRatio –用于推断的行的采样率


verifySchema –根据模式验证每一行的数据类型。

In [8]:
 l = [('Alice', 1)]
 spark.createDataFrame(l).collect()

[Row(_1='Alice', _2=1)]

In [9]:
 spark.createDataFrame(l, ['name', 'age']).collect()

[Row(name='Alice', age=1)]

In [10]:
d = [{'name': 'Alice', 'age': 1}]

In [11]:
spark.createDataFrame(d).collect()



[Row(age=1, name='Alice')]

In [16]:
sc = spark.sparkContext
rdd = sc.parallelize(l)
spark.createDataFrame(rdd).collect()

[Row(_1='Alice', _2=1)]

In [19]:
df = spark.createDataFrame(rdd, ['name', 'age'])
df.collect()

[Row(name='Alice', age=1)]

In [20]:
from pyspark.sql import Row
Person = Row('name', 'age')
person = rdd.map(lambda r: Person(*r))
df2 = spark.createDataFrame(person)
df2.collect()

[Row(name='Alice', age=1)]

In [28]:
from pyspark.sql.types import *
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)])
df3 = spark.createDataFrame(rdd, schema)
df3.collect()

[Row(name='Alice', age=1)]

In [29]:
spark.createDataFrame(df.toPandas()).collect()  

[Row(name='Alice', age=1)]

newSession（）
返回一个新的SparkSession作为新会话，该会话具有单独的SQLConf，已注册的临时视图和UDF，但是共享了SparkContext和表缓存。

range（start，end = None，step = 1，numPartitions = None ）

In [30]:
spark.range(1, 7, 2).collect()

[Row(id=1), Row(id=3), Row(id=5)]

In [31]:
spark.range(3).collect()

[Row(id=0), Row(id=1), Row(id=2)]

In [34]:
df3.createOrReplaceTempView("table1")

In [36]:
df2 = spark.sql("SELECT name AS f1, age as f2 from table1")

In [37]:
df2.collect()

[Row(f1='Alice', f2=1)]

# udf

In [38]:
strlen = spark.udf.register("stringLengthString", lambda x: len(x))

In [39]:
spark.sql("SELECT stringLengthString('test')").collect()

[Row(stringLengthString(test)='4')]

In [40]:
spark.sql("SELECT 'foo' AS text").select(strlen("text")).collect()

[Row(stringLengthString(text)='3')]

In [43]:
from pyspark.sql.types import IntegerType
_ = spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
spark.sql("SELECT stringLengthInt('test')").collect()

[Row(stringLengthInt(test)=4)]

In [44]:
from pyspark.sql.functions import udf
slen=udf(lambda x:len(x),IntegerType())
_=spark.udf.register("slen",slen)
spark.sql("SELECT slen('test')").collect()

[Row(slen(test)=4)]

In [48]:
import random
random_udf = udf(lambda: random.randint(0, 100), IntegerType())
new_random_udf = spark.udf.register("random_udf", random_udf)
spark.sql("SELECT random_udf()").collect()  

[Row(random_udf()=95)]

In [55]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
#     pandas_udf("integer", PandasUDFType.SCALAR)  
def add_one(x):
     return x + 1
_ = spark.udf.register("add_one", add_one)  
spark.sql("SELECT add_one(id) FROM range(3)").collect()

[Row(add_one(id)='1'), Row(add_one(id)='2'), Row(add_one(id)='3')]

In [57]:
df.agg({"age": "max"}).collect()

[Row(max(age)=1)]

In [58]:
from pyspark.sql import functions as F
df.agg(F.min(df.age)).collect()

[Row(min(age)=1)]

In [71]:
import datetime
today=datetime.date.today()
print(today)


2019-11-21


In [74]:
from pyspark.sql.functions import *
df_as1 = df.alias("df_as1")
df_as2 = df.alias("df_as2")
joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'left')
joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect()

[Row(name='Alice', name='Alice', age=1)]