# Working with rows

Filter rows by condition
Df.filter(col(‘column’) >1)

Get unique rows in dataframe
Df.select(column).distinct().show()

Sort rows
Df.orderBy(col(‘column’)

Append rows of dataframe (dataframes are immutable so must union original dataframe with new one = concatenating two dataframes, both need to have the same number of columns and schema)
Df.union(df2)


## Download and install Spark

In [1]:
!ls

sample_data


In [None]:
#!apt-get update
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
#!tar xf spark-2.3.1-bin-hadoop2.7.tgz
#!pip install -q findspark

## Setup environment

[Jan 2023 update]
- Google colab recently upgraded to Python 3.8. Unfortunately this breaks spark 2.3.1. 
- Please use the code below where we install from the pyspark package instead

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m9.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=c6418837417ce8bfa85a124437e29ef262b048e5140303c8962460b052ec794f
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [None]:
#import os
#os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

#import findspark
#findspark.init()
#from pyspark import SparkContext
#sc = SparkContext.getOrCreate()

#import pyspark
#from pyspark.sql import SparkSession
#spark = SparkSession.builder.getOrCreate() 
#spark

## Downloading and preprocessing Chicago's Reported Crime Data

In [4]:
!wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
!ls -l

--2023-03-28 16:52:26--  https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.140.199, 52.206.140.205, 52.206.68.26
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.140.199|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘rows.csv?accessType=DOWNLOAD’

rows.csv?accessType     [        <=>         ]   1.71G  2.83MB/s    in 9m 27s  

2023-03-28 17:01:54 (3.08 MB/s) - ‘rows.csv?accessType=DOWNLOAD’ saved [1832414791]

total 1789476
-rw-r--r-- 1 root root 1832414791 Mar 28 11:12 'rows.csv?accessType=DOWNLOAD'
drwxr-xr-x 1 root root       4096 Mar 27 13:41  sample_data


In [5]:
!mv rows.csv\?accessType\=DOWNLOAD reported-crimes.csv
!ls -l

total 1789476
-rw-r--r-- 1 root root 1832414791 Mar 28 11:12 reported-crimes.csv
drwxr-xr-x 1 root root       4096 Mar 27 13:41 sample_data


In [6]:
from pyspark.sql.functions import to_timestamp,col,lit
rc = spark.read.csv('reported-crimes.csv',header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') <= lit('2018-11-11'))
rc.show(5)

+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|               Date|               Block|IUCR|Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+-------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|10224738|   HY411648|2015-09-05 13:30:00|     043XX S WOOD ST|0486|     BATTERY|DOMESTIC BATTERY ...|           RESIDENCE| false|    true|0924|     00

## Working with rows

**Add the reported crimes for an additional day, 12-Nov-2018, to our dataset.**

In [16]:
# remember filter
## <= lit('2018-11-11'))
## add the addition next day to dataframe
# one day - change <= to ==
one_day = spark.read.csv('reported-crimes.csv',header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') == lit('2018-11-12'))

In [12]:
# number of reported crimes for the 12th of november
one_day.count()

3

In [17]:
# there were 3 crimes on 11-12-18
## add these rows to dataframe
rc.union(one_day).orderBy('Date', ascending = False).show(5)

+--------+-----------+-------------------+-------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|               Date|              Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+-------------------+-------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|11505149|   JB513151|2018-11-12 00:00:00| 003XX S WHIPPLE ST|0810|             THEFT|           OVER $500|              STREET| false| 

**What are the top 10 number of reported crimes by Primary type, in descending order of occurence?**

In [18]:
rc.groupBy("Primary Type").count().show(10)

+--------------------+------+
|        Primary Type| count|
+--------------------+------+
|OFFENSE INVOLVING...| 46642|
|CRIMINAL SEXUAL A...|  1230|
|            STALKING|  3388|
|PUBLIC PEACE VIOL...| 47785|
|           OBSCENITY|   587|
|NON-CRIMINAL (SUB...|     9|
|               ARSON| 11157|
|            GAMBLING| 14422|
|   CRIMINAL TRESPASS|193371|
|             ASSAULT|418521|
+--------------------+------+
only showing top 10 rows



at the end of groupBy must have aggregation - so count

In [22]:
rc.groupBy("Primary Type").count().orderBy("count", ascending=False).show(10)

+-------------------+-------+
|       Primary Type|  count|
+-------------------+-------+
|              THEFT|1418507|
|            BATTERY|1232278|
|    CRIMINAL DAMAGE| 771516|
|          NARCOTICS| 711766|
|      OTHER OFFENSE| 418903|
|            ASSAULT| 418521|
|           BURGLARY| 388040|
|MOTOR VEHICLE THEFT| 314133|
| DECEPTIVE PRACTICE| 267025|
|            ROBBERY| 255601|
+-------------------+-------+
only showing top 10 rows

