In [1]:
# Import findspark and initialize. 
import pandas as pd
import findspark
findspark.init()

In [2]:
# Import packages
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType, DateType,IntegerType



In [3]:
# Import our input dataset
df = pd.read_csv('https://profilingbucket.s3.us-east-2.amazonaws.com/combined_cleaned_data.csv')

profiling_arrest_analysis = pd.DataFrame(df)

spark = SparkSession.builder.appName("pandasToSpark").getOrCreate()

# Convert Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(profiling_arrest_analysis)

# Show the Spark DataFrame
spark_df.show(5)

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


+-----------+--------------------+---+------------------+--------+--------------------+--------------------+------------+--------------------+------------------+-----------------+
|arrest_date|         arrest_time|sex|              race|searched|      reason_stopped|     search_based_on|search_found|          race_known|               lng|              lat|
+-----------+--------------------+---+------------------+--------+--------------------+--------------------+------------+--------------------+------------------+-----------------+
| 2015-01-01|1970-01-01 00:00:...|  M|HISPANIC OR LATINO| YES = 1|    CALL FOR SERVICE|INCIDENTAL TO ARREST|     NOTHING|NO - RACE OR ETHN...|7.1814650738840085|85.77304446297813|
| 2015-01-01|1970-01-01 00:00:...|  M|             WHITE| YES = 1|VIOLATION OF TRAN...|INCIDENTAL TO ARREST|       DRUGS|NO - RACE OR ETHN...| 6.341422548848474| 85.7819417813572|
| 2015-01-01|1970-01-01 00:00:...|  F|             WHITE| YES = 1|VIOLATION OF TRAN...|INCIDENTAL TO

In [4]:
spark_df.dtypes

[('arrest_date', 'string'),
 ('arrest_time', 'string'),
 ('sex', 'string'),
 ('race', 'string'),
 ('searched', 'string'),
 ('reason_stopped', 'string'),
 ('search_based_on', 'string'),
 ('search_found', 'string'),
 ('race_known', 'string'),
 ('lng', 'double'),
 ('lat', 'double')]

In [5]:
from pyspark.sql.functions import count, col, when

# Check for Null Values/'
spark_df.select([count(when(col(c).isNull(), c)).alias(c) for c in spark_df.columns]).show()


+-----------+-----------+---+----+--------+--------------+---------------+------------+----------+---+---+
|arrest_date|arrest_time|sex|race|searched|reason_stopped|search_based_on|search_found|race_known|lng|lat|
+-----------+-----------+---+----+--------+--------------+---------------+------------+----------+---+---+
|          0|          0|  0|   0|       0|             0|              0|           0|         0|  0|  0|
+-----------+-----------+---+----+--------+--------------+---------------+------------+----------+---+---+



In [6]:
from pyspark.sql.functions import col, to_timestamp

# Convert 'arrest_date' and 'arrest_time' to timestamp
spark_df = spark_df.withColumn('arrest_date', to_timestamp(col('arrest_date')))
spark_df = spark_df.withColumn('arrest_time', to_timestamp(col('arrest_time')))

# Convert 'lng' and 'lat' to double
spark_df = spark_df.withColumn('lng',spark_df['lng'].cast('double'))
spark_df = spark_df.withColumn('lat',spark_df['lat'].cast('double'))

In [7]:
 # Get the data types of the columns. 
spark_df.printSchema()

root
 |-- arrest_date: timestamp (nullable = true)
 |-- arrest_time: timestamp (nullable = true)
 |-- sex: string (nullable = true)
 |-- race: string (nullable = true)
 |-- searched: string (nullable = true)
 |-- reason_stopped: string (nullable = true)
 |-- search_based_on: string (nullable = true)
 |-- search_found: string (nullable = true)
 |-- race_known: string (nullable = true)
 |-- lng: double (nullable = true)
 |-- lat: double (nullable = true)



In [8]:
pandas_df = spark_df.toPandas()
pandas_df
# Save Pandas DataFrame as JSON file
pandas_df.to_json("cleaned_data/austinArrests.json", orient="records")

  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):
  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):


