In [None]:
import os
import sys

os.environ['HADOOP_HOME'] = 'C:/hadoop'
os.environ['PATH'] = os.environ.get('PATH', '') + ';C:/hadoop/bin'

In [None]:
from pyspark.sql.functions import log1p
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

In [None]:
from pyspark.sql import SparkSession




spark = (
    SparkSession.builder
    .appName("SmartCity-Phase3")
    .master("local[*]")
    .config(
        "spark.driver.extraClassPath",
        r"D:\Project_Smart\SmartCity_project\jars\postgresql-42.7.8.jar"
    )
    .config(
        "spark.executor.extraClassPath",
        r"D:\Project_Smart\SmartCity_project\jars\postgresql-42.7.8.jar"
    )
    .getOrCreate()
)

spark


In [None]:
spark.version

In [None]:
spark._jvm.java.lang.Class.forName("org.postgresql.Driver")


In [None]:
jdbc_url = "jdbc:postgresql://localhost:5432/smartcity_delhi"

db_properties = {
    "user": "postgres",
    "password": "toor",
    "driver": "org.postgresql.Driver"
}

grid_df = spark.read.jdbc(
    url=jdbc_url,
    table="ncr_grid_features",
    properties=db_properties
)

grid_df.printSchema()
grid_df.show(5)


In [None]:
grid_df.select(
    "poi_count",
    "area_sq_km",
    "poi_density"
).summary().show()



In [None]:
fe_df = grid_df.withColumn(
    "log_poi_density",
    log1p("poi_density")
)

In [None]:
assembler = VectorAssembler(
    inputCols=["poi_density", "log_poi_density"],
    outputCol="features_raw"
)

fe_df = assembler.transform(fe_df)

In [None]:


scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withMean=True,
    withStd=True
)

scaler_model = scaler.fit(fe_df)
scaled_df = scaler_model.transform(fe_df)


In [None]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(
    k=4,
    seed=42,
    featuresCol="features",
    predictionCol="cluster"
)

kmeans_model = kmeans.fit(scaled_df)
clustered_df = kmeans_model.transform(scaled_df)


In [None]:
clustered_df.groupBy("cluster") \
    .avg("poi_density") \
    .orderBy("avg(poi_density)") \
    .show()


In [None]:
clustered_df.select(
    "grid_id",
    "poi_density",
    "cluster"
).write.mode("overwrite").parquet(
    "phase3/outputs/ncr_grid_clusters"
)


In [None]:
clustered_df.select(
    "grid_id",
    "poi_density",
    "cluster"
).write.mode("overwrite").csv(
    "phase3/outputs/ncr_grid_clusters",
    header=True
)

In [None]:
clustered_df.groupBy("cluster").count().orderBy("cluster").show()


In [None]:
clustered_df.groupBy("cluster") \
    .avg("poi_density") \
    .orderBy("cluster") \
    .show(truncate=False)

In [None]:
clustered_df.select(
    "grid_id",
    "cluster"
).coalesce(1).write \
 .mode("overwrite") \
 .option("header", True) \
 .csv("phase3/outputs/ncr_grid_clusters_csv")


In [None]:
import matplotlib.pyplot as plt

pdf = clustered_df.select("poi_density", "cluster").toPandas()

plt.scatter(pdf["poi_density"], pdf["cluster"])
plt.xlabel("POI Density")
plt.ylabel("Cluster")
plt.show()


In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

pdf = clustered_df.select("cluster", "poi_density").toPandas()

plt.figure()
sns.boxplot(x="cluster", y="poi_density", data=pdf)
plt.title("POI Density Distribution by Cluster")
plt.show()


In [None]:
import sys 
print(sys.executable)

In [None]:
plt.figure()
sns.histplot(data=pdf, x="poi_density", hue="cluster", bins=40, kde=True)
plt.title("POI Density Histogram by Cluster")
plt.show()


In [None]:
cluster_counts = pdf["cluster"].value_counts().sort_index()

