# Working with Data Frame API
+ <a href="#schema">1.Schema</a>
+ <a href="#columns">2.Working with Columns</a>
+ <a href="#rows">3.Working with Rows</a>
+ <a href="#challenges">3.Challenges</a>
----

# Set up

In [None]:
import os

import findspark
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, to_timestamp
from pyspark.sql.types import (
    BooleanType,
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
    TimestampType,
)

In [None]:
spark = SparkSession.builder.getOrCreate()
spark

# Preprocess Chicago's reported Crimes Data

In [None]:
rc = (
    spark.read.csv("../data/reported-crimes.csv", header=True)
    .withColumn("Date", to_timestamp(col("Date"), "MM/dd/yyyy hh:mm:ss a"))
    .filter(col("Date") <= lit("2018-11-11"))
)
rc.show(5)

In [None]:
rc.show(1)

--------

<p id="schema"></p>

# 1) Schemas

### print current schema

In [None]:
# check current schema
rc.printSchema()

## Create customized schema

- column name, variable type, nullable 

In [None]:
# get the columns
rc.columns

In [None]:
len(rc.columns)

### Option 1) manully creating StructType

In [None]:
# column name, variable type, nullable
schema = StructType(
    [
        StructField("ID", StringType(), True),
        StructField("Case Number", StringType(), True),
        StructField("Date", TimestampType(), True),
        StructField("Block", StringType(), True),
        StructField("IUCR", StringType(), True),
        StructField("Primary Type", StringType(), True),
        StructField("Description", StringType(), True),
        StructField("Location Description", StringType(), True),
        StructField("Arrest", StringType(), True),
        StructField("Domestic", BooleanType(), True),
        StructField("Beat", StringType(), True),
        StructField("District", StringType(), True),
        StructField("Ward", StringType(), True),
        StructField("Community Area", StringType(), True),
        StructField("FBI Code", StringType(), True),
        StructField("X Coordinate", StringType(), True),
        StructField("Y Coordinate", StringType(), True),
        StructField("Year", IntegerType(), True),
        StructField("Updated On", StringType(), True),
        StructField("Latitude", DoubleType(), True),
        StructField("Longitude", DoubleType(), True),
        StructField("Location", StringType(), True),
    ]
)

schema

### Option 2) creating Struct Field using Python tuples

-----

### Open CSV and read the content using the format of customized Schema

In [None]:
# rc = spark.read.csv("../data/reported-crimes.csv", schema=schema, header=True)

In [None]:
# rc.printSchema()

In [None]:
# rc.show(2)

----------

<p id="columns"></p>

# 2) Working with Columns

In [None]:
# get the file and preprocess it.
# rc = (
#     spark.read.csv("../data/chicago_crimes.csv", header=True)
#     .withColumn("Date", to_timestamp(col("Date"), "MM/dd/yyyy hh:mm:ss a"))
#     .filter(col("Date") <= lit("2018-11-11"))
# )

### Display only the first 5 rows of the column name IUCR

In [None]:
rc.select("IUCR").show(5)

In [None]:
rc.select(rc.IUCR).show(5)

In [None]:
rc.select(col("IUCR")).show(5)

### Display only the first 4 rows of column names Case Number, Date and Arrest

In [None]:
rc.select("Case Number", "Date", "Arrest").show(4)

### Add a column with name One, with entries of 1s
- constant value column as a last column, to a table

In [None]:
# new column will be displayed as the last column
rc.withColumn("One", lit(1)).show(5)

In [None]:
rc.withColumn("Two", lit(2)).show(5)

### Remove the Column IUCR

In [None]:
temp = rc.drop("IUCR")
temp.show(2)

-------

<p id="rows"></p>

# 3) Working with rows

In [None]:
# get the file and preprocess it.
# rc = (
#     spark.read.csv("../data/chicago_crimes.csv", header=True)
#     .withColumn("Date", to_timestamp(col("Date"), "MM/dd/yyyy hh:mm:ss a"))
#     .filter(col("Date") <= lit("2018-11-11"))
# )

In [None]:
rc.show(1)

### Add the reported crimes for an additional day, 12-Nov-2018 to our dataset.

#### first, filter data only for 12 Nov, 2018

In [None]:
# filter data only for 12 Nov, 2018
day_12_data = (
    spark.read.csv("../data/reported-crimes.csv", header=True)
    .withColumn("Date", to_timestamp(col("Date"), "MM/dd/yyyy hh:mm:ss a"))
    .filter(col("Date") == lit("2018-11-12"))
)

In [None]:
day_12_data.count()

In [None]:
day_12_data.show()

#### union day 12 data to original df, check the last 5 rows

In [None]:
rc.union(day_12_data).orderBy("Date", ascending=False).show(5)

### What are the top 10 number of reported crimes by Primary Type, in descending order occurence?

In [None]:
rc.groupBy("Primary Type").count().show()

In [None]:
rc.groupBy("Primary Type").count().orderBy("count", ascending=False).show(10)

---------

<p id="challenges"></p>

# 4) Challenges

- What percentage of reported crimes resulted in an arrest?
- What are the top 3 locations for reported crimes?

In [None]:
# get the file and preprocess it.
# rc = (
#     spark.read.csv("../data/chicago_crimes.csv", header=True)
#     .withColumn("Date", to_timestamp(col("Date"), "MM/dd/yyyy hh:mm:ss a"))
#     .filter(col("Date") <= lit("2018-11-12"))
# )

In [None]:
rc.show(2)

### What percentage of reported crimes resulted in an arrest?

In [None]:
# find the total crimes with Arrest = True  /  Total Crime Cases

In [None]:
df1 = rc.groupby("Arrest").count()
df1.show()

In [None]:
total_arrested_crimes = df1.filter(df1["Arrest"] == True).select("count").collect()
total_arrested_crimes = total_arrested_crimes[0]["count"]

In [None]:
# rc.printSchema()

In [None]:
total_crimes = rc.count()

In [None]:
total_crimes

In [None]:
# # crimes resulted in Arrest (arrest=True)
# total_arrested_cirmes = rc.filter(col("Arrest") == "true").count()
# total_arrested_cirmes

In [None]:
# find the percentages
print(
    "Percentage of reported crimes resulted in an arrest: {} %".format(
        100 * total_arrested_crimes / total_crimes
    )
)

------

### What are the top 3 locations for reported crimes?

In [None]:
rc.groupBy("Location Description").count().orderBy("count", ascending=False).show(3)

In [None]:
rc.select("Arrest").distinct().show()