### Analysis:
#### 1. Aggregations by Month or Day:

In [9]:
# Register the DataFrame as a temporary SQL table
spark_df.createOrReplaceTempView("arrest_data")

In [10]:
from pyspark.sql.functions import month, dayofmonth

result_by_month = spark.sql("""
    SELECT
        sex,
        race,
        month(arrest_date) AS arrest_month,
        COUNT(*) AS arrest_count
    FROM
        arrest_data
    GROUP BY
        sex, race, arrest_month
    ORDER BY
        arrest_month, arrest_count DESC
""")

result_by_month.show(truncate=False)

+---+------------------------------+------------+------------+
|sex|race                          |arrest_month|arrest_count|
+---+------------------------------+------------+------------+
|M  |HISPANIC OR LATINO            |1           |1473        |
|M  |WHITE                         |1           |1033        |
|M  |BLACK                         |1           |823         |
|F  |WHITE                         |1           |439         |
|F  |HISPANIC OR LATINO            |1           |375         |
|F  |BLACK                         |1           |243         |
|M  |ASIAN                         |1           |43          |
|M  |MIDDLE EASTERN                |1           |15          |
|F  |ASIAN                         |1           |7           |
|M  |UNKNOWN                       |1           |6           |
|M  |HAWAIIAN/PACIFIC ISLANDER     |1           |6           |
|F  |UNKNOWN                       |1           |5           |
|F  |MIDDLE EASTERN                |1           |1     

#### 2. Temporal Trends:

In [11]:
result_temporal_trends = spark.sql("""
    SELECT
        sex,
        race,
        arrest_date,
        COUNT(*) AS arrest_count
    FROM
        arrest_data
    GROUP BY
        sex, race, arrest_date
    ORDER BY
        arrest_date, arrest_count DESC
""")
print('This query analyzes temporal trends by providing the count for each combination of sex, race, and arrest date.')
result_temporal_trends.show(truncate=False)

This query analyzes temporal trends by providing the count for each combination of sex, race, and arrest date.
+---+------------------+-------------------+------------+
|sex|race              |arrest_date        |arrest_count|
+---+------------------+-------------------+------------+
|M  |HISPANIC OR LATINO|2015-01-01 00:00:00|11          |
|M  |BLACK             |2015-01-01 00:00:00|6           |
|M  |WHITE             |2015-01-01 00:00:00|4           |
|F  |WHITE             |2015-01-01 00:00:00|3           |
|M  |ASIAN             |2015-01-01 00:00:00|1           |
|M  |BLACK             |2015-01-02 00:00:00|5           |
|M  |HISPANIC OR LATINO|2015-01-02 00:00:00|3           |
|F  |WHITE             |2015-01-02 00:00:00|3           |
|M  |WHITE             |2015-01-02 00:00:00|3           |
|F  |BLACK             |2015-01-02 00:00:00|1           |
|M  |ASIAN             |2015-01-02 00:00:00|1           |
|M  |HISPANIC OR LATINO|2015-01-03 00:00:00|13          |
|M  |WHITE         

#### 3. Reasons for Arrest:

In [12]:
result_reasons_for_arrest = spark.sql("""
    SELECT
        sex,
        race,
        reason_stopped,
        COUNT(*) AS arrest_count
    FROM
        arrest_data
    GROUP BY
        sex, race, reason_stopped
    ORDER BY
        arrest_count DESC
""")
print('This query explores the reasons for arrest, providing the count for each combination of sex, race, and reason_stopped.')
result_reasons_for_arrest.show(truncate=False)


