In [0]:
%spark.pyspark

#df_crime = spark.read.csv("hdfs:///user/ubuntu/crime.csv")
#df_crime.show(10)

df_crime = spark.read.option("header","true").option("inferSchema", "true").csv("hdfs:///user/ubuntu/crime.csv")
df_crime.show(10)
spark.version

# create DataFrame from python list. It can infer schema for you.
#df1 = spark.createDataFrame([(1, "andy", 40, "USA"), (2, "jeff", 23, "China"), (3, "james", 18, "USA")]).toDF("id", "name", "age", "country")
#df1.printSchema()
#df1.show()

# create DataFrame from pandas dataframe
#df2 = spark.createDataFrame(df1.toPandas())
#df2.printSchema()
#df2.show()

In [1]:
%spark.pyspark
df_crime.orderBy("incident_number").show(10)


In [2]:
%spark.pyspark
df_crime.printSchema()

In [3]:
Offense codes

In [4]:
%spark.pyspark
df_offense = spark.read.option("header","true").option("inferSchema", "true").csv("hdfs:///user/ubuntu/offense_codes.csv")
df_offense.count()
df_offense.printSchema()

In [5]:
%spark.pyspark
df_offense.orderBy("code").show(10, truncate = False)

In [6]:

%spark.pyspark
from pyspark.sql.functions import col

def get_cnt_of_nulls(colname):
    return (colname + ": " + str(df_crime.filter(col(colname).isNull()).count()))
    
#print(get_cnt_of_nulls("District"))

# use df.dtypes to get them dynamically
cols = [ 'INCIDENT_NUMBER',
 'OFFENSE_CODE',
 'OFFENSE_CODE_GROUP',
 'OFFENSE_DESCRIPTION',
 'DISTRICT',
 'REPORTING_AREA',
 'SHOOTING',
 'OCCURRED_ON_DATE',
 'YEAR',
 'MONTH',
 'DAY_OF_WEEK',
 'HOUR',
 'UCR_PART',
 'STREET',
 'LAT',
 'LONG',
 'LOCATION']
 
print('total: ' + str(df_crime.count())) 
all_cols = df_crime.dtypes 

for c in all_cols:
    print(get_cnt_of_nulls(c[0]))


In [7]:
%spark.pyspark

df_crime.filter(col("Lat").isNull() | 
                col("Long").isNull() | 
                col("Street").isNull() |
                col("District").isNull() 
    ).select("lat", "long", "street", "district").show(10, truncate = False)

In [8]:
%spark.pyspark
df_crime.groupBy("INCIDENT_NUMBER").count().orderBy(col("count").desc()).show(10)

df_crime.groupBy("INCIDENT_NUMBER","OFFENSE_CODE").count().orderBy(col("count").desc()).show(10)

In [9]:
%spark.pyspark
df_crime.filter(col("INCIDENT_NUMBER")=="I162030584").orderBy("OFFENSE_CODE").show(truncate = False)

In [10]:
%spark.pyspark
df_crime.filter(col("INCIDENT_NUMBER")=="I152026775-00").orderBy("OFFENSE_CODE").show(truncate = False)


In [11]:
%spark.pyspark
df_offense.groupBy("CODE").count().orderBy(col("count").desc()).show(999999,truncate = False)

In [12]:
%spark.pyspark
df_offense.filter((col("CODE") == 2608) | (col("CODE") == 3502)).orderBy("CODE","NAME").show(truncate = False)

In [13]:
%spark.pyspark


In [14]:
%spark.pyspark
from pyspark.sql.functions import split

df_offense.withColumn("CRIME_TYPE",split(col("NAME"), ' - ')[0]) \
          .filter(col("CODE") == 3502) \
          .show(20, truncate = False)

In [15]:
%spark.pyspark
# !
from pyspark.sql.functions import trim

df_offense_clean = df_offense.withColumn("CRIME_TYPE",trim(split(col("NAME"), ' - ')[0])) \
                            .dropDuplicates(["CODE"]) 
                            
df_offense_clean.filter(col("CODE") == 3502).show()

                        

In [16]:
%spark.pyspark
# !
df_crime_clean_1 = df_crime \
                        .join(df_offense_clean, col("OFFENSE_CODE") == col("CODE"), "left") \
                        .select("INCIDENT_NUMBER", "CRIME_TYPE", "DISTRICT", "YEAR","MONTH", "LAT", "LONG") \
                        .na \
                        .fill("NA") 
                        
                        
df_crime_clean_1.show()

In [17]:
%spark.pyspark

df_crime_clean_1.filter(col("INCIDENT_NUMBER")=="I162030584").orderBy("OFFENSE_CODE").show(truncate = False)

In [18]:
%spark.pyspark
df_crime_clean_2 = df_crime_clean_1.dropDuplicates(["INCIDENT_NUMBER", "CRIME_TYPE"])
df_crime_clean_2.filter(col("INCIDENT_NUMBER")=="I162030584").orderBy("OFFENSE_CODE").show(truncate = False)

In [19]:
%spark.pyspark


In [20]:
%spark.pyspark
from pyspark.sql.functions import countDistinct, count, sum, mean


q1_distr = df_crime_clean_2 \
              .groupBy("DISTRICT", "YEAR", "MONTH") \
              .agg(countDistinct("INCIDENT_NUMBER").alias("COUNT")) \
              .groupBy("DISTRICT") \
              .agg(sum(col("COUNT")).alias("CRIMES_TOTAL"), mean(col("COUNT")).alias("CRIMES_MONTHLY") ) \
              .orderBy("DISTRICT")
               
q1_distr.show()               
               


In [21]:
%spark.pyspark
from pyspark.sql import Window
from pyspark.sql.functions import desc, row_number, collect_list, concat_ws

windowFrequentSpec = Window.partitionBy("DISTRICT").orderBy(desc("COUNT"))
q2_freq_crimes = df_crime_clean_2 \
                .groupBy("DISTRICT", "CRIME_TYPE") \
                .count().alias("COUNT") \
                .withColumn("ROW_NUMBER", row_number().over(windowFrequentSpec)) \
                .filter(col("ROW_NUMBER") <= 3) \
                .drop("ROW_NUMBER") \
                .groupBy("DISTRICT") \
                .agg(collect_list("CRIME_TYPE").alias("CRIME_TYPE_ARR")) \
                .withColumn("FREQUENT_CRIME_TYPES", concat_ws(",",col("CRIME_TYPE_ARR"))) \
                .drop("CRIME_TYPE_ARR")

q2_freq_crimes.show(truncate = False)

In [22]:
%spark.pyspark


In [23]:
%spark.pyspark


q3_coord = df_crime_clean_2 \
          .filter(col("LAT").isNotNull() & col("LONG").isNotNull()) \
          .groupBy("DISTRICT") \
          .agg(avg("LAT").alias("lat"),avg("LONG").alias("lng") )

#filtering actually not needed for avg aggregates

q3_coord.orderBy("DISTRICT").show(999)

In [24]:
%spark.pyspark


In [25]:
%spark.pyspark
#q1_distr.printSchema()
#q2_freq_crimes.printSchema()
#q3_coord.printSchema()

df_result = q1_distr \
            .join(q2_freq_crimes, "DISTRICT", "left") \
            .join(q3_coord, "DISTRICT", "left") \
            .orderBy("DISTRICT")
        
df_result.show()

In [26]:
%spark.pyspark
