In [1]:
%load_ext nb_black

<IPython.core.display.Javascript object>

In [2]:
import findspark

findspark.init()

<IPython.core.display.Javascript object>

In [63]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x000001DD8CEDEF10>


<IPython.core.display.Javascript object>

### Read DallasCouncilVotes.csv file

This file contains information regarding the voters on the Dallas City Council from the past few years. This truncated DataFrame contains the date of the vote being cast and the name and position of the voter.

In [75]:
voter_full_df = spark.read.csv(
    "datasets/DallasCouncilVotes_full.csv", header=True, inferSchema=True
)
voter_full_df.printSchema()

root
 |-- DATE: string (nullable = true)
 |-- AGENDA_ITEM_NUMBER: string (nullable = true)
 |-- ITEM_TYPE: string (nullable = true)
 |-- DISTRICT: string (nullable = true)
 |-- TITLE: string (nullable = true)
 |-- VOTER NAME: string (nullable = true)
 |-- VOTE CAST: string (nullable = true)
 |-- FINAL ACTION TAKEN: string (nullable = true)
 |-- AGENDA ITEM DESCRIPTION: string (nullable = true)
 |-- AGENDA_ID: string (nullable = true)
 |-- VOTE_ID: string (nullable = true)



<IPython.core.display.Javascript object>

### Filtering column content with Python
for "VOTER NAME" column, filter out any rows that didn't conform to something generally resembling a name.

In [44]:
# Rename "VOTER NAME" column
voter_full_df = voter_full_df.withColumnRenamed("VOTER NAME", "VOTER_NAME")

# Show the distinct VOTER_NAME entries
voter_full_df.select(voter_full_df["VOTER_NAME"]).distinct().show(10, truncate=False)

# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_full_df = voter_full_df.filter(
    "length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20"
)

# Filter out voter_df where the VOTER_NAME contains an underscore
voter_full_df = voter_full_df.filter(~F.col("VOTER_NAME").contains("_"))

# Show the distinct VOTER_NAME entries again
voter_full_df.select("VOTER_NAME").distinct().show(10, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|VOTER_NAME                                                                                                                                                                                                                                                                                                                                                                                                   |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

<IPython.core.display.Javascript object>

### Now we will work with a simpler version of the dataset

In [66]:
voter_df = spark.read.csv(
    "datasets/DallasCouncilVoters_simple.csv", header=True, inferSchema=True
)
voter_df.show()

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember|       Scott Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough|
|02/08/2017|Councilmember|       Lee Kleinman|
|02/08/2017|Councilmember|      Sandy Greyson|
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember| Rickey D. Callahan|
|01/11/2017|Councilmember|  Jennifer S. Gates|
|04/25/2018|C

<IPython.core.display.Javascript object>

#### create two new columns - first_name and last_name from "VOTER_NAME" column

In [67]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn("splits", split(voter_df.VOTER_NAME, "\s+"))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn("first_name", voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn("last_name", voter_df.splits.getItem(size("splits") - 1))

# Show the voter_df DataFrame
voter_df.show()

+----------+-------------+-------------------+--------------------+----------+---------+
|      DATE|        TITLE|         VOTER_NAME|              splits|first_name|last_name|
+----------+-------------+-------------------+--------------------+----------+---------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|    Philip| Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|   Michael| Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|     [Adam, Medrano]|      Adam|  Medrano|
|02/08/2017|Councilmember|       Casey Thomas|     [Casey, Thomas]|     Casey|   Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|[Carolyn, King, A...|   Carolyn|   Arnold|
|02/08/2017|Councilmember|       Scott Griggs|     [Scott, Griggs]|     Scott|   Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough| [B., Adam, McGough]|        B.|  McGough|
|02/08/2017|Councilme

<IPython.core.display.Javascript object>

In [68]:
# Drop splits column
voter_df = voter_df.drop("splits")
voter_df.show(5)

+----------+-------------+-------------------+----------+---------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|
+----------+-------------+-------------------+----------+---------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|
+----------+-------------+-------------------+----------+---------+
only showing top 5 rows



<IPython.core.display.Javascript object>

### Conditional DataFrame column operations
The when() clause lets you conditionally modify a Data Frame based on its content.

When() / Otherwise()
This requirement is similar to the last, but now you want to add multiple values based on the voter's position. 


In [49]:
# Add a column to voter_df for any voter with the title **Councilmember**
voter_df = voter_df.withColumn(
    "random_val", F.when(voter_df.TITLE == "Councilmember", F.rand())
)

# Show some of the DataFrame rows, noting whether the when clause worked
voter_df.select("TITLE", "random_val").show(5)

+-------------+------------------+
|        TITLE|        random_val|
+-------------+------------------+
|Councilmember| 0.512111501243107|
|Councilmember|0.9433473757078776|
|        Mayor|              null|
|Councilmember|0.8354883183823011|
|Councilmember|0.8686131594327928|
+-------------+------------------+
only showing top 5 rows



<IPython.core.display.Javascript object>

In [50]:
# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn(
    "random_val",
    when(voter_df.TITLE == "Councilmember", F.rand())
    .when(voter_df.TITLE == "Mayor", 2)
    .otherwise(0),
)

# Show some of the DataFrame rows
voter_df.select("TITLE", "random_val").show(5)

# Use the .filter() clause with random_val
voter_df.select("TITLE", "random_val").filter(voter_df.random_val == 0).show(5)

+-------------+-------------------+
|        TITLE|         random_val|
+-------------+-------------------+
|Councilmember| 0.5004460306250009|
|Councilmember|0.42735663133609403|
|        Mayor|                2.0|
|Councilmember| 0.5362454562903441|
|Councilmember| 0.8183482521450013|
+-------------+-------------------+
only showing top 5 rows

+--------------------+----------+
|               TITLE|random_val|
+--------------------+----------+
|Deputy Mayor Pro Tem|       0.0|
|       Mayor Pro Tem|       0.0|
|Deputy Mayor Pro Tem|       0.0|
|       Mayor Pro Tem|       0.0|
|Deputy Mayor Pro Tem|       0.0|
+--------------------+----------+
only showing top 5 rows



<IPython.core.display.Javascript object>

In [71]:
voter_df.printSchema()

root
 |-- DATE: string (nullable = true)
 |-- TITLE: string (nullable = true)
 |-- VOTER_NAME: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)



<IPython.core.display.Javascript object>

### Adding an ID Field

returning to the complete dataset


In [77]:
# Select all the unique council voters
voter_names = voter_full_df.select(voter_full_df["VOTER NAME"]).distinct()

# Count the rows in voter_df
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_names.count())