This query explores the reasons for arrest, providing the count for each combination of sex, race, and reason_stopped.
+---+------------------+---------------------------------------------+------------+
|sex|race              |reason_stopped                               |arrest_count|
+---+------------------+---------------------------------------------+------------+
|M  |HISPANIC OR LATINO|Moving Traffic Violation                     |5231        |
|M  |HISPANIC OR LATINO|VIOLATION OF TRANSPORTATION CODE/VEHICLE LAWS|4367        |
|M  |BLACK             |Moving Traffic Violation                     |3217        |
|M  |WHITE             |VIOLATION OF TRANSPORTATION CODE/VEHICLE LAWS|3216        |
|M  |WHITE             |Moving Traffic Violation                     |3005        |
|M  |BLACK             |VIOLATION OF TRANSPORTATION CODE/VEHICLE LAWS|2584        |
|M  |HISPANIC OR LATINO|Violation of law other than traffic          |2417        |
|M  |WHITE             |Violation of law 

#### 4. Search Outcomes:

In [13]:
result_search_outcomes = spark.sql("""
    SELECT
        sex,
        race,
        search_found,
        COUNT(*) AS arrest_count
    FROM
        arrest_data
    GROUP BY
        sex, race, search_found
    ORDER BY
        arrest_count DESC
""")
print('This query investigates outcomes of searches by providing the count for each combination of sex, race, and search_found.')
result_search_outcomes.show(truncate=False)

This query investigates outcomes of searches by providing the count for each combination of sex, race, and search_found.
+---+------------------+------------+------------+
|sex|race              |search_found|arrest_count|
+---+------------------+------------+------------+
|M  |HISPANIC OR LATINO|NOTHING     |7561        |
|M  |WHITE             |NOTHING     |5027        |
|M  |BLACK             |NOTHING     |3976        |
|F  |WHITE             |NOTHING     |2560        |
|F  |HISPANIC OR LATINO|NOTHING     |2226        |
|M  |HISPANIC OR LATINO|not_listed  |2019        |
|M  |WHITE             |not_listed  |1804        |
|M  |HISPANIC OR LATINO|DRUGS       |1415        |
|M  |HISPANIC OR LATINO|OTHER       |1289        |
|F  |BLACK             |NOTHING     |1254        |
|M  |BLACK             |not_listed  |1224        |
|M  |BLACK             |DRUGS       |1115        |
|M  |WHITE             |OTHER       |1016        |
|M  |HISPANIC OR LATINO|CASH        |838         |
|M  |WHITE  

#### 5. Geospatial Analysis:

In [14]:
result_geospatial_analysis = spark.sql("""
    SELECT
        sex,
        race,
        AVG(lng) AS avg_lng,
        AVG(lat) AS avg_lat,
        COUNT(*) AS arrest_count
    FROM
        arrest_data
    GROUP BY
        sex, race
    ORDER BY
        arrest_count DESC
""")
print('This query performs geospatial analysis by providing the average longitude, latitude, and count for each combination of sex and race.')
result_geospatial_analysis.show(truncate=False)

This query performs geospatial analysis by providing the average longitude, latitude, and count for each combination of sex and race.
+----------+------------------------------+------------------+-----------------+------------+
|sex       |race                          |avg_lng           |avg_lat          |arrest_count|
+----------+------------------------------+------------------+-----------------+------------+
|M         |HISPANIC OR LATINO            |7.40132349233462  |85.77564294734236|13613       |
|M         |WHITE                         |7.407710483696598 |85.79754172823796|9408        |
|M         |BLACK                         |7.7663369672558105|85.77610387547293|7713        |
|F         |WHITE                         |7.396047135349547 |85.79728711701722|4176        |
|F         |HISPANIC OR LATINO            |7.369510084157036 |85.77706442631212|3599        |
|F         |BLACK                         |7.850413210906305 |85.77782897332624|2192        |
|M         |ASIAN   

#### 6. Searches Based On:

In [15]:
result_searches_based_on = spark.sql("""
    SELECT
        sex,
        race,
        search_based_on,
        COUNT(*) AS arrest_count
    FROM
        arrest_data
    GROUP BY
        sex, race, search_based_on
    ORDER BY
        arrest_count DESC
""")
print('This query analyzes the distribution of different reasons for conducting searches, providing the count for each combination of sex, race, and search_based_on.')
result_searches_based_on.show(truncate=False)

