# SparkSession
A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files.
The entry point to programming Spark with the Dataset and DataFrame API.

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master("local[2]") \
.appName("Word Count") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

In [3]:
spark.conf.get("spark.some.config.option") == spark.sparkContext.getConf().get("spark.some.config.option") == "some-value"
spark2 = SparkSession.builder.config("k2", "v2").getOrCreate()

# there is a valid global default SparkSession
spark.conf.get("spark.some.config.option") == spark2.conf.get("spark.some.config.option")
spark.conf.get("k2") == spark2.conf.get("k2")

# DataFrame
A distributed collection of data grouped into named columns

## From list of tuples

In [6]:
l = [("Alice", 1)]
spark.createDataFrame(l).collect()

In [7]:
spark.createDataFrame(l, ["name", "age"]).collect()

In [8]:
d = [{"name": "Alice", "age": 1}]
spark.createDataFrame(d).collect()

## From RDDs

In [10]:
rdd = sc.parallelize(l)
spark.createDataFrame(rdd).collect()

In [11]:
# with list of column names
df = spark.createDataFrame(rdd, ["name", "age"])
df.collect()

In [12]:
# with Row definition
from pyspark.sql import Row
Person = Row("name", "age")
person = rdd.map(lambda r: Person(*r))
df2 = spark.createDataFrame(person)
df2.collect()

In [13]:
# with schema definition
from pyspark.sql.types import *
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
  ])
df3 = spark.createDataFrame(rdd, schema)
df3.collect()

In [14]:
# with string definition, New in version 2.0.
rdd = sc.parallelize(l)
print spark.createDataFrame(rdd, "a: string, b: int").collect()
rdd = rdd.map(lambda row: row[1])
print spark.createDataFrame(rdd, "int").collect()

## From pandas

In [16]:
import pandas
print spark.createDataFrame(df.toPandas()).collect()
print spark.createDataFrame(pandas.DataFrame([["Alice", 2]])).collect()

# SQLContext

In [18]:
# New in version 2.0
df.createOrReplaceTempView("table1")
df2 = spark.sql("SELECT name as N, age as A from table1")
df2.collect()

## Create DataFrame

## From list of tuples

In [21]:
l = [("Alice", 1)]
print sqlContext.createDataFrame(l).collect()
print sqlContext.createDataFrame(l, ["name", "age"]).collect()

In [22]:
# not deprecated if use sqlContext instead of SparkSession
d = [{"name": "Alice", "age": 1}]
sqlContext.createDataFrame(d).collect()

## From RDDs

In [24]:
rdd = sc.parallelize(l)
sqlContext.createDataFrame(rdd).collect()

In [25]:
df = sqlContext.createDataFrame(rdd, ["name", "age"])
df.collect()

In [26]:
from pyspark.sql import Row
Person = Row("name", "age")
person = rdd.map(lambda r: Person(*r))
df2 = sqlContext.createDataFrame(person)
df2.collect()

In [27]:
from pyspark.sql.types import *
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
  ])
df3 = sqlContext.createDataFrame(rdd, schema)
df3.collect()

In [28]:
print sqlContext.createDataFrame(df.toPandas()).collect()
print sqlContext.createDataFrame(pandas.DataFrame([[1, 2]])).collect()

In [29]:
print sqlContext.createDataFrame(rdd, "a: string, b: int").collect()
rdd = sc.parallelize(l)
rdd = rdd.map(lambda row: row[1])
print sqlContext.createDataFrame(rdd, "int").collect()

In [30]:
df.toDF("f1", "f2", "f3").collect()

In [31]:
df.toJSON().first()

In [32]:
list(df.toLocalIterator())

In [33]:
df.toPandas()

## Temp table

In [35]:
sqlContext.registerDataFrameAsTable(df, "table1")
sqlContext.registerDataFrameAsTable(df2, "table2")
print sqlContext.tableNames()
df3 = sqlContext.tables()
print df3
print df3.filter("tableName = 'table1'").first()
sqlContext.dropTempTable("table1")
sqlContext.dropTempTable("table2")

