## **Install and start PySpark Session**

In [1]:
!pip install pyspark 

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz (212.4MB)
[K     |████████████████████████████████| 212.4MB 70kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 18.4MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=29095f381e05cc5ec5b7455682f58ee9e61d46be8d6cf50ca3a8db9e0b7e4363
  Stored in directory: /root/.cache/pip/wheels/40/1b/2c/30f43be2627857ab80062bef1527c0128f7b4070b6b2d02139
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, col, lit
spark = SparkSession.builder.appName('Practise_1').getOrCreate()

# **Fiter data after transforming the Date columns into timestamps** 

In [29]:
rc_data = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/PySpark/Reported_Crimes.csv', header = True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).withColumn('Updated On',to_timestamp(col('Updated On'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') <= lit('2018-11-13'))
rc_data.show(5)


+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+
|      ID|Case Number|               Date|               Block|IUCR|Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|         Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+
|10224738|   HY411648|2015-09-05 13:30:00|     043XX S WOOD ST|0486|     BATTERY|DOMESTIC BATTERY ...|           RESIDENCE| false|    true|0924|     009| 

In [None]:
rc_data.orderBy('Date', ascending = False).show(5)

+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+--------+---------+--------+
|      ID|Case Number|               Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|         Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+--------+---------+--------+
|11719076|   JC296323|2018-11-12 21:00:00|     029XX W 25TH ST|2825|     OTHER OFFENSE|HARASSMENT BY TEL...|           RESIDENCE| false|   false|1033|     010|  12|            30|      26|        

**Check the outcome**

In [None]:
rc_data.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: timestamp (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)



## **Schemas**

In [None]:
## import specific modules for the Schemas
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType,BooleanType, DoubleType, IntegerType

In [None]:
## One way to do
StructType([
            StructField('ID',StringType,True),
            StructField('Case Number',StringType,True),
            StructField('Date',TimestampType,True),
            StructField('ID',StringType,True),
            StructField('ID',StringType,True),
            .....
            ......
])

In [None]:
##Another more simpler way
'''
labels = [
  ('ID',StringType()),
  ('Case Number',StringType()),
  ('Date',TimestampType()),
  ('Block',StringType()),
  ('IUCR',StringType()),
  ('Primary Type',StringType()),
  ('Description',StringType()),
  ('Location Description',StringType()),
  ('Arrest',StringType()),
  ('Domestic',BooleanType()),
  ('Beat',StringType()),
  ('District',StringType()),
  ('Ward',StringType()),
  ('Community Area',StringType()),
  ('FBI Code',StringType()),
  ('X Coordinate',StringType()),
  ('Y Coordinate',StringType()),
  ('Year',StringType()),
  ('Updated On',TimestampType()),
  ('Latitude',DoubleType()),
  ('Longitude',DoubleType()),
  ('Location',StringType())
]
'''

In [None]:
'''
schema =  StructType([StructField(x[0],x[1],True) for x in labels])
print(schema)
'''

StructType(List(StructField(ID,StringType,true),StructField(Case Number,StringType,true),StructField(Date,TimestampType,true),StructField(Block,StringType,true),StructField(IUCR,StringType,true),StructField(Primary Type,StringType,true),StructField(Description,StringType,true),StructField(Location Description,StringType,true),StructField(Arrest,StringType,true),StructField(Domestic,BooleanType,true),StructField(Beat,StringType,true),StructField(District,StringType,true),StructField(Ward,StringType,true),StructField(Community Area,StringType,true),StructField(FBI Code,StringType,true),StructField(X Coordinate,StringType,true),StructField(Y Coordinate,StringType,true),StructField(Year,StringType,true),StructField(Updated On,TimestampType,true),StructField(Latitude,DoubleType,true),StructField(Longitude,DoubleType,true),StructField(Location,StringType,true)))


