This notebook covers basic operations on Spark DataFrames.

DataFrames are table like collections with well defined rows and columns.

They represent immutable and lazily evaluated plans that specify what operations to apply to data in order to generate some output.

When we perform an action on a DataFrame we instruct Spark to actually perform the transformations and return the result.


In [1]:
%load_ext nb_black

<IPython.core.display.Javascript object>

#### Start a SparkSession where we can try out different basic dataframe operations that can be applied to both batch and streaming processes.

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Spark Basic Operations").getOrCreate()

<IPython.core.display.Javascript object>

In [3]:
spark

<IPython.core.display.Javascript object>

#### Creating a dataframe from scratch.

In [4]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

<IPython.core.display.Javascript object>

In [5]:
schema = StructType(
    [
        StructField(name="city", dataType=StringType(), nullable=True),
        StructField(name="country", dataType=StringType(), nullable=True),
        StructField(name="counts", dataType=LongType(), nullable=False),
    ]
)

<IPython.core.display.Javascript object>

In [6]:
rows = [
    Row("Los Angeles", "United States", 3),
    Row("New York", "United States", 1),
    Row("London", "United Kingdom", 1),
]

<IPython.core.display.Javascript object>

In [7]:
parallelizeRows = spark.sparkContext.parallelize(rows)

<IPython.core.display.Javascript object>

In [8]:
df = spark.createDataFrame(parallelizeRows, schema)

<IPython.core.display.Javascript object>

In [9]:
df.show()

+-----------+--------------+------+
|       city|       country|counts|
+-----------+--------------+------+
|Los Angeles| United States|     3|
|   New York| United States|     1|
|     London|United Kingdom|     1|
+-----------+--------------+------+



<IPython.core.display.Javascript object>

#### Create a dataframe from different types of files.

In [None]:
# DataFrame from csv
df = spark.read.csv("YOUR_FILE.csv", inferSchema=True, header=True)

In [None]:
# DataFrame from json
df_json = spark.read.json("YOUR_FILE.json")

#### Creating a lazily evaluated "view" that we can use in Spark SQL. 

In [10]:
df.createOrReplaceTempView("df_table")

<IPython.core.display.Javascript object>

#### Printing the schema

The schema defines the column names and types of a dataframe. They are worth exploring for reference later on.

In [11]:
df.printSchema()

root
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- counts: long (nullable = false)



<IPython.core.display.Javascript object>

In [12]:
type(df)

pyspark.sql.dataframe.DataFrame

<IPython.core.display.Javascript object>

#### Manipulating columns

Columns in Spark are similar to columns in Pandas. We can select, transform and remove columns with the use of expressions.

We cannot manipulate a column outside of the context of a dataframe, therefore we need to use Spark transformations within a dataframe to modify a column.

In [13]:
import pyspark.sql.functions as F

<IPython.core.display.Javascript object>

Now that we have the dataframe we can use *select* and *selectExpr* for columns/expressions and expressions in strings respectively.

In [14]:
df.select("country").show(1)

+-------------+
|      country|
+-------------+
|United States|
+-------------+
only showing top 1 row



<IPython.core.display.Javascript object>

In [15]:
df.select(F.col("country")).show(1)

+-------------+
|      country|
+-------------+
|United States|
+-------------+
only showing top 1 row



<IPython.core.display.Javascript object>

In [16]:
df.select("country", "city").show(1)

+-------------+-----------+
|      country|       city|
+-------------+-----------+
|United States|Los Angeles|
+-------------+-----------+
only showing top 1 row



<IPython.core.display.Javascript object>

Change the column name in an expression.

In [17]:
df.select(F.expr("country as destination")).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



<IPython.core.display.Javascript object>

Change the column name in an expression and then change it back (just to make a point that we can have as many manipulations as we want).

In [18]:
df.select(F.expr("country as destination").alias("country")).show(5)

+--------------+
|       country|
+--------------+
| United States|
| United States|
|United Kingdom|
+--------------+



<IPython.core.display.Javascript object>

For the above we can use *select_expr* to build more complex expressions in order to create new dataframes.

In [19]:
new_df = df.selectExpr("country as new_country", "country")

<IPython.core.display.Javascript object>

In [20]:
new_df.show()

+--------------+--------------+
|   new_country|       country|
+--------------+--------------+
| United States| United States|
| United States| United States|
|United Kingdom|United Kingdom|
+--------------+--------------+



<IPython.core.display.Javascript object>

In [21]:
new_df2 = df.selectExpr("avg(counts)", "count(distinct(country))")

<IPython.core.display.Javascript object>

In [22]:
new_df2.show()

+------------------+-----------------------+
|       avg(counts)|count(DISTINCT country)|
+------------------+-----------------------+
|1.6666666666666667|                      2|
+------------------+-----------------------+



<IPython.core.display.Javascript object>

#### Passing explicit values with literals

In [24]:
df.select(F.expr("*"), F.lit(1).alias("One")).show()

+-----------+--------------+------+---+
|       city|       country|counts|One|
+-----------+--------------+------+---+
|Los Angeles| United States|     3|  1|
|   New York| United States|     1|  1|
|     London|United Kingdom|     1|  1|
+-----------+--------------+------+---+