This query analyzes the distribution of different reasons for conducting searches, providing the count for each combination of sex, race, and search_based_on.
+---+------------------+--------------------+------------+
|sex|race              |search_based_on     |arrest_count|
+---+------------------+--------------------+------------+
|M  |HISPANIC OR LATINO|INCIDENTAL TO ARREST|8197        |
|M  |WHITE             |INCIDENTAL TO ARREST|5754        |
|M  |BLACK             |INCIDENTAL TO ARREST|3797        |
|M  |HISPANIC OR LATINO|not_listed          |2747        |
|F  |WHITE             |INCIDENTAL TO ARREST|2741        |
|F  |HISPANIC OR LATINO|INCIDENTAL TO ARREST|2224        |
|M  |WHITE             |not_listed          |2209        |
|M  |HISPANIC OR LATINO|PROBABLE CAUSE      |2088        |
|M  |BLACK             |PROBABLE CAUSE      |1951        |
|M  |BLACK             |not_listed          |1608        |
|F  |BLACK             |INCIDENTAL TO ARREST|1162        |
|M  |WHITE     

#### 7. Demographic Analysis:

In [16]:
result_demographic_analysis = spark.sql("""
    SELECT
        sex,
        race,
        COUNT(*) AS arrest_count
    FROM
        arrest_data
    GROUP BY
        sex, race
    ORDER BY
        arrest_count DESC
""")
print('This query performs demographic analysis by providing the count for each combination of sex and race.')
result_demographic_analysis.show(truncate=False)


This query performs demographic analysis by providing the count for each combination of sex and race.
+----------+------------------------------+------------+
|sex       |race                          |arrest_count|
+----------+------------------------------+------------+
|M         |HISPANIC OR LATINO            |13613       |
|M         |WHITE                         |9408        |
|M         |BLACK                         |7713        |
|F         |WHITE                         |4176        |
|F         |HISPANIC OR LATINO            |3599        |
|F         |BLACK                         |2192        |
|M         |ASIAN                         |321         |
|M         |MIDDLE EASTERN                |122         |
|F         |ASIAN                         |106         |
|M         |UNKNOWN                       |75          |
|F         |UNKNOWN                       |36          |
|M         |HAWAIIAN/PACIFIC ISLANDER     |19          |
|F         |MIDDLE EASTERN                |

In [17]:

# Run SQL queries on the DataFrame
result = spark.sql("""
    SELECT
        AVG(lng) AS avg_lng,
        AVG(lat) AS avg_lat,
        MAX(arrest_date) AS max_arrest_date,
        MIN(arrest_date) AS min_arrest_date,
        COUNT(*) AS total_records
    FROM
        arrest_data
    GROUP BY
        sex, race
""")

# Show the result
result.show()

+------------------+-----------------+-------------------+-------------------+-------------+
|           avg_lng|          avg_lat|    max_arrest_date|    min_arrest_date|total_records|
+------------------+-----------------+-------------------+-------------------+-------------+
|  7.40132349233462|85.77564294734236|2020-12-31 00:00:00|2015-01-01 00:00:00|        13613|
| 7.620816835428394|85.78816393881644|2019-10-04 00:00:00|2015-01-26 00:00:00|           36|
| 7.850413210906305|85.77782897332624|2020-12-30 00:00:00|2015-01-02 00:00:00|         2192|
| 7.337627601151965|85.78428277933055|2019-03-03 00:00:00|2015-08-09 00:00:00|            5|
| 7.064944785724222|85.76244460146759|2020-07-10 00:00:00|2015-05-15 00:00:00|           16|
| 7.542754208162593|85.79898914586826|2020-10-13 00:00:00|2015-01-04 00:00:00|          122|
|  7.73240840208576|85.80456896240166|2020-12-23 00:00:00|2015-01-01 00:00:00|          321|
|7.4072394663941585|85.80325925316868|2020-11-01 00:00:00|2015-11-10 0

