<a href="https://colab.research.google.com/github/fahmida185/Apache-Spark-BigData-Projects/blob/master/Working_with_columns_rows.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Working with columns

## Download and install Spark

In [24]:
!ls

reported-crimes.csv  spark-2.3.1-bin-hadoop2.7	    spark-warehouse
sample_data	     spark-2.3.1-bin-hadoop2.7.tgz


In [2]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease [3,626 B]
Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:4 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease [21.3 kB]
Get:5 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:6 http://ppa.launchpad.net/marutter/c2d4u3.5/ubuntu bionic InRelease [15.4 kB]
Get:7 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Ign:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:9 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic/main amd64 Packages [37.4 kB]
Ign:10 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:11 http://security.ubuntu.com/ubuntu bionic-security/main amd64 Packages [933 kB]
Hit:12 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  R

## Setup environment

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

## Downloading and preprocessing Chicago's Reported Crime Data

In [4]:
!wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
!ls -l

--2020-06-03 23:11:23--  https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.140.199, 52.206.68.26, 52.206.140.205
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.140.199|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘rows.csv?accessType=DOWNLOAD’

rows.csv?accessType     [              <=>   ]   1.56G  2.97MB/s    in 8m 42s  

2020-06-03 23:20:05 (3.07 MB/s) - ‘rows.csv?accessType=DOWNLOAD’ saved [1680463552]

total 1861684
-rw-r--r--  1 root root 1680463552 Jun  3 11:01 'rows.csv?accessType=DOWNLOAD'
drwxr-xr-x  1 root root       4096 May 29 18:19  sample_data
drwxrwxr-x 13 1000 1000       4096 Jun  1  2018  spark-2.3.1-bin-hadoop2.7
-rw-r--r--  1 root root  225883783 Jun  1  2018  spark-2.3.1-bin-hadoop2.7.tgz


In [5]:
!mv rows.csv\?accessType\=DOWNLOAD reported-crimes.csv
!ls -l

total 1861684
-rw-r--r--  1 root root 1680463552 Jun  3 11:01 reported-crimes.csv
drwxr-xr-x  1 root root       4096 May 29 18:19 sample_data
drwxrwxr-x 13 1000 1000       4096 Jun  1  2018 spark-2.3.1-bin-hadoop2.7
-rw-r--r--  1 root root  225883783 Jun  1  2018 spark-2.3.1-bin-hadoop2.7.tgz