plt.figure()
cluster_counts.plot(kind="bar")
plt.title("Grid Count per Cluster")
plt.xlabel("Cluster")
plt.ylabel("Number of Grids")
plt.show()


In [None]:
grid_df = spark.read.jdbc(
    url=jdbc_url,
    table="ncr_grid_features",
    properties=db_properties
)

grid_df.printSchema()


In [None]:
from pyspark.sql.functions import col

feature_df = grid_df.select(
    "grid_id",
    col("poi_density").alias("feature_poi_density")
)
feature_df.show(5)

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["feature_poi_density"],
    outputCol="features"
)

assembled_df = assembler.transform(feature_df)


In [None]:
assembled_df.show()

In [None]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withMean=True,
    withStd=True
)

scaler_model = scaler.fit(assembled_df)
scaled_df = scaler_model.transform(assembled_df)
scaled_df.show(5)

In [None]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(
    featuresCol="scaled_features",
    predictionCol="cluster",
    k=3,
    seed=42
)

kmeans_model = kmeans.fit(scaled_df)
clustered_df = kmeans_model.transform(scaled_df)


In [None]:

clustered_df.select("grid_id", "cluster").show(5)


In [None]:
final_df = grid_df.join(
    clustered_df.select("grid_id", "cluster"),
    on="grid_id",
    how="inner"
)
final_df.select("grid_id", "poi_density", "cluster").show(5)

In [None]:
from pyspark.sql import functions as F

atm_candidates = (
    final_df
    .filter(F.col("cluster") == 0)
    .withColumn("atm_score", F.col("poi_density"))
    .orderBy(F.desc("atm_score"))
)

atm_candidates.select(
    "grid_id", "poi_density", "atm_score"
).show(10)


In [None]:
mall_candidates = (
    final_df
    .filter(F.col("cluster") == 3)
    .withColumn(
        "mall_score",
        F.col("poi_density")
    )
    .orderBy(F.desc("mall_score"))
)

mall_candidates.select(
    "grid_id", "poi_density", "mall_score"
).show()


In [None]:
final_df.printSchema()


In [None]:
hospital_candidates = (
    final_df
    .filter(F.col("cluster").isin(0, 1))
    .withColumn(
        "hospital_score",
        F.when(F.col("cluster") == 0, F.col("poi_density") * 1.2)
         .otherwise(F.col("poi_density") * 0.8)
    )
    .orderBy(F.desc("hospital_score"))
)

hospital_candidates.select(
    "grid_id", "cluster", "poi_density", "hospital_score"
).show(10)


In [None]:
final_df.groupBy("cluster").count().orderBy("cluster").show()


In [None]:
final_df.filter(F.col("cluster") == 3) \
    .select("grid_id", "poi_density") \
    .orderBy(F.desc("poi_density")) \
    .show(20)


In [None]:
quantiles = final_df.approxQuantile(
    "poi_density", [0.6, 0.9], 0.01
)

low, high = quantiles


In [None]:
mall_candidates = (
    final_df
    .filter(
        (F.col("poi_density") > low) &
        (F.col("poi_density") < high)
    )
    .withColumn("mall_score", F.col("poi_density"))
    .orderBy(F.desc("mall_score"))
)


In [None]:
mall_candidates.select(
    "grid_id", "poi_density", "mall_score"
).show(10)

In [None]:
grid_df = spark.read.jdbc(
    url=jdbc_url,
    table="ncr_grid_features",
    properties=db_properties
)


In [None]:
from pyspark.sql import functions as F

mall_candidates = (
    grid_df
    .withColumn(
        "mall_score",
        (F.col("poi_density") * 1.2)
        - (F.col("mall_density") * 2.0)
        - (F.col("neighbor_mall_density") * 1.5)
    )
    .orderBy(F.desc("mall_score"))
)

mall_candidates.select(
    "grid_id",
    "poi_density",
    "mall_density",
    "neighbor_mall_density",
    "mall_score"
).show(10)


