In [38]:
# 1. Start Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, to_timestamp, concat_ws, col, year, month, dayofmonth, hour, date_format, when, count

# mongo-spark-connector
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 pyspark-shell"
spark = SparkSession.builder \
    .appName("NYPD_ETL_Project") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/nyc_crime.complaints") \
    .getOrCreate()

In [40]:
# 2. Load Dataset
file_path = "NYPD_Complaint_Data_Historic_20250406.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

df.show(5)
df.printSchema()
df.count()

                                                                                

+----------+------------+------------+------------+------------+-----------+----------+-----+--------------+-----+--------------------+----------------+-----------+---------+-----------------+--------------------+-------------------+-----------------+--------+----------+-----------+----------+----------+--------------+---------+--------+----------------+---------+----------+--------------------+--------------------+------------+-------------+--------------+-------+
|CMPLNT_NUM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|ADDR_PCT_CD|    RPT_DT|KY_CD|     OFNS_DESC|PD_CD|             PD_DESC|CRM_ATPT_CPTD_CD| LAW_CAT_CD|  BORO_NM|LOC_OF_OCCUR_DESC|       PREM_TYP_DESC|         JURIS_DESC|JURISDICTION_CODE|PARKS_NM|HADEVELOPT|HOUSING_PSA|X_COORD_CD|Y_COORD_CD|SUSP_AGE_GROUP|SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT| Latitude| Longitude|             Lat_Lon|         PATROL_BORO|STATION_NAME|VIC_AGE_GROUP|      VIC_RACE|VIC_SEX|
+----------+------------+------------+------------+---------

                                                                                

8914838

In [42]:
# 3. Drop rows with critical nulls
df_clean = df.dropna(subset=["CMPLNT_FR_DT", "CMPLNT_FR_TM", "BORO_NM"])

In [44]:
# 4. Parse date (cast to string first to ensure correct parsing)
df_clean = df_clean.withColumn(
    "date_only",
    to_date(col("CMPLNT_FR_DT").cast("string"), "MM/dd/yyyy")
).filter(col("date_only").isNotNull())

In [46]:
# 5. Create timestamp from date + time
df_clean = df_clean.withColumn(
    "timestamp",
    to_timestamp(
        concat_ws(" ", col("CMPLNT_FR_DT").cast("string"), col("CMPLNT_FR_TM").cast("string")),
        "MM/dd/yyyy HH:mm:ss"
    )
).filter(col("timestamp").isNotNull())

In [48]:
# 6. Extract time features
df_clean = df_clean \
    .withColumn("year", year("date_only")) \
    .withColumn("month", month("date_only")) \
    .withColumn("day", dayofmonth("date_only")) \
    .withColumn("hour", hour("timestamp")) \
    .withColumn("day_of_week", date_format("timestamp", "EEEE")) \
    .withColumn("time_of_day", when(col("hour").between(6, 11), "Morning")
                                .when(col("hour").between(12, 17), "Afternoon")
                                .when(col("hour").between(18, 21), "Evening")
                                .otherwise("Night"))
df_clean.count()

                                                                                

8914136

In [49]:
# 7. Filter out corrupted years (like 1010, 1011)
df_clean = df_clean.filter(col("year") >= 2000)
df_clean.count()

                                                                                

8911652

In [53]:
# Select the columns needed

# Drop the column
df_selected = df_clean.drop("CMPLNT_TO_DT")
# Repartition to 1 file (so it doesn’t split into many CSVs)
df_selected = df_selected.coalesce(1)
# Write to CSV (as a folder with one CSV inside)
output_path = "cleaned_csv_output"
df_selected.write.csv(output_path, header=True, mode="overwrite")
#  Find the CSV file inside the output folder
for file in os.listdir("cleaned_csv_output"):
    if file.endswith(".csv"):
        full_path = os.path.join("cleaned_csv_output", file)
        break
# download
import shutil
shutil.copy(full_path, "cleaned_data.csv")
print(" Cleaned CSV is ready to download as 'cleaned_data.csv'")

                                                                                

 Cleaned CSV is ready to download as 'cleaned_data.csv'


In [64]:
df_selected.write \
    .format("mongo") \
    .mode("overwrite") \
    .save()
print("successfully load into MongoDB")

                                                                                

successfully load into MongoDB


In [66]:
from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")

db = client["nyc_crime"]
collection = db["complaints"]

result = collection.find_one()
print(result)

