In [2]:
import collections
import csv
import operator
import os
import sys

from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import functions

sys.path.append('/Users/abuckfire/side-projects/arson')
import static_data.fire_codes.lookup_tables as codes
import elastic

In [3]:
COUNT = "count"
STATE = "state"
UNKNOWNS = ["UU", "NN", None]
STATES = codes.STATES

YEARS_AVAILABLE = map(str, range(2009, 2015))
FILE_PREFIX = "nfirs_arson_"
ARSON_PATH = os.path.join("..", "static_data", "nfirs_arson")


FIELDS = {"motives": ["state", "motive", "year"],
          "method": ["state", "method", "year"],
          "ownership": ["state", "ownership", "year"],
          "monthly_counts": ["state", "month", "count", "year"]
        }


In [28]:
def lookup_code(candidate_list, column, codebook):
    candidates = [row for row in candidate_list if row[column] not in UNKNOWNS]
    if candidates:
        return codebook[max(candidates, key=operator.itemgetter(1))[0]]
    return None


def build_max_count_dicts(df, column, codebook, state_codes=STATES):
    results = collections.defaultdict()
    for state in state_codes:
        count_list = df[df.state==state].select(column, COUNT).collect()
        results[state] = lookup_code(count_list, column, codebook)

    overall_res = df.groupBy(column).agg(functions.sum(COUNT)).alias(COUNT).collect()
    results["overall"] = lookup_code(overall_res, column, codebook)
    return results


def get_motives(df, column="motivation"):
    df1 = df.groupBy(["mot_facts1", STATE]).count().withColumnRenamed("mot_facts1", column).withColumnRenamed(COUNT, "count_1")
    df2 = df.groupBy(["mot_facts2", STATE]).count().withColumnRenamed("mot_facts2", column).withColumnRenamed(COUNT, "count_2")
    df3 = df.groupBy(["mot_facts3", STATE]).count().withColumnRenamed("mot_facts3", column).withColumnRenamed(COUNT, "count_3")

    motives = df1.join(df2, [column, STATE], "outer").join(df3, [column, STATE], "outer").na.fill(0)

    motives_per_state = motives.withColumn(COUNT, sum(motives[col] for col in ["count_1", "count_2", "count_3"]))
    return build_max_count_dicts(motives_per_state, column, codes.SUSPECTED_MOTIVATION_FACTORS)


def parse_date_by_month(date_string):
    month_lookup = {
        "1": "January",
        "2": "February",
        "3": "March",
        "4": "April",
        "5": "May",
        "6": "June",
        "7": "July",
        "8": "August",
        "9": "September",
        "10": "October",
        "11": "November",
        "12": "December"
    }
    return month_lookup[date_string[:2] if date_string[:2] in month_lookup else date_string[0]]


def get_arson_per_month(df, column="inc_date"):
    parse_date_by_month_udf = functions.udf(parse_date_by_month, "string")
    date_as_month_df = df.withColumn(column, parse_date_by_month_udf(df[column]))
    by_state = date_as_month_df.groupBy([column, STATE]).count()
    overall = date_as_month_df.groupBy(column).count().withColumn(STATE, functions.lit("overall")).select(column, STATE, COUNT)
    return overall.union(by_state).collect()


def get_fast_fact_one_column(df, column, codebook):
    df1 = df.groupBy([column, STATE]).count()
    return build_max_count_dicts(df1, column, codebook)


def add_to_elastic(aggs, key, year, index="arson"):
    if key == "monthly_counts":
        for agg in aggs:
            elastic.es.index(
                index=index,
                doc_type=key,
                body = dict(zip(FIELDS[key], [agg[STATE], agg["inc_date"], agg[COUNT], year])))
    else:
        for state, value in aggs.items():
            elastic.es.index(
                index=index,
                doc_type=key,
                body = dict(zip(FIELDS[key], [state, value, year])))


def calculate_stats_with_spark(df, year):
    # Question 1: What is the most common motivation for starting a fire?
    motives = get_motives(df)
    add_to_elastic(motives, "motives", year)

    # Question 2: What are the most common materials used for starting a fire?
    #method = get_fast_fact_one_column(df, "devi_ignit", codes.IGNITION_DELAY_DEVICE)
    #add_to_elastic(method, "method", year)

    # Question 3: What is the most common type of property burned?
    #ownership = get_fast_fact_one_column(df, "prop_owner", codes.PROPERTY_OWNERSHIP)
    #add_to_elastic(ownership, "method", year)

    # Question 4: Number of Arsons per Month
    #monthly_arsons = get_arson_per_month(df)
    #add_to_elastic(monthly_arsons, "monthly_counts", year)
    

def ingest_facts_into_es():
    # Initializing Spark Context and reading data in
    sqlContext = SQLContext(sc)
    spark = SparkSession.builder.enableHiveSupport().getOrCreate()
    
    for year in ["2013", "2014"]:#YEARS_AVAILABLE:
        print year
        df = spark.read.csv(os.path.join(ARSON_PATH, FILE_PREFIX + year + ".csv"), header=True)
        calculate_stats_with_spark(df, year)


In [12]:
#elastic.create_index()

In [29]:
ingest_facts_into_es()

2013
2014


In [26]:
elastic.es.count(index="arson", doc_type="method", q="year:2012")["count"] # need to ingest 2013 and 2014


204