In [8]:
# PYSPARK_PYTHON=python3 PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=lab pyspark --packages com.databricks:spark-avro_2.11:4.0.0
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.nash", "66") \
    .getOrCreate()

In [9]:
spark

In [10]:
# spark is an existing SparkSession
df = spark.read.json("resources/people.json")

In [11]:
df

DataFrame[age: bigint, name: string]

## DataFrame.show(), DataFrame.printSchema()

In [15]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [16]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [12]:
df.schema

StructType(List(StructField(age,LongType,true),StructField(name,StringType,true)))

In [13]:
df.dtypes

[('age', 'bigint'), ('name', 'string')]

## DataFrame.Column(jc) & DataFrame.Row()

In [9]:
people = df
ageCol = people.age

In [18]:
ageCol

Column<b'age'>

In [19]:
df.age+1

Column<b'(age + 1)'>

In [26]:
df.columns

['age', 'name']

## DataFrame.select(*cols)

In [9]:
df.select("name").show()

NameError: name 'df' is not defined

In [19]:
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [15]:
 df.select('*').collect()

[Row(age=None, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]

In [16]:
df.select('name', 'age').collect()

[Row(name='Michael', age=None),
 Row(name='Andy', age=30),
 Row(name='Justin', age=19)]

In [17]:
df.select(df.name, (df.age + 10).alias('age')).collect()

[Row(name='Michael', age=None),
 Row(name='Andy', age=40),
 Row(name='Justin', age=29)]

## DataFrame.filter(condition)

In [20]:
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [21]:
df.filter(df.age > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [22]:
df.filter('age > 21').show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



## DataFrame.groupBy(), DataFrame.count()

In [24]:
df.count()

3

In [25]:
df.groupBy('age').count()

DataFrame[age: bigint, count: bigint]

In [21]:
df.first()

Row(age=None, name='Michael')

In [22]:
df.explain()

== Physical Plan ==
*FileScan json [age#20L,name#21] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/Users/liazhang/code/handson-ml/learn_spark/resources/people.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<age:bigint,name:string>


In [23]:
df.isLocal()

False

In [30]:
df.limit(1).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
+----+-------+



In [31]:
df.rdd

MapPartitionsRDD[49] at javaToPython at NativeMethodAccessorImpl.java:0

In [24]:
df.storageLevel?

[0;31mType:[0m        property
[0;31mString form:[0m <property object at 0x10868f368>
[0;31mDocstring:[0m  
Get the :class:`DataFrame`'s current storage level.

>>> df.storageLevel
StorageLevel(False, False, False, False, 1)
>>> df.cache().storageLevel
StorageLevel(True, True, False, True, 1)
>>> df2.persist(StorageLevel.DISK_ONLY_2).storageLevel
StorageLevel(True, False, False, False, 2)

.. versionadded:: 2.1


In [37]:
df.cache().storageLevel

StorageLevel(True, True, False, True, 1)

In [40]:
df.toJSON()

MapPartitionsRDD[55] at toJavaRDD at NativeMethodAccessorImpl.java:0

In [43]:
df2 = df.toPandas()
type(df2)

pandas.core.frame.DataFrame

## pyspark.sql.functions

In [3]:
from pyspark.sql import Row
from pyspark.sql.functions import *
eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])

In [4]:
eDF.schema

StructType(List(StructField(a,LongType,true),StructField(intlist,ArrayType(LongType,true),true),StructField(mapfield,MapType(StringType,StringType,true),true)))

In [12]:
eDF.select(explode(eDF.intlist).alias("anInt")).collect()

[Row(anInt=1), Row(anInt=2), Row(anInt=3)]

In [8]:
eDF.select(explode(eDF.mapfield).alias("key", "value")).show()

+---+-----+
|key|value|
+---+-----+
|  a|    b|
+---+-----+



In [20]:
df = spark.createDataFrame([(5,),(6,)], ['n'])

In [21]:
df.schema

StructType(List(StructField(n,LongType,true)))

In [22]:
df.select(factorial(df.n).alias('f')).show()

+---+
|  f|
+---+
|120|
|720|
+---+



In [23]:
from pyspark.sql.types import *
data = [(1, '''{"a": 1}''')]
schema = StructType([StructField("a", IntegerType())])
df = spark.createDataFrame(data, ("key", "value"))

In [24]:
df.show()

+---+--------+
|key|   value|
+---+--------+
|  1|{"a": 1}|
+---+--------+



In [25]:
df.select(from_json(df.value, schema).alias("json")).show()

+----+
|json|
+----+
| [1]|
+----+



In [28]:
# df.select(from_json(df.value, "a INT").alias("json")).show()

In [27]:
data = [(1, '''[{"a": 1}]''')]
schema = ArrayType(StructType([StructField("a", IntegerType())]))
df = spark.createDataFrame(data, ("key", "value"))

In [30]:
df.show()

+---+----------+
|key|     value|
+---+----------+
|  1|[{"a": 1}]|
+---+----------+



In [32]:
df.select(from_json(df.value, schema).alias('json')).show()

+-----+
| json|
+-----+
|[[1]]|
+-----+



## UDF

In [33]:
strlen = spark.udf.register("stringLengthString", lambda x: len(x))

In [34]:
spark.sql("SELECT stringLengthString('test')").show()

+------------------------+
|stringLengthString(test)|
+------------------------+
|                       4|
+------------------------+



In [39]:
spark.sql("SELECT 'foo' AS text").select(strlen("text"))

TypeError: 'NoneType' object is not callable

In [41]:
from pyspark.sql.types import IntegerType
_ = spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
spark.sql("SELECT stringLengthInt('test')").show()

+---------------------+
|stringLengthInt(test)|
+---------------------+
|                    4|
+---------------------+



In [42]:
from pyspark.sql.types import IntegerType
slen = udf(lambda s: len(s), IntegerType())

In [52]:
range_a = spark.udf.register("range_a", lambda x: list(range(x)), ArrayType(IntegerType()))

In [54]:
spark.sql("SELECT range_a(4)").show()

+------------+
|  range_a(4)|
+------------+
|[0, 1, 2, 3]|
+------------+



In [4]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import IntegerType, StringType
slen = pandas_udf(lambda s: s.str.len(), IntegerType())

## SQL

In [14]:
df = spark.read.json("resources/people.json")
df.createOrReplaceTempView('people')

In [16]:
sqlDF = spark.sql("SELECT * FROM people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [17]:
df.createGlobalTempView('people')

In [18]:
spark.sql('select * from global_temp.people').show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [19]:
spark.newSession().sql('select * from global_temp.people').show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



## Interoperating with RDDs

In [23]:
# Inferring the Schema Using Reflection
from pyspark.sql import Row

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

In [24]:
lines.collect()

['Michael, 29', 'Andy, 30', 'Justin, 19']

In [25]:
parts.collect()

[['Michael', ' 29'], ['Andy', ' 30'], ['Justin', ' 19']]

In [26]:
people.collect()

[Row(age=29, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]

In [27]:
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

In [28]:
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

In [29]:
schemaPeople.schema

StructType(List(StructField(age,LongType,true),StructField(name,StringType,true)))

In [31]:
from pyspark.sql.types import *
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

In [32]:
schemaPeople.schema

StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))

In [35]:
# from pyspark.sql.types import IntegerType
# spark.udf.registerJavaFunction("javaStringLength", "test.org.apache.spark.sql.JavaStringLength", IntegerType())

## Data Sources

In [4]:
df = spark.read.load("resources/users.parquet")

In [6]:
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [8]:
df.schema

StructType(List(StructField(name,StringType,true),StructField(favorite_color,StringType,true),StructField(favorite_numbers,ArrayType(IntegerType,true),true)))

In [7]:
df.select("name", "favorite_color").write.mode('overwrite').save("resources/namesAndFavColors.parquet")

In [6]:
df.write.mode('overwrite').orc('resources/users.orc')

In [7]:
df.rdd.saveAsSequenceFile('resources/users.sequence')

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, localhost, executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:188)
	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:187)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1358)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1331)
	at org.apache.spark.api.python.SerDeUtil$.pythonToPairRDD(SerDeUtil.scala:269)
	at org.apache.spark.api.python.PythonRDD$.saveAsHadoopFile(PythonRDD.scala:497)
	at org.apache.spark.api.python.PythonRDD$.saveAsSequenceFile(PythonRDD.scala:472)
	at org.apache.spark.api.python.PythonRDD.saveAsSequenceFile(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:188)
	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:187)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [3]:
df2 = spark.read.format("com.databricks.spark.avro").load('resources/users.avro')

In [4]:
df2.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [5]:
df2.schema

StructType(List(StructField(name,StringType,true),StructField(favorite_color,StringType,true),StructField(favorite_numbers,ArrayType(IntegerType,true),true)))

In [11]:
df_json = spark.read.load("resources/people.json", format="json")

In [12]:
df_json.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [25]:
df_csv = spark.read.load("resources/people.csv",
                     format="csv", sep=";", inferSchema="true", header="true")

In [26]:
df_csv.show()

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+



In [27]:
df_csv.schema

StructType(List(StructField(name,StringType,true),StructField(age,IntegerType,true),StructField(job,StringType,true)))

In [28]:
# Run SQL on files directly
df = spark.sql("SELECT * FROM parquet.`resources/users.parquet`")

In [29]:
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