<IPython.core.display.Javascript object>

#### Adding a column

In [25]:
df = df.withColumn("One", F.lit(1))

<IPython.core.display.Javascript object>

In [26]:
df.show()

+-----------+--------------+------+---+
|       city|       country|counts|One|
+-----------+--------------+------+---+
|Los Angeles| United States|     3|  1|
|   New York| United States|     1|  1|
|     London|United Kingdom|     1|  1|
+-----------+--------------+------+---+



<IPython.core.display.Javascript object>

#### Renaming a column

In [27]:
df = df.withColumn("one", F.expr("One"))

<IPython.core.display.Javascript object>

In [28]:
df.columns

['city', 'country', 'counts', 'one']

<IPython.core.display.Javascript object>

#### Renaming a column (another way)

In [29]:
df = df.withColumnRenamed("one", "ONE")

<IPython.core.display.Javascript object>

In [30]:
df.columns

['city', 'country', 'counts', 'ONE']

<IPython.core.display.Javascript object>

#### Removing columns.

In [31]:
df = df.drop("ONE")

<IPython.core.display.Javascript object>

In [32]:
df.columns

['city', 'country', 'counts']

<IPython.core.display.Javascript object>

#### DataFrame filtering

In [33]:
df.filter(F.col("counts") < 2).show(1)

+--------+-------------+------+
|    city|      country|counts|
+--------+-------------+------+
|New York|United States|     1|
+--------+-------------+------+
only showing top 1 row



<IPython.core.display.Javascript object>

In [34]:
df.where("counts < 2").show(2)

+--------+--------------+------+
|    city|       country|counts|
+--------+--------------+------+
|New York| United States|     1|
|  London|United Kingdom|     1|
+--------+--------------+------+



<IPython.core.display.Javascript object>

If we need to perform multiple filters, we can just chain them together rather than put them in the same expression as Spark performs all operations at the same time.

In [35]:
df.where(F.col("counts") <= 1).where(F.col("country") != "United States").show()

+------+--------------+------+
|  city|       country|counts|
+------+--------------+------+
|London|United Kingdom|     1|
+------+--------------+------+



<IPython.core.display.Javascript object>

#### Get distinct rows

In [36]:
df.select("city").distinct().count()

3

<IPython.core.display.Javascript object>

#### Get random samples

In [37]:
df.sample(withReplacement=False, fraction=1.0, seed=5).count()

3

<IPython.core.display.Javascript object>

#### Random splits

In [38]:
df2 = df.randomSplit([0.67, 0.33], seed=5)

<IPython.core.display.Javascript object>

In [39]:
print(df.count())
print(df2[0].count())
print(df2[1].count())

3
2
1


<IPython.core.display.Javascript object>

#### Concatenating and appending rows

In [40]:
rows = [
    Row("Berlin", "Germany", 2),
    Row("Bucharest", "Romania", 2),
]
parallelizeRows = spark.sparkContext.parallelize(rows)
df2 = spark.createDataFrame(rows, schema)

<IPython.core.display.Javascript object>

In [41]:
df3 = df.union(df2)

<IPython.core.display.Javascript object>

In [42]:
df3.createOrReplaceTempView("new_df")

<IPython.core.display.Javascript object>

In [43]:
df3.show()

+-----------+--------------+------+
|       city|       country|counts|
+-----------+--------------+------+
|Los Angeles| United States|     3|
|   New York| United States|     1|
|     London|United Kingdom|     1|
|     Berlin|       Germany|     2|
|  Bucharest|       Romania|     2|
+-----------+--------------+------+



<IPython.core.display.Javascript object>

#### Sorting

In [44]:
df3.sort("counts").show()

+-----------+--------------+------+
|       city|       country|counts|
+-----------+--------------+------+
|   New York| United States|     1|
|     London|United Kingdom|     1|
|     Berlin|       Germany|     2|
|  Bucharest|       Romania|     2|
|Los Angeles| United States|     3|
+-----------+--------------+------+



<IPython.core.display.Javascript object>

In [45]:
df3.orderBy("counts").show()

+-----------+--------------+------+
|       city|       country|counts|
+-----------+--------------+------+
|     London|United Kingdom|     1|
|   New York| United States|     1|
|  Bucharest|       Romania|     2|
|     Berlin|       Germany|     2|
|Los Angeles| United States|     3|
+-----------+--------------+------+



<IPython.core.display.Javascript object>

In [48]:
df3.orderBy(F.desc("counts")).show()

+-----------+--------------+------+
|       city|       country|counts|
+-----------+--------------+------+
|Los Angeles| United States|     3|
|     Berlin|       Germany|     2|
|  Bucharest|       Romania|     2|
|   New York| United States|     1|
|     London|United Kingdom|     1|
+-----------+--------------+------+



<IPython.core.display.Javascript object>

#### Limiting what we extract from a dataframe.

In [47]:
df.limit(3).show()

+-----------+--------------+------+
|       city|       country|counts|
+-----------+--------------+------+
|Los Angeles| United States|     3|
|   New York| United States|     1|
|     London|United Kingdom|     1|
+-----------+--------------+------+



<IPython.core.display.Javascript object>