In [None]:
!pip install pyspark

#### If you want to install pyspark with sql, plotly or connect

In [None]:
!pip install pyspark[sql]
!pip install pyspark[pandas_on_spark] plotly
!pip install pyspark[connect]

In [65]:
from pyspark.sql import SparkSession, Column, Row
from datetime import datetime, date
import pandas as pd
import numpy as np
import time
from pyspark import SparkConf
from pyspark.sql.functions import pandas_udf, expr, upper, floor, round, col, avg, lit
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, TimestampType, FloatType, DateType, LongType
#When import pyspark pandas set TIMEZONE to 1
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
import pyspark.pandas as ps

# Build Spark Session

In [84]:
spark = SparkSession.builder.getOrCreate()
# default, app name is pyspark-shell

In [None]:
spark = SparkSession.builder.appName("spark1").getOrCreate()
# add app name

# Timing Pandas read_csv

In [70]:
df = pd.read_csv('Crime_Data_from_2020_to_Present.csv')

In [71]:
s1 = time.time_ns()
df_large = pd.read_csv('Crimes_2001_to_Present.csv')
e1 = time.time_ns()
print("Time taken", ((e1-s1)/(10**9)), "seconds")

Time taken 92.688306705 seconds


In [72]:
df.shape

(986873, 28)

In [73]:
df.dtypes

DR_NO               int64
Date Rptd          object
DATE OCC           object
TIME OCC            int64
AREA                int64
AREA NAME          object
Rpt Dist No         int64
Part 1-2            int64
Crm Cd              int64
Crm Cd Desc        object
Mocodes            object
Vict Age            int64
Vict Sex           object
Vict Descent       object
Premis Cd         float64
Premis Desc        object
Weapon Used Cd    float64
Weapon Desc        object
Status             object
Status Desc        object
Crm Cd 1          float64
Crm Cd 2          float64
Crm Cd 3          float64
Crm Cd 4          float64
LOCATION           object
Cross Street       object
LAT               float64
LON               float64
dtype: object

# Timing PySpark sql read.csv

Creates sql-like data frame

In [85]:
df_s = spark.read.csv('Crime_Data_from_2020_to_Present.csv', header=True, inferSchema=True)

                                                                                

In [86]:
s2 = time.time_ns()
s_large = spark.read.csv('Crimes_2001_to_Present.csv', header=True)
e2 = time.time_ns()
print("Time taken", ((e2-s2)/(10**9)), "seconds")

Time taken 0.096119106 seconds


### Display top n rows 
#### (similar to head)

In [76]:
df_s.show(1, vertical=True)

-RECORD 0------------------------------
 DR_NO          | 190326475            
 Date Rptd      | 03/01/2020 12:00:... 
 DATE OCC       | 03/01/2020 12:00:... 
 TIME OCC       | 2130                 
 AREA           | 7                    
 AREA NAME      | Wilshire             
 Rpt Dist No    | 784                  
 Part 1-2       | 1                    
 Crm Cd         | 510                  
 Crm Cd Desc    | VEHICLE - STOLEN     
 Mocodes        | NULL                 
 Vict Age       | 0                    
 Vict Sex       | M                    
 Vict Descent   | O                    
 Premis Cd      | 101                  
 Premis Desc    | STREET               
 Weapon Used Cd | NULL                 
 Weapon Desc    | NULL                 
 Status         | AA                   
 Status Desc    | Adult Arrest         
 Crm Cd 1       | 510                  
 Crm Cd 2       | 998                  
 Crm Cd 3       | NULL                 
 Crm Cd 4       | NULL                 


### Print schema (sql)

#### similar to pandas dtypes

In [87]:
df_s.printSchema()

