# Spark DataFrame example

In [0]:
# Let’s start by creating a Spark Session:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()

Create a Spark DataFrame

Start by creating a DataFrame with first_name and age columns and four rows of data:

In [0]:
df = spark.createDataFrame(
    [
        ("sue", 32),
        ("li", 3),
        ("bob", 75),
        ("heo", 13),
    ],
    ["first_name", "age"],
)

Use the show() method to view the contents of the DataFrame:

In [0]:
df.show()

+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+



Add a column to a Spark DataFrame

Let’s add a life_stage column to the DataFrame that returns “child” if the age is 12 or under, “teenager” if the age is between 13 and 19, and “adult” if the age is 20 or older.

In [0]:
from pyspark.sql.functions import col, when

df1 = df.withColumn(
    "life_stage",
    when(col("age") < 13, "child")
    .when(col("age").between(13, 19), "teenager")
    .otherwise("adult"),
)

It’s easy to add columns to a Spark DataFrame. Let’s view the contents of df1.

In [0]:
df1.show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|        li|  3|     child|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+



Notice how the original DataFrame is unchanged:

In [0]:
df.show()

+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+



Spark operations don’t mutate the DataFrame. You must assign the result to a new variable to access the DataFrame changes for subsequent operations.

Filter a Spark DataFrame

Now, filter the DataFrame so it only includes teenagers and adults.

In [0]:
df1.where(col("life_stage").isin(["teenager", "adult"])).show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+



Group by aggregation on Spark DataFrame

Now, let’s compute the average age for everyone in the dataset:

In [0]:
from pyspark.sql.functions import avg

df1.select(avg("age")).show()

+--------+
|avg(age)|
+--------+
|   30.75|
+--------+



You can also compute the average age for each life_stage:

In [0]:
df1.groupBy("life_stage").avg().show()

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+



Spark lets you run queries on DataFrames with SQL if you don’t want to use the programmatic APIs.

Query the DataFrame with SQL

Here’s how you can compute the average age for everyone with SQL:

In [0]:
spark.sql("select avg(age) from {df1}", df1=df1).show()

+--------+
|avg(age)|
+--------+
|   30.75|
+--------+



And here’s how to compute the average age by life_stage with SQL:

In [0]:
spark.sql("select life_stage, avg(age) from {df1} group by life_stage", df1=df1).show()

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+



# Spark SQL Example

Let’s persist the DataFrame in a named Parquet table that is easily accessible via the SQL API.

In [0]:
df1.write.saveAsTable("some_people")

In [0]:
spark.sql("select * from some_people").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|        li|  3|     child|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+



Now, let’s use SQL to insert a few more rows of data into the table:

In [0]:
spark.sql("INSERT INTO some_people VALUES ('frank', 4, 'child')")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

Inspect the table contents to confirm the row was inserted:

In [0]:
spark.sql("select * from some_people").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|        li|  3|     child|
|       bob| 75|     adult|
|       heo| 13|  teenager|
|     frank|  4|     child|
+----------+---+----------+



Run a query that returns the teenagers:

In [0]:
spark.sql("select * from some_people where life_stage='teenager'").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       heo| 13|  teenager|
+----------+---+----------+



Spark makes it easy to register tables and query them with pure SQL.

# Spark RDD Example
The Spark RDD APIs are suitable for unstructured data.

The Spark DataFrame API is easier and more performant for structured data.

Suppose you have a text file called some_text.txt with the following three lines of data:

    these are words
    these are more words
    words in english

In [0]:
text_file = spark.sparkContext.textFile("some_words.txt")

counts = (
    text_file.flatMap(lambda line: line.split(" "))
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
)
counts_df.show()

In [0]:
counts.collect()

[('these', 2),
 ('are', 2),
 ('more', 1),
 ('in', 1),
 ('words', 3),
 ('english', 1)]

Spark allows for efficient execution of the query because it parallelizes this computation. Many other query engines aren’t capable of parallelizing computations.