# A3 Section B - Working with DataFrames and SQL

In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

In [2]:
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [3]:
# setting random seed for notebook reproducability
rnd_seed=23
np.random.seed=rnd_seed
np.random.set_state=rnd_seed

In [4]:
spark_session = SparkSession.builder \
   .master("spark://192.168.2.156:7077") \
   .appName("alexander_sundquist_A3_B") \
   .config("spark.dynamicAllocation.enabled", True) \
   .config("spark.dynamicAllocation.shuffleTracking.enabled", True) \
   .config("spark.shuffle.service.enabled", False) \
   .config("spark.dynamicAllocation.executorIdleTimeout", "30s") \
   .config("spark.cores.max", 8) \
   .config("spark.driver.port",9999)\
   .config("spark.blockManager.port",10005)\
   .getOrCreate() 

spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/07 10:02:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/07 10:02:38 WARN Utils: Service 'sparkDriver' could not bind on port 9999. Attempting port 10000.
25/03/07 10:02:39 WARN StandaloneSchedulerBackend: Dynamic allocation enabled without spark.executor.cores explicitly set, you may get more executors allocated than expected. It's recommended to set spark.executor.cores explicitly. Please check SPARK-30299 for more details.


In [5]:
sqlContext = SQLContext(spark_session.sparkContext)
sqlContext



<pyspark.sql.context.SQLContext at 0x7f7ea1779fc0>

In [6]:
#Loading Data to dataframe, store in cache memory to increase speed
df = sqlContext.read.csv("hdfs://192.168.2.156:9000/data/los-angeles-parking-citations", header="true", inferSchema="true").cache()

                                                                                

In [7]:
df.show(10)



+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|          Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    NULL|       NULL|            CA|           200304|NULL|HOND|        PA|   GY|  

                                                                                

In [8]:
#getting the schema info from the dataframe
df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [9]:
# Count total number of rows
print(f"Total Rows: {df.count()}")

# Maximum rows in excel: 1,048,576
# https://support.office.com/en-us/article/excel-specifications-and-limits-1672b34d-7043-467e-8e27-269d656771c3

# Count the total number of columns
print(f"Total Cols: {len(df.columns)}")



Total Rows: 13079582
Total Cols: 22


                                                                                

In [10]:
df.rdd.getNumPartitions()

16

In [11]:
df1 = df.drop('Vin', 'Longitude', 'Latitude')
df1.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



## B.6