# Add a ROW_ID
voter_names = voter_df.withColumn("ROW_ID", monotonically_increasing_id())

# Show the rows with 10 highest IDs in the set
voter_names.orderBy(voter_names.ROW_ID.desc()).show(10)


There are 36 rows in the voter_df DataFrame.

+----------+--------------------+-------------------+----------+---------+------+
|      DATE|               TITLE|         VOTER_NAME|first_name|last_name|ROW_ID|
+----------+--------------------+-------------------+----------+---------+------+
|11/20/2018|       Councilmember|      Mark  Clayton|      Mark|  Clayton| 44624|
|11/20/2018|       Councilmember|     Tennell Atkins|   Tennell|   Atkins| 44623|
|11/20/2018|       Councilmember|       Kevin Felder|     Kevin|   Felder| 44622|
|11/20/2018|       Councilmember|       Omar Narvaez|      Omar|  Narvaez| 44621|
|11/20/2018|       Councilmember|Rickey D.  Callahan|    Rickey| Callahan| 44620|
|11/20/2018|              Vacant|               null|      null|     null| 44619|
|11/20/2018|       Mayor Pro Tem|      Casey  Thomas|     Casey|   Thomas| 44618|
|11/20/2018|Deputy Mayor Pro Tem|       Adam Medrano|      Adam|  Medrano| 44617|
|11/20/2018|               Mayor|Michael S. Rawling

<IPython.core.display.Javascript object>

### IDs with different partitions
You've just completed adding an ID field to a DataFrame. Now, take a look at what happens when you do the same thing on DataFrames containing a different number of partitions.

To check the number of partitions, use the method .rdd.getNumPartitions() on a DataFrame.

In [81]:
# Print the number of partitions in each DataFrame
# In this case there is only one partiotion because we are workink on the local machine
print(
    "\nThere are %d partitions in the voter_df DataFrame.\n"
    % voter_df.rdd.getNumPartitions()
)

# Add a ROW_ID field to each DataFrame
voter_df = voter_df.withColumn("ROW_ID", F.monotonically_increasing_id())

# Show the top 10 IDs in each DataFrame
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)



There are 1 partitions in the voter_df DataFrame.

+----------+--------------------+-------------------+----------+---------+------+
|      DATE|               TITLE|         VOTER_NAME|first_name|last_name|ROW_ID|
+----------+--------------------+-------------------+----------+---------+------+
|11/20/2018|       Councilmember|      Mark  Clayton|      Mark|  Clayton| 44624|
|11/20/2018|       Councilmember|     Tennell Atkins|   Tennell|   Atkins| 44623|
|11/20/2018|       Councilmember|       Kevin Felder|     Kevin|   Felder| 44622|
|11/20/2018|       Councilmember|       Omar Narvaez|      Omar|  Narvaez| 44621|
|11/20/2018|       Councilmember|Rickey D.  Callahan|    Rickey| Callahan| 44620|
|11/20/2018|              Vacant|               null|      null|     null| 44619|
|11/20/2018|       Mayor Pro Tem|      Casey  Thomas|     Casey|   Thomas| 44618|
|11/20/2018|Deputy Mayor Pro Tem|       Adam Medrano|      Adam|  Medrano| 44617|
|11/20/2018|               Mayor|Michael S. Ra

<IPython.core.display.Javascript object>

### More ID tricks
Once you define a Spark process, you'll likely want to use it many times. Depending on your needs, you may want to start your IDs at a certain value so there isn't overlap with previous runs of the Spark task. This behavior is similar to how IDs would behave in a relational database. You have been given the task to make sure that the IDs output from a monthly Spark task start at the highest value from the previous month.

In [None]:
# # Determine the highest ROW_ID and save it in previous_max_ID
# previous_max_ID = voter_df_march.select('ROW_ID').rdd.max()[0]

# # Add a ROW_ID column to voter_df_april starting at the desired value
# voter_df_april = voter_df_april.withColumn('ROW_ID', F.monotonically_increasing_id() + previous_max_ID)

# # Show the ROW_ID from both DataFrames and compare
# voter_df_march.select('ROW_ID').show()
# voter_df_april.select('ROW_ID').show()