# Apache PySpark by Example

**[June 2023 update]**

I've consolidated all the notebooks for this course into a single notebook. (The course videos will still show individual notebooks)


## Introduction to Google Colab

### Jupyter notebook basics

#### Code cells

In [None]:
2*5

import pandas as pd

#### Text cells

### Access to the shell

In [None]:
pwd

'c:\\Users\\I513656\\OneDrive - SAP SE\\BDC Learning\\PySpark\\apache-pyspark-by-example-802868'

In [None]:
ls

 Volume in drive C has no label.
 Volume Serial Number is B060-39BF

 Directory of c:\Users\I513656\OneDrive - SAP SE\BDC Learning\PySpark\apache-pyspark-by-example-802868

15-11-2024  19:24    <DIR>          .
15-11-2024  11:03    <DIR>          ..
15-11-2024  11:03    <DIR>          .github
15-11-2024  11:03                46 .gitignore
15-11-2024  18:29           138,228 Apache_PySpark_by_Example.ipynb
15-11-2024  11:03               642 CONTRIBUTING.md
19-10-2024  13:49            22,145 expense report.jsonl
15-11-2024  11:03             6,753 LICENSE
15-11-2024  11:03               629 NOTICE
15-11-2024  11:03             1,234 README.md
15-11-2024  12:27     1,936,858,135 reported-crimes.csv
               8 File(s)  1,937,027,812 bytes
               3 Dir(s)  309,066,240,000 bytes free


## Install Spark

- Google colab recently made some changes which breaks the Spark installation.
- Please use the code below where we install from the pyspark package instead

In [None]:
!pip install pyspark==3.4.0

Defaulting to user installation because normal site-packages is not writeable
Collecting pyspark==3.4.0
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
     ---------------------------------------- 0.0/310.8 MB ? eta -:--:--
     ---------------------------------------- 0.3/310.8 MB ? eta -:--:--
     ---------------------------------------- 2.1/310.8 MB 8.4 MB/s eta 0:00:37
      -------------------------------------- 5.0/310.8 MB 10.1 MB/s eta 0:00:31
      -------------------------------------- 6.3/310.8 MB 11.4 MB/s eta 0:00:27
     - -------------------------------------- 8.9/310.8 MB 9.9 MB/s eta 0:00:31
     - ------------------------------------ 10.5/310.8 MB 10.2 MB/s eta 0:00:30
     - ------------------------------------ 10.5/310.8 MB 10.2 MB/s eta 0:00:30
     - ------------------------------------ 10.5/310.8 MB 10.2 MB/s eta 0:00:30
     - ------------------------------------- 12.1/310.8 MB 6.8 MB/s eta 0:00:44
     -- ------------------------------------ 16.3/310.8 MB 8.2 M

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

## (02-02) Download Chicago's Reported Crime Data

### Downloading and preprocessing Chicago's Reported Crime Data

In [None]:
!curl -L "https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD" -o chicago_crime_data.csv


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0
100 31920    0 31920    0     0  12163      0 --:--:--  0:00:02 --:--:-- 12192
100  447k    0  447k    0     0   121k      0 --:--:--  0:00:03 --:--:--  121k
100 2702k    0 2702k    0     0   607k      0 --:--:--  0:00:04 --:--:--  608k
100 8973k    0 8973k    0     0  1648k      0 --:--:--  0:00:05 --:--:-- 1842k
100 11.6M    0 11.6M    0     0  1843k      0 --:--:--  0:00:06 --:--:-- 2491k
100 14.5M    0 14.5M    0     0  2000k      0 --:--:--  0:00:07 --:--:-- 3083k
100 17.4M    0 17.4M    0     0  2107k      0 --:--:--  0:00:08 --:--:-- 3631k
100 20.1M    0 20.1M    0     0  2189k      0 --:--

In [None]:
!rename "chicago_crime_data.csv" "reported-crimes.csv"

