# Extract Initial Data from 311 Service Data (1 Months) to Apache Spark

In [21]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
#os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars /Users/denisesonia/postgresql-42.2.24.jar"

In [22]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Intro to Apache Spark") \
    .config("spark.cores.max", "4") \
    .config('spark.executor.memory', '8G') \
    .config('spark.driver.maxResultSize', '8g') \
    .config('spark.kryoserializer.buffer.max', '512m') \
    .config("spark.driver.cores", "4") \
    .getOrCreate()



In [23]:
df = spark.read.format("csv") \
               .options(header='true', inferschema='true', treatEmptyValuesAsNulls='true') \
               .load("311_Service_Requests_from_2010_to_Present.csv")
df.count()

                                                                                

3970852

In [24]:
df.columns

['Unique Key',
 'Created Date',
 'Closed Date',
 'Agency',
 'Agency Name',
 'Complaint Type',
 'Descriptor',
 'Location Type',
 'Incident Zip',
 'Incident Address',
 'Street Name',
 'Cross Street 1',
 'Cross Street 2',
 'Intersection Street 1',
 'Intersection Street 2',
 'Address Type',
 'City',
 'Landmark',
 'Facility Type',
 'Status',
 'Due Date',
 'Resolution Description',
 'Resolution Action Updated Date',
 'Community Board',
 'BBL',
 'Borough',
 'X Coordinate (State Plane)',
 'Y Coordinate (State Plane)',
 'Open Data Channel Type',
 'Park Facility Name',
 'Park Borough',
 'Vehicle Type',
 'Taxi Company Borough',
 'Taxi Pick Up Location',
 'Bridge Highway Name',
 'Bridge Highway Direction',
 'Road Ramp',
 'Bridge Highway Segment',
 'Latitude',
 'Longitude',
 'Location',
 'Zip Codes',
 'Community Districts',
 'Borough Boundaries',
 'City Council Districts',
 'Police Precincts']

In [25]:
coba = df.select("Closed Date", "Created Date")
coba.show()

+-----------+--------------------+
|Closed Date|        Created Date|
+-----------+--------------------+
|       null|04/12/2023 12:00:...|
|       null|04/12/2023 12:00:...|
|       null|04/12/2023 02:07:...|
|       null|04/12/2023 02:06:...|
|       null|04/12/2023 02:06:...|
|       null|04/12/2023 02:06:...|
|       null|04/12/2023 02:05:...|
|       null|04/12/2023 02:05:...|
|       null|04/12/2023 02:05:...|
|       null|04/12/2023 02:04:...|
|       null|04/12/2023 02:03:...|
|       null|04/12/2023 02:02:...|
|       null|04/12/2023 02:02:...|
|       null|04/12/2023 02:02:...|
|       null|04/12/2023 02:01:...|
|       null|04/12/2023 01:59:...|
|       null|04/12/2023 01:59:...|
|       null|04/12/2023 01:58:...|
|       null|04/12/2023 01:58:...|
|       null|04/12/2023 01:58:...|
+-----------+--------------------+
only showing top 20 rows



# Transform the data accordingly

In [26]:
from pyspark.sql.functions import current_timestamp, lower, col, to_date, datediff, coalesce, unix_timestamp, when, lit

# Add a new column with today's date and time
df_raw = df.withColumn('LoadTimestamp', current_timestamp())

# Convert string columns to lowercase
string_columns = ["Agency", "Complaint Type", "Descriptor", "City", "Borough", "Incident Address", "Status"]
for column in string_columns:
    df_raw = df_raw.withColumn(column, lower(col(column)))

# Format Created Date
date_format = "M/d/yyyy h:mm:ss a"
df_raw = df_raw.withColumn("Created Date", to_date(unix_timestamp(col("Created Date"), date_format).cast("timestamp")))

# Format Closed Date
df_raw = df_raw.withColumn("Closed Date", when(col("Closed Date").isNotNull(), to_date(unix_timestamp(col("Closed Date"), date_format).cast("timestamp"))))

# Calculate the difference between Closed Date and Created Date
df_raw = df_raw.withColumn("DaysToClose", when(col("Closed Date").isNull(), 0).when(col("Closed Date") == col("Created Date"), 1).otherwise(datediff(col("Closed Date"), col("Created Date"))))

# Select columns
df_raw = df_raw.select("Unique Key", "Agency", "Complaint Type", "Location", "Status", "Closed Date", "Created Date", "DaysToClose", "LoadTimestamp")

# Show DataFrame
df_raw.show()