In [None]:
atm_candidates = (
    grid_df
    .withColumn(
        "atm_score",
        (F.col("poi_density") * 0.8)
        - (F.col("atm_density") * 1.5)
    )
    .orderBy(F.desc("atm_score"))
)

atm_candidates.select(
    "grid_id", "poi_density", "atm_density", "atm_score"
).show(10)


In [None]:
hospital_candidates = (
    grid_df
    .withColumn(
        "hospital_score",
        (F.col("poi_density") * 0.6)
        - (F.col("hospital_density") * 2.0)
    )
    .orderBy(F.desc("hospital_score"))
)

hospital_candidates.select(
    "grid_id", "poi_density", "hospital_density", "hospital_score"
).show(10)


In [87]:
osm_pois_df = spark.read.jdbc(
    url="jdbc:postgresql://localhost:5432/smartcity_delhi",
    table="osm_pois_ncr",
    properties={
        "user": "postgres",
        "password": "toor",
        "driver": "org.postgresql.Driver"
    }
)


In [105]:
recommend_location(final_df, "atm", top_n=10).show()


+-------+------------------+------------------+
|grid_id|       poi_density|             score|
+-------+------------------+------------------+
|    590| 444.2245601402223|355.37964811217785|
|    670| 411.0459119564082| 328.8367295651266|
|    799|382.94019142066236| 306.3521531365299|
|    256| 373.6559588400056| 298.9247670720045|
|    801| 354.3901763650551| 283.5121410920441|
|    610| 334.2999980147654| 267.4399984118123|
|    642| 314.8817372092187|  251.905389767375|
|    770|312.65071681462115|250.12057345169694|
|    591|303.57687901784055|242.86150321427246|
|    643|271.98367998211467|217.58694398569173|
+-------+------------------+------------------+



In [111]:
recommend_location(final_df, "hospital", top_n=10).show()


+-------+------------------+------------------+
|grid_id|       poi_density|             score|
+-------+------------------+------------------+
|    590| 444.2245601402223| 266.5347360841334|
|    670| 411.0459119564082| 246.6275471738449|
|    799|382.94019142066236| 229.7641148523974|
|    256| 373.6559588400056|224.19357530400336|
|    801| 354.3901763650551|212.63410581903304|
|    610| 334.2999980147654|200.57999880885922|
|    642| 314.8817372092187|188.92904232553124|
|    770|312.65071681462115|187.59043008877268|
|    591|303.57687901784055|182.14612741070434|
|    643|271.98367998211467| 163.1902079892688|
+-------+------------------+------------------+



In [109]:
final_df

DataFrame[grid_id: bigint, geom: string, poi_count: bigint, area_sq_km: double, poi_density: double, atm_poi_count: int, hospital_poi_count: int, mall_poi_count: int, atm_density: double, hospital_density: double, mall_density: double, neighbor_mall_density: double, atm_count: bigint, mall_count: bigint, hospital_count: bigint]

In [None]:
from pyspark.sql import functions as F

grid_df = spark.read.jdbc(
    url="jdbc:postgresql://localhost:5432/smartcity_delhi",
    table="ncr_grid_features",
    properties={
        "user": "postgres",
        "password": "toor",
        "driver": "org.postgresql.Driver"
    }
)


In [None]:
osm_pois_df = spark.read.jdbc(
    url="jdbc:postgresql://localhost:5432/smartcity_delhi",
    table="osm_pois_ncr",
    properties={
        "user": "postgres",
        "password": "toor",
        "driver": "org.postgresql.Driver"
    }
)


In [88]:
atm_pois = osm_pois_df.filter(F.col("amenity") == "atm")


In [89]:
mall_pois = osm_pois_df.filter(
    (F.col("shop") == "mall") | (F.col("amenity") == "mall")
)


In [90]:
hospital_pois = osm_pois_df.filter(
    (F.col("amenity") == "hospital") |
    (F.col("amenity") == "clinic")
)


