In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, DateType, FloatType

In [2]:
DATAPATH = r'/home/daniel/Desktop/programming/pythondatascience/datascience/dataengineering/datasets/Dallas_City_Council_Voting_Record.csv'
# Dataframe with inferSchema
spark = SparkSession.builder.appName('data_preprocessing').getOrCreate()
voter_df = spark.read.csv(DATAPATH, inferSchema = True, header = True)

22/05/14 17:20:02 WARN Utils: Your hostname, daniel-X555LJ resolves to a loopback address: 127.0.1.1; using 192.168.0.182 instead (on interface wlp3s0f0)
22/05/14 17:20:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/14 17:20:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
voter_df = voter_df.withColumnRenamed("VOTER NAME", "VOTER_NAME")
voter_df = voter_df.withColumnRenamed("VOTE CAST", "VOTE_CAST")
voter_df = voter_df.withColumnRenamed("FINAL ACTION TAKEN", "FINAL_ACTION_TAKEN")
voter_df = voter_df.withColumnRenamed("AGENDA ITEM DESCRIPTION", "AGENDA_ITEM_DESCRIPTION")

In [4]:
voter_df.dtypes

[('DATE', 'string'),
 ('AGENDA_ITEM_NUMBER', 'string'),
 ('ITEM_TYPE', 'string'),
 ('DISTRICT', 'string'),
 ('TITLE', 'string'),
 ('VOTER_NAME', 'string'),
 ('VOTE_CAST', 'string'),
 ('FINAL_ACTION_TAKEN', 'string'),
 ('AGENDA_ITEM_DESCRIPTION', 'string'),
 ('AGENDA_ID', 'string'),
 ('VOTE_ID', 'string')]

In [5]:
print(f"{len(voter_df.columns)} Columns is present in the dataset.")

11 Columns is present in the dataset.


In [6]:
df_shape = f"{voter_df.count()},{len(voter_df.columns)}"
print(df_shape)

115005,11


# Subset Columns and View a Glimpse of the Data

In [7]:
select_cols = ['DATE', 'TITLE', 'VOTER_NAME', 'VOTE_CAST']

# Subsetting the required column from the dataframe
subset = voter_df.select(*select_cols)
subset.show()

+----------+--------------------+-------------------+---------+
|      DATE|               TITLE|         VOTER_NAME|VOTE_CAST|
+----------+--------------------+-------------------+---------+
|09/16/2020|       Councilmember|          Chad West|      YES|
|09/16/2020|Deputy Mayor Pro Tem|    B. Adam McGough|      YES|
|06/24/2020|       Councilmember|Carolyn King Arnold|      YES|
|06/24/2020|       Councilmember|     Jaime Resendez|      YES|
|06/24/2020|       Councilmember|       Omar Narvaez|      YES|
|06/24/2020|       Councilmember|      Adam Bazaldua|      YES|
|06/24/2020|       Councilmember|     Tennell Atkins|      YES|
|06/24/2020|       Councilmember|     Paula Blackmon|      YES|
|06/24/2020|       Councilmember|          Chad West|      YES|
|06/24/2020|Deputy Mayor Pro Tem|    B. Adam McGough|      YES|
|06/24/2020|       Councilmember|    Lee M. Kleinman|      YES|
|01/11/2017|       Councilmember|  Jennifer S. Gates|      YES|
|04/25/2018|       Councilmember|     Sa

In [8]:
voter_df.show(vertical=True, truncate=False, n=4)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 DATE                    | 09/16/2020                                                                                                                                                                                                                                                                                                                                                                                     
 AGENDA_ITEM_NUMBER      | 1                                                                                                                                                      

# Check for Missing Value

In [9]:
# Search for null values in the TITLE column
voter_df.filter(voter_df.TITLE.isNull()).count()

                                                                                

1380

In [10]:
# Search for empty entries in the TITLE column
voter_df.filter(voter_df.TITLE == '').count()

0

In [11]:
# Search for NaN (not a number) values in the TITLE column
voter_df.filter(F.isnan(voter_df.TITLE)).count()

                                                                                

0

In [12]:
# Check for missing values in the Title column
voter_df.filter((voter_df.TITLE.isNull()) | (voter_df.TITLE == '') | (F.isnan(voter_df.TITLE))).count()



1380

### Calculate all the missing values in the DataFrame

