# Data Ingestion

The safety dataset is from [IRS SCHOOL SAFETY AND THE EDUCATIONAL CLIMATE](https://www.p12.nysed.gov/irs/school_safety/school_safety_data_reporting.htmll).


The datasets can be directly downloaded from the website. Then the datasets are uploaded to the data ingest website. And then transferred to the HDFS with the following code.

hadoop distcp gs://nyu-dataproc-hdfs-ingest/safety_data /user/yz6956_nyu_edu/project

# Data Cleaning

In [3]:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val schema1 = StructType(Array(
  StructField("County", StringType, true),
  StructField("District", StringType, true),
  StructField("School_Name", StringType, true),
  StructField("BEDS_Code", StringType, true),
  StructField("Grade_Organization", StringType, true),
  StructField("Need/Resource_Category", StringType, true),
  StructField("School_Type", StringType, true),
  StructField("Enrollment", IntegerType, true),
  StructField("Inactive_Date", StringType, true),
  StructField("Homocide", IntegerType, true),
  StructField("Sexual_Offenses-Forcible_Sex", IntegerType, true),
  StructField("Sexual_Offenses-Other", IntegerType, true),
  StructField("Assault-Physical_Injury", IntegerType, true),
  StructField("Assault-Serious_Physical_Injury", IntegerType, true),
  StructField("Weapons_Possession-Routine_Security_Check", IntegerType, true),
  StructField("Weapons_Possession-Other", IntegerType, true),
  StructField("Dignity Act-Excluding_Cyberbullying", IntegerType, true),
  StructField("Dignity Act-Cyberbullying", IntegerType, true),
  StructField("Bomb_Threat", IntegerType, true),
  StructField("False_Alarm", IntegerType, true),
  StructField("Drugs", IntegerType, true),
  StructField("Alcohol", IntegerType, true)
))

val schema2 = StructType(Array(
  StructField("County", StringType, true),
  StructField("District", StringType, true),
  StructField("School_Name", StringType, true),
  StructField("BEDS_Code", StringType, true),
  StructField("Grade_Organization", StringType, true),
  StructField("Need/Resource_Category", StringType, true),
  StructField("School_Type", StringType, true),
  StructField("Enrollment", IntegerType, true),
  StructField("Inactive_Date", StringType, true),
  StructField("Homocide", IntegerType, true),
  StructField("Sexual_Offense", IntegerType, true),
  StructField("Assault", IntegerType, true),
  StructField("Weapons_Possession", IntegerType, true),
  StructField("Dignity Act-Excluding_Cyberbullying", IntegerType, true),
  StructField("Dignity Act-Cyberbullying", IntegerType, true),
  StructField("Bomb_Threat", IntegerType, true),
  StructField("False_Alarm", IntegerType, true),
  StructField("Drugs", IntegerType, true),
  StructField("Alcohol", IntegerType, true),
  StructField("Other_Threats", IntegerType, true)
))

In [4]:
import org.apache.spark.sql.functions._

def clean1(filePath: String) = {
  // read the CSV file
  val rawDF = spark.read
    .option("multiLine", "true")
    .option("inferSchema", "true")
    .option("escape", "\"")
    .schema(schema1)
    .csv(filePath)
    .rdd
    .zipWithIndex()
    .filter { case (_, index) => index >= 3 }
    .map(_._1)

  // combine subsectors into one
  // drop the unused columns
  // filter out the inactive schools
  val df_temp = spark.createDataFrame(rawDF, schema1)
    .filter($"Inactive_Date".isNull)
    .withColumn("Sexual_Offense", col("Sexual_Offenses-Forcible_Sex") + col("Sexual_Offenses-Other"))
    .withColumn("Assault", col("Assault-Physical_Injury") + col("Assault-Serious_Physical_Injury"))
    .withColumn("Weapons_Possession", col("Weapons_Possession-Routine_Security_Check") + col("Weapons_Possession-Other"))
    .drop("Grade_Organization", "Need/Resource_Category", "Inactive_Date", "Sexual_Offenses-Forcible_Sex", "Sexual_Offenses-Other")
    .drop("Assault-Physical_Injury", "Assault-Serious_Physical_Injury", "Weapons_Possession-Routine_Security_Check", "Weapons_Possession-Other")
  
  val columns = df_temp.columns.slice(0, 6) ++ Seq("Sexual_Offense") ++ Seq("Assault") ++ Seq("Weapons_Possession") ++ df_temp.columns.slice(6, 13)
  val df = df_temp.select(columns.map(df_temp.col): _*)

  df
}

def clean2(filePath: String) = {
  // read the CSV file
  val rawDF = spark.read
    .option("multiLine", "true")
    .option("inferSchema", "true")
    .option("escape", "\"")
    .schema(schema2)
    .csv(filePath)
    .rdd
    .zipWithIndex()
    .filter { case (_, index) => index >= 3 }
    .map(_._1)

  // drop the unused columns
  // filter out the inactive schools
  val df = spark.createDataFrame(rawDF, schema2).filter($"Inactive_Date".isNull).drop("Grade_Organization", "Need/Resource_Category", "Inactive_Date", "Other_Threats")

  df
}

def combine1(filePath1: String, filePath2: String) = {
  val df1 = clean1(filePath1)
  val df2 = clean1(filePath2)
  df1.union(df2)
}

def combine2(filePath1: String, filePath2: String) = {
  val df1 = clean2(filePath1)
  val df2 = clean2(filePath2)
  df1.union(df2)
}


In [5]:
val year = "2018"

val filePath1 = s"project/safety_data/${year}_SSEC_NYC.csv"
val filePath2 = s"project/safety_data/${year}_SSEC_ROS.csv"
val df2018 = combine1(filePath1, filePath2)

z.show(df2018)

In [6]:
val year = "2022"

val filePath1 = s"project/safety_data/${year}_SSEC_NYC.csv"
val filePath2 = s"project/safety_data/${year}_SSEC_ROS.csv"
val df2022 = combine2(filePath1, filePath2)

z.show(df2022)

In [7]:
df2018.columns.length == df2022.columns.length

In [8]:
// 2018-2021, 2022-2023 datasets have different schema
// Use different cleaning functions for different schema

for (year <- 2018 to 2021) {
    val filePath1 = s"project/safety_data/${year}_SSEC_NYC.csv"
    val filePath2 = s"project/safety_data/${year}_SSEC_ROS.csv"
    val df = combine1(filePath1, filePath2)

    val outputPath = s"project/cleaned_data/safety${year}.csv"

    df.write.mode("overwrite").option("header", "true").csv(outputPath)
}

for (year <- 2022 to 2023) {
    val filePath1 = s"project/safety_data/${year}_SSEC_NYC.csv"
    val filePath2 = s"project/safety_data/${year}_SSEC_ROS.csv"
    val df = combine2(filePath1, filePath2)

    val outputPath = s"project/cleaned_data/safety${year}.csv"

    df.write.mode("overwrite").option("header", "true").csv(outputPath)
}

# Data Profiling

## Per Year Profiling

In [11]:
val cleanFilePath = "project/cleaned_data/safety2022.csv"

val safety2022 = spark.read
    .option("multiLine", "true")
    .option("inferSchema", "true")
    .option("escape", "\"")
    .option("header", true)
    .csv(cleanFilePath)

z.show(safety2022)

In [12]:
for (year <- 2018 to 2023) {
    val cleanFilePath = s"project/cleaned_data/safety${year}.csv"
    val safetydf = spark.read
    .option("multiLine", "true")
    .option("inferSchema", "true")
    .option("escape", "\"")
    .option("header", true)
    .csv(cleanFilePath)
    
    // create a dataframe for the current year
    safetydf.createOrReplaceTempView(s"safety$year")
    
    println(s"Year: ${year} | Number of Schools / Rows count: ${safetydf.count()}")
}


In [13]:
val threatColumns = Seq(
  "Homocide",
  "Sexual_Offense",
  "Assault",
  "Weapons_Possession",
  "Dignity Act-Excluding_Cyberbullying",
  "Dignity Act-Cyberbullying",
  "Bomb_Threat",
  "False_Alarm",
  "Drugs",
  "Alcohol"
)

In [14]:
import org.apache.spark.sql.DataFrame

var totalThreatDF: DataFrame = spark.emptyDataFrame
var avgThreatDF: DataFrame = spark.emptyDataFrame

for (year <- 2018 to 2023) {
    val cleanFilePath = s"project/cleaned_data/safety${year}.csv"
    val safetydf = spark.read
    .option("multiLine", "true")
    .option("inferSchema", "true")
    .option("escape", "\"")
    .option("header", true)
    .csv(cleanFilePath)
    
    // create a dataframe for the current year
    safetydf.createOrReplaceTempView(s"safety$year")
    
    val count = safetydf.count()
    
    // sum the number of each violations
    val rawtotal = spark.sql(s"""
        select ${threatColumns.map(c => s"SUM(`$c`) as `$c`").mkString(", ")}
        from safety${year}
    """).withColumn("Year", lit(year))
    
    // sum the number of each violations / number of schools
    val rawavg = spark.sql(s"""
        select ${threatColumns.map(c => s"SUM(`$c`)/$count as `$c`").mkString(", ")}
        from safety${year}
    """).withColumn("Year", lit(year))
    
    // add the year column to the dataframe
    val columns = Seq("Year") ++ rawtotal.columns.slice(0, 11)
    val total = rawtotal.select(columns.map(rawtotal.col): _*)
    val avg = rawavg.select(columns.map(rawavg.col): _*)
        
    // add the current-year dataframe to the overall dataframe
    totalThreatDF = if (totalThreatDF.isEmpty) total else totalThreatDF.union(total)
    avgThreatDF = if (avgThreatDF.isEmpty) avg else avgThreatDF.union(avg)
}

z.show(totalThreatDF)
z.show(avgThreatDF)

In [15]:
avgThreatDF.createOrReplaceTempView("avgThreat")

val avgYoYDF = spark.sql("""
  select
    cur.Year,
    round((cur.Homocide - prev.Homocide) / prev.Homocide * 100, 2) AS `Homocide_YoY (%)`,
    round((cur.Sexual_Offense - prev.Sexual_Offense) / prev.Sexual_Offense * 100, 2) AS `Sexual_Offense_YoY (%)`,
    round((cur.Assault - prev.Assault) / prev.Assault * 100, 2) AS `Assault_YoY (%)`,
    round((cur.Weapons_Possession - prev.Weapons_Possession) / prev.Weapons_Possession * 100, 2) AS `Weapons_Possession_YoY (%)`,
    round((cur.`Dignity Act-Excluding_Cyberbullying` - prev.`Dignity Act-Excluding_Cyberbullying`) / prev.`Dignity Act-Excluding_Cyberbullying` * 100, 2) AS `Dignity_Act_Excluding_Cyberbullying_YoY (%)`,
    round((cur.`Dignity Act-Cyberbullying` - prev.`Dignity Act-Cyberbullying`) / prev.`Dignity Act-Cyberbullying` * 100, 2) AS `Dignity_Act_Cyberbullying_YoY (%)`,
    round((cur.Bomb_Threat - prev.Bomb_Threat) / prev.Bomb_Threat * 100, 2) AS `Bomb_Threat_YoY (%)`,
    round((cur.False_Alarm - prev.False_Alarm) / prev.False_Alarm * 100, 2) AS `False_Alarm_YoY (%)`,
    round((cur.Drugs - prev.Drugs) / prev.Drugs * 100, 2) AS `Drugs_YoY (%)`,
    round((cur.Alcohol - prev.Alcohol) / prev.Alcohol * 100, 2) AS `Alcohol_YoY (%)`
  from
    avgThreat cur
  join
    avgThreat prev
  on
    cur.Year = prev.Year + 1
""")

z.show(avgYoYDF)