## UDF: User Defined Function

In [37]:
sqlContext.registerFunction("stringLengthString", lambda x: len(x))
sqlContext.sql("SELECT stringLengthString('test')").collect()

In [38]:
from pyspark.sql.types import IntegerType
sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
sqlContext.sql("SELECT stringLengthInt('test')").collect()

In [39]:
sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
sqlContext.sql("SELECT stringLengthInt('test')").collect()

# Working with DataFrame

In [41]:
l = [("Alice", 2, 12), ("Bob", 5, 25)]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd, "name: string, age: int, height: int")
df.collect()

df.createTempView("people")
df2 = spark.sql("select * from people")

In [42]:
df.repartition(10).rdd.getNumPartitions()

In [43]:
data = df.union(df).repartition("age")
data.show()

In [44]:
data = data.repartition(7, "age")
data.show()

In [45]:
data.rdd.getNumPartitions()

In [46]:
data = data.repartition("name", "age")
data.show()

In [47]:
# withColumn(colName, col)
# Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
df.withColumn("age2", df.age + 2).collect()

In [48]:
df.withColumnRenamed("age", "age2").collect()

In [49]:
print df.select(df.age.cast("string").alias("ages")).collect()
print df.select(df.age.cast(StringType()).alias("ages")).collect()

## Aggregate
Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).

In [51]:
df.agg({"age": "max"}).collect()

In [52]:
from pyspark.sql import functions as F
df.agg(F.min(df.age)).collect()

In [53]:
gdf = df.groupBy(df.name)
sorted(gdf.agg({"*": "count"}).collect())

In [54]:
from pyspark.sql import functions as F
sorted(gdf.agg(F.min(df.age)).collect())

## Alias

In [56]:
from pyspark.sql.functions import *
df_as1 = df.alias("df_as1")
df_as2 = df.alias("df_as2")
joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), "inner")
joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect()

## Stats

In [58]:
df.printSchema()

In [59]:
df.schema

In [60]:
df.storageLevel

In [61]:
df.count()

In [62]:
print df.groupBy().sum("age").collect()
print df.groupBy().sum("age", "height").collect()

In [63]:
df.groupBy().avg("age").collect()

In [64]:
df.groupBy().avg("age", "height").collect()

In [65]:
df.columns

In [66]:
print df.name
print df["name"]
print df.age + 1

In [67]:
# cube(*col): Create a multi-dimensional cube for the current DataFrame using the specified columns, so we can run aggregation on them.
df.cube("name", df.age).count().orderBy("name", "age").show()

In [68]:
df.describe(["age"]).show()

In [69]:
df.describe().show()

In [70]:
df.distinct().count()

In [71]:
df.dtypes

In [72]:
df.explain()

In [73]:
df.explain(True)

In [74]:
df.groupBy().avg().collect()

In [75]:
df.groupBy("name").agg({"age": "mean"}).collect()

In [76]:
df.groupBy(df.name).avg().collect()

In [77]:
df.groupBy(["name", df.age]).count().collect()

In [78]:
print df.groupBy().max("age").collect()
print df.groupBy().max("age", "height").collect()

In [79]:
print df.groupBy().mean("age").collect()
print df.groupBy().mean("age", "height").collect()

## Join

In [81]:
print df.select("age", "name").collect()
print df2.select("name", "height").collect()
df.crossJoin(df2.select("height")).select("age", "name", df2.height).collect()

In [82]:
df.drop("age").collect()

In [83]:
df.drop(df.age).collect()

In [84]:
df.join(df2, df.name == df2.name, "inner").drop(df.name).drop(df.age).collect()

In [85]:
df.join(df2, "name", "inner").drop("age", "height").collect()

In [86]:
from pyspark.sql import Row
df = sc.parallelize([
    Row(name="Alice", age=5, height=80),
    Row(name="Alice", age=5, height=80),
    Row(name="Alice", age=10, height=80)
  ]).toDF()
