---
title: 'PySpark Basics | Hw 1 Post'
subtitle: 'Within this post the basics of PySpark will be discussed'
author: 'Daniel Noone'
date: 2025-02-14
categories: ['PySpark', 'Homework']
image: "pyspark.png"

toc: true
---

# What is Apache Spark?

- A distributed processing system which is mainly used for large sizes of data sets
- Provides multiple libraries:
  - Parallel processing
  - Machine learning
  - etc.
- **Spark Structure on Cluster of Computers**
  - `Driver` communicates with `cluster manager` to get worker nodes
  - `Cluster manager` allocates the necessary resources (nodes) and assigns pieces of the total process
  - `Worker nodes` perform given tasks (pieces of a larger process) and return the result to the `Driver node`

# Import proper library components to start a `PySpark` session

In [2]:
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.master("local[*]").getOrCreate() # line needs to be run prior to loading new data
df_pd = pd.read_csv('https://bcdanl.github.io/data/nba.csv') # pandas read_csv() method used to load csv
df = spark.createDataFrame(df_pd) # .createDataFrame() used to load pandas df into current spark session

# General PySpark Methods

In [3]:
df.show(5) # Shows top 20 rows of df by default

# Accepts n, truncate and vertical parameters which are optional
# n specifies number of rows to show
# truncate True False or and int, specifies whether or not long strings will be truncated, default is 20 chars
# vertical, if True displays each row vertically (useful for wide tables)

+--------------+------------------+--------+--------+-------+
|          Name|              Team|Position|Birthday| Salary|
+--------------+------------------+--------+--------+-------+
|  Shake Milton|Philadelphia 76ers|      SG| 9/26/96|1445697|
|Christian Wood|   Detroit Pistons|      PF| 9/27/95|1645357|
| PJ Washington| Charlotte Hornets|      PF| 8/23/98|3831840|
|  Derrick Rose|   Detroit Pistons|      PG| 10/4/88|7317074|
| Marial Shayok|Philadelphia 76ers|       G| 7/26/95|  79568|
+--------------+------------------+--------+--------+-------+
only showing top 5 rows



In [4]:
df.printSchema() # prints schema of df in tree forman, nullable = true means var can contain null vals

df.columns # prints columns of df

root
 |-- Name: string (nullable = true)
 |-- Team: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Birthday: string (nullable = true)
 |-- Salary: long (nullable = true)



['Name', 'Team', 'Position', 'Birthday', 'Salary']

In [5]:
df.dtypes # prints list of tuples with column name and data type [('varname', 'dtype'), etc...]

[('Name', 'string'),
 ('Team', 'string'),
 ('Position', 'string'),
 ('Birthday', 'string'),
 ('Salary', 'bigint')]

In [6]:
df.describe().show() # .describe computes summary stats, .show displays them

+-------+--------------+------------------+--------+--------+-----------------+
|summary|          Name|              Team|Position|Birthday|           Salary|
+-------+--------------+------------------+--------+--------+-----------------+
|  count|           450|               450|     450|     450|              450|
|   mean|          NULL|              NULL|    NULL|    NULL|7653583.764444444|
| stddev|          NULL|              NULL|    NULL|    NULL|9288810.298497688|
|    min|  Aaron Gordon|     Atlanta Hawks|       C| 1/10/98|            79568|
|    max|Zylan Cheatham|Washington Wizards|      SG|  9/9/93|         40231758|
+-------+--------------+------------------+--------+--------+-----------------+



In [7]:
df.select('Name').show(5)

df.select('Name', 'Team', 'Salary').show(5)
# .select is used to select vars from df, .show displays them

+--------------+
|          Name|
+--------------+
|  Shake Milton|
|Christian Wood|
| PJ Washington|
|  Derrick Rose|
| Marial Shayok|
+--------------+
only showing top 5 rows