root
 |-- DR_NO: integer (nullable = true)
 |-- Date Rptd: string (nullable = true)
 |-- DATE OCC: string (nullable = true)
 |-- TIME OCC: integer (nullable = true)
 |-- AREA: integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Part 1-2: integer (nullable = true)
 |-- Crm Cd: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Cd: integer (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: integer (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- Crm Cd 1: integer (nullable = true)
 |-- Crm Cd 2: integer (nullable = true)
 |-- Crm Cd 3: integer (nullable = true)
 |-- Crm Cd 4: integer (nullable = true)
 |-- L

### Print column

In [88]:
df_s.columns

['DR_NO',
 'Date Rptd',
 'DATE OCC',
 'TIME OCC',
 'AREA',
 'AREA NAME',
 'Rpt Dist No',
 'Part 1-2',
 'Crm Cd',
 'Crm Cd Desc',
 'Mocodes',
 'Vict Age',
 'Vict Sex',
 'Vict Descent',
 'Premis Cd',
 'Premis Desc',
 'Weapon Used Cd',
 'Weapon Desc',
 'Status',
 'Status Desc',
 'Crm Cd 1',
 'Crm Cd 2',
 'Crm Cd 3',
 'Crm Cd 4',
 'LOCATION',
 'Cross Street',
 'LAT',
 'LON']

In [89]:
s_large.columns

['ID',
 'Case Number',
 'Date',
 'Block',
 'IUCR',
 'Primary Type',
 'Description',
 'Location Description',
 'Arrest',
 'Domestic',
 'Beat',
 'District',
 'Ward',
 'Community Area',
 'FBI Code',
 'X Coordinate',
 'Y Coordinate',
 'Year',
 'Updated On',
 'Latitude',
 'Longitude',
 'Location']

#### Can select multiple columns

In [90]:
df_s.select('AREA', 'DATE OCC').show(5)

+----+--------------------+
|AREA|            DATE OCC|
+----+--------------------+
|   7|03/01/2020 12:00:...|
|   1|02/08/2020 12:00:...|
|   3|11/04/2020 12:00:...|
|   9|03/10/2020 12:00:...|
|   4|09/09/2020 12:00:...|
+----+--------------------+
only showing top 5 rows



In [91]:
df_s.select(
  col("AREA"),
  col("DATE OCC")
).show(5)

+----+--------------------+
|AREA|            DATE OCC|
+----+--------------------+
|   7|03/01/2020 12:00:...|
|   1|02/08/2020 12:00:...|
|   3|11/04/2020 12:00:...|
|   9|03/10/2020 12:00:...|
|   4|09/09/2020 12:00:...|
+----+--------------------+
only showing top 5 rows



Can do sql like query

In [92]:
df_s.selectExpr(
  "AREA as area",
  "`DATE OCC` as occ"
).show(5)

+----+--------------------+
|area|                 occ|
+----+--------------------+
|   7|03/01/2020 12:00:...|
|   1|02/08/2020 12:00:...|
|   3|11/04/2020 12:00:...|
|   9|03/10/2020 12:00:...|
|   4|09/09/2020 12:00:...|
+----+--------------------+
only showing top 5 rows



#### Return n rows as list

In [93]:
df_s.take(1)

[Row(DR_NO=190326475, Date Rptd='03/01/2020 12:00:00 AM', DATE OCC='03/01/2020 12:00:00 AM', TIME OCC=2130, AREA=7, AREA NAME='Wilshire', Rpt Dist No=784, Part 1-2=1, Crm Cd=510, Crm Cd Desc='VEHICLE - STOLEN', Mocodes=None, Vict Age=0, Vict Sex='M', Vict Descent='O', Premis Cd=101, Premis Desc='STREET', Weapon Used Cd=None, Weapon Desc=None, Status='AA', Status Desc='Adult Arrest', Crm Cd 1=510, Crm Cd 2=998, Crm Cd 3=None, Crm Cd 4=None, LOCATION='1900 S  LONGWOOD                     AV', Cross Street=None, LAT=34.0375, LON=-118.3506)]

#### convert to pandas

In [94]:
df_s.toPandas()

                                                                                

Unnamed: 0,DR_NO,Date Rptd,DATE OCC,TIME OCC,AREA,AREA NAME,Rpt Dist No,Part 1-2,Crm Cd,Crm Cd Desc,...,Status,Status Desc,Crm Cd 1,Crm Cd 2,Crm Cd 3,Crm Cd 4,LOCATION,Cross Street,LAT,LON
0,190326475,03/01/2020 12:00:00 AM,03/01/2020 12:00:00 AM,2130,7,Wilshire,784,1,510,VEHICLE - STOLEN,...,AA,Adult Arrest,510.0,998.0,,,1900 S LONGWOOD AV,,34.0375,-118.3506
1,200106753,02/09/2020 12:00:00 AM,02/08/2020 12:00:00 AM,1800,1,Central,182,1,330,BURGLARY FROM VEHICLE,...,IC,Invest Cont,330.0,998.0,,,1000 S FLOWER ST,,34.0444,-118.2628
2,200320258,11/11/2020 12:00:00 AM,11/04/2020 12:00:00 AM,1700,3,Southwest,356,1,480,BIKE - STOLEN,...,IC,Invest Cont,480.0,,,,1400 W 37TH ST,,34.0210,-118.3002
3,200907217,05/10/2023 12:00:00 AM,03/10/2020 12:00:00 AM,2037,9,Van Nuys,964,1,343,SHOPLIFTING-GRAND THEFT ($950.01 & OVER),...,IC,Invest Cont,343.0,,,,14000 RIVERSIDE DR,,34.1576,-118.4387
4,200412582,09/09/2020 12:00:00 AM,09/09/2020 12:00:00 AM,630,4,Hollenbeck,413,1,510,VEHICLE - STOLEN,...,IC,Invest Cont,510.0,,,,200 E AVENUE 28,,34.0820,-118.2130
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
986868,240710284,07/24/2024 12:00:00 AM,07/23/2024 12:00:00 AM,1400,7,Wilshire,788,1,510,VEHICLE - STOLEN,...,IC,Invest Cont,510.0,,,,4000 W 23RD ST,,34.0362,-118.3284
986869,240104953,01/15/2024 12:00:00 AM,01/15/2024 12:00:00 AM,100,1,Central,101,2,745,VANDALISM - MISDEAMEANOR ($399 OR UNDER),...,IC,Invest Cont,745.0,,,,1300 W SUNSET BL,,34.0685,-118.2460
986870,240410786,10/14/2024 12:00:00 AM,10/11/2024 12:00:00 AM,2330,4,Hollenbeck,421,1,341,"THEFT-GRAND ($950.01 & OVER)EXCPT,GUNS,FOWL,LI...",...,IC,Invest Cont,341.0,,,,1700 ALBION ST,,34.0675,-118.2240
986871,240309674,04/24/2024 12:00:00 AM,04/24/2024 12:00:00 AM,1500,3,Southwest,358,1,230,"ASSAULT WITH DEADLY WEAPON, AGGRAVATED ASSAULT",...,IC,Invest Cont,230.0,,,,FLOWER ST,JEFFERSON BL,34.0215,-118.2868


# Creating a Dataframe

In [95]:
example = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

#explictly
example_expl = spark.createDataFrame(
  data = [(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)), 
          (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), 
          (4, 5., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))],
  schema = StructType([
    StructField('a', IntegerType(), False),
    StructField('b', FloatType(), False),
    StructField('c', StringType(), False),
    StructField('d', DateType(), False),
    StructField('e', TimestampType(), False),

  ])
)

