In [0]:
%scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.{col, to_date}

// To define user defined schema and specify datatype
val schema = StructType(Array(
    StructField("ID",IntegerType,true),
    StructField("Case Number",StringType,true),
    StructField("Date",StringType,true),
    StructField("Block",StringType,true),
    StructField("IUCR", StringType,true),
    StructField("Primary Type",StringType,true),
    StructField("Description",StringType,true),
    StructField("Location Description",StringType,true),
    StructField("Arrest",BooleanType,true),
    StructField("Domestic", BooleanType,true),
    StructField("Beat",IntegerType,true),
    StructField("District",IntegerType,true),
    StructField("Ward",IntegerType,true),
    StructField("Community Area",IntegerType,true),
    StructField("FBI Code",StringType,true),
    StructField("X Coordinate",IntegerType,true),
    StructField("Y Coordinate",IntegerType,true),
    StructField("Year",IntegerType,true),
    StructField("Updated On",StringType,true),
    StructField("Latitude",DoubleType,true),
    StructField("Longitude",DoubleType,true),
    StructField("Location",StringType,true)
  ))

val crimeDF = spark.read.format("com.databricks.spark.csv").option("header",true).schema(schema).csv("/FileStore/tables/Crimes___2001_to_Present.csv")
crimeDF.printSchema()
crimeDF.show()

In [0]:
%scala

//   Create a view or table
crimeDF.createOrReplaceTempView("crime_table")

In [0]:
%scala
// Problem 1: How the number of various crimes changed over time in Chicago?

display(spark.sql("SELECT `Primary Type`,Year, COUNT(*) AS `Crime Count` FROM crime_table GROUP BY Year,`Primary Type` ORDER BY Year, `Primary Type` DESC"))

In [0]:
%scala
// Problem 2: How have the number arrests corresponding to the crimes changed over time in Chicago?

display(spark.sql("select Year,count(*) from crime_table where Arrest='true' group by Year order by Year"))


In [0]:
%scala

// Problem 3: Which contiguous months show largest variation in crime?

//  Step 1: Extracting months from Date column and grouping the crimes per month
val month_count=spark.sql("select substring(Date,0,2) as Month_val,count(*) as noofcrimes from crime_table group by Month_val order by Month_val")

// Step 2: Using lag function to find variation in crime compared with months
month_count.createOrReplaceTempView("month_counts")
val finalresult7=spark.sql("""select *,abs(previouscount-noofcrimes) as variation_value from(select *,LAG(noofcrimes,1) over (order by Month_val) as previouscount from month_counts) where previouscount is not null""")

// Step 3: Displaying month having highest variation in crime
finalresult7.createOrReplaceTempView("result")
spark.sql("select Month_val as Month_with_highest_variation from result where variation_value=(select max(variation_value) as high_count from result)").show()


In [0]:

%scala

// Problem 4: Which quarter was the most peaceful compared to the previous quarter?


import org.apache.spark.sql.functions.{col, to_date}
// Step 1: Extracting date in string format and count of crimes for all the dates
val datedf=spark.sql("SELECT substring(Date,0,10) as Date_val,count(ID) as crimeCountOfDate  from crime_table group by Date_val")

// Step 2: converting dates from string to date format
val newDate=datedf.select(col("Date_val"),
    to_date(col("Date_val"),"MM/dd/yyyy").as("finalDate"),col("crimeCountOfDate"))

// Step 3: Grouping by quarters and finding the sum of crime counts per quarter
newDate.createOrReplaceTempView("newDate")
val resultdf=spark.sql("select quarter(finalDate) as quarters,sum(crimeCountOfDate) as totalCrimeCountPerQuarter from newDate group by quarters order by quarters")

// Step 4: using lag function to compare the variation of crime counts with the previous quarter
resultdf.createOrReplaceTempView("resultdf")
val result=spark.sql("""
select *,abs(previous_count-totalCrimeCountPerQuarter)as count_varition_per_quarter from (
select *,LAG(totalCrimeCountPerQuarter) OVER (ORDER BY quarters) as previous_count from resultdf)as a where previous_count is not null""")

// Step 5: finding the peaceful quarter compared to previous quarter
result.createOrReplaceTempView("crime_result")
spark.sql("select quarters from crime_result where count_varition_per_quarter=(select min(count_varition_per_quarter) from crime_result)").show


In [0]:
%scala

// Problem 5: Are there any trends in the crimes being committed?

// Step 1: display total number of crimes in each year
val displayYear=spark.sql("select Year as Year,count(*) as countOfCrimesInYear from crime_table group by Year order by Year")

// Step 2: Using the lag function to get the variation in crime rates for each year
displayYear.createOrReplaceTempView("year_counts")
val trendresult=spark.sql("""select Year,abs(previouscount-countOfCrimesInYear) as variation_value from(select Year,countOfCrimesInYear,LAG(countOfCrimesInYear,1)  over (order by Year) as previouscount from year_counts) where previouscount is not null""")

// Step 3: finding the years which has minimum and maximum variation
trendresult.createOrReplaceTempView("trendresult")
spark.sql("select Year,variation_value as Minimum_Crime_Trend from trendresult where variation_value=(select min(variation_value) from trendresult)").show
spark.sql("select Year,variation_value as Maximum_Crime_Trend from trendresult where variation_value=(select max(variation_value) from trendresult )").show



In [0]:
%scala

//Problem 6: In which locations crimes are being committed frequently? 

spark.sql("select count(*) as Crime_count,`Location Description` from crime_table where `Location Description` is not null group by `Location Description` order by Crime_count desc").show()

In [0]:
%scala

// Problem 7: Are there certain high crime locations for certain crime?

// Step 1: Creating an intermediate dataframe by grouping the columns 'Primary Type' and 'Location Description' of the 'crime_table'. It has one more column 'crime_count' representing the number of records in the respective groups i.e; actually the number of occurences of the 'Primary Type' in the respective locations.
val tempCrimeDF=spark.sql("SELECT `Primary Type`, `Location Description`, COUNT(*) AS crime_count FROM crime_table GROUP BY `Primary Type`,`Location Description` ORDER BY `Primary Type`, crime_count DESC")

tempCrimeDF.createOrReplaceTempView("temp_table1")

spark.sql("SELECT t1.`Primary Type`,t1.`Location Description`,t1.crime_count FROM temp_table1 t1 INNER JOIN (SELECT `Primary Type`, MAX(crime_count) AS max_crime_count FROM temp_table1 GROUP BY `Primary Type`) AS t2 ON t1.`Primary Type`=t2.`Primary Type` AND t1.crime_count=t2.max_crime_count ORDER BY crime_count DESC").show()


In [0]:
%scala

// Problem 8: In which locations the frequent crimes are being committed?

spark.sql("select `Primary Type`,`Location Description`,count(*) as Crime_count from crime_table where `Location Description` is not null and `Primary Type` in (select `Primary Type` from crime_table group by `Primary Type` having count(*) in (select count(*) as count from crime_table group by `Primary Type` order by count desc limit 5)) group by `Primary Type`,`Location Description` order by `Primary Type`,Crime_count desc ").show()