+--------------+------------------+-------+
|          Name|              Team| Salary|
+--------------+------------------+-------+
|  Shake Milton|Philadelphia 76ers|1445697|
|Christian Wood|   Detroit Pistons|1645357|
| PJ Washington| Charlotte Hornets|3831840|
|  Derrick Rose|   Detroit Pistons|7317074|
| Marial Shayok|Philadelphia 76ers|  79568|
+--------------+------------------+-------+
only showing top 5 rows



# Data Transformation with PySpark

## Counting operations

In [8]:
from pyspark.sql.functions import countDistinct

num_teams = df.select(countDistinct('Team')).collect()[0][0]
num_teams # Simply returns number of distinct teams

30

In [9]:
df.groupBy("Team").count().show(5)
# groups df by team, counts each occurance of team (so num players) and displays first 5

+--------------------+-----+
|                Team|count|
+--------------------+-----+
|        Phoenix Suns|   15|
|      Boston Celtics|   16|
|    Dallas Mavericks|   13|
|New Orleans Pelicans|   16|
|       Brooklyn Nets|   17|
+--------------------+-----+
only showing top 5 rows



## Ordering operations

In [10]:
df.orderBy('Name').show(5) # Sorts asc (by default) by single col

+-----------------+--------------------+--------+--------+--------+
|             Name|                Team|Position|Birthday|  Salary|
+-----------------+--------------------+--------+--------+--------+
|     Aaron Gordon|       Orlando Magic|      PF| 9/16/95|19863636|
|    Aaron Holiday|      Indiana Pacers|      PG| 9/30/96| 2239200|
|      Abdel Nader|Oklahoma City Thu...|      SF| 9/25/93| 1618520|
|      Adam Mokoka|       Chicago Bulls|       G| 7/18/98|   79568|
|Admiral Schofield|  Washington Wizards|      SF| 3/30/97| 1000000|
+-----------------+--------------------+--------+--------+--------+
only showing top 5 rows



In [11]:
from pyspark.sql.functions import desc
df.orderBy(desc('Salary')).show(5) # sort by salary desc

+-----------------+--------------------+--------+--------+--------+
|             Name|                Team|Position|Birthday|  Salary|
+-----------------+--------------------+--------+--------+--------+
|    Stephen Curry|Golden State Warr...|      PG| 3/14/88|40231758|
|Russell Westbrook|     Houston Rockets|      PG|11/12/88|38506482|
|       Chris Paul|Oklahoma City Thu...|      PG|  5/6/85|38506482|
|        John Wall|  Washington Wizards|      PG|  9/6/90|38199000|
|     James Harden|     Houston Rockets|      PG| 8/26/89|38199000|
+-----------------+--------------------+--------+--------+--------+
only showing top 5 rows



In [12]:
df.orderBy(['Team', desc('Salary')]).show(5)
# sort by multiple cols

+----------------+-------------+--------+--------+--------+
|            Name|         Team|Position|Birthday|  Salary|
+----------------+-------------+--------+--------+--------+
|Chandler Parsons|Atlanta Hawks|      SF|10/25/88|25102512|
|     Evan Turner|Atlanta Hawks|      PG|10/27/88|18606556|
|    Allen Crabbe|Atlanta Hawks|      SG|  4/9/92|18500000|
| De'Andre Hunter|Atlanta Hawks|      SF| 12/2/97| 7068360|
|   Jabari Parker|Atlanta Hawks|      PF| 3/15/95| 6500000|
+----------------+-------------+--------+--------+--------+
only showing top 5 rows



## Adding columns

In [13]:
# Adding columns with withcolumn()
from pyspark.sql.functions import col
df_add = df.withColumn('salary_k', col('Salary')/ 1000).show(5)

+--------------+------------------+--------+--------+-------+--------+
|          Name|              Team|Position|Birthday| Salary|salary_k|
+--------------+------------------+--------+--------+-------+--------+
|  Shake Milton|Philadelphia 76ers|      SG| 9/26/96|1445697|1445.697|
|Christian Wood|   Detroit Pistons|      PF| 9/27/95|1645357|1645.357|
| PJ Washington| Charlotte Hornets|      PF| 8/23/98|3831840| 3831.84|
|  Derrick Rose|   Detroit Pistons|      PG| 10/4/88|7317074|7317.074|
| Marial Shayok|Philadelphia 76ers|       G| 7/26/95|  79568|  79.568|
+--------------+------------------+--------+--------+-------+--------+
only showing top 5 rows



