# <center/> PySpark by Example 

## 1 - Initialize the Spark Engine

### 1.1 - Load all essential libraries, functions and initiate SparkSession

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,lit, max, min , to_timestamp, to_date,count

In [3]:
spark = SparkSession.builder.appName('PySpark by Example').getOrCreate()

### 1.2 - Load and explore Chicago's Reported Crime Data (Basic EDA)

In [None]:
#Display sample record
df_rc = spark.read.csv('reported_crimes.csv', header=True)
df_rc.show(3, truncate = False)

In [None]:
#Determine schema structure, especially types
df_rc.printSchema()

In [None]:
#Get brief stats to detrmine subset for analysis purposes. Its obviously a large dataset
df_rc.select(
    max(to_date(col('Date'),'MM/dd/yyyy HH:mm:ss a')).alias('max_date'),
    min(to_date(col('Date'),'MM/dd/yyyy HH:mm:ss a')).alias('min_date'),
    count(col('ID')).alias('rec_count')
).show()

In [None]:
#Let us load data for 20 years strictly for analysis purposes and convert 'Date' to a valid type
df_rc = df_rc.withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy HH:mm:ss a')).filter(col('Date') < lit('2020-01-01'))
df_rc.show(3)

## 2 - Working with columns

### 2.1 - Display only the first 5 rows of the column name IUCR

In [None]:
df_rc.select('IUCR').show(5)

  ### 2.2 - Display only the first 4 rows of the column names Case Number, Date and Arrest

In [None]:
df_rc.select('Case Number','Date', 'Arrest').show(4)

### 2.3 - Add a column with name One, with entries all 1s

In [None]:
df_rc.withColumn('One',lit('1')).show(3)

### 2.4 - Remove the column IUCR

In [None]:
df_rc.drop('IUCR').show(3)

## 3 - Working with rows

### 3.1 - Add the reported crimes for another 3 yrs. (2020-01-01 <= X < 2024-01-01)

In [12]:
from pyspark.sql.functions import year, dayofweek, months

In [None]:
df_more_data = spark.read.csv('reported_crimes.csv', header=True) \
    .withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy HH:mm:ss a')) \
        .filter((col('Date') > lit('2020-01-01')) & (col('Date') < lit('2024-01-01'))).distinct()

print(f'The newly added data has : {df_more_data.count()} records!')

In [None]:
df = df_rc.union(df_more_data)
#df.count(), df_rc.count(), df_more_data.count()
data = [(df_rc.count(),df_more_data.count(),df.count() )]
spark.createDataFrame(data,['Initial Data Loaded','Additional Data Loaded','Total Data on DF']).show()

In [None]:
df.groupby(year(col('Date'))).count() \
    .toDF('Year', 'Report Crimes Count') \
        .orderBy('Year', ascending=False).show(50,truncate=False)

### 3.2 - What are the top 10 number of reported crimes by Primary type, in DESC order?

In [None]:
df.groupBy('Primary Type').count().toDF('Primary Type', 'Count of Crimes Reported')\
    .orderBy('Count of Crimes Reported', ascending=False)\
        .show(10, truncate=False)

### 3.3 - Challenge

#### 3.3(A) - What percentage of reported crimes resulted in an arrest?

In [None]:
from pyspark.sql.functions import lower, round,to_str
df.cache()

In [None]:
tot_cases = df.count()
arrests = df.filter(lower(col('Arrest')) == 'true').count()
perc = (arrests/tot_cases)*100
data = [(tot_cases, arrests, perc)]  # Note the extra parentheses
spark.createDataFrame(data, ['Cases','Arrest','Percentage Arrest %']).show()

#### 3.3(B) - What are the top 3 locations for reported crimes?

In [None]:
df.groupBy('Location Description').count().orderBy('count', ascending=False).show(3)

##  4 - Built-in functions

In [20]:
from pyspark.sql.functions import lower, upper, max, min,substring, date_add

**This is just fooling around with join,aggregates and so on. You can disregard though it works without analytical sense**

In [None]:
d1 = df.groupby(year(col('Date'))).agg({'Date':'count'}).toDF('Date','Dates_Count')
d2 = df.groupby(year(col('Date'))).agg({'Date':'mean'}).toDF('Date2','Dates_Mean')

d3 = d1.join(
    d2, d1.Date == d2.Date2,'inner'
)
d3.select('Date','Date2','Dates_Count','Dates_Mean').show()



### 4.1 - String functions

**Display the Primary Type column in lower and upper characters, and the first 4 characters of the column**

In [None]:
df.select(
    lower(col('Primary Type')).alias('Lowercase Name'),
    upper(col('Primary Type')).alias('Uppercase Name'),
    substring(col('Primary Type'),1,4).alias('Substring Name')
    ).show(5,truncate=False)


### 4.2 - Numeric functions


**Show the oldest date and the most recent date**

In [None]:
df.select(min(to_date(col('Date'))).alias('Oldest_Date'), max(to_date(col('Date'))).alias('Most_Recent_Date')).show()

### 4.3 - Date

**What is 3 days earlier that the oldest date and 3 days later than the most recent date?**

In [None]:
df.select(  
    min(date_add(to_date(col('Date')), -3)).alias('3 Days before (OD)'), 
    min(to_date(col('Date'))).alias('Oldest_Date (OD)'),
    max(to_date(col('Date'))).alias('Maximum_Recent_Date (MRD)'),
    max(date_add(to_date(col('Date')), + 3)).alias('3 Days after (MRD)')
    ).show()

## 5 - Joins

### 5.1 - Download police station data

In [25]:
from pyspark.sql.functions import lpad, date_format

In [None]:
df_police = spark.read.csv('police_station.csv', header=True).withColumn('Format_District', lpad(col('District'),3,'0'))
df_police.show(5)
df_police.select('Format_District').distinct().show()

### 5.2 - The crimes data has only the district no. Add district name by joining with the police station data

In [None]:
df_join = df.join(df_police, df.District == df_police.Format_District,'inner')
df_join.select('ID', 'Case Number','Date',col('Format_District').alias('DIstrict'),'District Name','Primary Type','Description', 'ADDRESS','Ward','Community Area').show(3)

## 6 - Challenge questions

### 6.1 - What is the most frequently reported non-criminal activity (Top 5)? 

In [None]:
df_rc.groupBy('Primary Type').count().orderBy('count', ascending=False).show(5,truncate=False)

### 6.2 - Using a bar chart, plot which day of the week has the most number of reported crime.

In [None]:
df_crime_days = df_rc.groupBy(date_format(col('Date'),'E').alias('Week Day')).count().orderBy('count', ascending=False)
df_crime_days.show()

In [None]:
import matplotlib.pyplot as plt
pd = df_crime_days.toPandas()

# Plot the data
plt.bar(pd['Week Day'], pd["count"])
plt.xlabel("WK Day")
plt.ylabel("Count")
plt.title("Crimes commited by days of the week")
plt.show()


## (05-01) RDDs setup

**How many police stations are there?**

**Display the District ID, District name, Address and Zip for the police station with District ID 7**



**Police stations 10 and 11 are geographically close to each other. Display the District ID, District name, address and zip code**