In [12]:
# Convert fine amount to float
df1 = df1.withColumn('Fine amount',df1['Fine amount'].cast("float").alias('Fine amount'))
df1.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: float (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [13]:
# Find maximum fine amount
row1 = df1.agg({"Fine amount": "max"}).collect()[0]
print(row1["max(Fine amount)"])



1100.0


                                                                                

In [14]:
df2 = df1.select(col("Fine amount").alias("fine_amount"))
df2.printSchema()

root
 |-- fine_amount: float (nullable = true)



In [15]:
# Count occurence of max value (1100.0)
df2.groupBy('fine_amount').count().show() 




+-----------+-------+
|fine_amount|  count|
+-----------+-------+
|       58.0| 994710|
|      105.0|    824|
|      163.0| 153489|
|      143.0|    530|
|      255.0|     36|
|       NULL|  75329|
|      293.0|  18770|
|      345.0|     43|
|       55.0|    143|
|       10.0|    172|
|      505.0|      6|
|       53.0|    929|
|       68.0|1938725|
|      128.0|   1062|
|       40.0|    295|
|      133.0|   9684|
|      363.0|  87596|
|       35.0|    270|
|       50.0| 138041|
|       78.0|  38642|
+-----------+-------+
only showing top 20 rows



                                                                                

In [16]:
nmbr_1100_tickets = df2.filter(df2.fine_amount=='1100.0').count()
print(f'Number of tickets of value 1100.0 given out: {nmbr_1100_tickets}')



Number of tickets of value 1100.0 given out: 626


                                                                                

## B.7

In [17]:
df1.groupBy('Make').count().show()



+----+------+
|Make| count|
+----+------+
|CARS|    44|
|FRET|     5|
|WINN|  3007|
|LIBE|    12|
|ARRW|     2|
|MERK|   946|
| SBT|     2|
|MUNI|     5|
|DRLN|     6|
|VELO|     6|
|ZIEM|    20|
| APR|     1|
|CMPI|     1|
| WAB|     4|
|PREL|     1|
|DAWE|     2|
|BRAU|     2|
|FRWS|     6|
|DODG|391686|
|DUES|    84|
+----+------+
only showing top 20 rows



                                                                                

In [18]:
df1.groupby("Make").count().agg(mode("count")).show()




+-----------+
|mode(count)|
+-----------+
|          1|
+-----------+



                                                                                

In [19]:
df1.groupBy('Make').agg(count('Make').alias('make_counts')).orderBy(col('make_counts').desc()).show(20)



+----+-----------+
|Make|make_counts|
+----+-----------+
|TOYT|    2150768|
|HOND|    1479996|
|FORD|    1116235|
|NISS|     945133|
|CHEV|     892676|
| BMW|     603092|
|MERZ|     543298|
|VOLK|     432030|
|HYUN|     404917|
|DODG|     391686|
|LEXS|     368420|
| KIA|     328155|
|JEEP|     316300|
|AUDI|     255395|
|MAZD|     242344|
|OTHR|     205546|
| GMC|     184889|
|INFI|     174315|
|CHRY|     159948|
|SUBA|     154640|
+----+-----------+
only showing top 20 rows



                                                                                

In [20]:
df1.groupBy('Make').count().orderBy(col('count').desc()).show(20)



+----+-------+
|Make|  count|
+----+-------+
|TOYT|2150768|
|HOND|1479996|
|FORD|1116235|
|NISS| 945133|
|CHEV| 892676|
| BMW| 603092|
|MERZ| 543298|
|VOLK| 432030|
|HYUN| 404917|
|DODG| 391686|
|LEXS| 368420|
| KIA| 328155|
|JEEP| 316300|
|AUDI| 255395|
|MAZD| 242344|
|OTHR| 205546|
| GMC| 184889|
|INFI| 174315|
|CHRY| 159948|
|SUBA| 154640|
+----+-------+
only showing top 20 rows



                                                                                

## B.8

In [21]:
df1.groupBy('Color').count().show()



+-----+-------+
|Color|  count|
+-----+-------+
|   PU|   1049|
|   SL|1604134|
|   PI|    171|
|   RE|  41282|
|   PL|     49|
|   OR|  38121|
|   OL|     14|
|   GO| 159723|
|   BU|   3635|
|   RU|    185|
|   LI|      1|
| NULL|  37193|
|   TU|   5872|
|   BN| 234905|
|   WI|      8|
|   PK|   1998|
|   SA|     17|
|   BL|1007674|
|   YE|  50253|
|   BR|   8524|
+-----+-------+
only showing top 20 rows



                                                                                

In [22]:
COLORS = { 
'AL': 'Aluminum', 'AM': 'Amber', 'BG': 'Beige', 'BK': 'Black',  
'BL': 'Blue', 'BN': 'Brown', 'BR': 'Brown', 'BZ': 'Bronze',  
'CH': 'Charcoal', 'DK': 'Dark', 'GD': 'Gold', 'GO': 'Gold',  
'GN': 'Green', 'GY': 'Gray', 'GT': 'Granite', 'IV': 'Ivory',  
'LT': 'Light', 'OL': 'Olive', 'OR': 'Orange', 'MR': 'Maroon',  
'PK': 'Pink', 'RD': 'Red', 'RE': 'Red', 'SI': 'Silver', 'SL': 'Silver',  
'SM': 'Smoke', 'TN': 'Tan', 'VT': 'Violet', 'WT': 'White', 'WH': 'White',  
'YL': 'Yellow', 'YE': 'Yellow', 'UN': 'Unknown' 
} 

In [36]:
def color_mapping(color):
    return COLORS.get(color, color)
    

mapColorUDF = udf(lambda x: color_mapping(x))

df1 = df1.withColumn('color_long', mapColorUDF(col('color')))

In [37]:
df1.select('color', 'color_long').show(10)

+-----+----------+
|color|color_long|
+-----+----------+
|   GY|      Gray|
|   WH|     White|
|   BK|     Black|
|   WH|     White|
|   BK|     Black|
|   GY|      Gray|
|   BL|      Blue|
|   BK|     Black|
|   BR|     Brown|
|   SI|    Silver|
+-----+----------+
only showing top 10 rows



## B.9

In [38]:
df1.groupBy('color_long','Make').count()

DataFrame[color_long: string, Make: string, count: bigint]

In [41]:
df1.where(df1.Make=='TOYT').groupBy('color_long').count().orderBy(col('count').desc()).show(10)



+----------+------+
|color_long| count|
+----------+------+
|      Gray|489697|
|     White|434595|
|     Black|353812|
|    Silver|347894|
|      Blue|180091|
|       Red|119074|
|     Green| 74968|
|      Gold| 40646|
|    Maroon| 26242|
|       Tan| 23355|
+----------+------+
only showing top 10 rows



                                                                                