# PySpark Evaluation

Date:  06_29_18

Candidate: John A

In [239]:
# fill in with command to read parquet file into a dataframe
parqFileName = 's3://caserta-bucket1/train.pqt'

df = spark.read.parquet(parqFileName)

### Get the total count of crimes that happened on a Tuesday

In [240]:
total_number_of_crimes_on_Tuesday = df.where(df.DayOfWeek=='Tuesday').count()
print 'Total number of crimes commited on a Tuesday: {0}'.format(total_number_of_crimes_on_Tuesday)

Total number of crimes commited on a Tuesday: 124965


### Which Police District has the most crime?

In [241]:
# import the spark-sql functions library
import pyspark.sql.functions as F

# aggregate the column by district and then get the count of crimes which happenin that district. Name the column
# of crime counts as Total_Number_of_Crimes
total_crime_by_district_df = df.groupBy(['PdDistrict']).agg(F.count('PdDistrict').alias('Total_Number_of_Crimes'))

# We could sort descending and take the first column, but instead we use the desc function for ordering and take 
# only the first Row RDD (list returned - so have to take the 0th entry) and then since the name of the district
# is in the first column we take the first entry.
district_with_the_most_crime = total_crime_by_district_df.orderBy(F.desc('Total_Number_of_Crimes')).take(1)[0][0]

print 'District with the most crime is: {0}'.format(district_with_the_most_crime)


District with the most crime is: SOUTHERN


### Find the maximum length of each string column

In [242]:
# Get the list of columns to apply transforms on
string_columns =  [item[0] for item in df.dtypes if item[1] == 'string']

# define a UDF to compute the length of the strings in the  column
from pyspark.sql.types import IntegerType
string_length_udf = F.udf(lambda x: len(x),IntegerType())

# Apply the UDF to the 
df_of_string_lengths = df.select([string_length_udf(name).alias(name) for name in string_columns])

# Create the aggregation expressions
aggregation_expressions = [F.max(col) for col in string_columns]

# Apply the aggregation expressions
df_of_string_lengths.agg(*aggregation_expressions).show()



+-------------+-------------+--------------+---------------+---------------+------------+
|max(Category)|max(Descript)|max(DayOfWeek)|max(PdDistrict)|max(Resolution)|max(Address)|
+-------------+-------------+--------------+---------------+---------------+------------+
|           27|           73|             9|             10|             38|          44|
+-------------+-------------+--------------+---------------+---------------+------------+



### Find the min, max, and average of each column that is a number of any type

In [243]:
# get list of columns with a 'numeric' type
numeric_columns = [item[0] for item in df.dtypes if item[1] in ['int','float','double','long','short','decimal']]

# Create the aggregation expressions. Make sure in the product to have the functions as the second part of the 
# cartestion product to gaurantee that results for each column are grouped together.
import itertools
aggregation_expressions = [transform(col) for col,transform in itertools.product(numeric_columns,[F.min,F.max,F.avg])]

#create a datagrame with just the numeric columns
df_numeric = df.select(numeric_columns)

# Compute the aggregations
df_numeric.agg(*aggregation_expressions).show()

# Another approach would be to generate the standard descriptive statistics as below.
df_numeric.describe().show()


+----------+------+-------------------+--------+------+-----------------+
|    min(X)|max(X)|             avg(X)|  min(Y)|max(Y)|           avg(Y)|
+----------+------+-------------------+--------+------+-----------------+
|-122.51364|-120.5|-122.42261639461971|37.70788|  90.0|37.77102031342325|
+----------+------+-------------------+--------+------+-----------------+

+-------+--------------------+-------------------+
|summary|                   X|                  Y|
+-------+--------------------+-------------------+
|  count|              878049|             878049|
|   mean| -122.42261639461971|  37.77102031342325|
| stddev|0.030353645185028408|0.45689310592877364|
|    min|          -122.51364|           37.70788|
|    max|              -120.5|               90.0|
+-------+--------------------+-------------------+



### Create a new dataframe with a new integer column called "addr_words" to have the number of words contained in the Address column.  Display the first 5 rows of Address and addr_words.

In [244]:
new_df = df.select(['Address'])

# Version 1:
# I will take the conventional definition that a word is any series of characters separated by a space
def word_count(sentence):
    if ' ' in sentence:
        return len(sentence.split(' '))
    else:
        return 1
from pyspark.sql.types import StringType
word_count_udf = F.udf(lambda x: word_count(x),IntegerType())
new_df = new_df.withColumn('addr_words',word_count_udf(new_df.Address))
new_df.show(5)

+--------------------+----------+
|             Address|addr_words|
+--------------------+----------+
|  OAK ST / LAGUNA ST|         5|
|  OAK ST / LAGUNA ST|         5|
|VANNESS AV / GREE...|         5|
|1500 Block of LOM...|         5|
|100 Block of BROD...|         5|
+--------------------+----------+
only showing top 5 rows



### What is the maximum and minimum number of words in the Address column?

In [245]:
new_df.agg(*[F.min(new_df.addr_words).alias('min_words'), F.max(new_df.addr_words).alias('max_words')]).show()

+---------+---------+
|min_words|max_words|
+---------+---------+
|        3|       10|
+---------+---------+



### Rename the column "PdDistrict" to "PoliceDistrict" and print the schema

In [246]:
df = df.withColumnRenamed('PdDistrict','PoliceDistrict')
df.printSchema()

root
 |-- Dates: timestamp (nullable = true)
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- PoliceDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- X: float (nullable = true)
 |-- Y: float (nullable = true)



### Consider the following code snippet ...

In [247]:
gameFileLoc = "s3://caserta-bucket1/Seahawks/game.csv"

