<a href="https://colab.research.google.com/github/Ashish265/big-data-and-pyspark/blob/master/functions_in_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Download Chicago's Reported Crime Data

## Download and install Spark

In [28]:
!ls

reported-crimes.csv  spark-2.3.1-bin-hadoop2.7	    spark-warehouse
sample_data	     spark-2.3.1-bin-hadoop2.7.tgz


In [29]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [1 InRelease 2,586 B/88.7                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [1 InRelease 43.1 kB/88.70% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease [3,626 B]
                                                                               0% [Waiting for headers] [Waiting for headers] [Waiting for headers]0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                               Ign:4 h

## Setup environment

In [30]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

## Downloading and preprocessing Chicago's Reported Crime Data

In [31]:
!wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD

--2020-06-21 05:30:54--  https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.68.26, 52.206.140.205, 52.206.140.199
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.68.26|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘rows.csv?accessType=DOWNLOAD’

rows.csv?accessType     [<=>                 ]   1.57G  3.39MB/s    in 8m 11s  

2020-06-21 05:39:05 (3.27 MB/s) - ‘rows.csv?accessType=DOWNLOAD’ saved [1683075202]



In [32]:
!ls

 reported-crimes.csv		 spark-2.3.1-bin-hadoop2.7.tgz
'rows.csv?accessType=DOWNLOAD'	 spark-2.3.1-bin-hadoop2.7.tgz.1
 sample_data			 spark-warehouse
 spark-2.3.1-bin-hadoop2.7


In [33]:
!mv rows.csv\?accessType\=DOWNLOAD reported-crimes.csv

In [35]:
from pyspark.sql.functions import to_timestamp,col,lit
rc = spark.read.csv('/content/reported-crimes.csv',header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') <= lit('2018-11-11'))
rc.show(5)

+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|               Block|IUCR|       Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11034701|   JA366925|2001-01-01 11:00:00|     016XX E 86TH PL|1153| DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           RESIDENCE| false|   false|0412|     004|   8|            45|      11| 

## importing the function library



In [66]:
from pyspark.sql import functions

In [67]:
print(dir(functions))



In [68]:
from pyspark.sql.functions import lower, upper, substring

In [69]:
rc.select(lower(col('Primary Type')),upper(col('Primary Type')),substring(col('Primary Type'),1,4)).show()

+--------------------+--------------------+-----------------------------+
| lower(Primary Type)| upper(Primary Type)|substring(Primary Type, 1, 4)|
+--------------------+--------------------+-----------------------------+
|  deceptive practice|  DECEPTIVE PRACTICE|                         DECE|
| crim sexual assault| CRIM SEXUAL ASSAULT|                         CRIM|
|            burglary|            BURGLARY|                         BURG|
|               theft|               THEFT|                         THEF|
| crim sexual assault| CRIM SEXUAL ASSAULT|                         CRIM|
| crim sexual assault| CRIM SEXUAL ASSAULT|                         CRIM|
|offense involving...|OFFENSE INVOLVING...|                         OFFE|
|offense involving...|OFFENSE INVOLVING...|                         OFFE|
|               theft|               THEFT|                         THEF|
|  deceptive practice|  DECEPTIVE PRACTICE|                         DECE|
|     criminal damage|     CRIMINAL DA

## show the oldest and the most recent date

In [70]:
from pyspark.sql.functions import min, max

In [71]:
rc.select(min(col('Date')),max(col('Date'))).show()

+-------------------+-------------------+
|          min(Date)|          max(Date)|
+-------------------+-------------------+
|2001-01-01 00:00:00|2018-11-10 23:55:00|
+-------------------+-------------------+



# Date

## what is 3 days earlier than the oldest date and 3 days later than the most recent date

In [73]:
from pyspark.sql.functions import date_add,date_sub

In [74]:
rc.select(date_sub(min(col('Date')),3),date_add(min(col('Date')),3)).show(1)

+----------------------+----------------------+
|date_sub(min(Date), 3)|date_add(min(Date), 3)|
+----------------------+----------------------+
|            2000-12-29|            2001-01-04|
+----------------------+----------------------+