In [96]:
example.show()

                                                                                

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [97]:
example_expl.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



# Lazy Evaluation

Although query is present, it is not executed unless we apply action like show, take, reduce

In [98]:
df_s.filter(df_s.LOCATION.contains("1900 S  LONGWOOD"))

DataFrame[DR_NO: int, Date Rptd: string, DATE OCC: string, TIME OCC: int, AREA: int, AREA NAME: string, Rpt Dist No: int, Part 1-2: int, Crm Cd: int, Crm Cd Desc: string, Mocodes: string, Vict Age: int, Vict Sex: string, Vict Descent: string, Premis Cd: int, Premis Desc: string, Weapon Used Cd: int, Weapon Desc: string, Status: string, Status Desc: string, Crm Cd 1: int, Crm Cd 2: int, Crm Cd 3: int, Crm Cd 4: int, LOCATION: string, Cross Street: string, LAT: double, LON: double]

It will not query even when setting it to a new variable

In [99]:
test_wil = df_s.filter(df_s.LOCATION.contains("1900 S  LONGWOOD"))
test_wil

DataFrame[DR_NO: int, Date Rptd: string, DATE OCC: string, TIME OCC: int, AREA: int, AREA NAME: string, Rpt Dist No: int, Part 1-2: int, Crm Cd: int, Crm Cd Desc: string, Mocodes: string, Vict Age: int, Vict Sex: string, Vict Descent: string, Premis Cd: int, Premis Desc: string, Weapon Used Cd: int, Weapon Desc: string, Status: string, Status Desc: string, Crm Cd 1: int, Crm Cd 2: int, Crm Cd 3: int, Crm Cd 4: int, LOCATION: string, Cross Street: string, LAT: double, LON: double]

In [100]:
df_s.filter(df_s.LOCATION.contains("1900 S  LONGWOOD")).show(1)