{'_id': ObjectId('681801e8ea4e953e3b0d23aa'), 'CMPLNT_NUM': '270364459', 'CMPLNT_FR_DT': '06/24/2023', 'CMPLNT_FR_TM': '13:10:00', 'CMPLNT_TO_TM': '13:51:00', 'ADDR_PCT_CD': 75, 'RPT_DT': '06/25/2023', 'KY_CD': 341, 'OFNS_DESC': 'PETIT LARCENY', 'PD_CD': 333, 'PD_DESC': 'LARCENY,PETIT FROM STORE-SHOPL', 'CRM_ATPT_CPTD_CD': 'COMPLETED', 'LAW_CAT_CD': 'MISDEMEANOR', 'BORO_NM': 'BROOKLYN', 'LOC_OF_OCCUR_DESC': '(null)', 'PREM_TYP_DESC': 'OTHER', 'JURIS_DESC': 'N.Y. POLICE DEPT', 'JURISDICTION_CODE': 0, 'PARKS_NM': '(null)', 'HADEVELOPT': '(null)', 'HOUSING_PSA': '(null)', 'X_COORD_CD': '1,020,327', 'Y_COORD_CD': '176,285', 'SUSP_AGE_GROUP': '(null)', 'SUSP_RACE': '(null)', 'SUSP_SEX': '(null)', 'Latitude': 40.650466, 'Longitude': -73.869986, 'Lat_Lon': '(40.650466, -73.869986)', 'PATROL_BORO': 'PATROL BORO BKLYN NORTH', 'STATION_NAME': '(null)', 'VIC_AGE_GROUP': '25-44', 'VIC_RACE': 'BLACK', 'VIC_SEX': 'D', 'date_only': datetime.datetime(2023, 6, 24, 4, 0), 'timestamp': datetime.datetime(

In [68]:
import os
import shutil

# Update path as needed
neo4j_import_path = "/Users/pallavigudipati/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-b8654556-9669-4d7f-ace5-dc88298c2713/import"
source_csv = "cleaned_data.csv"
destination_csv = os.path.join(neo4j_import_path, "cleaned_data.csv")

# Ensure the Neo4j import directory exists
os.makedirs(neo4j_import_path, exist_ok=True)

# Copy the file
shutil.copy(source_csv, destination_csv)
print(f"The file is already copied to Neo4j import folder: {destination_csv}")


The file is already copied to Neo4j import folder: /Users/pallavigudipati/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-b8654556-9669-4d7f-ace5-dc88298c2713/import/cleaned_data.csv


In [70]:
from neo4j import GraphDatabase
import pandas as pd

## Connect to Neo4j
database_name = "Crime Data"
username = "neo4j"
password = "pallavidata"
uri = "bolt://localhost:7687/apan5400" + database_name
driver = GraphDatabase.driver(uri, auth=(username, password))
session = driver.session()
print("Successfully connected to Neo4j!")

Successfully connected to Neo4j!


In [72]:
# # Cypher query to delete everything
# delete_all_query = "MATCH (n) DETACH DELETE n"

# # Execute the query

# with driver.session() as session:
#     session.run(delete_all_query)

# print("All graphs (nodes and relationships) deleted.")

In [74]:
import os
os.makedirs("static", exist_ok=True)

In [76]:
!pip install plotly networkx
!pip install py2neo



In [77]:
!pip install psycopg2-binary



In [None]:
import os
from flask import Flask, request, render_template_string
from pymongo import MongoClient
from collections import Counter
import folium
from folium.plugins import MarkerCluster
import psycopg2
import pandas as pd
import plotly.graph_objs as go
import plotly.io as pio
from neo4j import GraphDatabase
from pyvis.network import Network

# CONFIG

os.makedirs("static", exist_ok=True)

# MongoDB
mongo_client     = MongoClient("mongodb://localhost:27017/")
mongo_collection = mongo_client["nyc_crime"]["complaints"]

# PostgreSQL helper
def get_pg_conn():
    return psycopg2.connect(
        dbname="Data", user="postgres", password="123",
        host="localhost", port="5432"
    )

# Neo4j driver
NEO4J_URI  = "bolt://localhost:7687"
NEO4J_AUTH = ("neo4j", "pallavidata")
neo4j_driver = GraphDatabase.driver(NEO4J_URI, auth=NEO4J_AUTH)

# Flask
app = Flask(__name__, static_folder="static")


# 1) Graph1: Time–Crime–Location → static/graph1.html

def generate_graph1():
    with neo4j_driver.session() as session:
        session.run("""
        LOAD CSV WITH HEADERS
          FROM 'file:///cleaned_data.csv' AS line
        WITH line
        LIMIT 100

        MERGE (crime:Crime { complaint_id: line.CMPLNT_NUM })
        MERGE (time:Time { date: line.CMPLNT_FR_DT, time: line.CRM_ATPT_CPTD_CD })
        MERGE (location:Location {
          latitude: line.Latitude,
          longitude: line.Longitude,
          premise_type: coalesce(line.PREM_TYP_DESC,'UNKNOWN')
        })
        WITH crime, time, location,
          CASE 
            WHEN toInteger(substring(time.time,0,2)) >= 5  AND toInteger(substring(time.time,0,2)) < 12 THEN 'Morning'
            WHEN toInteger(substring(time.time,0,2)) >= 12 AND toInteger(substring(time.time,0,2)) < 17 THEN 'Afternoon'
            WHEN toInteger(substring(time.time,0,2)) >= 17 AND toInteger(substring(time.time,0,2)) < 21 THEN 'Evening'
            ELSE 'Night'
          END AS TimePeriodLabel,
          location.premise_type AS LocationTypeLabel
        MERGE (tp:TimePeriod { period: TimePeriodLabel })
        MERGE (lt:LocationType { type: LocationTypeLabel })
        MERGE (crime)-[:OCCURRED_IN_PERIOD]->(tp)
        MERGE (crime)-[:HAPPENED_AT_TYPE]->(lt);
        """)

        result = session.run("""
        MATCH (c:Crime)-[:OCCURRED_IN_PERIOD]->(tp:TimePeriod),
              (c)-[:HAPPENED_AT_TYPE]->(lt:LocationType)
        RETURN c, tp, lt
        LIMIT 100;
        """)

        net1 = Network(height="500px", width="100%", directed=True)
        for rec in result:
            c, tp, lt = rec["c"], rec["tp"], rec["lt"]
            net1.add_node(c.element_id,  label=c["complaint_id"], color="lightblue")
            net1.add_node(tp.element_id, label=tp["period"],         color="orange")
            net1.add_node(lt.element_id, label=lt["type"],           color="green")
            net1.add_edge(c.element_id, tp.element_id, label="Time")
            net1.add_edge(c.element_id, lt.element_id, label="Location")

        net1.write_html("static/graph1.html")

# 2) Graph2: Offense→Victim → static/graph2.html (yellow & bright blue)

def generate_graph2(limit=500):
    with neo4j_driver.session() as session:
        # import + count-aggregation (note the doubled {{ }} for literal braces)
        session.run(f"""
        LOAD CSV WITH HEADERS
          FROM 'file:///cleaned_data.csv' AS line
        WITH line
        LIMIT {limit}

        MERGE (off:Offense {{ description: coalesce(line.OFNS_DESC,'UNKNOWN') }})
        MERGE (vic:Victim {{
          age_group: coalesce(line.VIC_AGE_GROUP,'UNKNOWN'),
          race:      coalesce(line.VIC_RACE,'UNKNOWN'),
          sex:       coalesce(line.VIC_SEX,'UNKNOWN')
        }})
        MERGE (off)-[r:COMMITTED_AGAINST]->(vic)
          ON CREATE SET r.count = 1
          ON MATCH  SET r.count = r.count + 1;
        """)

        # fetch top‐N
        result = session.run(f"""
        MATCH (o:Offense)-[r:COMMITTED_AGAINST]->(v:Victim)
        RETURN o, r.count AS cnt, v
        ORDER BY cnt DESC
        LIMIT {limit};
        """)

        net2 = Network(height="500px", width="100%", directed=True)
        for rec in result:
            o, v, cnt = rec["o"], rec["v"], rec["cnt"]
            # golden yellow for offense
            net2.add_node(o.element_id,
                          label=f"Offense: {o['description']}",
                          color="#FFD700")
            # bright sky-blue for victim
            net2.add_node(v.element_id,
                          label=f"Victim: {v['age_group']}, {v['race']}, {v['sex']}",
                          color="#00BFFF")
            net2.add_edge(o.element_id, v.element_id, label=f"Count: {cnt}")

        net2.write_html("static/graph2.html")


# 3) PostgreSQL setup & charting

def setup_postgre():
    conn = get_pg_conn(); cur = conn.cursor()
    cur.execute("DROP TABLE IF EXISTS crime_table2;")
    cur.execute("""
      CREATE TABLE crime_table2 (
        cmplnt_num TEXT,
        ofns_desc   TEXT,
        boro_nm     TEXT,
        law_cat_cd  TEXT,
        cmplnt_fr_dt DATE
      );
    """)
    sample = []
    for boro in ["BRONX","BROOKLYN","MANHATTAN","QUEENS","STATEN ISLAND"]:
        docs = list(mongo_collection.find({
            "CMPLNT_NUM":{"$ne":None},
            "OFNS_DESC":{"$ne":None},
            "BORO_NM":boro,
            "LAW_CAT_CD":{"$ne":None},
            "CMPLNT_FR_DT":{"$ne":None}
        }, {
            "CMPLNT_NUM":1,"OFNS_DESC":1,"BORO_NM":1,
            "LAW_CAT_CD":1,"CMPLNT_FR_DT":1
        }).limit(2000))
        sample.extend(docs)

    df = pd.DataFrame(sample)
    for r in df.itertuples(index=False):
        cur.execute("""
          INSERT INTO crime_table2
            (cmplnt_num, ofns_desc, boro_nm, law_cat_cd, cmplnt_fr_dt)
          VALUES (%s,%s,%s,%s,%s)
        """, (str(r.CMPLNT_NUM), r.OFNS_DESC, r.BORO_NM, r.LAW_CAT_CD, r.CMPLNT_FR_DT))

    conn.commit(); cur.close(); conn.close()


def generate_pg_charts(selected):
    conn = get_pg_conn()
    queries = {
      "borough":        "SELECT boro_nm, COUNT(*) FROM crime_table2 GROUP BY boro_nm ORDER BY 2 DESC",
      "monthly_trends":"SELECT TO_CHAR(cmplnt_fr_dt,'Mon') AS m, COUNT(*) FROM crime_table2 GROUP BY m ORDER BY MIN(EXTRACT(MONTH FROM cmplnt_fr_dt))",
      "top_crimes":     "SELECT ofns_desc, COUNT(*) FROM crime_table2 GROUP BY ofns_desc ORDER BY 2 DESC LIMIT 5",
      "by_date":        "SELECT cmplnt_fr_dt, COUNT(*) FROM crime_table2 GROUP BY cmplnt_fr_dt ORDER BY 1"
    }
    titles = {
      "borough":        "Crime Count by Borough",
      "monthly_trends":"Monthly Crime Trends",
      "top_crimes":     "Top 5 Crime Types",
      "by_date":        "Daily Crime Pattern"
    }
    df = pd.read_sql(queries[selected], conn)
    conn.close()
    if selected in ["monthly_trends","by_date"]:
        fig = go.Figure([go.Scatter(x=df.iloc[:,0], y=df.iloc[:,1], mode="lines+markers")])
    elif selected == "borough":
        fig = go.Figure([go.Bar(x=df.iloc[:,0], y=df.iloc[:,1])])
    else:
        fig = go.Figure([go.Pie(labels=df.iloc[:,0], values=df.iloc[:,1])])
    fig.update_layout(title=titles[selected], height=400, template="plotly_white")
    return pio.to_html(fig, full_html=False)


# 4) Flask dashboard

@app.route("/")
def dashboard():
    borough        = request.args.get("borough", "")
    offense        = request.args.get("offense", "")
    page           = int(request.args.get("page", "1"))
    selected_chart = request.args.get("chart", "borough")
    per_page       = 10

    qry = {"Latitude": {"$ne": None}, "Longitude": {"$ne": None}}
    if borough:
        qry["BORO_NM"] = borough
    if offense:
        qry["OFNS_DESC"] = {"$regex": offense, "$options": "i"}
    docs = list(mongo_collection.find(qry, {
      "Latitude": 1, "Longitude": 1, "OFNS_DESC": 1,
      "BORO_NM": 1, "CMPLNT_NUM": 1
    }).limit(1000))
    table_results = docs[(page-1)*per_page : page*per_page]

    # Folium map
    m  = folium.Map(location=[40.7128, -74.0060], zoom_start=11)
    mc = MarkerCluster().add_to(m)
    for d in docs:
        try:
            folium.Marker(
              [float(d["Latitude"]), float(d["Longitude"])],
              popup=f"{d['OFNS_DESC']} ({d['BORO_NM']})"
            ).add_to(mc)
        except:
            pass

    top5   = Counter(d.get("OFNS_DESC","Unknown") for d in docs).most_common(5)
    labels = [x for x,_ in top5]
    values = [v for _,v in top5]

    chart_html = generate_pg_charts(selected_chart)
    map_html   = m._repr_html_()

    return render_template_string("""
<!DOCTYPE html>
<html><head>
  <title>NYC Crime Dashboard</title>
  <meta charset="utf-8">
  <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
  <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
</head><body class="bg-light">
<div class="container mt-4"><h1 class="mb-4">NYC Crime Dashboard</h1><div class="row">
  <div class="col-md-4">
    <form method="get" onchange="this.submit()">
      <div class="card mb-3 p-3"><h5>Filters</h5>
        <select name="borough" class="form-select mb-2">
          <option value="">-- All --</option>
          {% for b in ['MANHATTAN','BROOKLYN','QUEENS','BRONX','STATEN ISLAND'] %}
            <option value="{{b}}" {% if borough==b %}selected{%endif%}>{{b}}</option>
          {% endfor %}
        </select>
        <input name="offense" value="{{offense}}" class="form-control mb-2" placeholder="Crime keyword…">
      </div>
    </form>
    <div class="card mb-3 p-3"><h5>Top 5 Crimes</h5><canvas id="pieChart"></canvas></div>
    <div class="card mb-3 p-3"><h5>Time→Crime→Location</h5>
      <iframe src="/static/graph1.html" width="100%" height="300" style="border:1px solid #ddd;"></iframe>
    </div>
    <div class="card mb-3 p-3"><h5>Offense→Victim</h5>
      <iframe src="/static/graph2.html" width="100%" height="300" style="border:1px solid #ddd;"></iframe>
    </div>
  </div>
  <div class="col-md-8">
    <div class="card mb-3"><div class="card-body">{{ map_html|safe }}</div></div>
    <div class="card mb-3 p-3"><h5>Analytics Charts</h5>
      <select class="form-select mb-2" onchange="location.search='chart='+this.value">
        <option value="borough"        {% if selected_chart=='borough'%}selected{%endif%}>Crime Count by Borough</option>
        <option value="monthly_trends" {% if selected_chart=='monthly_trends'%}selected{%endif%}>Monthly Crime Trends</option>
        <option value="top_crimes"     {% if selected_chart=='top_crimes'%}selected{%endif%}>Top 5 Crime Types</option>
        <option value="by_date"        {% if selected_chart=='by_date'%}selected{%endif%}>Daily Crime Pattern</option>
      </select>
      {{ chart_html|safe }}
    </div>
    <div class="card p-3"><h5>Complaints</h5>
      <input id="searchBox" class="form-control mb-2" placeholder="Search…">
      <div style="max-height:300px; overflow-y:auto;">
        <table class="table table-sm"><thead><tr><th>ID</th><th>Boro</th><th>Offense</th></tr></thead><tbody id="tableBody">
          {% for r in table_results %}
            <tr><td>{{r['CMPLNT_NUM']}}</td><td>{{r['BORO_NM']}}</td><td>{{r['OFNS_DESC']}}</td></tr>
          {% endfor %}
        </tbody></table>
      </div>
    </div>
  </div>
</div></div>
<script src="https://kit.fontawesome.com/a076d05399.js"></script>
<script>
  new Chart(document.getElementById("pieChart"), {
    type:"pie", data:{ labels:{{labels|safe}}, datasets:[{ data:{{values|safe}} }] }
  });
  document.getElementById("searchBox").addEventListener("keyup",function(){
    const q=this.value.toLowerCase();
    document.querySelectorAll("#tableBody tr")
      .forEach(r=>r.style.display=r.textContent.toLowerCase().includes(q)?"":"none");
  });
</script>
</body></html>
""",
      borough=borough,
      offense=offense,
      selected_chart=selected_chart,
      map_html=map_html,
      chart_html=chart_html,
      table_results=table_results,
      labels=labels,
      values=values
    )

if __name__ == "__main__":
    generate_graph1()
    generate_graph2(limit=100)
    setup_postgre()
    app.run(debug=True, port=5036, use_reloader=False)


 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on http://127.0.0.1:5036
INFO:werkzeug:[33mPress CTRL+C to quit[0m

pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.

INFO:werkzeug:127.0.0.1 - - [04/May/2025 20:23:37] "GET / HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [04/May/2025 20:23:37] "GET /static/graph1.html HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [04/May/2025 20:23:37] "GET /static/graph2.html HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [04/May/2025 20:23:37] "[33mGET /static/lib/bindings/utils.js HTTP/1.1[0m" 404 -
INFO:werkzeug:127.0.0.1 - - [04/May/2025 20:23:37] "[33mGET /static/lib/bindings/utils.js HTTP/1.1[0m" 404 -