In [16]:
from pyspark.sql.functions import to_timestamp,col,lit
rc = spark.read.csv('reported-crimes.csv',header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') <= lit('2018-11-11'))
rc.show(5)

+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|               Block|IUCR|       Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11034701|   JA366925|2001-01-01 11:00:00|     016XX E 86TH PL|1153| DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           RESIDENCE| false|   false|0412|     004|   8|            45|      11| 

## Working with columns

**Display only the first 5 rows of the column name IUCR **

In [7]:
rc.select('IUCR').show(5)

+----+
|IUCR|
+----+
|1153|
|0281|
|0620|
|0810|
|0281|
+----+
only showing top 5 rows



  **Display only the first 4 rows of the column names Case Number, Date and Arrest**

In [8]:
rc.select('IUCR','Case Number', 'Date', 'Arrest').show(4)

+----+-----------+-------------------+------+
|IUCR|Case Number|               Date|Arrest|
+----+-----------+-------------------+------+
|1153|   JA366925|2001-01-01 11:00:00| false|
|0281|   JB147188|2017-10-08 03:00:00| false|
|0620|   JB147595|2017-03-28 14:00:00| false|
|0810|   JB147230|2017-09-09 20:17:00| false|
+----+-----------+-------------------+------+
only showing top 4 rows



** Add a column with name One, with entries all 1s **

In [10]:
from pyspark.sql.functions import lit
rc.withColumn('One',lit(1)).show(4)

+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+---+
|      ID|Case Number|               Date|               Block|IUCR|       Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|One|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+---+
|11034701|   JA366925|2001-01-01 11:00:00|     016XX E 86TH PL|1153| DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           RESIDENCE| false|   false|0412|     004|   8|            4

** Remove the column IUCR **

In [0]:
rc=rc.drop('IUCR')

Add to the reported crimes for an additional day, that's the 12th of November 2018 to our dataset. Now remember that to the original dataset has this filter which causes us to capture all of the reported crimes from 2001 the 11th of November 2018. So we want to add the additional day, that's the next day to our dataset. So let's capture that additional day in a new DataFrame, let's call it one-day, so spark.read.csv and we can copy all of the information from our previous cell. And the only change we need to make is we want to capture it only for that one day. So we say everything that is on the 12th of November. And let's have a look and see how many rules I added, so I'm do a one_day.count and this will give us the number of reported crimes for the 12th of November. So that's interesting. For whatever reason we only have two reported crimes for the 12th of November. So let's add those two rows to our original dataset. So our original DataFrame is called rc.union(one_day) and if you do a show here we can see that it's probably added the rows to the bottom of the DataFrame

In [17]:
# Add the reported crimes for an additional day, 12-Nov-2018, to our dataset.
from pyspark.sql.functions import to_timestamp,col,lit
one_day= spark.read.csv('reported-crimes.csv',header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') ==lit('2018-11-12'))
one_day.count()
 

3

In [18]:
rc.union(one_day).show(5)

+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|               Block|IUCR|       Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11034701|   JA366925|2001-01-01 11:00:00|     016XX E 86TH PL|1153| DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           RESIDENCE| false|   false|0412|     004|   8|            45|      11| 

In [19]:
rc.union(one_day).orderBy('Date',ascending=False).show(5)

+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|               Date|               Block|IUCR|       Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|11505149|   JB513151|2018-11-12 00:00:00|  003XX S WHIPPLE ST|0810|              THEFT|           OVER $500|              STREET|

**What are the top 10 number of reported crimes by Primary type, in descending order of occurence?**

In [22]:
rc.groupBy('Primary Type').count().show(5)

+--------------------+-----+
|        Primary Type|count|
+--------------------+-----+
|OFFENSE INVOLVING...|45734|
|CRIMINAL SEXUAL A...|  391|
|            STALKING| 3384|
|PUBLIC PEACE VIOL...|47785|
|           OBSCENITY|  582|
+--------------------+-----+
only showing top 5 rows



In [23]:
rc.groupBy('Primary Type').count().orderBy('count', ascending=False).show(5)

+---------------+-------+
|   Primary Type|  count|
+---------------+-------+
|          THEFT|1418426|
|        BATTERY|1232222|
|CRIMINAL DAMAGE| 771499|
|      NARCOTICS| 711648|
|  OTHER OFFENSE| 418837|
+---------------+-------+
only showing top 5 rows



## Challenge questions
**What percentage of reported crimes resulted in an arrest?**

In [26]:
rc.select('Arrest').distinct().show()
rc.printSchema()

+------+
|Arrest|
+------+
| false|
|  true|
+------+

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)



In [27]:
rc.filter(col('Arrest')=='true').count()/rc.select('Arrest').count()

0.2775387099578267

 # **What are the top 3 locations for reported crimes?**

***Challenges***


In [29]:
rc.groupBy('Location Description').count().orderBy('count', ascending=False).show(3)

+--------------------+-------+
|Location Description|  count|
+--------------------+-------+
|              STREET|1770577|
|           RESIDENCE|1144683|
|           APARTMENT| 698191|
+--------------------+-------+
only showing top 3 rows



###### Built in Functions

In [30]:
from pyspark.sql import functions
print(dir(functions))



In [31]:
from pyspark.sql.functions import lower, upper, substring
help(substring)

Help on function substring in module pyspark.sql.functions:

substring(str, pos, len)
    Substring starts at `pos` and is of length `len` when str is String type or
    returns the slice of byte array that starts at `pos` in byte and is of length `len`
    when str is Binary type.
    
    .. note:: The position is not zero based, but 1 based index.
    
    >>> df = spark.createDataFrame([('abcd',)], ['s',])
    >>> df.select(substring(df.s, 1, 2).alias('s')).collect()
    [Row(s='ab')]
    
    .. versionadded:: 1.5



In [32]:
 # **Display the Primary Type column in lower and upper characters, and the first 4 characters of the column**
 rc.select(lower(col('Primary Type')),upper(col('Primary Type')),substring(col('Primary Type'),1,4)).show(5)

+-------------------+-------------------+-----------------------------+
|lower(Primary Type)|upper(Primary Type)|substring(Primary Type, 1, 4)|
+-------------------+-------------------+-----------------------------+
| deceptive practice| DECEPTIVE PRACTICE|                         DECE|
|crim sexual assault|CRIM SEXUAL ASSAULT|                         CRIM|
|           burglary|           BURGLARY|                         BURG|
|              theft|              THEFT|                         THEF|
|crim sexual assault|CRIM SEXUAL ASSAULT|                         CRIM|
+-------------------+-------------------+-----------------------------+
only showing top 5 rows



In [34]:
#**Show the oldest date and the most recent date**
from pyspark.sql.functions import min, max
rc.select(min(col('Date')),max(col('Date'))).show(1)

+-------------------+-------------------+
|          min(Date)|          max(Date)|
+-------------------+-------------------+
|2001-01-01 00:00:00|2018-11-10 23:55:00|
+-------------------+-------------------+



 ** What is 3 days earlier that the oldest date and 3 days later than the most recent date?**

In [35]:
from pyspark.sql.functions import date_add, date_sub
help(date_add)

Help on function date_add in module pyspark.sql.functions:

date_add(start, days)
    Returns the date that is `days` days after `start`
    
    >>> df = spark.createDataFrame([('2015-04-08',)], ['dt'])
    >>> df.select(date_add(df.dt, 1).alias('next_date')).collect()
    [Row(next_date=datetime.date(2015, 4, 9))]
    
    .. versionadded:: 1.5



In [36]:
rc.select(date_sub(min(col('Date')), 3),date_add(max(col('Date')), 3)).show()

+----------------------+----------------------+
|date_sub(min(Date), 3)|date_add(max(Date), 3)|
+----------------------+----------------------+
|            2000-12-29|            2018-11-13|
+----------------------+----------------------+



In [40]:
from pyspark.sql.functions import to_date, to_timestamp, lit
df=spark.createDataFrame([('2020-12-15 13:00:00',)],['Christmas'])
df.show(1)

+-------------------+
|          Christmas|
+-------------------+
|2020-12-15 13:00:00|
+-------------------+



In [42]:
df.select(to_date(col('Christmas'), 'yyyy-MM-dd HH:mm:ss' ),to_timestamp(col('Christmas'), 'yyyy-MM-dd HH:mm:ss' ) ).show(1)

+-------------------------------------------+------------------------------------------------+
|to_date(`Christmas`, 'yyyy-MM-dd HH:mm:ss')|to_timestamp(`Christmas`, 'yyyy-MM-dd HH:mm:ss')|
+-------------------------------------------+------------------------------------------------+
|                                 2020-12-15|                             2020-12-15 13:00:00|
+-------------------------------------------+------------------------------------------------+