In [None]:
from pyspark.sql.functions import to_timestamp,col,lit
rc = spark.read.csv('reported-crimes.csv',header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') <= lit('2018-11-11'))
rc.show(5)

+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11037294|   JA371270|2015-03-18 12:00:00|   0000X W WACKER DR|1153|DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|                BANK| false|   false|0111|     001|  42|            32|      11|     

## (03-03) Schemas

In [None]:
rc.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)



In [None]:
rc.columns

['ID',
 'Case Number',
 'Date',
 'Block',
 'IUCR',
 'Primary Type',
 'Description',
 'Location Description',
 'Arrest',
 'Domestic',
 'Beat',
 'District',
 'Ward',
 'Community Area',
 'FBI Code',
 'X Coordinate',
 'Y Coordinate',
 'Year',
 'Updated On',
 'Latitude',
 'Longitude',
 'Location']

In [None]:
from pyspark.sql.types import StructField,StringType,DoubleType,TimestampType,StructType,IntegerType,BooleanType

In [None]:
labels = [('ID',StringType()),
 ('Case Number',StringType()),
 ('Date',TimestampType()),
 ('Block',StringType()),
 ('IUCR',StringType()),
 ('Primary Type',StringType()),
 ('Description',StringType()),
 ('Location Description',StringType()),
 ('Arrest',StringType()),
 ('Domestic',StringType()),
 ('Beat',StringType()),
 ('District',StringType()),
 ('Ward',StringType()),
 ('Community Area',StringType()),
 ('FBI Code',StringType()),
 ('X Coordinate',StringType()),
 ('Y Coordinate',StringType()),
 ('Year',IntegerType()),
 ('Updated On',TimestampType()),
 ('Latitude',DoubleType()),
 ('Longitude',DoubleType()),
 ('Location',StringType())]

In [None]:
schema = StructType([StructField (x[0],x[1],True) for x in labels])
schema