gameDF = spark.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") \
            .option("path",gameFileLoc) \
            .option("sep","\t") \
            .option("header","true") \
            .option("inferSchema","true") \
            .option("mode", "FAILFAST") \
            .load(gameFileLoc)


### There is a problem with a column in the resulting dataframe.  Determine what the problem is and resolve the issue.

A preliminary inspection of the summary statistics show that Down has a value of 0. This isn't reasonable. I would not suggest changing this without discussing further with an SME.

In [248]:
for col in gameDF.columns:
    gameDF.select(col).describe().show()
    

+-------+------------------+
|summary|           GameKey|
+-------+------------------+
|  count|              2689|
|   mean| 55805.21680922276|
| stddev|184.99592840621645|
|    min|             55530|
|    max|             56157|
+-------+------------------+

+-------+------------------+
|summary|            PlayID|
+-------+------------------+
|  count|              2689|
|   mean|1997.0476013387877|
| stddev|1164.1755589722163|
|    min|                35|
|    max|              4447|
+-------+------------------+

+-------+------------+
|summary|HomeClubCode|
+-------+------------+
|  count|        2689|
|   mean|        null|
| stddev|        null|
|    min|         SEA|
|    max|         SEA|
+-------+------------+

+-------+---------------+
|summary|VisitorClubCode|
+-------+---------------+
|  count|           2689|
|   mean|           null|
| stddev|           null|
|    min|            ARZ|
|    max|            TEN|
+-------+---------------+

+-------+----------+
|summary|  G

I would say that there are more than a few issues with the columns, all of which will be remedied

1. IsScoringPlay should be boolean (assuming these are for recording keeping and not one-hot encoded for analytics)
2. The following columns should be of short type, extra storage could have impact analytics downstream:
    * Sequence
    * Quarter
    * HomeScore
    * VisitorScore
    * SpecialTeamsPlay
    * AbsoluteYardLine
    * YardsToGo
    * Down
    * PlayID
    
3. It feels very unnatural to separate the Date from the clock. If you are wanting to oder the entried by when they occurred, then these need to be combined in some way. Either as a DateTime (using DateType) or as an epoch (timestamp). My personal preference, due to simplicity of an epoch, is to convert their concatenation to a TimestampType 


In [249]:
#datetime converter
import datetime
def convert_date_and_time_to_timestamp(date,time):
    try:
        #return ('{0} {1}'.format(date,time))
        value = datetime.datetime.strptime('{0} {1}'.format(str(date),str(time)),'%m/%d/%Y %H:%M:%S')
    except:
        value = None
    return value

from pyspark.sql.types import TimestampType
datetimeUDF = F.udf(lambda x,y: convert_date_and_time_to_timestamp(x,y),TimestampType())

from pyspark.sql.types import DateType,BooleanType
gameDF = gameDF.withColumn('GameTime',datetimeUDF(gameDF.GameDate,gameDF.UniversalTimeClock))\
.drop('GameDate').drop('UniversalTimeClock')

# short conversions - since this converstion uses all columns (some with identity transform), it is performed after teh datetime conversion to epoch
columns_to_convert_to_short = ['Sequence','Quarter','HomeScore','VisitorScore','SpecialTeamsPlay','AbsoluteYardLine','YardsToGo','Down','PlayID']
short_conversion_expressions = [F.col(name).cast('short').alias(name) if name in columns_to_convert_to_short else F.col(name).alias(name) for name in gameDF.columns]

gameDF = gameDF.withColumn('IsScoringPlay',gameDF.IsScoringPlay.cast(BooleanType()))\
.select(*short_conversion_expressions)


Now the schema looks like this:

In [250]:
gameDF.printSchema()

root
 |-- GameKey: integer (nullable = true)
 |-- PlayID: short (nullable = true)
 |-- HomeClubCode: string (nullable = true)
 |-- VisitorClubCode: string (nullable = true)
 |-- ClockTime: string (nullable = true)
 |-- Down: short (nullable = true)
 |-- YardsToGo: short (nullable = true)
 |-- YardLine: string (nullable = true)
 |-- AbsoluteYardLine: short (nullable = true)
 |-- SpecialTeamsPlay: short (nullable = true)
 |-- Quarter: short (nullable = true)
 |-- Sequence: short (nullable = true)
 |-- IsScoringPlay: boolean (nullable = true)
 |-- PossessionTeam: string (nullable = true)
 |-- HomeScore: short (nullable = true)
 |-- VisitorScore: short (nullable = true)
 |-- PlayDescription: string (nullable = true)
 |-- GameTime: timestamp (nullable = true)



And the resulting DataFrame is:

In [251]:
gameDF.show()

+-------+------+------------+---------------+---------+----+---------+--------+----------------+----------------+-------+--------+-------------+--------------+---------+------------+--------------------+--------------------+
|GameKey|PlayID|HomeClubCode|VisitorClubCode|ClockTime|Down|YardsToGo|YardLine|AbsoluteYardLine|SpecialTeamsPlay|Quarter|Sequence|IsScoringPlay|PossessionTeam|HomeScore|VisitorScore|     PlayDescription|            GameTime|
+-------+------+------------+---------------+---------+----+---------+--------+----------------+----------------+-------+--------+-------------+--------------+---------+------------+--------------------+--------------------+
|  55774|    53|         SEA|            TEN|    15:00|   0|        0|  SEA 35|              65|               1|      1|      53|        false|           SEA|        0|           0|C.Wiggs kicks 65 ...|2012-08-11 02:05:...|
|  55774|    71|         SEA|            TEN|    15:00|   1|       10|  TEN 20|              80|    