# Working with joins

## Download and install Spark

In [None]:
!ls -l

In [None]:
#!apt-get update
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
#!tar xf spark-2.3.1-bin-hadoop2.7.tgz
#!pip install -q findspark

## Setup environment

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

## Downloading and preprocessing Chicago's Reported Crime Data

In [None]:
#!wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
#!ls -l

In [None]:
#!mv rows.csv\?accessType\=DOWNLOAD reported-crimes.csv
#!ls -l

In [None]:
from pyspark.sql.functions import to_timestamp,col,lit
rc = spark.read.csv('reported-crimes.csv',header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') <= lit('2018-11-11'))
rc.show(5)

## Joins

**Download police station data**

In [None]:
!wget -O police-station.csv https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
!ls -l

In [None]:
ps = spark.read.csv('police-station.csv', header=True)
ps.show(5)

**The reported crimes dataset has only the district number. Add the district name by joining with the police station dataset**

In [None]:
rc.cache()
rc.count()

In [None]:
ps.select(col('DISTRICT')).distrinct().show(30)

In [None]:
rc.select('Disctrict').dstrinct().show(30)

In [None]:
from pyspark.sql.functions import lpad

In [None]:
help(lpad)      # lpad(col, len, pad) left-pad the string column to width len with pad

In [None]:
ps.select(lpad(col('DISTRICT'),3,'0')).show()

In [None]:
ps = ps.withColumn('Format_district',lpad(col('DISTRICT'),3,'0'))
ps.show(5)

In [None]:
rc.join(ps, rc.District == ps.Format_distrit, 'left_outer').show()

In [None]:
ps.columns      # show all columns of police station dataframe

In [None]:
rc.join(ps, rc.District == ps.Format_distrit, 'left_outer').drop(   # remove columns from ps dataframe
    'ADDRESS',
    'CITY',
    'STATE',
    'ZIP',
    'WEBSITE'
).show()