StructType([StructField('ID', StringType(), True), StructField('Case Number', StringType(), True), StructField('Date', TimestampType(), True), StructField('Block', StringType(), True), StructField('IUCR', StringType(), True), StructField('Primary Type', StringType(), True), StructField('Description', StringType(), True), StructField('Location Description', StringType(), True), StructField('Arrest', StringType(), True), StructField('Domestic', StringType(), True), StructField('Beat', StringType(), True), StructField('District', StringType(), True), StructField('Ward', StringType(), True), StructField('Community Area', StringType(), True), StructField('FBI Code', StringType(), True), StructField('X Coordinate', StringType(), True), StructField('Y Coordinate', StringType(), True), StructField('Year', IntegerType(), True), StructField('Updated On', TimestampType(), True), StructField('Latitude', DoubleType(), True), StructField('Longitude', DoubleType(), True), StructField('Location', Stri

In [None]:
rc = spark.read.csv('reported-crimes.csv',schema=schema,header=True)
rc.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: timestamp (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



In [None]:
rc.show(5)

+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11037294|   JA371270|2015-03-18 12:00:00|   0000X W WACKER DR|1153|DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|                BANK| false|   false|0111|     001|  42|            32|      11|     

## (03-04) Working with columns

**Display only the first 5 rows of the column name IUCR**

In [None]:
rc.select(col('id')).show(5)

+--------+
|      id|
+--------+
|11037294|
|11645836|
|11645601|
|11646166|
|11645648|
+--------+
only showing top 5 rows



In [None]:
rc.select('id').show(5)

+--------+
|      id|
+--------+
|11037294|
|11646293|
|11645836|
|11645959|
|11645601|
+--------+
only showing top 5 rows



  **Display only the first 4 rows of the column names Case Number, Date and Arrest**

In [None]:
rc.select('id','date','iucr','block').show(5)

+--------+----+----+--------------------+
|      id|date|iucr|               block|
+--------+----+----+--------------------+
|11037294|null|1153|   0000X W WACKER DR|
|11646293|null|1154|023XX N LOCKWOOD AVE|
|11645836|null|1153| 055XX S ROCKWELL ST|
|11645959|null|2820|  045XX N ALBANY AVE|
|11645601|null|1153| 087XX S SANGAMON ST|
+--------+----+----+--------------------+
only showing top 5 rows



In [None]:
dc = rc.withColumnRenamed('Block','blocker')

In [None]:
dc.show(5)

+--------+-----------+----+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------+--------+---------+--------+
|      ID|Case Number|Date|             blocker|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On|Latitude|Longitude|Location|
+--------+-----------+----+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------+--------+---------+--------+
|11037294|   JA371270|null|   0000X W WACKER DR|1153|DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|                BANK| false|   false|0111|     001|  42|            32|      11|        null|        null|2015|      null|    null|     null|    null|
|11646293|   JC213749|nu

**Add a column with name One, with entries all 1s**

In [None]:
from pyspark.sql.functions import lit

In [None]:
rc.withColumn('One',lit(1)).show(5)

+--------+-----------+----+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------+--------+---------+--------+---+
|      ID|Case Number|Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On|Latitude|Longitude|Location|One|
+--------+-----------+----+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------+--------+---------+--------+---+
|11037294|   JA371270|null|   0000X W WACKER DR|1153|DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|                BANK| false|   false|0111|     001|  42|            32|      11|        null|        null|2015|      null|    null|     null|    null|  1|
|1164629

**Remove the column IUCR**

## (03-05) Working with rows

**Add the reported crimes for an additional day, 12-Nov-2018, to our dataset.**

In [None]:
one_day = spark.read.csv('reported-crimes.csv',header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') == lit('2018-11-12'))
one_day.count()

5

In [None]:
rc.union(one_day).orderBy('Date',ascending = False).show(7)

+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|               Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|13358766|   JH140578|2018-11-12 00:00:00|     008XX E 63RD ST|1153|DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           APARTMENT| fal

In [None]:
one_day.show(10)

+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|               Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|13358766|   JH140578|2018-11-12 00:00:00|     008XX E 63RD ST|1153|DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           APARTMENT| fal

**What are the top 10 number of reported crimes by Primary type, in descending order of occurence?**

In [None]:
rc.groupBy('Primary Type').count().orderBy('count',ascending =False).show(5)

+---------------+-------+
|   Primary Type|  count|
+---------------+-------+
|          THEFT|1418536|
|        BATTERY|1232300|
|CRIMINAL DAMAGE| 771525|
|      NARCOTICS| 711780|
|  OTHER OFFENSE| 419052|
+---------------+-------+
only showing top 5 rows



## (03-06) Challenge

**What percentage of reported crimes resulted in an arrest?**

In [None]:
total_crimes = rc.count()
total_crimes

2088284

In [None]:
arrest_crimes = rc.filter(col('Arrest') == ('true')).count()
arrest_crimes

In [None]:
percentage_arrested = round(arrest_crimes/total_crimes * 100,)

print(f"{percentage_arrested}%")

25%


  **What are the top 3 locations for reported crimes?**

In [None]:
rc.groupBy('Location Description').count().orderBy('count', ascending=False).show(3)

+--------------------+-------+
|Location Description|  count|
+--------------------+-------+
|              STREET|2143080|
|           RESIDENCE|1355531|
|           APARTMENT| 956366|
+--------------------+-------+
only showing top 3 rows



## (04-01) Built-in functions

In [27]:
from pyspark.sql import functions

In [28]:
print(dir(functions))



### String functions

**Display the Primary Type column in lower and upper characters, and the first 4 characters of the column**

In [29]:
from pyspark.sql.functions import lower,upper,substring

In [30]:
help(substring)

Help on function substring in module pyspark.sql.functions:

substring(str: 'ColumnOrName', pos: int, len: int) -> pyspark.sql.column.Column
    Substring starts at `pos` and is of length `len` when str is String type or
    returns the slice of byte array that starts at `pos` in byte and is of length `len`
    when str is Binary type.
    
    .. versionadded:: 1.5.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Notes
    -----
    The position is not zero based, but 1 based index.
    
    Parameters
    ----------
    str : :class:`~pyspark.sql.Column` or str
        target column to work on.
    pos : int
        starting position in str.
    len : int
        length of chars.
    
    Returns
    -------
    :class:`~pyspark.sql.Column`
        substring of given value.
    
    Examples
    --------
    >>> df = spark.createDataFrame([('abcd',)], ['s',])
    >>> df.select(substring(df.s, 1, 2).alias('s')).collect()
    [Row(s='ab')]



In [31]:
rc.select(
    lower(col('Primary Type')).alias('LOWER'),
    upper(col('Primary Type')).alias('UPPER'),
    substring(col('Primary Type'), 1, 4).alias('SUBSTRING')
).show(5)

+------------------+------------------+---------+
|             LOWER|             UPPER|SUBSTRING|
+------------------+------------------+---------+
|deceptive practice|DECEPTIVE PRACTICE|     DECE|
|deceptive practice|DECEPTIVE PRACTICE|     DECE|
|deceptive practice|DECEPTIVE PRACTICE|     DECE|
|             theft|             THEFT|     THEF|
|deceptive practice|DECEPTIVE PRACTICE|     DECE|
+------------------+------------------+---------+
only showing top 5 rows



### Numeric functions


**Show the oldest date and the most recent date**

In [34]:
from pyspark.sql.functions import min,max

In [35]:
one_day.select(min(col('Date')),max(col('Date'))).show(1)

NameError: name 'one_day' is not defined

### Date

**What is 3 days earlier that the oldest date and 3 days later than the most recent date?**

In [None]:
from pyspark.sql.functions import date_add,date_sub

In [None]:
help(date_add)

In [None]:
one_day.select((date_sub(min(col('Date')),3)).alias('Oldest'),(date_add(max(col('Date')),3)).alias('Newest')).show(1)

## (04-02) Working with dates

In [26]:
from pyspark.sql import functions

  **2019-12-25 13:30:00**

In [36]:

df = spark.createDataFrame([('2019-12-25',)],['Christmas']).show()

+----------+
| Christmas|
+----------+
|2019-12-25|
+----------+



In [37]:
from pyspark.sql.functions import to_date, to_timestamp

In [40]:
# Apply transformations: to_date and to_timestamp
df_transformed = df.select(
    to_date(col('Christmas'), 'yyyy-MM-dd HH:mm:ss').alias('Christmas_date'),
    to_timestamp(col('Christmas'), 'yyyy-MM-dd HH:mm:ss').alias('Christmas_timestamp')
)

# Show the result
df_transformed.show(truncate=False)


+--------------+-------------------+
|Christmas_date|Christmas_timestamp|
+--------------+-------------------+
|2024-12-25    |2024-12-25 15:30:00|
|2024-12-31    |2024-12-31 12:00:00|
+--------------+-------------------+



In [39]:
from pyspark.sql.functions import to_date, to_timestamp, col

# Sample data (list of tuples)
data = [("2024-12-25 15:30:00",), ("2024-12-31 12:00:00",)]
columns = ["Christmas"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Show the DataFrame
df.show(truncate=False)


+-------------------+
|Christmas          |
+-------------------+
|2024-12-25 15:30:00|
|2024-12-31 12:00:00|
+-------------------+



In [None]:
import sys
print(sys.version)


3.13.0 (tags/v3.13.0:60403a5, Oct  7 2024, 09:38:07) [MSC v.1941 64 bit (AMD64)]


**25/Dec/2019 13:30:00**

**12/25/2019 01:30:00 PM**

## (04-03) Joins

**Download police station data**

**The reported crimes dataset has only the district number. Add the district name by joining with the police station dataset**

## (04-05) Challenge questions

**What is the most frequently reported non-criminal activity?**

**Using a bar chart, plot which day of the week has the most number of reported crime.**

## (05-01) RDDs setup

**How many police stations are there?**

**Display the District ID, District name, Address and Zip for the police station with District ID 7**



**Police stations 10 and 11 are geographically close to each other. Display the District ID, District name, address and zip code**