# An Analytical Detective (PySpark)\n\nThis material is part of the exercises for **The Analytics Edge (edX/MITx)** and has been adapted to **Apache Spark (PySpark)**.\n\n**Notebook type:** PySpark implementation\n\n## Dataset\nPlace `mvtWeek1.csv` next to this notebook (or update the `DATA_PATH`).\nThe exercises analyze motor vehicle thefts in Chicago.\n

In [None]:
# PySpark setup
from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.appName("AnalyticsEdge_AnAnalyticalDetective").getOrCreate()

DATA_PATH = "mvtWeek1.csv"

mvt = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(DATA_PATH)
)

# Standardize types
mvt = (
    mvt
    .withColumn("ID", F.col("ID").cast("int"))
    .withColumn("Beat", F.col("Beat").cast("int"))
    .withColumn("Year", F.col("Year").cast("int"))
    .withColumn("Arrest", F.col("Arrest").cast("boolean"))
    .withColumn("Domestic", F.col("Domestic").cast("boolean"))
    .withColumn("District", F.col("District").cast("int"))
    .withColumn("CommunityArea", F.col("CommunityArea").cast("int"))
    .withColumn("Latitude", F.col("Latitude").cast("double"))
    .withColumn("Longitude", F.col("Longitude").cast("double"))
    .withColumn("DateTS", F.to_timestamp(F.col("Date"), "M/d/yy H:mm"))
    .withColumn("Month", F.date_format(F.col("DateTS"), "MMMM"))
    .withColumn("Weekday", F.date_format(F.col("DateTS"), "EEEE"))
)

mvt.cache()
print("Row count:", mvt.count())
mvt.printSchema()
mvt.show(5, truncate=False)


## Problem 1.1 - Loading the Data

Load the dataset `mvtWeek1.csv` into a Spark DataFrame called `mvt`.

- Inspect the schema and basic summary statistics.
- You will compute the exact number of rows in **Problem 1.2**.

Note: Place `mvtWeek1.csv` in the same folder as this notebook (or update the path).

In [None]:
# Inspect schema and basic summary
mvt.printSchema()
mvt.describe("ID", "Beat", "Year", "Latitude", "Longitude").show()


## Problem 1.2 - Loading the Data

How many rows of data (observations) are in this dataset?

In [None]:
mvt.count()


## Problem 1.3 - Loading the Data

Using an aggregate function, what is the maximum value of the variable `ID`?

In [None]:
mvt.agg(F.max("ID").alias("max_ID")).show()


## Problem 1.4 - Loading the Data

What is the minimum value of the variable `Beat`?

In [None]:
mvt.agg(F.min("Beat").alias("min_Beat")).show()


## Problem 1.5 - Loading the Data

How many observations have value `True` in the `Arrest` variable?

In [None]:
mvt.filter(F.col("Arrest") == True).count()


## Problem 1.6 - Loading the Data

How many observations have a `LocationDescription` value of `ALLEY`?

In [None]:
mvt.filter(F.col("LocationDescription") == "ALLEY").count()


## Problem 2.1 - Understanding Dates

Look at a sample value of the `Date` column. In what format are the entries?

(Hint: they look like month/day/year hour:minute.)

In [None]:
# Sample a Date string
mvt.select("Date").where(F.col("Date").isNotNull()).limit(5).show(truncate=False)


## Problem 2.2 - Understanding Dates

Convert the `Date` column from string to timestamp. Then compute the **median** date.

What is the month and year of the median date in this dataset?

In [None]:
# Median date (approx) using percentile_approx on unix seconds
median_unix = mvt.select(F.percentile_approx(F.col("DateTS").cast("long"), 0.5).alias("median_unix")).first()[0]
median_ts = spark.range(1).select(F.from_unixtime(F.lit(median_unix)).cast("timestamp").alias("median_ts")).first()[0]
median_month_year = spark.range(1).select(F.date_format(F.from_unixtime(F.lit(median_unix)), "MMMM yyyy").alias("median_month_year")).first()[0]
median_ts, median_month_year


## Problem 2.3 - Understanding Dates

Create two new columns:
- `Month` (month name)
- `Weekday` (day-of-week name)

In which month did the fewest motor vehicle thefts occur?

In [None]:
# Month with fewest thefts
(mvt
 .groupBy("Month")
 .count()
 .orderBy(F.asc("count"))
 .show(20, truncate=False)
)


## Problem 2.4 - Understanding Dates

On which weekday did the most motor vehicle thefts occur?

In [None]:
# Weekday with most thefts
(mvt
 .groupBy("Weekday")
 .count()
 .orderBy(F.desc("count"))
 .show(truncate=False)
)


## Problem 2.5 - Understanding Dates

Which month has the largest number of motor vehicle thefts for which an arrest was made?

In [None]:
# Month with the largest number of thefts for which an arrest was made
(mvt
 .filter(F.col("Arrest") == True)
 .groupBy("Month")
 .count()
 .orderBy(F.desc("count"))
 .show(20, truncate=False)
)


## Problem 3.1 - Visualizing Crime Trends

Create a histogram (or a time-series proxy such as counts by month-year) of the thefts over time.

Answer the following based on the plot:
- In general, does crime increase or decrease from 2002 to 2012?
- In general, does crime increase or decrease from 2005 to 2008?
- In general, does crime increase or decrease from 2009 to 2011?

In [None]:
# Time-series proxy: theft count by year (and optional histogram)
counts_by_year = mvt.groupBy("Year").count().orderBy("Year")
counts_by_year.show()