In [13]:
voter_df.select([F.count(F.when((F.col(c)=='') | (F.col(c).isNull()) | (F.isnan(c)), c)).alias(c) for c in voter_df.columns]).show(vertical=True)



-RECORD 0-----------------------
 DATE                    | 0    
 AGENDA_ITEM_NUMBER      | 75   
 ITEM_TYPE               | 300  
 DISTRICT                | 1170 
 TITLE                   | 1380 
 VOTER_NAME              | 1885 
 VOTE_CAST               | 1500 
 FINAL_ACTION_TAKEN      | 1560 
 AGENDA_ITEM_DESCRIPTION | 1590 
 AGENDA_ID               | 2850 
 VOTE_ID                 | 2850 



                                                                                

We have missing values in every column except DATE

In [14]:
# Check for missing values in the title column
voter_df.filter(voter_df.TITLE.isNull()).show(vertical=True, n = 3)

-RECORD 0---------------------------------------
 DATE                    | [ATALIA ANNA GARC... 
 AGENDA_ITEM_NUMBER      | 091620_AG_2          
 ITEM_TYPE               | 091620_AG_2_1        
 DISTRICT                | null                 
 TITLE                   | null                 
 VOTER_NAME              | null                 
 VOTE_CAST               | null                 
 FINAL_ACTION_TAKEN      | null                 
 AGENDA_ITEM_DESCRIPTION | null                 
 AGENDA_ID               | null                 
 VOTE_ID                 | null                 
-RECORD 1---------------------------------------
 DATE                    | [ATALIA ANNA GARC... 
 AGENDA_ITEM_NUMBER      | 091620_AG_2          
 ITEM_TYPE               | 091620_AG_2_10       
 DISTRICT                | null                 
 TITLE                   | null                 
 VOTER_NAME              | null                 
 VOTE_CAST               | null                 
 FINAL_ACTION_TAKEN 

In [15]:
# No missing values
voter_df.filter(~ voter_df.TITLE.isNull()).show(vertical=True, n = 3)

-RECORD 0---------------------------------------
 DATE                    | 09/16/2020           
 AGENDA_ITEM_NUMBER      | 1                    
 ITEM_TYPE               | AGENDA               
 DISTRICT                | 1                    
 TITLE                   | Councilmember        
 VOTER_NAME              | Chad West            
 VOTE_CAST               | YES                  
 FINAL_ACTION_TAKEN      | APPROVED             
 AGENDA_ITEM_DESCRIPTION | Approval of Minut... 
 AGENDA_ID               | 091620_AG_1          
 VOTE_ID                 | 091620_AG_1_1        
-RECORD 1---------------------------------------
 DATE                    | 09/16/2020           
 AGENDA_ITEM_NUMBER      | 1                    
 ITEM_TYPE               | AGENDA               
 DISTRICT                | 10                   
 TITLE                   | Deputy Mayor Pro Tem 
 VOTER_NAME              | B. Adam McGough      
 VOTE_CAST               | YES                  
 FINAL_ACTION_TAKEN 

In [16]:
# Let’s verify if there are any repetitive VOTER_NAME in the dataset we have
voter_df.groupBy(voter_df.VOTER_NAME).count().show()



+-------------------+-----+
|         VOTER_NAME|count|
+-------------------+-----+
|     Tennell Atkins| 6405|
|       Scott Griggs| 1157|
|      Scott  Griggs| 2638|
|      Sandy Greyson| 1157|
|Michael S. Rawlings| 3795|
|     Paula Blackmon| 3764|
|       Kevin Felder| 2641|
|    Cara Mendelsohn| 3764|
|       Adam Medrano| 3795|
|    Casey Thomas II| 3764|
|      Jaynie Shultz| 1082|
|               null| 1885|
|      David Blewett| 2682|
|   Casey Thomas, II| 2690|
|      Mark  Clayton| 2638|
|  Casey  Thomas, II| 1105|
|     Sandy  Greyson| 2638|
| Gay Donnell Willis| 1082|
|      Adam  Medrano| 2682|
|       Adam McGough| 1082|
+-------------------+-----+
only showing top 20 rows



                                                                                

In [17]:
voter_df.groupby(voter_df.VOTER_NAME).count().sort(F.desc("count")).show()

+-------------------+-----+
|         VOTER_NAME|count|
+-------------------+-----+
|     Tennell Atkins| 6405|
|       Omar Narvaez| 6405|
|Carolyn King Arnold| 5612|
| Jennifer S.  Gates| 5320|
|    Lee M. Kleinman| 5320|
|Michael S. Rawlings| 3795|
|       Adam Medrano| 3795|
|   B. Adam  McGough| 3795|
|          Chad West| 3764|
|      Adam Bazaldua| 3764|
|     Jaime Resendez| 3764|
|    Cara Mendelsohn| 3764|
|       Eric Johnson| 3764|
|     Paula Blackmon| 3764|
|    Casey Thomas II| 3764|
|   Casey Thomas, II| 2690|
|    B. Adam McGough| 2682|
|      David Blewett| 2682|
|      Adam  Medrano| 2682|
|       Kevin Felder| 2641|
+-------------------+-----+
only showing top 20 rows



In [18]:
# Subsetting and creating a temporary DataFrame to eliminate any missing values
df_temp = voter_df.filter((voter_df.TITLE != '') & (voter_df.TITLE.isNotNull()) & (~F.isnan(voter_df.TITLE)))
df_temp.show(vertical=True)

-RECORD 0---------------------------------------
 DATE                    | 09/16/2020           
 AGENDA_ITEM_NUMBER      | 1                    
 ITEM_TYPE               | AGENDA               
 DISTRICT                | 1                    
 TITLE                   | Councilmember        
 VOTER_NAME              | Chad West            
 VOTE_CAST               | YES                  
 FINAL_ACTION_TAKEN      | APPROVED             
 AGENDA_ITEM_DESCRIPTION | Approval of Minut... 
 AGENDA_ID               | 091620_AG_1          
 VOTE_ID                 | 091620_AG_1_1        
-RECORD 1---------------------------------------
 DATE                    | 09/16/2020           
 AGENDA_ITEM_NUMBER      | 1                    
 ITEM_TYPE               | AGENDA               
 DISTRICT                | 10                   
 TITLE                   | Deputy Mayor Pro Tem 
 VOTER_NAME              | B. Adam McGough      
 VOTE_CAST               | YES                  
 FINAL_ACTION_TAKEN 

In [19]:
# Subsetting the DataFrame to Voter names that are repeated more than 3000 times
df_temp.groupby(df_temp['VOTER_NAME']).count().filter("count >= 3000").sort(F.col("count").desc()).show()

[Stage 31:>                                                         (0 + 4) / 4]

+-------------------+-----+
|         VOTER_NAME|count|
+-------------------+-----+
|       Omar Narvaez| 6405|
|     Tennell Atkins| 6405|
|Carolyn King Arnold| 5612|
|    Lee M. Kleinman| 5320|
| Jennifer S.  Gates| 5320|
|   B. Adam  McGough| 3795|
|Michael S. Rawlings| 3795|
|       Adam Medrano| 3795|
|     Paula Blackmon| 3764|
|    Cara Mendelsohn| 3764|
|          Chad West| 3764|
|    Casey Thomas II| 3764|
|      Adam Bazaldua| 3764|
|     Jaime Resendez| 3764|
|       Eric Johnson| 3764|
+-------------------+-----+



                                                                                

In [20]:
# The following command is to delete any temporary DataFrames that we
# created in the process
del df_temp

In [21]:
# del voter_df

In [22]:
# Create a spark session
spark = SparkSession.builder.appName('data_preprocessing').getOrCreate()
vote = spark.read.csv(DATAPATH, header = True)

In [23]:
vote.printSchema()

root
 |-- DATE: string (nullable = true)
 |-- AGENDA_ITEM_NUMBER: string (nullable = true)
 |-- ITEM_TYPE: string (nullable = true)
 |-- DISTRICT: string (nullable = true)
 |-- TITLE: string (nullable = true)
 |-- VOTER NAME: string (nullable = true)
 |-- VOTE CAST: string (nullable = true)
 |-- FINAL ACTION TAKEN: string (nullable = true)
 |-- AGENDA ITEM DESCRIPTION: string (nullable = true)
 |-- AGENDA_ID: string (nullable = true)
 |-- VOTE_ID: string (nullable = true)



In [24]:
vote.dtypes

[('DATE', 'string'),
 ('AGENDA_ITEM_NUMBER', 'string'),
 ('ITEM_TYPE', 'string'),
 ('DISTRICT', 'string'),
 ('TITLE', 'string'),
 ('VOTER NAME', 'string'),
 ('VOTE CAST', 'string'),
 ('FINAL ACTION TAKEN', 'string'),
 ('AGENDA ITEM DESCRIPTION', 'string'),
 ('AGENDA_ID', 'string'),
 ('VOTE_ID', 'string')]

In [25]:
# Change the VOTE ID datatype to Integer
vote.withColumn("Vote_ID", vote['VOTE_ID'].cast('int')).dtypes

[('DATE', 'string'),
 ('AGENDA_ITEM_NUMBER', 'string'),
 ('ITEM_TYPE', 'string'),
 ('DISTRICT', 'string'),
 ('TITLE', 'string'),
 ('VOTER NAME', 'string'),
 ('VOTE CAST', 'string'),
 ('FINAL ACTION TAKEN', 'string'),
 ('AGENDA ITEM DESCRIPTION', 'string'),
 ('AGENDA_ID', 'string'),
 ('Vote_ID', 'int')]

In [26]:
date_vars = ['DATE']
int_vars = ['AGENDA_ID', 'VOTE_ID']
float_vars = ['AGENDA_ITEM_NUMBER', 'DISTRICT']

# Converting variables to the right datatype
for column in date_vars:
    vote = vote.withColumn(column, vote[column].cast(DateType()))
for column in int_vars:
    vote = vote.withColumn(column, vote[column].cast(IntegerType()))
for column in float_vars:
    vote = vote.withColumn(column, vote[column].cast(FloatType()))

In [27]:
vote.dtypes

[('DATE', 'date'),
 ('AGENDA_ITEM_NUMBER', 'float'),
 ('ITEM_TYPE', 'string'),
 ('DISTRICT', 'float'),
 ('TITLE', 'string'),
 ('VOTER NAME', 'string'),
 ('VOTE CAST', 'string'),
 ('FINAL ACTION TAKEN', 'string'),
 ('AGENDA ITEM DESCRIPTION', 'string'),
 ('AGENDA_ID', 'int'),
 ('VOTE_ID', 'int')]

In [28]:
vote.describe().show(5, vertical=True)



-RECORD 0---------------------------------------
 summary                 | count                
 AGENDA_ITEM_NUMBER      | 103215               
 ITEM_TYPE               | 114705               
 DISTRICT                | 113415               
 TITLE                   | 113625               
 VOTER NAME              | 113120               
 VOTE CAST               | 113505               
 FINAL ACTION TAKEN      | 113445               
 AGENDA ITEM DESCRIPTION | 113415               
 AGENDA_ID               | 15                   
 VOTE_ID                 | 45                   
-RECORD 1---------------------------------------
 summary                 | mean                 
 AGENDA_ITEM_NUMBER      | 33.66309402703096    
 ITEM_TYPE               | 87.0                 
 DISTRICT                | 8.228012167702685    
 TITLE                   | null                 
 VOTER NAME              | null                 
 VOTE CAST               | null                 
 FINAL ACTION TAKEN 

                                                                                

In [29]:
del vote

In [30]:
voter_df.count()

115005

In [31]:
voter_df.select('AGENDA_ID', 'VOTE_ID', 'AGENDA_ITEM_NUMBER', 'DISTRICT').describe().show(5)



+-------+--------------------+--------------------+--------------------+--------------------+
|summary|           AGENDA_ID|             VOTE_ID|  AGENDA_ITEM_NUMBER|            DISTRICT|
+-------+--------------------+--------------------+--------------------+--------------------+
|  count|              112155|              112155|              114930|              113835|
|   mean|                 0.0|  234.73333333333332|   33.66309402703096|   8.228012167702685|
| stddev|                 0.0|   96.34739464326711|  34.918232003975675|  14.817555893667526|
|    min| 2016 through Nov...|  to  re-establis...|   subject to con...|  2025 -  Estimat...|
|    max|999 from the Texa...|999 from the Texa...|                  Z9|              999.00|
+-------+--------------------+--------------------+--------------------+--------------------+



                                                                                

In [32]:
voter_df.head(2)

[Row(DATE='09/16/2020', AGENDA_ITEM_NUMBER='1', ITEM_TYPE='AGENDA', DISTRICT='1', TITLE='Councilmember', VOTER_NAME='Chad West', VOTE_CAST='YES', FINAL_ACTION_TAKEN='APPROVED', AGENDA_ITEM_DESCRIPTION='Approval of Minutes of the September 2, 2020 City Council Meeting', AGENDA_ID='091620_AG_1', VOTE_ID='091620_AG_1_1'),
 Row(DATE='09/16/2020', AGENDA_ITEM_NUMBER='1', ITEM_TYPE='AGENDA', DISTRICT='10', TITLE='Deputy Mayor Pro Tem', VOTER_NAME='B. Adam McGough', VOTE_CAST='YES', FINAL_ACTION_TAKEN='APPROVED', AGENDA_ITEM_DESCRIPTION='Approval of Minutes of the September 2, 2020 City Council Meeting', AGENDA_ID='091620_AG_1', VOTE_ID='091620_AG_1_10')]

In [33]:
# Correlation between variables
voter_df.select(F.corr('AGENDA_ITEM_NUMBER', 'DISTRICT')).show()

+----------------------------------+
|corr(AGENDA_ITEM_NUMBER, DISTRICT)|
+----------------------------------+
|               0.18464586874451538|
+----------------------------------+



The Agenda_Item_Number is poorly correlated with the District variable

# Data Manipulation/Cleaning

Task:
- Show the distinct VOTER_NAME entries.
- Filter voter_df where the VOTER_NAME is 1-20 characters in length.
- Filter out voter_df where the VOTER_NAME contains an _.
- Show the distinct VOTER_NAME entries again.

**Show the distinct VOTER_NAME entries.**

In [34]:
voter_df.select("VOTER_NAME").distinct().show(truncate=False, n=40)

+-------------------+
|VOTER_NAME         |
+-------------------+
|Tennell Atkins     |
|Scott Griggs       |
|Scott  Griggs      |
|Sandy Greyson      |
|Michael S. Rawlings|
|Paula Blackmon     |
|Kevin Felder       |
|Cara Mendelsohn    |
|Adam Medrano       |
|Casey Thomas II    |
|Jaynie Shultz      |
|null               |
|David Blewett      |
|Casey Thomas, II   |
|Mark  Clayton      |
|Casey  Thomas, II  |
|Sandy  Greyson     |
|Gay Donnell Willis |
|Adam  Medrano      |
|Adam McGough       |
|Mark Clayton       |
|Jennifer S.  Gates |
|Chad West          |
|Tiffinni A. Young  |
|B. Adam  McGough   |
|Paul Ridley        |
|Omar Narvaez       |
|Philip T. Kingston |
|Rickey D. Callahan |
|Dwaine R. Caraway  |
|Philip T.  Kingston|
|Jennifer S. Gates  |
|Lee M. Kleinman    |
|B. Adam McGough    |
|Jesse Moreno       |
|Monica R. Alonzo   |
|Adam Bazaldua      |
|Jaime Resendez     |
|Rickey D.  Callahan|
|Eric Johnson       |
+-------------------+
only showing top 40 rows



**Filter voter_df where the VOTER_NAME is 1-20 characters in length.**

In [35]:
voter_df = voter_df.filter("length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20")
voter_df.show(5, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 DATE                    | 09/16/2020                                                                                                                                                                                                                                                                                                                                                                                     
 AGENDA_ITEM_NUMBER      | 1                                                                                                                                                      

**Filter out voter_df where the VOTER_NAME contains an _(underscore)**

In [36]:
voter_df = voter_df.filter(~ F.col("VOTER_NAME").contains("_"))

**Show the distinct VOTER_NAME entries again.**

In [37]:
voter_df.select("DATE", "VOTER_NAME").distinct().show(40)

+----------+-------------------+
|      DATE|         VOTER_NAME|
+----------+-------------------+
|08/29/2018|   B. Adam  McGough|
|09/06/2017| Jennifer S.  Gates|
|02/08/2017|   Casey Thomas, II|
|04/12/2017| Philip T. Kingston|
|04/25/2018|       Adam Medrano|
|06/13/2018|   Casey Thomas, II|
|05/23/2018|       Adam Medrano|
|11/06/2018|Philip T.  Kingston|
|06/06/2018|     Tennell Atkins|
|03/21/2018|     Sandy  Greyson|
|10/26/2016|Carolyn King Arnold|
|06/28/2017|    Lee M. Kleinman|
|04/26/2017| Philip T. Kingston|
|04/26/2017|   B. Adam  McGough|
|06/06/2018|    Lee M. Kleinman|
|09/05/2018|Philip T.  Kingston|
|10/17/2018|    Lee M. Kleinman|
|02/27/2019|       Omar Narvaez|
|04/12/2017| Rickey D. Callahan|
|10/11/2016|   B. Adam  McGough|
|08/09/2017|  Dwaine R. Caraway|
|09/06/2017|Rickey D.  Callahan|
|02/22/2017|      Sandy Greyson|
|09/07/2016|   Monica R. Alonzo|
|09/12/2018|Michael S. Rawlings|
|09/12/2018|       Omar Narvaez|
|12/12/2018|      Scott  Griggs|
|08/29/201

Task:
- Add a new column called splits holding the list of possible names.
- Use the getItem() method and create a new column called first_name.
- Get the last entry of the splits list and create a column called last_name.
- Drop the splits column and show the new voter_df.

**Add a new column called splits holding the list of possible names**

In [38]:
voter_df = voter_df.withColumn("Splits", F.split("VOTER_NAME", "\s+"))

In [39]:
voter_df.select("DATE", "VOTER_NAME", "Splits").show(40, truncate = False)

+----------+-------------------+-----------------------+
|DATE      |VOTER_NAME         |Splits                 |
+----------+-------------------+-----------------------+
|09/16/2020|Chad West          |[Chad, West]           |
|09/16/2020|B. Adam McGough    |[B., Adam, McGough]    |
|06/24/2020|Carolyn King Arnold|[Carolyn, King, Arnold]|
|06/24/2020|Jaime Resendez     |[Jaime, Resendez]      |
|06/24/2020|Omar Narvaez       |[Omar, Narvaez]        |
|06/24/2020|Adam Bazaldua      |[Adam, Bazaldua]       |
|06/24/2020|Tennell Atkins     |[Tennell, Atkins]      |
|06/24/2020|Paula Blackmon     |[Paula, Blackmon]      |
|06/24/2020|Chad West          |[Chad, West]           |
|06/24/2020|B. Adam McGough    |[B., Adam, McGough]    |
|06/24/2020|Lee M. Kleinman    |[Lee, M., Kleinman]    |
|01/11/2017|Jennifer S. Gates  |[Jennifer, S., Gates]  |
|04/25/2018|Sandy  Greyson     |[Sandy, Greyson]       |
|04/25/2018|Jennifer S.  Gates |[Jennifer, S., Gates]  |
|04/25/2018|Philip T.  Kingston

**Use the getItem() method and create a new column called first_name based on the first item in splits**

In [40]:
voter_df = voter_df.withColumn("First_name", voter_df.Splits[0])
voter_df.select("DATE", "VOTER_NAME", "Splits", "First_name").show(30, truncate=False)

+----------+-------------------+-----------------------+----------+
|DATE      |VOTER_NAME         |Splits                 |First_name|
+----------+-------------------+-----------------------+----------+
|09/16/2020|Chad West          |[Chad, West]           |Chad      |
|09/16/2020|B. Adam McGough    |[B., Adam, McGough]    |B.        |
|06/24/2020|Carolyn King Arnold|[Carolyn, King, Arnold]|Carolyn   |
|06/24/2020|Jaime Resendez     |[Jaime, Resendez]      |Jaime     |
|06/24/2020|Omar Narvaez       |[Omar, Narvaez]        |Omar      |
|06/24/2020|Adam Bazaldua      |[Adam, Bazaldua]       |Adam      |
|06/24/2020|Tennell Atkins     |[Tennell, Atkins]      |Tennell   |
|06/24/2020|Paula Blackmon     |[Paula, Blackmon]      |Paula     |
|06/24/2020|Chad West          |[Chad, West]           |Chad      |
|06/24/2020|B. Adam McGough    |[B., Adam, McGough]    |B.        |
|06/24/2020|Lee M. Kleinman    |[Lee, M., Kleinman]    |Lee       |
|01/11/2017|Jennifer S. Gates  |[Jennifer, S., G

**Get the last entry of the splits list and create a column called last_name.**

In [41]:
# We want to get the last name from the list
voter_df = voter_df.withColumn("Last_name", voter_df.Splits[F.size("Splits")-1])

In [42]:
voter_df.select("DATE", "VOTER_NAME", "Splits", "First_name", "Last_name").show(20, truncate=False)

+----------+-------------------+-----------------------+----------+----------+
|DATE      |VOTER_NAME         |Splits                 |First_name|Last_name |
+----------+-------------------+-----------------------+----------+----------+
|09/16/2020|Chad West          |[Chad, West]           |Chad      |West      |
|09/16/2020|B. Adam McGough    |[B., Adam, McGough]    |B.        |McGough   |
|06/24/2020|Carolyn King Arnold|[Carolyn, King, Arnold]|Carolyn   |Arnold    |
|06/24/2020|Jaime Resendez     |[Jaime, Resendez]      |Jaime     |Resendez  |
|06/24/2020|Omar Narvaez       |[Omar, Narvaez]        |Omar      |Narvaez   |
|06/24/2020|Adam Bazaldua      |[Adam, Bazaldua]       |Adam      |Bazaldua  |
|06/24/2020|Tennell Atkins     |[Tennell, Atkins]      |Tennell   |Atkins    |
|06/24/2020|Paula Blackmon     |[Paula, Blackmon]      |Paula     |Blackmon  |
|06/24/2020|Chad West          |[Chad, West]           |Chad      |West      |
|06/24/2020|B. Adam McGough    |[B., Adam, McGough] 

**Drop the splits column and show the new voter_df**

In [43]:
voter_df.show(10, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 DATE                    | 09/16/2020                                                                                                                                                                                                                                                                                                                                                                                     
 AGENDA_ITEM_NUMBER      | 1                                                                                                                                                      

Task:
- Add a column to voter_df named random_val with the results of the F.rand() method for any voter with the title Councilmember.
- Show some of the DataFrame rows, noting whether the .when() clause worked.

**Add a column to voter_df named random_val with the results of the F.rand() method for any voter with the title Councilmember**

In [44]:
# Assign random numbers to persons with the title "Councilmember"
voter_df = voter_df.withColumn("random_val", F.when(voter_df.TITLE == "Councilmember", F.rand()))

**Show some of the DataFrame rows, noting whether the .when() clause worked**

In [45]:
voter_df.show(5, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 DATE                    | 09/16/2020                                                                                                                                                                                                                                                                                                                                                                                     
 AGENDA_ITEM_NUMBER      | 1                                                                                                                                                      

Task:
- Add a column to voter_df named random_val with the results of the F.rand() method for any voter with the title `Councilmember`. Set random_val to 2 for the `Mayor`. Set any other title to the value 0.
- Show some of the Data Frame rows, noting whether the clauses worked.
- Use the .filter clause to find 0 in random_val.

**Add a column to voter_df named random_val with the results of the F.rand() method for any voter with the title `Councilmember`. Set random_val to 2 for the `Mayor`. Set any other title to the value 0**

In [46]:
# Same as if, elif, else
voter_df = voter_df.withColumn("random_val",(F.when(voter_df.TITLE == "Councilmember", F.rand())
                                             .when(voter_df.TITLE == "Mayor", 2)
                                             .otherwise(0)))

**Show some of the Data Frame rows, noting whether the clauses worked**

In [47]:
voter_df.show(30, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 DATE                    | 09/16/2020                                                                                                                                                                                                                                                                                                                                                                                     
 AGENDA_ITEM_NUMBER      | 1                                                                                                                                                      

**Use the .filter clause to find 0 in random_val**

In [48]:
voter_df.filter(voter_df.random_val == 0).select("DATE", "TITLE", "random_val","VOTER_NAME").show(30, truncate=False)

+----------+--------------------+----------+-----------------+
|DATE      |TITLE               |random_val|VOTER_NAME       |
+----------+--------------------+----------+-----------------+
|09/16/2020|Deputy Mayor Pro Tem|0.0       |B. Adam McGough  |
|06/24/2020|Deputy Mayor Pro Tem|0.0       |B. Adam McGough  |
|04/25/2018|Deputy Mayor Pro Tem|0.0       |Adam Medrano     |
|04/25/2018|Mayor Pro Tem       |0.0       |Dwaine R. Caraway|
|06/20/2018|Deputy Mayor Pro Tem|0.0       |Adam Medrano     |
|06/20/2018|Mayor Pro Tem       |0.0       |Dwaine R. Caraway|
|09/16/2020|Mayor Pro Tem       |0.0       |Adam  Medrano    |
|06/20/2018|Deputy Mayor Pro Tem|0.0       |Adam Medrano     |
|06/20/2018|Mayor Pro Tem       |0.0       |Dwaine R. Caraway|
|08/15/2018|Deputy Mayor Pro Tem|0.0       |Adam Medrano     |
|08/15/2018|Deputy Mayor Pro Tem|0.0       |Adam Medrano     |
|09/18/2018|Deputy Mayor Pro Tem|0.0       |Adam Medrano     |
|09/18/2018|Mayor Pro Tem       |0.0       |Casey  Thom

Task:
- Define the function getFirstAndMiddle() as a user-defined function. It should return a string type. Function to return a space separated string of names, except the last entry in the names list.
- Create a new column on voter_df called first_and_middle_name using your UDF.
- Show the Data Frame.

**Define the function as a user-defined function. It should return a string type**

In [49]:
def getFirstandMiddle(names):
    return " ".join(names[:-1])

In [50]:
udfname = F.udf(getFirstandMiddle, StringType())

**Create a new column on voter_df called first_and_middle_name using your UDF**

In [51]:
voter_df = voter_df.withColumn("first_and_middle_name", udfname(voter_df.Splits))

In [52]:
# # Drop the splits column and show the new voter_df
voter_df = voter_df.drop("Splits")

**Show the Data Frame**

In [53]:
voter_df.show(5, truncate=False, vertical=True)

[Stage 62:>                                                         (0 + 1) / 1]                                                                                

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 DATE                    | 09/16/2020                                                                                                                                                                                                                                                                                                                                                                                     
 AGENDA_ITEM_NUMBER      | 1                                                                                                                                                      

Task:
- Select the unique entries from the column VOTER NAME and create a new DataFrame called voter_df.
- Count the rows in the voter_df DataFrame.
- Add a ROW_ID column using the appropriate Spark function.
- Show the rows with the 10 highest ROW_IDs.

**Select the unique entries from the column VOTER NAME and create a new DataFrame called voter_df**

In [54]:
voter_df.select("VOTER_NAME").distinct().show()

+-------------------+
|         VOTER_NAME|
+-------------------+
|     Tennell Atkins|
|       Scott Griggs|
|      Scott  Griggs|
|      Sandy Greyson|
|Michael S. Rawlings|
|     Paula Blackmon|
|       Kevin Felder|
|    Cara Mendelsohn|
|       Adam Medrano|
|    Casey Thomas II|
|      Jaynie Shultz|
|      David Blewett|
|   Casey Thomas, II|
|      Mark  Clayton|
|  Casey  Thomas, II|
|     Sandy  Greyson|
| Gay Donnell Willis|
|      Adam  Medrano|
|       Adam McGough|
|       Mark Clayton|
+-------------------+
only showing top 20 rows



**Count the rows in the voter_df DataFrame**

In [55]:
print(f"rows: {voter_df.count()}")

rows: 113000


**Add a ROW_ID column using the appropriate Spark function**

In [56]:
voter_df = voter_df.withColumn("ROW_ID", F.monotonically_increasing_id())

In [57]:
# Ascending Order
voter_df.orderBy(voter_df.ROW_ID).select("ROW_ID", "VOTER_NAME").show(5)

+------+-------------------+
|ROW_ID|         VOTER_NAME|
+------+-------------------+
|     0|          Chad West|
|     1|    B. Adam McGough|
|     2|Carolyn King Arnold|
|     3|     Jaime Resendez|
|     4|       Omar Narvaez|
+------+-------------------+
only showing top 5 rows



**Show the rows with the `10` highest ROW_IDs**

In [58]:
# Descending Order
voter_df.orderBy(voter_df.ROW_ID.desc()).select("ROW_ID", "VOTER_NAME").show(10)

+-----------+-------------------+
|     ROW_ID|         VOTER_NAME|
+-----------+-------------------+
|25769825960|     Paula Blackmon|
|25769825959|     Tennell Atkins|
|25769825958|      Adam Bazaldua|
|25769825957|       Omar Narvaez|
|25769825956|     Jaime Resendez|
|25769825955|Carolyn King Arnold|
|25769825954|    Casey Thomas II|
|25769825953|       Jesse Moreno|
|25769825952|          Chad West|
|25769825951|     Paula Blackmon|
+-----------+-------------------+
only showing top 10 rows



You have been given the task to make sure that the IDs output from a monthly Spark task start at the highest value from the previous month.

Task:
- Determine the highest ROW_ID in voter_df_march and save it in the variable previous_max_ID. The statement .rdd.max()[0] will get the maximum ID.
- Add a ROW_ID column to voter_df_april starting at the value of previous_max_ID.
- Show the ROW_ID's from both Data Frames and compare.

In [59]:
# voter_df_march = voter_df.orderBy(voter_df.ROW_ID).select("ROW_ID", "VOTER_NAME")
# voter_df_april = voter_df.select("VOTER_NAME").collect()[0:36]