In [95]:
atm_df = spark.read.jdbc(
    url=jdbc_url,
    table="grid_atm_counts",
    properties=db_properties
)

In [96]:
mall_df = spark.read.jdbc(
    url=jdbc_url,
    table="grid_mall_counts",
    properties=db_properties
)


In [98]:
hospital_df = spark.read.jdbc(
    url=jdbc_url,
    table="grid_hospital_counts",
    properties=db_properties
)

In [102]:
final_df = (
    grid_df
    .join(atm_df, "grid_id", "left")
    .join(mall_df, "grid_id", "left")
    .join(hospital_df, "grid_id", "left")
    .fillna(0)
)


In [103]:
final_df = (
    final_df
    .withColumn("atm_density", F.col("atm_count") / F.col("area_sq_km"))
    .withColumn("mall_density", F.col("mall_count") / F.col("area_sq_km"))
    .withColumn("hospital_density", F.col("hospital_count") / F.col("area_sq_km"))
)


In [99]:
atm_df.printSchema()
atm_df.show(5)

mall_df.printSchema()
hospital_df.printSchema()


root
 |-- grid_id: long (nullable = true)
 |-- atm_count: long (nullable = true)

+-------+---------+
|grid_id|atm_count|
+-------+---------+
|      1|        0|
|      2|        0|
|      3|        0|
|      4|        0|
|      5|        0|
+-------+---------+
only showing top 5 rows
root
 |-- grid_id: long (nullable = true)
 |-- mall_count: long (nullable = true)

root
 |-- grid_id: long (nullable = true)
 |-- hospital_count: long (nullable = true)



In [None]:
atm_df.printSchema()
atm_df.show(5)

In [None]:
mall_density_df = (
    mall_pois
    .join(grid_df, F.expr("ST_Within(mall_pois.geom, grid_df.geom)"))
    .groupBy("grid_id")
    .count()
    .withColumnRenamed("count", "mall_count")
)


In [None]:
#
#
#from pyspark.sql import functions as F
#
#def recommend_location(
#    df,
#    facility_type,
#    top_n=10
#):
#    """
#    Recommend best grid locations for a facility.
#
#    Parameters:
#    - df: Spark DataFrame (final_df)
#    - facility_type: 'atm' | 'mall' | 'hospital'
#    - top_n: number of recommendations
#
#    Returns:
#    - Spark DataFrame with ranked recommendations
#    """
#
#    facility_type = facility_type.lower()
#
#    if facility_type == "atm":
#        scored_df = (
#            df.withColumn(
#                "score",
#                (F.col("poi_density") * 0.8)
#                - (F.col("atm_density") * 1.5)
#            )
#        )
#
#    elif facility_type == "mall":
#        scored_df = (
#            df.withColumn(
#                "score",
#                (F.col("poi_density") * 1.2)
#                - (F.col("mall_density") * 2.0)
#                - (F.col("neighbor_mall_density") * 1.5)
#            )
#        )
#
#    elif facility_type == "hospital":
#        scored_df = (
#            df.withColumn(
#                "score",
#                (F.col("poi_density") * 0.6)
#                - (F.col("hospital_density") * 2.5)
#            )
#        )
#
#    else:
#        raise ValueError(
#            "Invalid facility_type. Choose from 'atm', 'mall', 'hospital'"
#        )
#
#    return (
#        scored_df
#        .orderBy(F.desc("score"))
#        .select(
#            "grid_id",
#            "poi_density",
#            "score"
#        )
#        .limit(top_n)
#    )
#

In [108]:
recommend_location(final_df, "mall", top_n=10).show()
recommend_location(final_df, "hospital", top_n=10).show()       

