In [125]:
import pyspark
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings('ignore')
import warnings; warnings.simplefilter('ignore')
import pandas as pd
from pyspark.sql.functions import *
from google.cloud import storage
from pyspark.sql import types

In [126]:
spark = SparkSession.builder \
        .master("local") \
        .appName("311 Service Request") \
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

In [127]:


schema = types.StructType([
 types.StructField('sr_number', types.StringType(),True),
 types.StructField('sr_type', types.StringType(),True),
 types.StructField('sr_short_code', types.StringType(),True),
 types.StructField('owner_department', types.StringType(),True),
 types.StructField('status', types.StringType(),True),
 types.StructField('created_date', types.TimestampType(),True),
 types.StructField('last_modified_date', types.TimestampType(),True),
 types.StructField('closed_date', types.TimestampType(),True),
 types.StructField('street_address', types.StringType(),True),
 types.StructField('city', types.StringType(),True),
 types.StructField('state', types.StringType(),True),
 types.StructField('zip_code', types.LongType(), True),
 types.StructField('street_number', types.LongType(), True),
 types.StructField('street_direction', types.StringType(), True),
 types.StructField('street_name', types.StringType(), True),
 types.StructField('street_type', types.StringType(), True),
 types.StructField('duplicate', types.BooleanType(), True),
 types.StructField('legacy_record', types.BooleanType(), True),
 types.StructField('legacy_sr_number', types.IntegerType(),True),
 types.StructField('parent_sr_number', types.BinaryType(),True),
 types.StructField('community_area', types.LongType(),True),
 types.StructField('ward', types.LongType(),True),
 types.StructField('electrical_district', types.FloatType(),True),
 types.StructField('electricity_grid', types.StringType(),True),
 types.StructField('police_sector', types.LongType(), True),
 types.StructField('police_district', types.FloatType(), True),
 types.StructField('police_beat', types.FloatType(), True),
 types.StructField('precinct', types.LongType(), True),
 types.StructField('sanitation_division_days', types.IntegerType(), True),
 types.StructField('created_hour', types.LongType(), True),
 types.StructField('created_day_of_week', types.LongType(), True),
 types.StructField('created_month', types.LongType(), True),
 types.StructField('x_coordinate', types.DoubleType(), True),
 types.StructField('y_coordinate', types.DoubleType(), True),
 types.StructField('latitude', types.DoubleType(), True),
 types.StructField('longitude', types.DoubleType(), True),
 types.StructField('location', types.StringType(),True)
 ])

In [132]:
df = spark.read \
    .option("header",True) \
    .schema(schema) \
    .csv("gs://datalake-311-bronze/311request_2022_03_21-03_42_29_AM.csv")


In [134]:
df_sel =df.select("sr_number", "sr_type", "sr_short_code", "owner_department","status", "created_date", \
                  "last_modified_date", "closed_date", "street_address", "city", "state", \
                  "zip_code", "street_number", "street_type", "duplicate")


In [131]:
df_sel.count()

                                                                                

2726

In [135]:
df1 =df_sel.withColumn("hash_sr_number", sha2(col("sr_number"),256))

In [136]:
df1.printSchema


<bound method DataFrame.printSchema of DataFrame[sr_number: string, sr_type: string, sr_short_code: string, owner_department: string, status: string, created_date: timestamp, last_modified_date: timestamp, closed_date: timestamp, street_address: string, city: string, state: string, zip_code: bigint, street_number: bigint, street_type: string, duplicate: boolean, hash_sr_number: string]>

In [164]:
df1.groupBy("duplicate").count().show()
df1.filter(df1.duplicate == ' ').count()

[Stage 116:>                                                        (0 + 1) / 1]

+---------+-----+
|duplicate|count|
+---------+-----+
|     null| 1726|
|     true|   47|
|    false|  953|
+---------+-----+



                                                                                

0

In [190]:
df2 =df1.na.fill(False)   #.filter(df1.duplicate != 'true').show(10)

In [194]:
df2.select("duplicate").filter(df2.duplicate == 'true').count()
df2.select("duplicate").filter(df2.duplicate != 'true').count()

                                                                                

2679

In [196]:
df2.select("sr_number","sr_type", "created_date","last_modified_date", "closed_date" ).show(100)

+-------------------+--------------------+-------------------+-------------------+-------------------+
|          sr_number|             sr_type|       created_date| last_modified_date|        closed_date|
+-------------------+--------------------+-------------------+-------------------+-------------------+
|      SR22-00454513|Street Light Out ...|2022-03-20 21:15:03|2022-03-20 21:15:05|               null|
|               null|                    |               null|               null|               null|
| (41.99184798194465| -87.665447242925...|               null|               null|               null|
|      SR22-00454512|311 INFORMATION O...|2022-03-20 21:14:46|2022-03-20 21:14:47|2022-03-20 21:14:47|
|      SR22-00454511|Aircraft Noise Co...|2022-03-20 21:14:46|2022-03-20 21:14:47|2022-03-20 21:14:47|
|      SR22-00454510|Aircraft Noise Co...|2022-03-20 21:14:43|2022-03-20 21:14:43|2022-03-20 21:14:43|
|      SR22-00454509|Aircraft Noise Co...|2022-03-20 21:14:34|2022-03-20 

[Stage 166:>                                                        (0 + 1) / 1]                                                                                

In [None]:
sr_number: string, sr_type: string, sr_short_code: string, owner_department: string, status: string, created_date: timestamp, last_modified_date: timestamp, closed_date