In [None]:
!pip install findspark
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
from google.colab import drive  # Import the drive module
import os
# Mount Google Drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from pyspark.sql import SparkSession

#creating spark Session
spark = SparkSession.builder.appName("ReadCSV").getOrCreate()

#reading the file in spark Dataframe
file_path = '/content/drive/MyDrive/Colab Notebooks/NYC/nyc_311_data.csv'

df_spark = spark.read.csv(file_path, header=True, inferSchema=True)

# Now you can work with the PySpark DataFrame 'df_spark'
df_spark.printSchema()
df_spark.show(5)


root
 |-- created_date: timestamp (nullable = true)
 |-- closed_date: timestamp (nullable = true)
 |-- agency: string (nullable = true)
 |-- complaint_type: string (nullable = true)
 |-- descriptor: string (nullable = true)
 |-- location_type: string (nullable = true)
 |-- incident_zip: double (nullable = true)
 |-- city: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- open_data_channel_type: string (nullable = true)
 |-- community_board: string (nullable = true)
 |-- facility_type: string (nullable = true)

+-------------------+-------------------+------+--------------------+--------------------+--------------------+------------+----------------+---------+------------------+------------------+----------------------+---------------+-------------+
|       created_date|        closed_date|agency|      complaint_type|          descriptor|       location_type|incident_zip|            city|

In [None]:
from pyspark.sql.functions import col, count, when
# Check for missing values
print("Checking for missing values:")
df_spark.select([count(when(col(c).isNull(), c)).alias(c) for c in df_spark.columns]).show()

# Check for inconsistent values ( empty strings, spaces)
print("\nChecking for inconsistent values (empty strings and spaces):")
for col_name in df_spark.columns:
    count_empty_strings = df_spark.filter(col(col_name) == "").count()
    count_spaces = df_spark.filter(col(col_name) == " ").count()
    if count_empty_strings > 0 or count_spaces > 0:
        print(f"Column '{col_name}':")
        if count_empty_strings > 0:
            print(f"  - Empty strings: {count_empty_strings}")
        if count_spaces > 0:
            print(f"  - Spaces: {count_spaces}")

# Check for data type inconsistencies (if needed, based on your schema)
print("\nChecking for data type inconsistencies (if any):")

#Example, if you think that the column 'incident_zip' should be numeric:
if 'incident_zip' in df_spark.columns:
    try:
        df_spark.select(col("incident_zip").cast("integer")).show(1)
        print("incident_zip is castable to integer")
    except Exception as e:
        print(f"incident_zip cannot be casted to integer, error: {e}")

# Example of checking if a date column is really a date.
if 'created_date' in df_spark.columns:
    from pyspark.sql.functions import to_date
    try:
        df_spark.select(to_date(col("created_date"))).show(1)
        print("created_date is castable to date")
    except Exception as e:
        print(f"created_date cannot be casted to date, error: {e}")

if 'closed_date' in df_spark.columns:
    from pyspark.sql.functions import to_date
    try:
        df_spark.select(to_date(col("closed_date"))).show(1)
        print("closed_date is castable to date")
    except Exception as e:
        print(f"closed_date cannot be casted to date, error: {e}")

# checking if a complaint_type contains only allowed values.
if 'complaint_type' in df_spark.columns:
    allowed_complaint_types = df_spark.select('complaint_type').distinct().rdd.flatMap(lambda x: x).collect()
    print(f"\nUnique complaint types: {allowed_complaint_types}")

Checking for missing values:
+------------+-----------+------+--------------+----------+-------------+------------+-------+-------+--------+---------+----------------------+---------------+-------------+
|created_date|closed_date|agency|complaint_type|descriptor|location_type|incident_zip|   city|borough|latitude|longitude|open_data_channel_type|community_board|facility_type|
+------------+-----------+------+--------------+----------+-------------+------------+-------+-------+--------+---------+----------------------+---------------+-------------+
|           0|     955643|     0|             0|    141135|      8047543|     1564582|2150236|  28886| 2615636|  2615636|                     0|          28886|     14390533|
+------------+-----------+------+--------------+----------+-------------+------------+-------+-------+--------+---------+----------------------+---------------+-------------+


Checking for inconsistent values (empty strings and spaces):

Checking for data type inconsist

In [None]:
#since it is crucial to know when was the case closed it is better to delete the rows that do not have closed date
# Delete rows where 'closed_date' is null
df_cleaned = df_spark.filter(col("closed_date").isNotNull())

# Verify the deletion
print(f"Original DataFrame count: {df_spark.count()}")
print(f"Cleaned DataFrame count: {df_cleaned.count()}")


Original DataFrame count: 39218423
Cleaned DataFrame count: 38262780


In [None]:
from pyspark.sql.functions import col, when

# Define categorical columns (excluding date/time and numeric)
categorical_cols = [
    "agency",
    "complaint_type",
    "descriptor",
    "location_type",
    "city",
    "borough",
    "open_data_channel_type",
    "community_board",
    "facility_type",
]

# Replace null values or cells containing "N/A" with "Unknown" in categorical columns
for col_name in categorical_cols:
    df_cleaned = df_cleaned.withColumn(
        col_name,
        when(col(col_name).isNull() | (col(col_name) == "N/A"), "Unknown").otherwise(col(col_name))
    )


In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut, GeocoderServiceError

def get_zipcode(latitude, longitude):
    """Retrieves the zipcode for given latitude and longitude."""
    if latitude is None or longitude is None:
        return None  # Handle missing coordinates

    try:
        # Initialize geolocator within the UDF
        geolocator = Nominatim(user_agent="zipcode_replacement")
        location = geolocator.reverse((latitude, longitude), exactly_one=True, language='en')
        if location and location.raw.get('address') and location.raw['address'].get('postcode'):
            return location.raw['address']['postcode']
        else:
            return None # Handle cases where post code is not returned
    except (GeocoderTimedOut, GeocoderServiceError) as e:
        print(f"Geocoder error: {e}")
        return None
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