+---------+--------------------+--------------------+--------+----+---------+-----------+--------+------+----------------+-------+--------+--------+------------+---------+-----------+--------------+-----------+------+------------+--------+--------+--------+--------+--------------------+------------+-------+---------+
|    DR_NO|           Date Rptd|            DATE OCC|TIME OCC|AREA|AREA NAME|Rpt Dist No|Part 1-2|Crm Cd|     Crm Cd Desc|Mocodes|Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc|Weapon Used Cd|Weapon Desc|Status| Status Desc|Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|            LOCATION|Cross Street|    LAT|      LON|
+---------+--------------------+--------------------+--------+----+---------+-----------+--------+------+----------------+-------+--------+--------+------------+---------+-----------+--------------+-----------+------+------------+--------+--------+--------+--------+--------------------+------------+-------+---------+
|190326475|03/01/2020 12:00:...|03/01/2020 

#### Can create Pandas function to apply to DataFrame

In [101]:
@pandas_udf('long')
def pandas_multiply(series: pd.Series) -> pd.Series:
    return series **2

**withColumn() creates a new column**

In [102]:
new_df = df_s.withColumn('Area Squared', pandas_multiply(df_s.AREA))
df_s.withColumn('Area Squared', pandas_multiply(df_s.AREA)).select('AREA', 'Area squared').show(3)

+----+------------+
|AREA|Area squared|
+----+------------+
|   7|          49|
|   1|           1|
|   3|           9|
+----+------------+
only showing top 3 rows



                                                                                

DataFrame is immutable, so you must set it to new variable to get new changes

In [103]:
'Area Squared' in df_s.columns

False

In [104]:
'Area Squared' in new_df.columns

True

**We can drop columns**

In [105]:
tmp_df = df_s.drop("Mocodes")
'Mocodes' in tmp_df.columns

False

We can filter out NA rows

In [106]:
df_na = df_s.na.drop("all", subset=["Mocodes", "LOCATION"])
df_na.select('LOCATION').show(2)

+--------------------+
|            LOCATION|
+--------------------+
|1900 S  LONGWOOD ...|
|1000 S  FLOWER   ...|
+--------------------+
only showing top 2 rows



We can group data, apply function, then show results

In [107]:
new_df.groupby('LOCATION').agg(avg('Vict Age')).select('LOCATION', 'avg(Vict Age)').show(3)

[Stage 17:====>                                                   (1 + 11) / 12]

+--------------------+------------------+
|            LOCATION|     avg(Vict Age)|
+--------------------+------------------+
|10TH             ...|28.044444444444444|
|900 W  OLYMPIC   ...|  30.4235807860262|
|1500    4TH      ...| 30.91176470588235|
+--------------------+------------------+
only showing top 3 rows



                                                                                

# Parallelizing

#### Comparing completely merging 2 large datasets in pandas vs pyspark

In [43]:
s3 = time.time_ns()
merg_df = pd.concat([df, df_large])
e3 = time.time_ns()
print("Time", ((e3-s3)/(10**9)), "seconds")

Time 50.367986954 seconds


In [108]:
df_s.withColumn("new", lit(20))

DataFrame[DR_NO: int, Date Rptd: string, DATE OCC: string, TIME OCC: int, AREA: int, AREA NAME: string, Rpt Dist No: int, Part 1-2: int, Crm Cd: int, Crm Cd Desc: string, Mocodes: string, Vict Age: int, Vict Sex: string, Vict Descent: string, Premis Cd: int, Premis Desc: string, Weapon Used Cd: int, Weapon Desc: string, Status: string, Status Desc: string, Crm Cd 1: int, Crm Cd 2: int, Crm Cd 3: int, Crm Cd 4: int, LOCATION: string, Cross Street: string, LAT: double, LON: double, new: int]

In [116]:
ex = s_large.withColumn("age", col("Year"))
ex = ex.withColumn("v1", lit(220)).withColumn("v2", lit("state")).withColumn("state", lit("No"))
ex = ex.withColumn("v3", lit(91)).withColumn("v4", lit(93))

28

In [118]:
s4 = time.time_ns()
df3 = ex.union(df_s)
e4 = time.time_ns()
print("Time", ((e4-s4)/(10**9)), "seconds")

Time 0.039191869 seconds


# Working with SQL

#### Create temporary view of dataframe

In [29]:
df_s.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|  986873|
+--------+



#### Can use our own functions with sql

In [31]:
@pandas_udf("integer")
def reduce(s: pd.Series) -> pd.Series:
    return s/2

spark.udf.register("reduce", reduce)
spark.sql("SELECT reduce(AREA) FROM tableA").show(3)

24/12/11 12:08:17 WARN SimpleFunctionRegistry: The function reduce replaced a previously registered function.


+------------+
|reduce(AREA)|
+------------+
|           3|
|           0|
|           1|
+------------+
only showing top 3 rows



In [32]:
spark.stop()
sc.stop()
# remember to stop 