## Application settings

In [1]:
from pyspark.sql import SparkSession


spark_session = SparkSession.builder\
    .master("spark://192.168.2.70:7077") \
    .appName("emiresenov")\
    .config("spark.dynamicAllocation.enabled", True)\
    .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
    .config("spark.shuffle.service.enabled", True)\
    .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
    .config("spark.cores.max", 4)\
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/17 12:26:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Section B

In [2]:
# B.1 - Load the CSV file and call show()
df = spark_session.read.option("header",True).csv('hdfs://192.168.2.70:9000/parking-citations.csv')

df.show(truncate=False, vertical=True) # Display as vertical for nicer formatting

                                                                                

-RECORD 0-----------------------------------------
 Ticket number          | 1103341116              
 Issue Date             | 2015-12-21T00:00:00.000 
 Issue time             | 1251                    
 Meter Id               | null                    
 Marked Time            | null                    
 RP State Plate         | CA                      
 Plate Expiry Date      | 200304                  
 VIN                    | null                    
 Make                   | HOND                    
 Body Style             | PA                      
 Color                  | GY                      
 Location               | 13147 WELBY WAY         
 Route                  | 01521                   
 Agency                 | 1                       
 Violation code         | 4000A1                  
 Violation Description  | NO EVIDENCE OF REG      
 Fine amount            | 50                      
 Latitude               | 99999                   
 Longitude              | 99999

In [3]:
# B.2 - Print the schema for the DataFrame
df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [4]:
# B.2 - Count rows in the CSV file
df.count()

                                                                                

13077724

In [5]:
# B.4 - Count the number of partitions in the underlying RDD
df_rdd = df.rdd
df_rdd.getNumPartitions()

16

In [6]:
# B.5 - Drop the columns VIN, Latitute, Longitute
df2 = df.drop("VIN", "Latitude","Longitude")

df2.printSchema() # Print schema
df2.show(truncate=False, vertical=True) # Show to confirm changes

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)

-RECORD 0-----------------------------------------
 Ticket number          | 1103341116              
 Issue Date             | 2015-12-21T00:00:00.000 
 Issue ti

In [7]:
# B.6 - Find maximum fine amount. How many fines have this amount?

from pyspark.sql.functions import col
from pyspark.sql.functions import max
from pyspark.sql.functions import count

# Extract fine amount column
fineCol = df2.select(col("Fine amount").cast('float'))

# Find max fine
fineCol.select(max(col("Fine amount"))).show()

# Count max fine entries
fineCol.select('Fine amount').where(col("Fine amount") == 1100.0).count()

                                                                                

+----------------+
|max(Fine amount)|
+----------------+
|          1100.0|
+----------------+



                                                                                

626

In [8]:
# B.7 - Show the top 20 most frequent vehicle makes, and their frequencies
countMake = df2.groupBy('Make').count().sort(col("count").desc()).show()



+----+-------+
|Make|  count|
+----+-------+
|TOYT|2150768|
|HOND|1479996|
|FORD|1116235|
|NISS| 945133|
|CHEV| 892676|
| BMW| 603092|
|MERZ| 543298|
|VOLK| 432030|
|HYUN| 404917|
|DODG| 391686|
|LEXS| 368420|
| KIA| 328155|
|JEEP| 316300|
|AUDI| 255395|
|MAZD| 242344|
|OTHR| 205546|
| GMC| 184889|
|INFI| 174315|
|CHRY| 159948|
|SUBA| 154640|
+----+-------+
only showing top 20 rows



                                                                                

In [9]:
# B.8 - Create new column 'Color long', mapping the colors to their corresponding values

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black', 'BL':'Blue',
'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze', 'CH':'Charcoal', 'DK':'Dark',
'GD':'Gold', 'GO':'Gold', 'GN':'Green', 'GY':'Gray', 'GT':'Granite',
'IV':'Ivory', 'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White', 'WH':'White',
'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}

# Define color mapping function
def mapColor(col):
    try:
        return COLORS[col]
    except:
        return COLORS['UN']

# Make UDF
mapColorUDF = udf(lambda x: mapColor(x),StringType())

# Add column with UDF and display
df3 = df2.withColumn("Color long", mapColorUDF(col("Color"))) 
df3.show(truncate=False, vertical=True)

-RECORD 0-----------------------------------------
 Ticket number          | 1103341116              
 Issue Date             | 2015-12-21T00:00:00.000 
 Issue time             | 1251                    
 Meter Id               | null                    
 Marked Time            | null                    
 RP State Plate         | CA                      
 Plate Expiry Date      | 200304                  
 Make                   | HOND                    
 Body Style             | PA                      
 Color                  | GY                      
 Location               | 13147 WELBY WAY         
 Route                  | 01521                   
 Agency                 | 1                       
 Violation code         | 4000A1                  
 Violation Description  | NO EVIDENCE OF REG      
 Fine amount            | 50                      
 Agency Description     | null                    
 Color Description      | null                    
 Body Style Description | null 

                                                                                

In [10]:
# B.9 - Find the most frequent color value for Toyotas
df3.groupBy('Color long').count().sort(col("count").desc()).show()



+----------+-------+
|Color long|  count|
+----------+-------+
|     White|2922676|
|     Black|2846765|
|      Gray|2567571|
|    Silver|1682040|
|      Blue|1007674|
|       Red| 680418|
|     Green| 336771|
|     Brown| 243429|
|   Unknown| 218526|
|      Gold| 159727|
|    Maroon| 148768|
|       Tan|  93064|
|     Beige|  79378|
|    Yellow|  50299|
|    Orange|  38121|
|      Pink|   1998|
|  Charcoal|    326|
|    Bronze|    152|
|     Olive|     14|
|     Smoke|      2|
+----------+-------+
only showing top 20 rows



                                                                                

## Answer

The most frequent color value for Toyotas is white.