df.dropDuplicates().show()

In [87]:
df.dropDuplicates(["name", "height"]).show()

In [88]:
df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()

In [89]:
df.join(df2, 'name', 'outer').select('name', df.height).collect()

In [90]:
cond = [df.name == df2.name, df.age == df2.age]
df.join(df2, cond, 'outer').select(df.name, df2.age).collect()

In [91]:
df.join(df2, 'name').select(df.name, df2.height).collect()

In [92]:
df.join(df2, ['name', 'age']).select(df.name, df.age).collect()

## Filter

In [94]:
l = [("Alice", 2, 12), ("Bob", 5, 25)]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd, "name: string, age: int, height: int")

print df.filter(df.age > 3).collect()
print df.filter("age > 3").collect()
print df.where("age=2").collect()

In [95]:
df.first()

In [96]:
def print_name(person):
  print person.name
df.foreach(print_name)

In [97]:
def print_name(people):
  for person in people:
    print person.name
df.foreachPartition(print_name)

In [98]:
df.head()

In [99]:
print df.limit(1).collect()
print df.limit(0).collect()

In [100]:
# orderBy
print df.sort(df.age.desc()).collect()
print df.sort("age", ascending=False).collect()
print df.orderBy(df.age.desc()).collect()

from pyspark.sql.functions import *
print df.sort(asc("age")).collect()
print df.sort(desc("age"), "name").collect()
print df.orderBy(["age", "name"], ascending=[0, 1]).collect()

In [101]:
print df.filter(df.name.endswith("ice")).collect()
df.filter(df.name.endswith("ice$")).collect()

In [102]:
# get subfield RDD > RDD, gets a field by name in a StructField.
from pyspark.sql import Row
df1 = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
df1.select(df1.r.getField("b")).show()
df1.select(df1.r.getField("a")).show()

In [103]:
# RDD contains list and dictionary
df1 = sc.parallelize([([1, 2], {"key": "value"})]).toDF(["l", "d"])
df1.select(df1.l.getItem(0), df1.d.getItem("key")).show()
df1.select(df1.l[0], df1.d["key"]).show()

In [104]:
from pyspark.sql import Row
df1 = sc.parallelize([Row(name=u"Tom", height=80), Row(name=u"Alice", height=None)]).toDF()
print df1.filter(df1.height.isNotNull()).collect()
print df1.filter(df1.height.isNull()).collect()

In [105]:
print df[df.name.isin("Bob", "Mike")].collect()
print df[df.age.isin(1, 2, 3)].collect()

In [106]:
df.filter(df.name.like("Al%")).collect()

In [107]:
from pyspark.sql import functions as F
df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()

# Working with Sample

In [109]:
df.na.replace(["Alice", "Bob"], ["A", "B"], "name").show()

In [110]:
df.rollup("name", df.age).count().orderBy("name", "age").show()

In [111]:
# sample(withReplacement, fraction, seed=None)
df.sample(False, 0.5, 42).count()

In [112]:
# sampleBy(col, fractions, seed=None)
dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key"))
sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
sampled.groupBy("key").count().orderBy("key").show()

In [113]:
df.selectExpr("age * 2", "abs(age)").collect()

In [114]:
# show(n=20, truncate=True)
# truncate – If set to True, truncate strings longer than 20 chars by default. If set to a number greater than one, truncates long strings to length truncate and align cells right.
df.show(truncate=3)

# Working with Row

In [116]:
row = Row(name="Alice", age=11)
print row
print row["name"], row["age"]
print row.name, row.age
print "name" in row
print "wrong_key" in row

In [117]:
# Row also can be used to create another Row like class, then it could be used to create Row objects
Person = Row("name", "age")
print Person
print Person("Alice", 11)

In [118]:
# asDict(recursive=False)
print Row(name="Alice", age=11).asDict()
row = Row(key=1, value=Row(name="a", age=2))
print row.asDict()
print row.asDict(True)