# Register UDF
get_zipcode_udf = udf(get_zipcode, StringType())

# Replace 'incident_zip' with zipcodes from lat/long
df_with_new_zip = df_cleaned.withColumn(
    "new_zipcode", get_zipcode_udf(col("latitude"), col("longitude"))
)

# Replace the original 'incident_zip' with the new zipcodes, if available.
df_replaced_zip = df_with_new_zip.withColumn(
    "incident_zip", when(col("new_zipcode").isNotNull(), col("new_zipcode").cast("double")).otherwise(col("incident_zip"))
).drop("new_zipcode")



In [None]:
# Delete rows where 'latitude' is null
df_temp = df_replaced_zip.filter(col("latitude").isNotNull())
# Delete rows where 'longitude' is null
df_final = df_temp.filter(col("longitude").isNotNull())


df_final.show(5)

+-------------------+-------------------+------+--------------------+--------------------+--------------------+------------+-------------+---------+------------------+------------------+----------------------+---------------+-------------+
|       created_date|        closed_date|agency|      complaint_type|          descriptor|       location_type|incident_zip|         city|  borough|          latitude|         longitude|open_data_channel_type|community_board|facility_type|
+-------------------+-------------------+------+--------------------+--------------------+--------------------+------------+-------------+---------+------------------+------------------+----------------------+---------------+-------------+
|2014-06-22 02:04:48|2014-06-22 03:19:21|  NYPD| Noise - Residential|    Loud Music/Party|Residential Build...|     10027.0|     NEW YORK|MANHATTAN|40.811948970584716|-73.94364832426544|                ONLINE|   10 MANHATTAN|     Precinct|
|2014-06-22 00:00:00|2014-06-24 00:00:00

In [None]:
print(f"The Final DataFrame count: {df_final.count()}")

The Final DataFrame count: 35698902


In [None]:
from pyspark.sql.functions import col, datediff, unix_timestamp, hour, dayofweek, month, year, when
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
from pyspark.sql.functions import unix_timestamp, col, hour, dayofweek, month, year, when

# Calculate response time
df_final = df_final.withColumn("response_time", unix_timestamp(col("closed_date")) - unix_timestamp(col("created_date")))

# Feature engineering
df_final = df_final.withColumn("hour", hour(col("created_date")))
df_final = df_final.withColumn("day_of_week", dayofweek(col("created_date")))
df_final = df_final.withColumn("month", month(col("created_date")))
df_final = df_final.withColumn("year", year(col("created_date")))
df_final = df_final.withColumn("time_of_day", when((col("hour") >= 6) & (col("hour") < 12), "morning").when((col("hour") >= 12) & (col("hour") < 18), "afternoon").when((col("hour") >= 18) & (col("hour") < 22), "evening").otherwise("night"))
df_final = df_final.withColumn("is_weekend", when((col("day_of_week") == 1) | (col("day_of_week") == 7), 1).otherwise(0))

#Cache the dataframe.
df_final = df_final.cache()



In [None]:
df_final.printSchema()

root
 |-- created_date: timestamp (nullable = true)
 |-- closed_date: timestamp (nullable = true)
 |-- agency: string (nullable = true)
 |-- complaint_type: string (nullable = true)
 |-- descriptor: string (nullable = true)
 |-- location_type: string (nullable = true)
 |-- incident_zip: double (nullable = true)
 |-- city: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- open_data_channel_type: string (nullable = true)
 |-- community_board: string (nullable = true)
 |-- facility_type: string (nullable = true)
 |-- response_time: long (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- time_of_day: string (nullable = false)
 |-- is_weekend: integer (nullable = false)



In [None]:


# String indexer and one-hot encoder
indexers = [StringIndexer(inputCol=col_name, outputCol=col_name + "_index", handleInvalid="keep") for col_name in categorical_cols]
encoders = [OneHotEncoder(inputCol=col_name + "_index", outputCol=col_name + "_encoded") for col_name in categorical_cols]


# Vector assembler
numerical_cols = ["incident_zip", "latitude", "longitude", "hour", "day_of_week", "month", "year","is_weekend"]
encoded_cols = [col_name + "_encoded" for col_name in categorical_cols]
assembler = VectorAssembler(inputCols=numerical_cols + encoded_cols, outputCol="features")

# Linear regression model
lr = LinearRegression(featuresCol="features", labelCol="response_time")

# Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, lr])


In [None]:
train_df, test_df = df_final.randomSplit([0.8, 0.2], seed=42)
test_df.show(5)

+-------------------+-------------------+------+--------------------+--------------------+------------+--------+---------+----------------------+--------------------+-------------+-------------+-----------+------------+-------------+----+-----------+-----+----+-----------+----------+
|       created_date|        closed_date|agency|      complaint_type|          descriptor|incident_zip|    city|  borough|open_data_channel_type|     community_board|facility_type|location_type|   latitude|   longitude|response_time|hour|day_of_week|month|year|time_of_day|is_weekend|
+-------------------+-------------------+------+--------------------+--------------------+------------+--------+---------+----------------------+--------------------+-------------+-------------+-----------+------------+-------------+----+-----------+-----+----+-----------+----------+
|2022-02-27 19:45:00|2022-02-27 20:23:48|  NYPD|          Encampment|             Unknown|        NULL| Unknown|MANHATTAN|                MOBILE|

In [None]:
# Train-test split
train_df, test_df = df_final.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train_df)

# Make predictions
predictions = model.transform(test_df)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="response_time", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")