## Removing columns

In [14]:
# Removing cols with drop()
df_drop = df.drop('Salary').show(5)

+--------------+------------------+--------+--------+
|          Name|              Team|Position|Birthday|
+--------------+------------------+--------+--------+
|  Shake Milton|Philadelphia 76ers|      SG| 9/26/96|
|Christian Wood|   Detroit Pistons|      PF| 9/27/95|
| PJ Washington| Charlotte Hornets|      PF| 8/23/98|
|  Derrick Rose|   Detroit Pistons|      PG| 10/4/88|
| Marial Shayok|Philadelphia 76ers|       G| 7/26/95|
+--------------+------------------+--------+--------+
only showing top 5 rows



## Renaming columns

In [15]:
# Renaming columns with withColumnsRenamed()
df_ren = df.withColumnRenamed('Team', 'Team Name')
df_ren.show(5)

+--------------+------------------+--------+--------+-------+
|          Name|         Team Name|Position|Birthday| Salary|
+--------------+------------------+--------+--------+-------+
|  Shake Milton|Philadelphia 76ers|      SG| 9/26/96|1445697|
|Christian Wood|   Detroit Pistons|      PF| 9/27/95|1645357|
| PJ Washington| Charlotte Hornets|      PF| 8/23/98|3831840|
|  Derrick Rose|   Detroit Pistons|      PG| 10/4/88|7317074|
| Marial Shayok|Philadelphia 76ers|       G| 7/26/95|  79568|
+--------------+------------------+--------+--------+-------+
only showing top 5 rows



## Aggregations and Summary Statistics

In [16]:
# Aggregations and Summary Stats

df.selectExpr(
    'mean(Salary) as mean_sal',
    'min(Salary) as min_sal',
    'max(Salary) as max_sal',
    'stddev_pop(Salary) as std_salary'
).show()

+-----------------+-------+--------+-----------------+
|         mean_sal|min_sal| max_sal|       std_salary|
+-----------------+-------+--------+-----------------+
|7653583.764444444|  79568|40231758|9278483.657952718|
+-----------------+-------+--------+-----------------+



## Converting Data Types

In [17]:
# Using cast() to change dtype of Salary column to integer
df_int = df.withColumn("Salary_int", col("Salary").cast("int"))
df_int.dtypes

[('Name', 'string'),
 ('Team', 'string'),
 ('Position', 'string'),
 ('Birthday', 'string'),
 ('Salary', 'bigint'),
 ('Salary_int', 'int')]

In [18]:
# Using to_date() to cast "Birthday" col to date dtype
from pyspark.sql.functions import to_date
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY") # Done if using years before 2000, as this df is
df_date = df.withColumn("DOB", to_date("Birthday", "M/d/yy"))
df_date.dtypes

[('Name', 'string'),
 ('Team', 'string'),
 ('Position', 'string'),
 ('Birthday', 'string'),
 ('Salary', 'bigint'),
 ('DOB', 'date')]

## Filtering

In [19]:
# Filtering by a condition

df_cond = (
    df
    .filter(col("Salary") > 30000000)
    .orderBy(desc("Salary"))
    .show(5)
)

+-----------------+--------------------+--------+--------+--------+
|             Name|                Team|Position|Birthday|  Salary|
+-----------------+--------------------+--------+--------+--------+
|    Stephen Curry|Golden State Warr...|      PG| 3/14/88|40231758|
|       Chris Paul|Oklahoma City Thu...|      PG|  5/6/85|38506482|
|Russell Westbrook|     Houston Rockets|      PG|11/12/88|38506482|
|        John Wall|  Washington Wizards|      PG|  9/6/90|38199000|
|     James Harden|     Houston Rockets|      PG| 8/26/89|38199000|
+-----------------+--------------------+--------+--------+--------+
only showing top 5 rows