+-------+------------------+------------------+
|grid_id|       poi_density|             score|
+-------+------------------+------------------+
|    590| 444.2245601402223| 533.0694721682668|
|    670| 411.0459119564082| 493.2550943476898|
|    799|382.94019142066236| 459.5282297047948|
|    256| 373.6559588400056| 448.3871506080067|
|    801| 354.3901763650551| 425.2682116380661|
|    610| 334.2999980147654|401.15999761771843|
|    642| 314.8817372092187| 377.8580846510625|
|    770|312.65071681462115|375.18086017754536|
|    591|303.57687901784055| 364.2922548214087|
|    643|271.98367998211467| 326.3804159785376|
+-------+------------------+------------------+

+-------+------------------+------------------+
|grid_id|       poi_density|             score|
+-------+------------------+------------------+
|    590| 444.2245601402223| 266.5347360841334|
|    670| 411.0459119564082| 246.6275471738449|
|    799|382.94019142066236| 229.7641148523974|
|    256| 373.6559588400056|224.1935753

In [112]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Define neighborhood window
neighbor_window = Window.partitionBy().orderBy("grid_id").rowsBetween(-2, 2)

final_df = final_df.withColumn(
    "neighbor_poi_density",
    F.avg("poi_density").over(neighbor_window)
)


In [113]:
atm_df = final_df.withColumn(
    "score",
    (F.col("poi_density") * 0.8)
    - (F.col("neighbor_poi_density") * 1.2)
)


In [114]:
hospital_df = final_df.withColumn(
    "score",
    (F.col("poi_density") * 0.5)
    - (F.col("neighbor_poi_density") * 0.6)
)


In [120]:
mall_df = final_df.withColumn(
    "score",
    (F.col("poi_density") * 0.7)
    - (F.col("neighbor_poi_density") * 1.8)
)


In [121]:
def add_reason(df, facility):
    return df.withColumn(
        "reason",
        F.concat(
            F.lit(f"Recommended for {facility.upper()} because demand is "),
            F.round("poi_density", 1),
            F.lit(" and nearby competition is low")
        )
    )


In [122]:

atm_final = add_reason(atm_df, "atm")
hospital_final = add_reason(hospital_df, "hospital")
mall_final = add_reason(mall_df, "mall")


In [123]:
def recommend_location(df, facility_type, top_n=10):
    facility_type = facility_type.lower()

    if facility_type == "atm":
        scored = df.withColumn(
            "score",
            (F.col("poi_density") * 0.8)
            - (F.col("neighbor_poi_density") * 1.2)
        )

    elif facility_type == "hospital":
        scored = df.withColumn(
            "score",
            (F.col("poi_density") * 0.5)
            - (F.col("neighbor_poi_density") * 0.6)
        )

    elif facility_type == "mall":
        scored = df.withColumn(
            "score",
            (F.col("poi_density") * 0.7)
            - (F.col("neighbor_poi_density") * 1.8)
        )

    else:
        raise ValueError("Unknown facility type")

    return (
        scored
        .orderBy(F.desc("score"))
        .limit(top_n)
        .select("grid_id", "poi_density", "neighbor_poi_density", "score")
    )


In [125]:
recommend_location(final_df, "mall", top_n=10).show()
recommend_location(final_df, "hospital", top_n=10).show()       

+-------+------------------+--------------------+------------------+
|grid_id|       poi_density|neighbor_poi_density|             score|
+-------+------------------+--------------------+------------------+
|    770|312.65071681462115|   74.77547543404418| 84.25964598895524|
|    256| 373.6559588400056|   98.94428377635674| 83.45946039056179|
|    405|270.76270794634394|   66.12626571230453| 70.50661728029257|
|    610| 334.2999980147654|   96.26198800980387| 60.73842019268878|
|    829|259.20101392078845|   76.32525239062485|44.055255441427164|
|    286|145.81695954750032|    33.5906360251631| 41.60872683795664|
|    675|109.34647820937192|   27.33677448192384|27.336340679097425|
|    316| 84.62591402326048|  19.268824459586924|24.554255789025866|
|    375|111.94996578550759|   30.19894868559839|  24.0068684157782|
|    740| 74.25454524347252|  17.456719169025792|20.556087166184337|
+-------+------------------+--------------------+------------------+

+-------+------------------+-----