<a href="https://www.kaggle.com/code/irenashen1/my-goto-pyspark-basic-functions?scriptVersionId=115937935" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

Pyspark is my primary programming language at work-  I spend so much time googling it everyday. And as I've become more fluent in Pyspark (with the help of a wall filled with snippets and notes in front of my desk) here I want to summarize down and give examples of the functions that I find the most useful. I hope you can use this notebook your "cheatsheet", if you are learning Pyspark like me! 

## **<span style="color:#007BA7">My Top 9 basic Pyspark functions for data manipulation: </span>**

* [1. Check the null value rate of each column](#check_null)`.columns` and `for loop`
* [2. Select or remove columns](#select_remove)
`.select()` and `.drop()`
* [3. Filter with condition(s)](#filter)
`.filter()`
* [4. Create a new column, or rename a column](#create)
`.withColumn()` and `.withColumnRenamed()`
* [5. Fill in a custom value](#fill)
`.lit()` and `concat()`
* [6. Check unique values counts in a column](#check)
`.select().distinct().count()`
* [7. Group by a column and tranform it into a new dataframe](#group)
`.groupBy()` and `.agg()`
* [8. Sort](#change)
`.orderBy()`
* [9. Make a sample dataframe](#make)
`.limit()`

In this notebook, I will use the **US legislator datasets** as demo.

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l- \ done
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / done
[?25h  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845513 sha256=0ddfe59d9766ed6a9e1ae5216da96a0ac743cbd8802fc63a4cc4333e6bae35e0
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.3.1
[0m

In [2]:
import os
import pandas as pd

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder.master("local[2]").appName("Legislators_terms").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/09 20:57:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
sc = spark.sparkContext

In [5]:
#Read the csv files:  
legis_path = '../input/uslegislatorsdataset/legislators.csv'
terms_path = '../input/uslegislatorsdataset/legislators_terms.csv'

In [6]:
terms_sdf = spark.read.csv(terms_path,inferSchema=True,header=True)
terms_sdf.printSchema()

id_window = Window.partitionBy('id_bioguide').orderBy(F.col('term_start').desc())

#make a dataframe where id are distinct per row, with each id's latest "term_start"
id_profile_sdf = terms_sdf \
.withColumn('term_order', F.row_number().over(id_window)) \
.filter('term_order == 1') \
.drop('term_order', 'term_number')

id_profile_sdf.count()

                                                                                

root
 |-- id_bioguide: string (nullable = true)
 |-- term_number: integer (nullable = true)
 |-- term_id: string (nullable = true)
 |-- term_type: string (nullable = true)
 |-- term_start: timestamp (nullable = true)
 |-- term_end: timestamp (nullable = true)
 |-- state: string (nullable = true)
 |-- district: integer (nullable = true)
 |-- class: integer (nullable = true)
 |-- party: string (nullable = true)
 |-- how: string (nullable = true)
 |-- url: string (nullable = true)
 |-- address: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- fax: string (nullable = true)
 |-- contact_form: string (nullable = true)
 |-- office: string (nullable = true)
 |-- state_rank: string (nullable = true)
 |-- rss_url: string (nullable = true)
 |-- caucus: string (nullable = true)



                                                                                

12518

In [7]:
legis_sdf = spark.read.csv(legis_path,inferSchema=True,header=True)

#To check if the id_bioguide are unique 
row_count = legis_sdf.count()
id_count = legis_sdf.select('id_bioguide').distinct().count()

print("total row count = " + str(row_count))
print("total legislator id count = " + str(id_count))
#Yes, they are unique per row.

total row count = 12518
total legislator id count = 12518


## 1. Check the null value rate of each column:  <a class="anchor" id="check_null"></a>

It's a great first step to learn how the raw data look like, see all column names, and check every column's **data completeness**. I use this snippet in almost every notebook! I have this stuck on my wall because I always come back to this over and over.  Although it requires a bit advanced technique (for example, for loop), it's definitely worth learning and memorizing. 

Here we use **legis_sdf** to demo. You can copy and paste this snippet in your notebook, and change the variable in line 1,2, and 7. 

In [8]:
cols = legis_sdf.columns 
total_count = legis_sdf.count()
print('total_count = ' + str(total_count))

for i in range(len(cols)):
    col = cols[i]
    ct = legis_sdf.select(col).dropna().count()
    null_ct= total_count - ct
    p_null = 100*null_ct/total_count
    print(col + " null_count = " + str(null_ct) + " (" + str(p_null) + "%)")

total_count = 12518
full_name null_count = 0 (0.0%)
first_name null_count = 0 (0.0%)
last_name null_count = 0 (0.0%)
middle_name null_count = 3959 (31.6264579006231%)
nickname null_count = 12262 (97.9549448793737%)
suffix null_count = 12078 (96.48506151142355%)
other_names_end null_count = 12513 (99.96005751717527%)
other_names_middle null_count = 12518 (100.0%)
other_names_last null_count = 12513 (99.96005751717527%)
birthday null_count = 552 (4.409650103850455%)
gender null_count = 0 (0.0%)
id_bioguide null_count = 0 (0.0%)
id_bioguide_previous_0 null_count = 12514 (99.96804601374022%)
id_govtrack null_count = 1 (0.007988496564946478%)
id_icpsr null_count = 220 (1.757469244288225%)
id_wikipedia null_count = 2 (0.015976993129892956%)
id_wikidata null_count = 2 (0.015976993129892956%)
id_google_entity_id null_count = 69 (0.5512062629813069%)
id_house_history null_count = 1492 (11.918836874900144%)
id_house_history_alternate null_count = 12516 (99.9840230068701%)
id_thomas null_count = 

In [9]:
cols = terms_sdf.columns 
total_count = terms_sdf.count()
print('total_count = ' + str(total_count))

for i in range(len(cols)):
    col = cols[i]
    ct = terms_sdf.select(col).dropna().count()
    null_ct= total_count - ct
    p_null = 100*null_ct/total_count 
    print(col + " null_count = " + str(null_ct) + " (" + str(p_null) + "%)")

total_count = 44063
id_bioguide null_count = 0 (0.0%)
term_number null_count = 0 (0.0%)
term_id null_count = 0 (0.0%)
term_type null_count = 0 (0.0%)
term_start null_count = 0 (0.0%)
term_end null_count = 0 (0.0%)
state null_count = 0 (0.0%)
district null_count = 3873 (8.789687492907882%)
class null_count = 40190 (91.21031250709211%)
party null_count = 452 (1.025803962508227%)
how null_count = 43862 (99.54383496357488%)
url null_count = 39677 (90.04607039920114%)
address null_count = 41564 (94.32857499489367%)
phone null_count = 41569 (94.33992238385947%)
fax null_count = 42251 (95.88770623879445%)
contact_form null_count = 42633 (96.7546467557815%)
office null_count = 41571 (94.3444613394458%)
state_rank null_count = 43846 (99.50752331888432%)
rss_url null_count = 42574 (96.62074756598507%)
caucus null_count = 44049 (99.96822731089576%)


## 2. Select or remove columns: <a class="anchor" id="select_remove"></a>
This is to select or remove specific columns. We can select (`.select()`) or remove (`.drop()`) **one or more** columns in one line. When I have a wide dataframe to tranform, I'd only select the relevant columns.

In [10]:
joined_sdf = legis_sdf \
    .join(id_profile_sdf.select('id_bioguide', 'party', 'state'), on='id_bioguide', how='left_outer') \
    .select('id_bioguide', 'id_wikipedia', 'full_name', 'last_name', 'first_name', 'birthday', 'gender', 'party', 'state')

joined_sdf.show()

+-----------+------------------+--------------------+----------+----------+-------------------+------+-----------+-----+
|id_bioguide|      id_wikipedia|           full_name| last_name|first_name|           birthday|gender|      party|state|
+-----------+------------------+--------------------+----------+----------+-------------------+------+-----------+-----+
|    B000944|     Sherrod Brown|       Sherrod Brown|     Brown|   Sherrod|1952-11-09 00:00:00|     M|   Democrat|   OH|
|    C000127|    Maria Cantwell|      Maria Cantwell|  Cantwell|     Maria|1958-10-13 00:00:00|     F|   Democrat|   WA|
|    C000141|        Ben Cardin|  Benjamin L. Cardin|    Cardin|  Benjamin|1943-10-05 00:00:00|     M|   Democrat|   MD|
|    C000174|        Tom Carper|    Thomas R. Carper|    Carper|    Thomas|1947-01-23 00:00:00|     M|   Democrat|   DE|
|    C001070|     Bob Casey Jr.|Robert P. Casey, Jr.|     Casey|    Robert|1960-04-13 00:00:00|     M|   Democrat|   PA|
|    F000062|  Dianne Feinstein|

After viewing these columns, I decide to remove **id_wikipedia** and **full_name**, so I will `.drop` them.

In [11]:
sample_2_sdf = joined_sdf \
    .drop('id_wikipedia', 'full_name')

sample_2_sdf.show()

+-----------+----------+----------+-------------------+------+-----------+-----+
|id_bioguide| last_name|first_name|           birthday|gender|      party|state|
+-----------+----------+----------+-------------------+------+-----------+-----+
|    B000944|     Brown|   Sherrod|1952-11-09 00:00:00|     M|   Democrat|   OH|
|    C000127|  Cantwell|     Maria|1958-10-13 00:00:00|     F|   Democrat|   WA|
|    C000141|    Cardin|  Benjamin|1943-10-05 00:00:00|     M|   Democrat|   MD|
|    C000174|    Carper|    Thomas|1947-01-23 00:00:00|     M|   Democrat|   DE|
|    C001070|     Casey|    Robert|1960-04-13 00:00:00|     M|   Democrat|   PA|
|    F000062| Feinstein|    Dianne|1933-06-22 00:00:00|     F|   Democrat|   CA|
|    F000469|   Fulcher|      Russ|1973-07-19 00:00:00|     M| Republican|   ID|
|    K000367| Klobuchar|       Amy|1960-05-25 00:00:00|     F|   Democrat|   MN|
|    M000639|  Menendez|    Robert|1954-01-01 00:00:00|     M|   Democrat|   NJ|
|    S000033|   Sanders|   B

## 3. Filter a specific column with condition(s): <a class="anchor" id="filter"></a>

Next, I want to apply a filter, say, I want to get "only the female legislators who were born before 1950". Pay attention to where I put the parentheis around the conditions and quotations. Memorizing these little details will save you so much time when coding! 

I also used a `print()` function at the end of the command below, which is very useful too, especially when presenting a notebook to other people. A little footnote will make your code so much easier to follow.

In [12]:
sample_3_sdf = sample_2_sdf \
.withColumn('birthday', F.to_date(F.col('birthday'))) \
.filter((F.col('gender')=='F') & (F.col('birthday')<= '1949-12-31')) 

sample_3_sdf.show()
id_count = sample_3_sdf.select('id_bioguide').distinct().count()
print("There are " + str(id_count) + ' female legislators who were born before 1950.')

+-----------+-------------+----------+----------+------+----------+-----+
|id_bioguide|    last_name|first_name|  birthday|gender|     party|state|
+-----------+-------------+----------+----------+------+----------+-----+
|    F000062|    Feinstein|    Dianne|1933-06-22|     F|  Democrat|   CA|
|    S001181|      Shaheen|    Jeanne|1947-01-28|     F|  Democrat|   NH|
|    D000598|        Davis|     Susan|1944-04-13|     F|  Democrat|   CA|
|    D000216|      DeLauro|      Rosa|1943-03-02|     F|  Democrat|   CT|
|    H001042|       Hirono|     Mazie|1947-11-03|     F|  Democrat|   HI|
|    E000215|        Eshoo|      Anna|1942-12-13|     F|  Democrat|   CA|
|    F000450|         Foxx|  Virginia|1943-06-29|     F|Republican|   NC|
|    G000377|      Granger|       Kay|1943-01-18|     F|Republican|   TX|
|    J000126|      Johnson|     Eddie|1935-12-03|     F|  Democrat|   TX|
|    K000009|       Kaptur|     Marcy|1946-06-17|     F|  Democrat|   OH|
|    L000551|          Lee|   Barbara|

## 4. Create a new column, or rename a column: <a class="anchor" id="create"></a>

I already used `.withColumn` once in the last section. However, I didn't really "create" a new column; I just change the datatype of **birthday** from timestamp to yyyy-mm-dd date format. Here I want to use date to show how to create new column **birth_year** and **birth_month**, and change the column name from **birthday** to **DOB**.

(sidenote: handling date- this will be another notebook!)

In [13]:
sample_4_sdf = sample_3_sdf \
    .withColumn('birth_year', F.year('birthday')) \
    .withColumn('birth_month', F.month('birthday')) \
    .withColumnRenamed('birthday', "DOB")

sample_4_sdf.show()

+-----------+-------------+----------+----------+------+----------+-----+----------+-----------+
|id_bioguide|    last_name|first_name|       DOB|gender|     party|state|birth_year|birth_month|
+-----------+-------------+----------+----------+------+----------+-----+----------+-----------+
|    F000062|    Feinstein|    Dianne|1933-06-22|     F|  Democrat|   CA|      1933|          6|
|    S001181|      Shaheen|    Jeanne|1947-01-28|     F|  Democrat|   NH|      1947|          1|
|    D000598|        Davis|     Susan|1944-04-13|     F|  Democrat|   CA|      1944|          4|
|    D000216|      DeLauro|      Rosa|1943-03-02|     F|  Democrat|   CT|      1943|          3|
|    H001042|       Hirono|     Mazie|1947-11-03|     F|  Democrat|   HI|      1947|         11|
|    E000215|        Eshoo|      Anna|1942-12-13|     F|  Democrat|   CA|      1942|         12|
|    F000450|         Foxx|  Virginia|1943-06-29|     F|Republican|   NC|      1943|          6|
|    G000377|      Granger|   

## 5. Fill in a custom value, and combine it with other columns into a new column: <a class="anchor" id="fill"></a>

When you want to add a custome value somewhere in the dataframe, `.lit()` and `.concat()` can be handy. I use this function quite often when dealing with dates. Let's make birth year and month into a format of 'yyyy-mm'. This is an useful technique for cohort analysis (e.g, if you want to group legislators by the **year_month** they were born.)

In [14]:
sample_5_sdf = sample_4_sdf \
.withColumn('birth_year_month', F.concat('birth_year', F.lit('-'), 'birth_month'))

sample_5_sdf.show()

+-----------+-------------+----------+----------+------+----------+-----+----------+-----------+----------------+
|id_bioguide|    last_name|first_name|       DOB|gender|     party|state|birth_year|birth_month|birth_year_month|
+-----------+-------------+----------+----------+------+----------+-----+----------+-----------+----------------+
|    F000062|    Feinstein|    Dianne|1933-06-22|     F|  Democrat|   CA|      1933|          6|          1933-6|
|    S001181|      Shaheen|    Jeanne|1947-01-28|     F|  Democrat|   NH|      1947|          1|          1947-1|
|    D000598|        Davis|     Susan|1944-04-13|     F|  Democrat|   CA|      1944|          4|          1944-4|
|    D000216|      DeLauro|      Rosa|1943-03-02|     F|  Democrat|   CT|      1943|          3|          1943-3|
|    H001042|       Hirono|     Mazie|1947-11-03|     F|  Democrat|   HI|      1947|         11|         1947-11|
|    E000215|        Eshoo|      Anna|1942-12-13|     F|  Democrat|   CA|      1942|    

## 6. Check unique values count in a column: <a class="anchor" id="check"></a>

I use this function in almost every notebook I work on! Checking if a column is **unique per row** is the key to understand the [granularity](https://www.talon.one/glossary/granularity) of the dataset. I've used `.select(col_name).distinct().count()` in the first part of this notebook to check how many legislators in total, and, if it is one legislator per row. 

It is also useful to verify the [cardinality](https://wisdomschema.com/granularity-and-cardinality/#:~:text=One%20key%20functionality%20of%20cardinality,with%20respect%20to%20an%20entity.) of a certain column. For example, we would <i>assume</i> there are only 2 elements in the **gender** column: F or M. But is it really? Using this function can validate that:

In [15]:
joined_sdf \
    .select('gender').distinct().show()

+----------+
|    gender|
+----------+
|         F|
|         M|
|1954-10-02|
+----------+



Wait, there's a date in **gender**? Let's find out how the whole row look like with this date: 
(Pay attention to where the parenthese and single quotes are. Also I user *!=* to indicate "does not equal to" )

In [16]:
joined_sdf\
    .filter((F.col('gender')!= 'F') & (F.col('gender')!='M')).show()

+-----------+------------+--------------------+---------+----------+--------+----------+-----+-----+
|id_bioguide|id_wikipedia|           full_name|last_name|first_name|birthday|    gender|party|state|
+-----------+------------+--------------------+---------+----------+--------+----------+-----+-----+
|          M|       20712|"Henry C. ""Hank"...|    Henry|      Jr."|    null|1954-10-02| null| null|
+-----------+------------+--------------------+---------+----------+--------+----------+-----+-----+



Seems like it will take more advanced techniques to clean this up.

## 7. Group by a column and tranform it into a new dataframe: <a class="group" id="filter"></a>
Aggregation is an essential concept for data transformation. Here I would like use **gender** as example, to see the numbers of female and male legislators by their birth year:

In [17]:
gender_by_birth_year_sdf = joined_sdf \
    .withColumn('birthday', F.to_date('birthday')) \
    .withColumn('birth_year', F.year('birthday')) \
        .groupby('birth_year') \
        .agg(
             F.count(F.when(F.col('gender')=="F", True)).alias('female_legislator_count'),
             F.count(F.when(F.col('gender')=="M", True)).alias('male_legislator_count')
    )

gender_by_birth_year_sdf.show()

#just to count the total numbers throughout the history, simply remove .groupby: 

gender_count_sdf = joined_sdf \
        .agg(
             F.count(F.when(F.col('gender')=="F", True)).alias('female_legislator_count'),
             F.count(F.when(F.col('gender')=="M", True)).alias('male_legislator_count')
    )

gender_count_sdf.show()

+----------+-----------------------+---------------------+
|birth_year|female_legislator_count|male_legislator_count|
+----------+-----------------------+---------------------+
|      1959|                      8|                   31|
|      1829|                      0|                   74|
|      1721|                      0|                    1|
|      1896|                      1|                   60|
|      1903|                      2|                   55|
|      1975|                      4|                    5|
|      1884|                      1|                   61|
|      1977|                      0|                    6|
|      1766|                      0|                   29|
|      1787|                      0|                   46|
|      1888|                      2|                   48|
|      1808|                      0|                   80|
|      1743|                      0|                   11|
|      1863|                      0|                   8

Female legislators are hugely outnumbered by male!

## 8. Sort row orders: <a class="anchor" id="change"></a>
To sort,`.orderBy(F.col().desc())` or `.orderBy(F.col().asc())` is what we need. Without assigning **desc** or **asc**, it can be written `.orderBy(col_name)`, by default it will be sorted ascending.

In [18]:
gender_by_birth_year_sdf\
.orderBy(F.col('birth_year').desc()) \
.show()

+----------+-----------------------+---------------------+
|birth_year|female_legislator_count|male_legislator_count|
+----------+-----------------------+---------------------+
|      1989|                      1|                    0|
|      1988|                      1|                    0|
|      1987|                      1|                    0|
|      1986|                      1|                    2|
|      1984|                      2|                    6|
|      1983|                      1|                    4|
|      1982|                      0|                    5|
|      1981|                      2|                    3|
|      1980|                      1|                    9|
|      1979|                      1|                    6|
|      1978|                      2|                    7|
|      1977|                      0|                    6|
|      1976|                      7|                   11|
|      1975|                      4|                    

## 9. Make a sample dataframe: <a class="anchor" id="make"></a>
When dealing with a bigger or complicated dataset (ie, over 5M rows), it's a good practice to run codes on a smaller datset first to save you time and computing resource. Our dataset in this notebook is small, but we can size it down by using `.limit()`(First N rows) or `sample(fraction=n)`(random n%)

In [19]:
limit_sdf = terms_sdf.limit(1000).count()
print(limit_sdf)
random_sdf = terms_sdf.sample(fraction=0.20).count()
print(random_sdf)

1000
8690


### <span style="color:#007BA7"> I hope you find this notebook about basic Pyspark functions helpful--  I plan to work on another notebook about advanced functions. Stay tuned! </span>