In [20]:
# Filtering using isin()

df_isin = (
    df
    .filter(col('Team').isin('Houston Rockets', 'Washington Wizards', 'Miami Heat'))
    .show(5)
)

+---------------+------------------+--------+--------+--------+
|           Name|              Team|Position|Birthday|  Salary|
+---------------+------------------+--------+--------+--------+
|  Kendrick Nunn|        Miami Heat|      SG|  8/3/95| 1416852|
|  Rui Hachimura|Washington Wizards|      PF|  2/8/98| 4469160|
|Michael Frazier|   Houston Rockets|       G|  3/8/94|   79568|
|   Bradley Beal|Washington Wizards|      SG| 6/28/93|27093018|
|  Thomas Bryant|Washington Wizards|       C| 7/31/97| 8000000|
+---------------+------------------+--------+--------+--------+
only showing top 5 rows



In [21]:
# Filtering using between()
df_btwn = (
    df
    .filter(col("Salary").between(2000000, 2050000))
    .show()
)

+--------------------+--------------------+--------+--------+-------+
|                Name|                Team|Position|Birthday| Salary|
+--------------------+--------------------+--------+--------+-------+
|        Torrey Craig|      Denver Nuggets|      SF|12/19/90|2000000|
|          Trey Burke|  Philadelphia 76ers|      PG|11/12/92|2028594|
|         Noah Vonleh|Minnesota Timberw...|       C| 8/24/95|2000000|
|        Ben McLemore|     Houston Rockets|      SG| 2/11/93|2028594|
|        Troy Daniels|  Los Angeles Lakers|      SG| 7/15/91|2028594|
|        Mike Muscala|Oklahoma City Thu...|       C|  7/1/91|2028594|
|      Caleb Swanigan|    Sacramento Kings|      PF| 4/18/97|2033160|
|       Dylan Windler| Cleveland Cavaliers|      GF| 9/22/96|2035800|
|       Edmond Sumner|      Indiana Pacers|      PG|12/31/95|2000000|
|       Iman Shumpert|       Brooklyn Nets|      PG| 6/26/90|2031676|
|Michael Carter-Wi...|       Orlando Magic|      PG|10/10/91|2028594|
+-------------------

## Dealing with Missing Values

In [22]:
# Check for Missing Vals using isNull() and isNotNull()

df_n1 =(
    df
    .filter(col("Salary").isNull())
    .count()
)
df_n1

0

In [23]:
df_n1 =(
    df
    .filter(col("Salary").isNotNull())
    .count()
)
df_n1

450

In [24]:
# Dropping rows with Null values using .na.drop()

  # Dropping ANY row that has a null value in ANY columns - default setting of how parameter
df_any = (
    df.na.drop()
)

  # Add in how = 'all' param to drop rows that have ALL columns null
df_all = (
    df.na.drop(how = 'all')
)

  # Dropping with a subset, will only drop rows that have null in Name and Team cols
df_sub = (
    df.na.drop(subset = ['Name', 'Team'])
)


In [25]:
# Replacing Null values using na.fill()

  # Filling a specific cols nulls with a value
df_fill = (
    df.na
    .fill(value = -999, subset = ['Salary'])
)

  # Filling more than one col with a dict
df_fill_mult = (
    df.na
    .fill({'Salary': -999,
           'Team': 'N/A'})
)

## Duplicate operations

In [26]:
# distinct() method will return a new df with duplicate rows removed, only unique obs left
df_dist = (
    df
    .select('Team', 'Name')
    .distinct
)

In [27]:
# Using dropDuplicates() with and w/out subset

df_drop = df.dropDuplicates() # will remove all rows that are exact duplicates across all columns

df_drop_sub = df.dropDuplicates(['Team']) # Will remove rows that have duplicates in the Team column (not a good thing to do here, but shows what it does)