+----------+------+--------------------+--------------------+-----------+-----------+------------+-----------+--------------------+
|Unique Key|Agency|      Complaint Type|            Location|     Status|Closed Date|Created Date|DaysToClose|       LoadTimestamp|
+----------+------+--------------------+--------------------+-----------+-----------+------------+-----------+--------------------+
|  57288544|  dsny|   derelict vehicles|(40.8634631045716...|       open|       null|  2023-04-12|          0|2023-04-17 14:57:...|
|  57290182|  dsny|   derelict vehicles|(40.5942555419964...|       open|       null|  2023-04-12|          0|2023-04-17 14:57:...|
|  57282889|  nypd| noise - residential|(40.6975045385544...|in progress|       null|  2023-04-12|          0|2023-04-17 14:57:...|
|  57290428|   dot|    street condition|                null|in progress|       null|  2023-04-12|          0|2023-04-17 14:57:...|
|  57289291|  nypd| noise - residential|(40.8688924200005...|in progress|   

In [27]:
from pyspark.sql.functions import hash, abs

# Select the columns to check for uniqueness
loc = ['Location']

# Create a new DataFrame with only unique rows based on the selected columns
loc_df = df.drop_duplicates(loc)

# Select specific columns
loc_df = loc_df.withColumn('LocID', abs(hash("Location")))
loc_df = loc_df.select("LocID", "Location", "Latitude", "Longitude", "Incident Zip", "Incident Address", "Borough", "City")
loc_df.show()





+----------+--------------------+------------------+------------------+------------+--------------------+-------------+-------------+
|     LocID|            Location|          Latitude|         Longitude|Incident Zip|    Incident Address|      Borough|         City|
+----------+--------------------+------------------+------------------+------------+--------------------+-------------+-------------+
| 374147409|(40.4989488461683...|40.498948846168354| -74.2443650809073|       10307|                null|STATEN ISLAND|STATEN ISLAND|
|1114679381|(40.4989657939184...| 40.49896579391845|-74.24154617399876|       10307| 457 BRIGHTON STREET|STATEN ISLAND|STATEN ISLAND|
|2078495998|(40.4990929527163...|40.499092952716396|-74.24371839403513|       10307|   235 BILLOP AVENUE|STATEN ISLAND|STATEN ISLAND|
|   9578222|(40.4993188749562...| 40.49931887495625|-74.24071685602676|       10307|442 MANHATTAN STREET|STATEN ISLAND|STATEN ISLAND|
| 243960583|(40.4994039565189...| 40.49940395651891|-74.240720

[Stage 32:>                                                         (0 + 1) / 1]                                                                                

In [28]:
# Register DataFrames as temporary tables in Spark SQL
loc_df.createOrReplaceTempView("loc_df")
df_raw.createOrReplaceTempView("df_raw")

In [30]:
joined_df = spark.sql("""
    SELECT `Unique Key`, Agency, `Complaint Type`, Status, `Closed Date`, `Created Date`, LoadTimestamp, DaystoClose, a.LocID as location_id
    FROM df_raw d
    JOIN loc_df a ON d.Location= a.Location
""")
joined_df.show()

                                                                                

+----------+------+--------------------+-----------+-----------+------------+--------------------+-----------+-----------+
|Unique Key|Agency|      Complaint Type|     Status|Closed Date|Created Date|       LoadTimestamp|DaystoClose|location_id|
+----------+------+--------------------+-----------+-----------+------------+--------------------+-----------+-----------+
|  56587544|   dot|street light cond...|     closed| 2023-01-21|  2023-01-21|2023-04-17 14:59:...|          1|  374147409|
|  54620423|   dot|street light cond...|     closed| 2022-06-27|  2022-06-27|2023-04-17 14:59:...|          1|  374147409|
|  54619140|   dot|street light cond...|     closed| 2022-06-27|  2022-06-27|2023-04-17 14:59:...|          1|  374147409|
|  55742971|   dpr|    new tree request|     closed| 2022-11-09|  2022-10-19|2023-04-17 14:59:...|         21| 1114679381|
|  55582634|   dpr|    new tree request|     closed| 2022-11-09|  2022-10-02|2023-04-17 14:59:...|         38| 1114679381|
|  54889706|   d

# Load the data to PostgreSQL 

In [28]:
!pip3 install -U "psycopg[binary]"

Collecting psycopg-binary<=3.1.8,>=3.1.6
  Downloading psycopg_binary-3.1.8-cp310-cp310-macosx_11_0_arm64.whl (2.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m9.1 MB/s[0m eta [36m0:00:00[0m:00:01[0m0:01[0m0m
[?25hInstalling collected packages: psycopg-binary
Successfully installed psycopg-binary-3.1.8


In [31]:
#Connect the PostgreSQL Database called task_2
import psycopg, os

print('Connecting to the PostgreSQL database...')
conn = psycopg.connect(
    host="localhost",
    port='5432',
    dbname="311_service_data",
    user="postgres",
    password="123")

Connecting to the PostgreSQL database...


In [32]:
# create a cursor
cur = conn.cursor()

In [33]:
create_location = """ CREATE TABLE location (
              "LocID" INT PRIMARY KEY NOT NULL, 
              "Location" VARCHAR(300) NOT NULL, 
              "Latitude" DOUBLE PRECISION, 
              "Longitude" DOUBLE PRECISION,
              "Incident Zip" INT, 
              "Incident Address" VARCHAR(500),
              "Borough" VARCHAR(30),
              "City" VARCHAR(20)
                )
            """
    
cur.execute(create_location)
conn.commit()

In [34]:
conn.rollback()

In [35]:
create_main = """ CREATE TABLE main_request (
              "Unique Key" INT PRIMARY KEY NOT NULL, 
              "Agency" VARCHAR(30), 
              "Complaint Type" VARCHAR(100),
              "Descriptor" VARCHAR(400),
              "Status" VARCHAR(200),
              "Closed Date" DATE, 
              "Created Date" DATE,
              "LoadTimestamp" TIMESTAMP,
              "location_id" INT NOT NULL 
                )
            """
    
cur.execute(create_main)
conn.commit()

In [73]:
!pip3 install sqlalchemy


Collecting sqlalchemy
  Downloading SQLAlchemy-2.0.9-cp310-cp310-macosx_11_0_arm64.whl (2.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m11.1 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Installing collected packages: sqlalchemy
Successfully installed sqlalchemy-2.0.9


In [76]:
!pip3 install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.6-cp310-cp310-macosx_11_0_arm64.whl (2.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.6


In [36]:
# PostgreSQL connection properties
db_url = "jdbc:postgresql://localhost:5432/311_service_data"
table_name = "location"
properties = {
    "user": "postgres",
    "password": "123",
    "driver": "org.postgresql.Driver"
}

# Write the DataFrame to PostgreSQL
loc_df.write.jdbc(url=db_url, table=table_name, mode="overwrite", properties=properties)

                                                                                

In [37]:
# PostgreSQL connection properties
db_url = "jdbc:postgresql://localhost:5432/311_service_data"
table_name = "main_request"
properties = {
    "user": "postgres",
    "password": "123",
    "driver": "org.postgresql.Driver"
}

# Write the DataFrame to PostgreSQL
joined_df.write.jdbc(url=db_url, table=table_name, mode="overwrite", properties=properties)

                                                                                

In [85]:
conn.rollback()

In [38]:
import pandas as pd
query = "SELECT * FROM location;"
cur.execute(query)
data = cur.fetchall()

column_names = [desc[0] for desc in cur.description]
df = pd.DataFrame(data, columns=column_names)

print(df)

             LocID                                  Location   Latitude  \
0               42                                      None        NaN   
1       2134162157   (40.49900071077867, -74.24060424632607)  40.499001   
2       1624883563  (40.499378540887335, -74.23974984219767)  40.499379   
3        380950840    (40.4997508076291, -74.23892776954874)  40.499751   
4       1417674560   (40.50014121024302, -74.24384446800241)  40.500141   
...            ...                                       ...        ...   
668178  1815532003  (40.908129374877475, -73.90000546783567)  40.908129   
668179   985138399   (40.90866525712599, -73.90078248376851)  40.908665   
668180  1198277803   (40.90875661114392, -73.90169402865659)  40.908757   
668181   295844999  (40.911065649519074, -73.89939685885678)  40.911066   
668182   829719397   (40.91165661257873, -73.89725414322062)  40.911657   

        Longitude Incident Zip      Incident Address        Borough  \
0             NaN        100

In [39]:
import pandas as pd

query = "SELECT * FROM main_request LIMIT 10;"
cur.execute(query)
data = cur.fetchall()

column_names = [desc[0] for desc in cur.description]
df = pd.DataFrame(data, columns=column_names)

print(df)

   Unique Key Agency                       Complaint Type  Status Closed Date  \
0    56587544    dot               street light condition  closed  2023-01-21   
1    54620423    dot               street light condition  closed  2022-06-27   
2    54619140    dot               street light condition  closed  2022-06-27   
3    55742971    dpr                     new tree request  closed  2022-11-09   
4    55582634    dpr                     new tree request  closed  2022-11-09   
5    52978300   dsny  request large bulky item collection  closed  2022-01-07   
6    54889706    dep                                sewer  closed  2022-07-24   
7    53158875   dsny  request large bulky item collection  closed  2022-01-28   
8    52954054   dsny  request large bulky item collection  closed  2022-01-07   
9    56669574   nypd                    abandoned vehicle  closed  2023-01-31   

  Created Date           LoadTimestamp  DaystoClose  location_id  
0   2023-01-21 2023-04-17 15:05:12.984   

In [40]:

delete_duplicates = """ DELETE FROM location t1
                        USING location t2
                        WHERE t1.ctid < t2.ctid
                        AND t1."LocID" = t2."LocID"
                    """

cur.execute(delete_duplicates)
conn.commit()

# Add the primary key constraint to the 'LocID' column
alter_location_primary_key = """ ALTER TABLE location
                                 ADD PRIMARY KEY ("LocID")
                             """

cur.execute(alter_location_primary_key)
conn.commit()


In [41]:
alter_main = """ ALTER TABLE main_request
                ADD CONSTRAINT fk_location_id
                FOREIGN KEY (location_id)
                REFERENCES location("LocID")
            """

cur.execute(alter_main)
conn.commit()