In [None]:
'''
rc_data = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/PySpark/Reported_Crimes.csv', header = True,schema = schema).filter(col('Date') <= lit('2018-11-11'))
rc_data.printSchema()
'''

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: timestamp (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



**PySpark is very fussy about this data types**

In [None]:
'''
rc_data.show() ### All of them become Null showing that some of the datatypes are not what we expected.
'''

+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------+--------+---------+--------+
| ID|Case Number|Date|Block|IUCR|Primary Type|Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On|Latitude|Longitude|Location|
+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------+--------+---------+--------+
+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------+--------+---------+--------+



# **Working with columns**

In [None]:
## Show first 5 for a particular column
rc_data.select('ID','Case Number', 'Date').show(5)

+--------+-----------+-------------------+
|      ID|Case Number|               Date|
+--------+-----------+-------------------+
|10224738|   HY411648|2015-09-05 13:30:00|
|10224739|   HY411615|2015-09-04 11:30:00|
|11646166|   JC213529|2018-09-01 00:01:00|
|10224740|   HY411595|2015-09-05 12:45:00|
|10224741|   HY411610|2015-09-05 13:00:00|
+--------+-----------+-------------------+
only showing top 5 rows



In [None]:
# To add or transform a column
rc_data.withColumn('ID_new', 1+rc_data.ID).show(5)

+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+-----------+
|      ID|Case Number|               Date|               Block|IUCR|Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|         Updated On|    Latitude|    Longitude|            Location|     ID_new|
+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+-----------+
|10224738|   HY411648|2015-09-05 13:30:00|     043XX S WOOD ST|0486|     BATTERY|DOMESTIC BATTERY ...|           RESID

In [None]:
# To rename a column
rc_data.withColumnRenamed('ID','ID_New').show(5)

+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+
|  ID_New|Case Number|               Date|               Block|IUCR|Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|         Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+
|10224738|   HY411648|2015-09-05 13:30:00|     043XX S WOOD ST|0486|     BATTERY|DOMESTIC BATTERY ...|           RESIDENCE| false|    true|0924|     009| 

# **Working with Rows**

In [None]:
# To filter rows
rc_data.filter(col('ID')=='10224738').show()

+--------+-----------+-------------------+---------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+
|      ID|Case Number|               Date|          Block|IUCR|Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|         Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+-------------------+---------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+
|10224738|   HY411648|2015-09-05 13:30:00|043XX S WOOD ST|0486|     BATTERY|DOMESTIC BATTERY ...|           RESIDENCE| false|    true|0924|     009|  12|            61| 

In [None]:
# Get Unique columns
rc_data.select('Primary Type').distinct().show()

+--------------------+
|        Primary Type|
+--------------------+
|OFFENSE INVOLVING...|
|CRIMINAL SEXUAL A...|
|            STALKING|
|PUBLIC PEACE VIOL...|
|           OBSCENITY|
|NON-CRIMINAL (SUB...|
|               ARSON|
|            GAMBLING|
|   CRIMINAL TRESPASS|
|             ASSAULT|
|      NON - CRIMINAL|
|LIQUOR LAW VIOLATION|
| MOTOR VEHICLE THEFT|
|               THEFT|
|             BATTERY|
|             ROBBERY|
|            HOMICIDE|
|    PUBLIC INDECENCY|
| CRIM SEXUAL ASSAULT|
|   HUMAN TRAFFICKING|
+--------------------+
only showing top 20 rows



**Top 10 primary type of reported  crimes in descending order**

In [None]:
rc_data.groupby('Primary Type').count().orderBy('count', ascending = False).show(10)

+-------------------+------+
|       Primary Type| count|
+-------------------+------+
|              THEFT|188267|
|            BATTERY|151097|
|    CRIMINAL DAMAGE| 91131|
|            ASSAULT| 57003|
| DECEPTIVE PRACTICE| 55950|
|      OTHER OFFENSE| 53140|
|          NARCOTICS| 42861|
|           BURGLARY| 40917|
|            ROBBERY| 34360|
|MOTOR VEHICLE THEFT| 33530|
+-------------------+------+
only showing top 10 rows



**Challenge 1: What are the Top 3 locations of reported crimes?**

In [None]:
rc_data.groupby('Location Description').count().orderBy('count', ascending = False).show(3)

+--------------------+------+
|Location Description| count|
+--------------------+------+
|              STREET|180511|
|           RESIDENCE|137151|
|           APARTMENT|104203|
+--------------------+------+
only showing top 3 rows



**Challange 2: Percentage of reported crimes that resulted in arrest**

In [None]:
rc_data.select('Arrest').distinct().show()

+------+
|Arrest|
+------+
| false|
|  true|
+------+



In [None]:
(rc_data.filter(col('Arrest')=='true').count()/rc_data.select('Arrest').count())*100

20.673770174870427

# **Built In Functions**

In [None]:
from pyspark.sql import functions
print(dir(functions))



**String Functions**

In [4]:
from pyspark.sql.functions import lower, upper, substring

In [5]:
help(substring)

Help on function substring in module pyspark.sql.functions:

substring(str, pos, len)
    Substring starts at `pos` and is of length `len` when str is String type or
    returns the slice of byte array that starts at `pos` in byte and is of length `len`
    when str is Binary type.
    
    .. versionadded:: 1.5.0
    
    Notes
    -----
    The position is not zero based, but 1 based index.
    
    Examples
    --------
    >>> df = spark.createDataFrame([('abcd',)], ['s',])
    >>> df.select(substring(df.s, 1, 2).alias('s')).collect()
    [Row(s='ab')]



In [6]:
rc_data.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: timestamp (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)



In [12]:
rc_data.select(lower(col("Primary Type")), upper(col("Primary Type")),substring(col("Primary Type"),1,4)).show(5)

+-------------------+-------------------+-----------------------------+
|lower(Primary Type)|upper(Primary Type)|substring(Primary Type, 1, 4)|
+-------------------+-------------------+-----------------------------+
|            battery|            BATTERY|                         BATT|
|              theft|              THEFT|                         THEF|
|              theft|              THEFT|                         THEF|
|          narcotics|          NARCOTICS|                         NARC|
|            assault|            ASSAULT|                         ASSA|
+-------------------+-------------------+-----------------------------+
only showing top 5 rows



**Adding the newly prepared columns**

In [30]:
rc_data  = rc_data.withColumn("Primary Type(First Four Letters)",substring(col("Primary Type"),1,4))

In [31]:
rc_data.show(5)

+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+--------------------------------+
|      ID|Case Number|               Date|               Block|IUCR|Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|         Updated On|    Latitude|    Longitude|            Location|Primary Type(First Four Letters)|
+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+--------------------------------+
|10224738|   HY411648|2015-09-05 13:30:00|     043XX S 

In [32]:
rc_data = rc_data.drop("Primary Type(First Four Letters)")
rc_data.show(5)

+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+
|      ID|Case Number|               Date|               Block|IUCR|Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|         Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+
|10224738|   HY411648|2015-09-05 13:30:00|     043XX S WOOD ST|0486|     BATTERY|DOMESTIC BATTERY ...|           RESIDENCE| false|    true|0924|     009| 

**Show the Oldest and the Newest Date**

In [17]:
from pyspark.sql.functions import min, max

In [35]:
rc_data.select(min(col("Date")), max(col("Date"))).show()

+-------------------+-------------------+
|          min(Date)|          max(Date)|
+-------------------+-------------------+
|2001-01-01 00:00:00|2018-11-12 21:00:00|
+-------------------+-------------------+



# **Date Functions**

In [36]:
from pyspark.sql.functions import date_add, date_sub, datediff

In [62]:
min_date = rc_data.select(min(col("Date"))).collect()[0]["min(Date)"]
max_date = rc_data.select(max(col("Date"))).collect()[0]["max(Date)"]

In [63]:
print(min_date)
print(max_date)
###### Why cannot I use this as a input in date_add function

2001-01-01 00:00:00
2018-11-12 21:00:00


In [37]:
help(date_add)

Help on function date_add in module pyspark.sql.functions:

date_add(start, days)
    Returns the date that is `days` days after `start`
    
    .. versionadded:: 1.5.0
    
    Examples
    --------
    >>> df = spark.createDataFrame([('2015-04-08',)], ['dt'])
    >>> df.select(date_add(df.dt, 1).alias('next_date')).collect()
    [Row(next_date=datetime.date(2015, 4, 9))]



**3 date after the oldest date**

In [46]:
rc_data.select(date_add(min(col("Date")),3)).show()

+----------------------+
|date_add(min(Date), 3)|
+----------------------+
|            2001-01-04|
+----------------------+



**3 days before than the most recent date**

In [66]:
rc_data.select(date_sub(max(col("Date")),3)).show()

+----------------------+
|date_sub(max(Date), 3)|
+----------------------+
|            2018-11-09|
+----------------------+



# **Working With Dates (in Details)**

**Different Date Formats**

In [74]:
from pyspark.sql.functions import to_date, to_timestamp, lit

In [76]:
df = spark.createDataFrame([('2021-03-14 10:34:00',)],['Birthday'])
df.show()
df.select(to_date(col("Birthday"),"yyyy-MM-dd HH:mm:ss"),to_timestamp(col("Birthday"),"yyyy-MM-dd HH:mm:ss")).show()

+-------------------+
|           Birthday|
+-------------------+
|2021-03-14 10:34:00|
+-------------------+

+--------------------------------------+-------------------------------------------+
|to_date(Birthday, yyyy-MM-dd HH:mm:ss)|to_timestamp(Birthday, yyyy-MM-dd HH:mm:ss)|
+--------------------------------------+-------------------------------------------+
|                            2021-03-14|                        2021-03-14 10:34:00|
+--------------------------------------+-------------------------------------------+



In [78]:
df = spark.createDataFrame([('14/Mar/2021 10:34:00',)],['Birthday'])
df.show()
df.select(to_date(col("Birthday"),"dd/MMM/yyyy HH:mm:ss"),to_timestamp(col("Birthday"),"dd/MMM/yyyy HH:mm:ss")).show() ## As string MMM

+--------------------+
|            Birthday|
+--------------------+
|14/Mar/2021 10:34:00|
+--------------------+

+---------------------------------------+--------------------------------------------+
|to_date(Birthday, dd/MMM/yyyy HH:mm:ss)|to_timestamp(Birthday, dd/MMM/yyyy HH:mm:ss)|
+---------------------------------------+--------------------------------------------+
|                             2021-03-14|                         2021-03-14 10:34:00|
+---------------------------------------+--------------------------------------------+



In [80]:
rc_data.show(5, truncate=False)

+--------+-----------+-------------------+---------------------+----+------------+-----------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+-----------------------------+
|ID      |Case Number|Date               |Block                |IUCR|Primary Type|Description            |Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On         |Latitude    |Longitude    |Location                     |
+--------+-----------+-------------------+---------------------+----+------------+-----------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+-----------------------------+
|10224738|HY411648   |2015-09-05 13:30:00|043XX S WOOD ST      |0486|BATTERY     |DOMESTIC BATTERY SIMPLE|RESIDENCE

# **User Defined Functions**