## Group operations (& Window)

- `groupBy()` method is used to work with data at a grouped level
- Returns a `GroupedData` object

In [30]:
df_groups = df.groupBy('Position')
df_groups # Below shows that df_groups is a GroupedData object

GroupedData[grouping expressions: [Position], value: [Name: string, Team: string ... 3 more fields], type: GroupBy]

In [31]:
# Showing number of groups
df_groups.count().show()

+--------+-----+
|Position|count|
+--------+-----+
|      FC|    1|
|      PF|   96|
|       F|    5|
|      PG|   98|
|      SF|   76|
|       C|   88|
|      SG|   74|
|       G|   10|
|      GF|    2|
+--------+-----+



In [35]:
# Using agg functions for each group
df_groups.avg("Salary").show()
df_groups.sum("Salary").show()

+--------+-----------------+
|Position|      avg(Salary)|
+--------+-----------------+
|      FC|          79568.0|
|      PF|        7223613.5|
|       F|        2322338.4|
|      PG|        9781712.0|
|      SF|7466574.565789473|
|       C|9686050.784090908|
|      SG|4781786.243243244|
|       G|         372833.4|
|      GF|        1467055.0|
+--------+-----------------+

+--------+-----------+
|Position|sum(Salary)|
+--------+-----------+
|      FC|      79568|
|      PF|  693466896|
|       F|   11611692|
|      PG|  958607776|
|      SF|  567459667|
|       C|  852372469|
|      SG|  353852182|
|       G|    3728334|
|      GF|    2934110|
+--------+-----------+



In [37]:
# Group Agg with many cols
from pyspark.sql.functions import min, max, mean

team_group = (
    df.groupBy('Team').agg(
        min("Salary").alias('min_sal'),
        max('Salary').alias('max_sal'),
        mean("Salary").alias("mean_sal")
    ).show(5)
)

+--------------------+-------+--------+-----------------+
|                Team|min_sal| max_sal|         mean_sal|
+--------------------+-------+--------+-----------------+
|        Phoenix Suns|  79568|27285000|6791594.066666666|
|      Boston Celtics|  79568|32742000|       7238863.25|
|    Dallas Mavericks|  79568|27285000|7432050.846153846|
|New Orleans Pelicans|  79568|26131111|      6512430.125|
|       Brooklyn Nets|  79568|37199000|7215064.764705882|
+--------------------+-------+--------+-----------------+
only showing top 5 rows



In [39]:
# Using a window function to add group level stats to og data frame
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
df_ = df

w = Window.partitionBy("Team")

df_w_mean = df_.withColumn(
    'mean_salary_by_team',
    avg(col("Salary")).over(w)
).show()


+----------------+--------------+--------+--------+--------+-------------------+
|            Name|          Team|Position|Birthday|  Salary|mean_salary_by_team|
+----------------+--------------+--------+--------+--------+-------------------+
|   Kevin Huerter| Atlanta Hawks|      SG| 8/27/98| 2636280|  6503699.866666666|
|     Evan Turner| Atlanta Hawks|      PG|10/27/88|18606556|  6503699.866666666|
|    John Collins| Atlanta Hawks|      PF| 9/23/97| 2686560|  6503699.866666666|
|    Vince Carter| Atlanta Hawks|      PF| 1/26/77| 2564753|  6503699.866666666|
|Chandler Parsons| Atlanta Hawks|      SF|10/25/88|25102512|  6503699.866666666|
|    Damian Jones| Atlanta Hawks|       C| 6/30/95| 2305057|  6503699.866666666|
|    Allen Crabbe| Atlanta Hawks|      SG|  4/9/92|18500000|  6503699.866666666|
|     Cam Reddish| Atlanta Hawks|      SF|  9/1/99| 4245720|  6503699.866666666|
|   Charlie Brown| Atlanta Hawks|      SG|  2/2/97|   79568|  6503699.866666666|
| De'Andre Hunter| Atlanta H