# Optional: plot counts by year
import matplotlib.pyplot as plt
pdf = counts_by_year.toPandas()
plt.figure()
plt.plot(pdf["Year"], pdf["count"])
plt.xlabel("Year")
plt.ylabel("Number of thefts")
plt.title("Motor Vehicle Thefts by Year")
plt.show()


## Problem 3.2 - Visualizing Crime Trends

Create a boxplot comparing the distribution of dates for crimes with `Arrest = True` versus `Arrest = False`.

Does it look like there were more crimes for which arrests were made in the first half of the time period (2001 to 2006) or the second half (2007 to 2012)?

In [None]:
# Compare arrest timing across the time period
# A simple proxy: number of arrests by year
arrests_by_year = (mvt
    .filter(F.col("Arrest") == True)
    .groupBy("Year")
    .count()
    .orderBy("Year")
)
arrests_by_year.show()

# Optional boxplot (requires pandas; may be slow on limited machines)
import pandas as pd
import matplotlib.pyplot as plt
pdf = mvt.select("Year", "Arrest").where(F.col("Year").isNotNull()).toPandas()
plt.figure()
# Boxplot of Year by Arrest flag (as a lightweight substitute for Date boxplot)
pdf.boxplot(column="Year", by="Arrest")
plt.title("Year by Arrest")
plt.suptitle("")
plt.show()


## Problem 3.3 - Visualizing Crime Trends

For what proportion of motor vehicle thefts in 2001 was an arrest made?

In [None]:
# Arrest proportion by year
arrest_rate_by_year = (mvt
    .groupBy("Year")
    .agg(F.mean(F.col("Arrest").cast("double")).alias("arrest_rate"))
    .orderBy("Year")
)
arrest_rate_by_year.show()

arrest_rate_by_year.where(F.col("Year") == 2001).select("arrest_rate").first()[0]


## Problem 3.4 - Visualizing Crime Trends

For what proportion of motor vehicle thefts in 2007 was an arrest made?

In [None]:
arrest_rate_by_year = (mvt
    .groupBy("Year")
    .agg(F.mean(F.col("Arrest").cast("double")).alias("arrest_rate"))
    .orderBy("Year")
)
arrest_rate_by_year.where(F.col("Year") == 2007).select("arrest_rate").first()[0]


## Problem 3.5 - Visualizing Crime Trends

For what proportion of motor vehicle thefts in 2012 was an arrest made?

In [None]:
arrest_rate_by_year = (mvt
    .groupBy("Year")
    .agg(F.mean(F.col("Arrest").cast("double")).alias("arrest_rate"))
    .orderBy("Year")
)
arrest_rate_by_year.where(F.col("Year") == 2012).select("arrest_rate").first()[0]


## Problem 4.1 - Popular Locations

Which locations are the top five locations for motor vehicle thefts, excluding the `OTHER` category?

In [None]:
# Top 5 locations by theft count excluding OTHER
loc_counts = mvt.groupBy("LocationDescription").count().orderBy(F.desc("count"))
loc_counts.show(10, truncate=False)

top5_locations = [r[0] for r in loc_counts.filter(F.col("LocationDescription") != "OTHER").limit(5).collect()]
top5_locations


## Problem 4.2 - Popular Locations

Create a subset called `Top5` that contains only thefts from the five locations in Problem 4.1. How many observations are in `Top5`?

In [None]:
# Build Top5 subset
loc_counts = mvt.groupBy("LocationDescription").count().orderBy(F.desc("count"))
top5_locations = [r[0] for r in loc_counts.filter(F.col("LocationDescription") != "OTHER").limit(5).collect()]

Top5 = mvt.filter(F.col("LocationDescription").isin(top5_locations))
Top5.count()


## Problem 4.3 - Popular Locations

Using `Top5`, which of the five locations has the highest arrest rate?

In [None]:
# Highest arrest rate within Top5
loc_counts = mvt.groupBy("LocationDescription").count().orderBy(F.desc("count"))
top5_locations = [r[0] for r in loc_counts.filter(F.col("LocationDescription") != "OTHER").limit(5).collect()]
Top5 = mvt.filter(F.col("LocationDescription").isin(top5_locations))

(Top5
 .groupBy("LocationDescription")
 .agg(F.mean(F.col("Arrest").cast("double")).alias("arrest_rate"))
 .orderBy(F.desc("arrest_rate"))
 .show(truncate=False)
)


## Problem 4.4 - Popular Locations

On which day of the week do the most motor vehicle thefts at gas stations happen?

In [None]:
# Most common weekday for GAS STATION thefts (within Top5)
loc_counts = mvt.groupBy("LocationDescription").count().orderBy(F.desc("count"))
top5_locations = [r[0] for r in loc_counts.filter(F.col("LocationDescription") != "OTHER").limit(5).collect()]
Top5 = mvt.filter(F.col("LocationDescription").isin(top5_locations))

(Top5
 .filter(F.col("LocationDescription") == "GAS STATION")
 .groupBy("Weekday")
 .count()
 .orderBy(F.desc("count"))
 .show(truncate=False)
)


## Problem 4.5 - Popular Locations

On which day of the week do the fewest motor vehicle thefts in residential driveways happen?

In [None]:
# Least common weekday for DRIVEWAY - RESIDENTIAL thefts (within Top5)
loc_counts = mvt.groupBy("LocationDescription").count().orderBy(F.desc("count"))
top5_locations = [r[0] for r in loc_counts.filter(F.col("LocationDescription") != "OTHER").limit(5).collect()]
Top5 = mvt.filter(F.col("LocationDescription").isin(top5_locations))

(Top5
 .filter(F.col("LocationDescription") == "DRIVEWAY - RESIDENTIAL")
 .groupBy("Weekday")
 .count()
 .orderBy(F.asc("count"))
 .show(truncate=False)
)