In [18]:
result_location = spark.sql("""
    SELECT
        sex,
        race,
        AVG(lng) AS avg_lng,
        AVG(lat) AS avg_lat,
        COUNT(*) AS arrest_count
    FROM
        arrest_data
    GROUP BY
        sex, race
    ORDER BY
        arrest_count DESC
""")

result_location.show()

+----------+--------------------+------------------+-----------------+------------+
|       sex|                race|           avg_lng|          avg_lat|arrest_count|
+----------+--------------------+------------------+-----------------+------------+
|         M|  HISPANIC OR LATINO|  7.40132349233462|85.77564294734236|       13613|
|         M|               WHITE| 7.407710483696598|85.79754172823796|        9408|
|         M|               BLACK|7.7663369672558105|85.77610387547293|        7713|
|         F|               WHITE| 7.396047135349547|85.79728711701722|        4176|
|         F|  HISPANIC OR LATINO| 7.369510084157036|85.77706442631212|        3599|
|         F|               BLACK| 7.850413210906305|85.77782897332624|        2192|
|         M|               ASIAN|  7.73240840208576|85.80456896240166|         321|
|         M|      MIDDLE EASTERN| 7.542754208162593|85.79898914586826|         122|
|         F|               ASIAN| 7.604823325384605|85.80329658911349|      

In [19]:
result_with_percentages = spark.sql("""
    SELECT
        sex,
        race,
        COUNT(*) AS arrest_count,
        (COUNT(*) / SUM(COUNT(*)) OVER ()) * 100 AS arrest_percentage
    FROM
        arrest_data
    GROUP BY
        sex, race
    ORDER BY
        arrest_count DESC
""")

result_with_percentages.show()

+----------+--------------------+------------+--------------------+
|       sex|                race|arrest_count|   arrest_percentage|
+----------+--------------------+------------+--------------------+
|         M|  HISPANIC OR LATINO|       13613|     32.863385075924|
|         M|               WHITE|        9408|   22.71201989233035|
|         M|               BLACK|        7713|   18.62009028800425|
|         F|               WHITE|        4176|  10.081355768534388|
|         F|  HISPANIC OR LATINO|        3599|   8.688409820631051|
|         F|               BLACK|        2192|   5.291746131376288|
|         M|               ASIAN|         321|  0.7749318011732612|
|         M|      MIDDLE EASTERN|         122|  0.2945223668010526|
|         F|               ASIAN|         106| 0.25589648263042275|
|         M|             UNKNOWN|          75|  0.1810588320498274|
|         F|             UNKNOWN|          36| 0.08690823938391715|
|         M|HAWAIIAN/PACIFIC ...|          19| 0

In [20]:
result_by_year = spark.sql("""
    SELECT
        sex,
        race,
        YEAR(arrest_date) AS arrest_year,
        COUNT(*) AS arrest_count,
        (COUNT(*) / SUM(COUNT(*)) OVER (PARTITION BY YEAR(arrest_date))) * 100 AS arrest_percentage
    FROM
        arrest_data
    GROUP BY
        sex, race, arrest_year
    ORDER BY
        arrest_year, arrest_count DESC
""")

result_by_year.show(truncate=False)

+---+------------------------------+-----------+------------+-------------------+
|sex|race                          |arrest_year|arrest_count|arrest_percentage  |
+---+------------------------------+-----------+------------+-------------------+
|M  |HISPANIC OR LATINO            |2015       |3011        |32.69627538277772  |
|M  |WHITE                         |2015       |2257        |24.50863285915952  |
|M  |BLACK                         |2015       |1705        |18.514496688022586 |
|F  |WHITE                         |2015       |890         |9.6644586817244    |
|F  |HISPANIC OR LATINO            |2015       |724         |7.861874253447715  |
|F  |BLACK                         |2015       |459         |4.984254533608427  |
|M  |ASIAN                         |2015       |75          |0.8144206754262134 |
|M  |MIDDLE EASTERN                |2015       |26          |0.2823325008144207 |
|F  |ASIAN                         |2015       |24          |0.2606146161363883 |
|M  |UNKNOWN    