In [23]:
!pip install pyspark




In [24]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession \
        .builder \
        .appName("Test") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

In [25]:
df=spark.read.csv("/content/Combined.csv",header=True,inferSchema=True)
print(df.dtypes)
df.printSchema()

[('Town', 'string'), ('Year', 'int'), ('Residential Type', 'string'), ('Serial Number', 'int'), ('List Year', 'int'), ('Date Recorded', 'date'), ('Address', 'string'), ('Assessed Value', 'double'), ('Sale Amount', 'double'), ('Sales Ratio', 'double'), ('Location', 'string'), ('Total Crimes Committed', 'double'), ('Average Crimes per 100,000', 'double'), ('Number of Schools', 'int'), ('Number of Healthcare facilities', 'int')]
root
 |-- Town: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Residential Type: string (nullable = true)
 |-- Serial Number: integer (nullable = true)
 |-- List Year: integer (nullable = true)
 |-- Date Recorded: date (nullable = true)
 |-- Address: string (nullable = true)
 |-- Assessed Value: double (nullable = true)
 |-- Sale Amount: double (nullable = true)
 |-- Sales Ratio: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- Total Crimes Committed: double (nullable = true)
 |-- Average Crimes per 100,000: double (nullabl

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import round, mean, when
from pyspark.sql.functions import to_date, date_format

In [27]:
# Define a list of desired property types
desired_types = ["Condo", "Single Family", "Two Family", "Three Family", "Four Family"]
# Filter the DataFrame based on the list
df = df.filter(df["Residential Type"].isin(desired_types))
print(df.count())

60992


In [28]:
print(df.distinct().count())

60842


In [29]:
for col in df.columns:
    null_count = df.filter(df[col].isNull() | (df[col] == "")).count()
    print(f"Column: {col}\t Null Count: {null_count}")
total_count = df.count()
print(f"Total Count: {total_count}")

Column: Town	 Null Count: 0
Column: Year	 Null Count: 0
Column: Residential Type	 Null Count: 0
Column: Serial Number	 Null Count: 0
Column: List Year	 Null Count: 0
Column: Date Recorded	 Null Count: 0
Column: Address	 Null Count: 0
Column: Assessed Value	 Null Count: 0
Column: Sale Amount	 Null Count: 0
Column: Sales Ratio	 Null Count: 0
Column: Location	 Null Count: 0
Column: Total Crimes Committed	 Null Count: 0
Column: Average Crimes per 100,000	 Null Count: 0
Column: Number of Schools	 Null Count: 0
Column: Number of Healthcare facilities	 Null Count: 0
Total Count: 60992


In [32]:
df.show(20)

+------------+----+----------------+-------------+---------+-------------+-------------------+--------------+-----------+-----------+--------------------+----------------------+--------------------------+-----------------+-------------------------------+
|        Town|Year|Residential Type|Serial Number|List Year|Date Recorded|            Address|Assessed Value|Sale Amount|Sales Ratio|            Location|Total Crimes Committed|Average Crimes per 100,000|Number of Schools|Number of Healthcare facilities|
+------------+----+----------------+-------------+---------+-------------+-------------------+--------------+-----------+-----------+--------------------+----------------------+--------------------------+-----------------+-------------------------------+
|        Avon|2010|           Condo|        90146|     2009|   2010-04-12|      3 WILLOW LANE|      125870.0|   168500.0|       0.75|POINT (-72.875838...|                 577.0|                    315.73|                5|             

In [33]:
# Group records by Town, List Year, and Residential Type, and calculate the average Assessed Value
avg_assessed_value = df.groupby(["Town", "List Year", "Residential Type"]).agg(round(mean(df["Assessed Value"]), 0).alias("Avg Assessed Value"))

# Show the result
avg_assessed_value.show()


+----------------+---------+----------------+------------------+
|            Town|List Year|Residential Type|Avg Assessed Value|
+----------------+---------+----------------+------------------+
|       Naugatuck|     2009|   Single Family|          165287.0|
|       Naugatuck|     2009|           Condo|          105142.0|
|   South Windsor|     2011|           Condo|          139654.0|
|          Oxford|     2012|   Single Family|          209362.0|
|          Easton|     2013|   Single Family|          472500.0|
|North Stonington|     2013|   Single Family|          189941.0|
|        Stafford|     2013|   Single Family|          132853.0|
|      Middlebury|     2013|           Condo|          125411.0|
|    East Hampton|     2014|   Single Family|          191122.0|
|         Wolcott|     2014|   Single Family|          176561.0|
|        Cheshire|     2011|   Single Family|          259582.0|
|    Old Saybrook|     2011|           Condo|          311500.0|
|   East Hartford|     20

In [34]:
# Join the original DataFrame with the average Assessed Value DataFrame
df = df.join(avg_assessed_value, on=["Town", "List Year", "Residential Type"], how="left")

# Replace 0 values in Assessed Value column with the average Assessed Value
df = df.withColumn("Assessed Value", when(df["Assessed Value"] == 0, df["Avg Assessed Value"]).otherwise(df["Assessed Value"]))

# Drop the Avg Assessed Value column
df = df.drop("Avg Assessed Value")




In [36]:
df.show()

+------------+---------+----------------+----+-------------+-------------+-------------------+--------------+-----------+-----------+--------------------+----------------------+--------------------------+-----------------+-------------------------------+
|        Town|List Year|Residential Type|Year|Serial Number|Date Recorded|            Address|Assessed Value|Sale Amount|Sales Ratio|            Location|Total Crimes Committed|Average Crimes per 100,000|Number of Schools|Number of Healthcare facilities|
+------------+---------+----------------+----+-------------+-------------+-------------------+--------------+-----------+-----------+--------------------+----------------------+--------------------------+-----------------+-------------------------------+
|        Avon|     2009|           Condo|2010|        90146|   2010-04-12|      3 WILLOW LANE|      125870.0|   168500.0|       0.75|POINT (-72.875838...|                 577.0|                    315.73|                5|             

In [37]:
# Filter rows where Sale Amount is 0
zero_sale_value = df.filter(df["Sale Amount"] == 0).count()
print("Number of rows where Sale Amount is 0:", zero_sale_value)

# Group records by Town and calculate the average Sale Amount
avg_sale_value = df.groupby(["Town", "List Year", "Residential Type"]).agg(round(mean(df["Sale Amount"]), 0).alias("Avg Sale Amount"))

# Join the original DataFrame with the average Sale Amount DataFrame
df = df.join(avg_sale_value, on=["Town", "List Year", "Residential Type"], how="left")

# Replace 0 values in Sale Amount column with the average Sale Value
df = df.withColumn("Sale Amount", when(df["Sale Amount"] == 0, df["Avg Sale Amount"]).otherwise(df["Sale Amount"]))

# Drop the Avg Sale Value column
df = df.drop("Avg Sale Amount")

df.show(truncate=False)

Number of rows where Sale Amount is 0: 6
+------------+---------+----------------+----+-------------+-------------+-------------------+--------------+-----------+-----------+---------------------------------------------+----------------------+--------------------------+-----------------+-------------------------------+
|Town        |List Year|Residential Type|Year|Serial Number|Date Recorded|Address            |Assessed Value|Sale Amount|Sales Ratio|Location                                     |Total Crimes Committed|Average Crimes per 100,000|Number of Schools|Number of Healthcare facilities|
+------------+---------+----------------+----+-------------+-------------+-------------------+--------------+-----------+-----------+---------------------------------------------+----------------------+--------------------------+-----------------+-------------------------------+
|Avon        |2009     |Condo           |2010|90146        |2010-04-12   |3 WILLOW LANE      |125870.0      |168500.0  

In [38]:
from pyspark.sql.functions import round

# Calculate Sales Ratio and round to two decimal places
df = df.withColumn("Sales Ratio", round(df["Assessed Value"] / df["Sale Amount"], 2))

# Display the first 5 rows
df.show(5, truncate=False)


+----------+---------+----------------+----+-------------+-------------+---------------+--------------+-----------+-----------+--------------------------------------------+----------------------+--------------------------+-----------------+-------------------------------+
|Town      |List Year|Residential Type|Year|Serial Number|Date Recorded|Address        |Assessed Value|Sale Amount|Sales Ratio|Location                                    |Total Crimes Committed|Average Crimes per 100,000|Number of Schools|Number of Healthcare facilities|
+----------+---------+----------------+----+-------------+-------------+---------------+--------------+-----------+-----------+--------------------------------------------+----------------------+--------------------------+-----------------+-------------------------------+
|Avon      |2009     |Condo           |2010|90146        |2010-04-12   |3 WILLOW LANE  |125870.0      |168500.0   |0.75       |POINT (-72.8758384 41.7696552)              |577.0    

In [39]:
# Delete rows where the Town field has value "***Unknown***"
df = df.filter(df["Town"] != "***Unknown***")

In [40]:
from pyspark.sql.functions import count

# Group by List Year and calculate the total houses listed
year_wise_listing = df.groupBy("List Year").agg(count("*").alias("Total Houses Listed"))

# Order the result by List Year
year_wise_listing = year_wise_listing.orderBy("List Year")

# Display the result
year_wise_listing.show()


+---------+-------------------+
|List Year|Total Houses Listed|
+---------+-------------------+
|     2009|               5708|
|     2010|               6676|
|     2011|               6080|
|     2012|               6503|
|     2013|               7102|
|     2014|               9170|
|     2015|               8866|
|     2016|               8945|
|     2017|               1942|
+---------+-------------------+



In [41]:
from pyspark.sql.functions import count

# Group by List Year and Residential Type, and calculate the total houses listed
year_wise_listing_residential_type = df.groupBy("List Year", "Residential Type") \
                                       .agg(count("*").alias("Total Houses Listed"))

# Order the result by List Year
year_wise_listing_residential_type = year_wise_listing_residential_type.orderBy("List Year")

# Display the result
year_wise_listing_residential_type.show()


+---------+----------------+-------------------+
|List Year|Residential Type|Total Houses Listed|
+---------+----------------+-------------------+
|     2009|           Condo|               1113|
|     2009|   Single Family|               4595|
|     2010|   Single Family|               5383|
|     2010|           Condo|               1293|
|     2011|           Condo|               1124|
|     2011|   Single Family|               4956|
|     2012|           Condo|               1134|
|     2012|   Single Family|               5369|
|     2013|   Single Family|               5810|
|     2013|           Condo|               1292|
|     2014|           Condo|               1625|
|     2014|   Single Family|               7545|
|     2015|   Single Family|               7224|
|     2015|           Condo|               1642|
|     2016|           Condo|               1774|
|     2016|   Single Family|               7171|
|     2017|           Condo|                318|
|     2017|   Single

In [55]:
from pyspark.sql.functions import col, count, sum, avg, when
from pyspark.sql import functions as F

house_types = ["Condo", "Single Family"]
df = df.filter(col('Residential Type').isin(house_types))
house_type_sales_town_year = df.groupBy("Town", "List Year", "Residential Type").agg(
    F.round(F.avg("Assessed Value"), 2).alias("Average Assessed Value"),
    F.round(F.avg("Sale Amount"), 2).alias("Average Sale Amount"),
    F.round(F.avg("Sales Ratio"), 2).alias("Average Sale Ratio"),
    count("*").alias("Houses Sold")
)

condo_data = house_type_sales_town_year.groupBy("Town", "List Year") \
    .agg(
        F.sum(F.when(house_type_sales_town_year["Residential Type"] == "Condo", house_type_sales_town_year["Houses Sold"])).alias("Houses Sold(Condo)"),
        F.avg(F.when(house_type_sales_town_year["Residential Type"] == "Condo", house_type_sales_town_year["Average Assessed Value"])).alias("Condo_Average Assessed Value"),
        F.avg(F.when(house_type_sales_town_year["Residential Type"] == "Condo", house_type_sales_town_year["Average Sale Amount"])).alias("Condo_Average Sale Amount"),
        F.avg(F.when(house_type_sales_town_year["Residential Type"] == "Condo", house_type_sales_town_year["Average Sale Ratio"])).alias("Condo_Average Sale Ratio")

    )
single_family_data = house_type_sales_town_year.groupBy("Town", "List Year") \
    .agg(
        F.sum(F.when(house_type_sales_town_year["Residential Type"] == "Single Family", house_type_sales_town_year["Houses Sold"])).alias("Houses Sold(Single)"),
        F.avg(F.when(house_type_sales_town_year["Residential Type"] == "Single Family", house_type_sales_town_year["Average Assessed Value"])).alias("Single Family_Average Assessed Value"),
        F.avg(F.when(house_type_sales_town_year["Residential Type"] == "Single Family", house_type_sales_town_year["Average Sale Amount"])).alias("Single Family_Average Sale Amount"),
        F.avg(F.when(house_type_sales_town_year["Residential Type"] == "Single Family", house_type_sales_town_year["Average Sale Ratio"])).alias("Single Family_Average Sale Ratio")
    )

final_df = condo_data.join(single_family_data, ["Town", "List Year"], "inner")
final_df.show()







+------------+---------+------------------+----------------------------+-------------------------+------------------------+-------------------+------------------------------------+---------------------------------+--------------------------------+
|        Town|List Year|Houses Sold(Condo)|Condo_Average Assessed Value|Condo_Average Sale Amount|Condo_Average Sale Ratio|Houses Sold(Single)|Single Family_Average Assessed Value|Single Family_Average Sale Amount|Single Family_Average Sale Ratio|
+------------+---------+------------------+----------------------------+-------------------------+------------------------+-------------------+------------------------------------+---------------------------------+--------------------------------+
|      Hamden|     2010|                15|                    125896.0|                 167860.0|                    0.82|                 66|                           166972.41|                        203731.21|                            0.96|
|   Kill

In [56]:
# Group by "List Year" and count the number of unique towns
town_counts_by_year = final_df.groupBy("List Year").agg(count("Town").alias("Number of Towns"))

# Order the result by "List Year"
town_counts_by_year = town_counts_by_year.orderBy("List Year")

# Show the result
town_counts_by_year.show()

+---------+---------------+
|List Year|Number of Towns|
+---------+---------------+
|     2009|            155|
|     2010|            150|
|     2011|            134|
|     2012|            138|
|     2013|            127|
|     2014|            156|
|     2015|            139|
|     2016|            142|
|     2017|            133|
+---------+---------------+



In [57]:
from pyspark.sql.functions import count

# Group by "List Year", "Houses Sold(Condo)", and "Houses Sold(Single)", and count the number of unique towns
town_counts_by_year_and_houses_sold = final_df.groupBy(["List Year", "Houses Sold(Condo)", "Houses Sold(Single)"]) \
    .agg(count("Town").alias("Number of Towns"))

# Order the result by "List Year"
town_counts_by_year_and_houses_sold = town_counts_by_year_and_houses_sold.orderBy("List Year")

# Show the result
town_counts_by_year_and_houses_sold.show()

+---------+------------------+-------------------+---------------+
|List Year|Houses Sold(Condo)|Houses Sold(Single)|Number of Towns|
+---------+------------------+-------------------+---------------+
|     2009|                 5|                 19|              1|
|     2009|                 7|                 20|              1|
|     2009|                 5|                 11|              1|
|     2009|              NULL|                  4|              4|
|     2009|              NULL|                 13|              2|
|     2009|                 6|                 33|              1|
|     2009|              NULL|                 10|              3|
|     2009|                24|                 38|              1|
|     2009|                 2|                  4|              1|
|     2009|              NULL|                  7|              4|
|     2009|                13|                 60|              1|
|     2009|              NULL|                 19|            

In [58]:
from pyspark.sql.functions import col, count, sum

# Group by "Town" and aggregate the data
df_year_town = final_df.groupBy("Town").agg(
    count(col("List Year")).alias("Number of Years"),
    sum(col("Houses Sold(Condo)")).alias("Total Condo Homes"),
    sum(col("Houses Sold(Single)")).alias("Total Single Homes")
).orderBy("Town")

# Display the DataFrame
df_year_town.show()


+------------+---------------+-----------------+------------------+
|        Town|Number of Years|Total Condo Homes|Total Single Homes|
+------------+---------------+-----------------+------------------+
|     Andover|              8|             NULL|                60|
|     Ansonia|              8|                6|               200|
|     Ashford|              7|                3|               101|
|        Avon|              8|              207|               410|
| Barkhamsted|              8|             NULL|                74|
|Beacon Falls|              7|               24|                99|
|      Berlin|              8|               88|               280|
|     Bethany|              8|             NULL|               117|
|      Bethel|              7|              138|               238|
|   Bethlehem|              8|                1|                60|
|  Bloomfield|              7|               49|               341|
|      Bolton|              9|             NULL|

In [59]:
print(final_df.count())

1274


In [60]:
final_df.show()

+------------+---------+------------------+----------------------------+-------------------------+------------------------+-------------------+------------------------------------+---------------------------------+--------------------------------+
|        Town|List Year|Houses Sold(Condo)|Condo_Average Assessed Value|Condo_Average Sale Amount|Condo_Average Sale Ratio|Houses Sold(Single)|Single Family_Average Assessed Value|Single Family_Average Sale Amount|Single Family_Average Sale Ratio|
+------------+---------+------------------+----------------------------+-------------------------+------------------------+-------------------+------------------------------------+---------------------------------+--------------------------------+
|      Hamden|     2010|                15|                    125896.0|                 167860.0|                    0.82|                 66|                           166972.41|                        203731.21|                            0.96|
|   Kill

In [61]:
final_df = df.join(final_df, ["Town", "List Year", ], "inner")

final_df.show(truncate=False)

+------------+---------+----------------+----+-------------+-------------+-------------------+--------------+-----------+-----------+---------------------------------------------+----------------------+--------------------------+-----------------+-------------------------------+------------------+----------------------------+-------------------------+------------------------+-------------------+------------------------------------+---------------------------------+--------------------------------+
|Town        |List Year|Residential Type|Year|Serial Number|Date Recorded|Address            |Assessed Value|Sale Amount|Sales Ratio|Location                                     |Total Crimes Committed|Average Crimes per 100,000|Number of Schools|Number of Healthcare facilities|Houses Sold(Condo)|Condo_Average Assessed Value|Condo_Average Sale Amount|Condo_Average Sale Ratio|Houses Sold(Single)|Single Family_Average Assessed Value|Single Family_Average Sale Amount|Single Family_Average Sale R

In [62]:
print(final_df.count())

60992


In [70]:
# Group by town and year, and aggregate the required columns
aggregated_df = final_df.groupBy("Town", "List Year") \
    .agg(avg("Assessed Value").alias("Avg Assessed Value"),
         avg("Sale Amount").alias("Avg Sale Amount"),
         avg("Sales Ratio").alias("Avg Sales Ratio"),
         avg("Total Crimes Committed").alias("Avg Total Crimes"),
         avg("Average Crimes per 100,000").alias("Avg Crimes per 100,000"),
         avg("Number of Schools").alias("Avg Number of Schools"),
         avg("Number of Healthcare facilities").alias("Avg Number of Healthcare facilities"),
         avg("Houses Sold(Condo)").alias("Avg Houses Sold (Condo)"),
         avg("Condo_Average Assessed Value").alias("Avg Condo Assessed Value"),
         avg("Condo_Average Sale Amount").alias("Avg Condo Sale Amount"),
         avg("Condo_Average Sale Ratio").alias("Avg Condo Sale Ratio"),
         avg("Houses Sold(Single)").alias("Avg Houses Sold (Single)"),
         avg("Single Family_Average Assessed Value").alias("Avg Single Family Assessed Value"),
         avg("Single Family_Average Sale Amount").alias("Avg Single Family Sale Amount"),
         avg("Single Family_Average Sale Ratio").alias("Avg Single Family Sale Ratio"))

# Show the aggregated DataFrame
aggregated_df.show()

+------------+---------+------------------+------------------+------------------+------------------+----------------------+---------------------+-----------------------------------+-----------------------+------------------------+---------------------+--------------------+------------------------+--------------------------------+-----------------------------+----------------------------+
|        Town|List Year|Avg Assessed Value|   Avg Sale Amount|   Avg Sales Ratio|  Avg Total Crimes|Avg Crimes per 100,000|Avg Number of Schools|Avg Number of Healthcare facilities|Avg Houses Sold (Condo)|Avg Condo Assessed Value|Avg Condo Sale Amount|Avg Condo Sale Ratio|Avg Houses Sold (Single)|Avg Single Family Assessed Value|Avg Single Family Sale Amount|Avg Single Family Sale Ratio|
+------------+---------+------------------+------------------+------------------+------------------+----------------------+---------------------+-----------------------------------+-----------------------+-------------

In [71]:
aggregated_df.count()

1274

In [72]:
for col in aggregated_df.columns:
    null_count = aggregated_df.filter(aggregated_df[col].isNull() | (aggregated_df[col] == "")).count()
    print(f"Column: {col}\t Null Count: {null_count}")

total_count = aggregated_df.count()
print(f"Total Count: {total_count}")


Column: Town	 Null Count: 0
Column: List Year	 Null Count: 0
Column: Avg Assessed Value	 Null Count: 0
Column: Avg Sale Amount	 Null Count: 0
Column: Avg Sales Ratio	 Null Count: 0
Column: Avg Total Crimes	 Null Count: 0
Column: Avg Crimes per 100,000	 Null Count: 0
Column: Avg Number of Schools	 Null Count: 0
Column: Avg Number of Healthcare facilities	 Null Count: 0
Column: Avg Houses Sold (Condo)	 Null Count: 403
Column: Avg Condo Assessed Value	 Null Count: 403
Column: Avg Condo Sale Amount	 Null Count: 403
Column: Avg Condo Sale Ratio	 Null Count: 403
Column: Avg Houses Sold (Single)	 Null Count: 1
Column: Avg Single Family Assessed Value	 Null Count: 1
Column: Avg Single Family Sale Amount	 Null Count: 1
Column: Avg Single Family Sale Ratio	 Null Count: 1
Total Count: 1274


In [73]:
# Replace null values with zero in the aggregated DataFrame
aggregated_df_filled = aggregated_df.fillna(0)

# Check if there are any null values after filling
for col in aggregated_df_filled.columns:
    null_count = aggregated_df_filled.filter(aggregated_df_filled[col].isNull() | (aggregated_df_filled[col] == "")).count()
    print(f"Column: {col}\t Null Count: {null_count}")

total_count = aggregated_df_filled.count()
print(f"Total Count: {total_count}")


Column: Town	 Null Count: 0
Column: List Year	 Null Count: 0
Column: Avg Assessed Value	 Null Count: 0
Column: Avg Sale Amount	 Null Count: 0
Column: Avg Sales Ratio	 Null Count: 0
Column: Avg Total Crimes	 Null Count: 0
Column: Avg Crimes per 100,000	 Null Count: 0
Column: Avg Number of Schools	 Null Count: 0
Column: Avg Number of Healthcare facilities	 Null Count: 0
Column: Avg Houses Sold (Condo)	 Null Count: 0
Column: Avg Condo Assessed Value	 Null Count: 0
Column: Avg Condo Sale Amount	 Null Count: 0
Column: Avg Condo Sale Ratio	 Null Count: 0
Column: Avg Houses Sold (Single)	 Null Count: 0
Column: Avg Single Family Assessed Value	 Null Count: 0
Column: Avg Single Family Sale Amount	 Null Count: 0
Column: Avg Single Family Sale Ratio	 Null Count: 0
Total Count: 1274


In [76]:
# Read the GeoJSON file as a regular JSON file into a PySpark DataFrame
geojson_df = spark.read.format("json").load("/content/Town_Location.geojson")

# Show the schema of the DataFrame
geojson_df.printSchema()

# Show the first few rows of the DataFrame
geojson_df.show()

root
 |-- _corrupt_record: string (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- State: string (nullable = true)
 |    |-- Town: string (nullable = true)
 |    |-- category: string (nullable = true)
 |    |-- display_name: string (nullable = true)
 |    |-- latlong: string (nullable = true)
 |    |-- osm_id: string (nullable = true)
 |    |-- result_num: string (nullable = true)
 |    |-- type: string (nullable = true)
 |-- type: string (nullable = true)

+--------------------+--------------------+--------------------+-------+
|     _corrupt_record|            geometry|          properties|   type|
+--------------------+--------------------+--------------------+-------+
|                   {|                NULL|                NULL|   NULL|
|"type": "FeatureC...|                NULL| 

In [77]:
# Join the aggregated DataFrame with the GeoJSON DataFrame
joined_df = aggregated_df.join(geojson_df, aggregated_df["Town"] == geojson_df["properties.Town"], how="left")

# Drop the '_corrupt_record' column if it exists (only if reading GeoJSON caused this column to appear)
joined_df = joined_df.drop("_corrupt_record")

# Show the joined DataFrame
joined_df.show()

+------------+---------+------------------+------------------+------------------+------------------+----------------------+---------------------+-----------------------------------+-----------------------+------------------------+---------------------+--------------------+------------------------+--------------------------------+-----------------------------+----------------------------+--------------------+--------------------+-------+
|        Town|List Year|Avg Assessed Value|   Avg Sale Amount|   Avg Sales Ratio|  Avg Total Crimes|Avg Crimes per 100,000|Avg Number of Schools|Avg Number of Healthcare facilities|Avg Houses Sold (Condo)|Avg Condo Assessed Value|Avg Condo Sale Amount|Avg Condo Sale Ratio|Avg Houses Sold (Single)|Avg Single Family Assessed Value|Avg Single Family Sale Amount|Avg Single Family Sale Ratio|            geometry|          properties|   type|
+------------+---------+------------------+------------------+------------------+------------------+------------------

In [79]:
joined_df = joined_df.drop("properties","type")

joined_df.show()

+------------+---------+------------------+------------------+------------------+------------------+----------------------+---------------------+-----------------------------------+-----------------------+------------------------+---------------------+--------------------+------------------------+--------------------------------+-----------------------------+----------------------------+--------------------+
|        Town|List Year|Avg Assessed Value|   Avg Sale Amount|   Avg Sales Ratio|  Avg Total Crimes|Avg Crimes per 100,000|Avg Number of Schools|Avg Number of Healthcare facilities|Avg Houses Sold (Condo)|Avg Condo Assessed Value|Avg Condo Sale Amount|Avg Condo Sale Ratio|Avg Houses Sold (Single)|Avg Single Family Assessed Value|Avg Single Family Sale Amount|Avg Single Family Sale Ratio|            geometry|
+------------+---------+------------------+------------------+------------------+------------------+----------------------+---------------------+-------------------------------

In [82]:
# Check if there are any null values after filling
for col in joined_df.columns:
    null_count = joined_df.filter(joined_df[col].isNull()).count()
    print(f"Column: {col}\t Null Count: {null_count}")

total_count = joined_df.count()
print(f"Total Count: {total_count}")

Column: Town	 Null Count: 0
Column: List Year	 Null Count: 0
Column: Avg Assessed Value	 Null Count: 0
Column: Avg Sale Amount	 Null Count: 0
Column: Avg Sales Ratio	 Null Count: 0
Column: Avg Total Crimes	 Null Count: 0
Column: Avg Crimes per 100,000	 Null Count: 0
Column: Avg Number of Schools	 Null Count: 0
Column: Avg Number of Healthcare facilities	 Null Count: 0
Column: Avg Houses Sold (Condo)	 Null Count: 403
Column: Avg Condo Assessed Value	 Null Count: 403
Column: Avg Condo Sale Amount	 Null Count: 403
Column: Avg Condo Sale Ratio	 Null Count: 403
Column: Avg Houses Sold (Single)	 Null Count: 1
Column: Avg Single Family Assessed Value	 Null Count: 1
Column: Avg Single Family Sale Amount	 Null Count: 1
Column: Avg Single Family Sale Ratio	 Null Count: 1
Column: geometry	 Null Count: 0
Total Count: 1274


In [83]:
from pyspark.sql.functions import when

# Replace null values with zero in the joined DataFrame
filled_df = joined_df.fillna(0)

# Check if there are any null values after filling
for col in filled_df.columns:
    null_count = filled_df.filter(filled_df[col].isNull()).count()
    print(f"Column: {col}\t Null Count: {null_count}")

total_count = filled_df.count()
print(f"Total Count: {total_count}")


Column: Town	 Null Count: 0
Column: List Year	 Null Count: 0
Column: Avg Assessed Value	 Null Count: 0
Column: Avg Sale Amount	 Null Count: 0
Column: Avg Sales Ratio	 Null Count: 0
Column: Avg Total Crimes	 Null Count: 0
Column: Avg Crimes per 100,000	 Null Count: 0
Column: Avg Number of Schools	 Null Count: 0
Column: Avg Number of Healthcare facilities	 Null Count: 0
Column: Avg Houses Sold (Condo)	 Null Count: 0
Column: Avg Condo Assessed Value	 Null Count: 0
Column: Avg Condo Sale Amount	 Null Count: 0
Column: Avg Condo Sale Ratio	 Null Count: 0
Column: Avg Houses Sold (Single)	 Null Count: 0
Column: Avg Single Family Assessed Value	 Null Count: 0
Column: Avg Single Family Sale Amount	 Null Count: 0
Column: Avg Single Family Sale Ratio	 Null Count: 0
Column: geometry	 Null Count: 0
Total Count: 1274


In [84]:
filled_df.show()
filled_df.count()

+------------+---------+------------------+------------------+------------------+------------------+----------------------+---------------------+-----------------------------------+-----------------------+------------------------+---------------------+--------------------+------------------------+--------------------------------+-----------------------------+----------------------------+--------------------+
|        Town|List Year|Avg Assessed Value|   Avg Sale Amount|   Avg Sales Ratio|  Avg Total Crimes|Avg Crimes per 100,000|Avg Number of Schools|Avg Number of Healthcare facilities|Avg Houses Sold (Condo)|Avg Condo Assessed Value|Avg Condo Sale Amount|Avg Condo Sale Ratio|Avg Houses Sold (Single)|Avg Single Family Assessed Value|Avg Single Family Sale Amount|Avg Single Family Sale Ratio|            geometry|
+------------+---------+------------------+------------------+------------------+------------------+----------------------+---------------------+-------------------------------

1274

In [86]:
from pyspark.sql.functions import round

# Assuming you have a DataFrame named 'filled_df' containing your data

# Round all decimal values to two decimal places
final_df = filled_df.select(
    filled_df['Town'],
    filled_df['List Year'],
    round(filled_df['Avg Assessed Value'], 2).alias('Avg Assessed Value'),
    round(filled_df['Avg Sale Amount'], 2).alias('Avg Sale Amount'),
    round(filled_df['Avg Sales Ratio'], 2).alias('Avg Sales Ratio'),
    round(filled_df['Avg Total Crimes'], 2).alias('Avg Total Crimes'),
    round(filled_df['Avg Crimes per 100,000'], 2).alias('Avg Crimes per 100,000'),
    round(filled_df['Avg Number of Schools'], 2).alias('Avg Number of Schools'),
    round(filled_df['Avg Number of Healthcare facilities'], 2).alias('Avg Number of Healthcare facilities'),
    round(filled_df['Avg Houses Sold (Condo)'], 2).alias('Avg Houses Sold (Condo)'),
    round(filled_df['Avg Condo Assessed Value'], 2).alias('Avg Condo Assessed Value'),
    round(filled_df['Avg Condo Sale Amount'], 2).alias('Avg Condo Sale Amount'),
    round(filled_df['Avg Condo Sale Ratio'], 2).alias('Avg Condo Sale Ratio'),
    round(filled_df['Avg Houses Sold (Single)'], 2).alias('Avg Houses Sold (Single)'),
    round(filled_df['Avg Single Family Assessed Value'], 2).alias('Avg Single Family Assessed Value'),
    round(filled_df['Avg Single Family Sale Amount'], 2).alias('Avg Single Family Sale Amount'),
    round(filled_df['Avg Single Family Sale Ratio'], 2).alias('Avg Single Family Sale Ratio'),
    filled_df['geometry']
)

# Show the rounded DataFrame
final_df.show()


+------------+---------+------------------+---------------+---------------+----------------+----------------------+---------------------+-----------------------------------+-----------------------+------------------------+---------------------+--------------------+------------------------+--------------------------------+-----------------------------+----------------------------+--------------------+
|        Town|List Year|Avg Assessed Value|Avg Sale Amount|Avg Sales Ratio|Avg Total Crimes|Avg Crimes per 100,000|Avg Number of Schools|Avg Number of Healthcare facilities|Avg Houses Sold (Condo)|Avg Condo Assessed Value|Avg Condo Sale Amount|Avg Condo Sale Ratio|Avg Houses Sold (Single)|Avg Single Family Assessed Value|Avg Single Family Sale Amount|Avg Single Family Sale Ratio|            geometry|
+------------+---------+------------------+---------------+---------------+----------------+----------------------+---------------------+-----------------------------------+-------------------

In [92]:
final_df.printSchema()

root
 |-- Town: string (nullable = true)
 |-- List Year: integer (nullable = true)
 |-- Avg Assessed Value: double (nullable = true)
 |-- Avg Sale Amount: double (nullable = true)
 |-- Avg Sales Ratio: double (nullable = true)
 |-- Avg Total Crimes: double (nullable = true)
 |-- Avg Crimes per 100,000: double (nullable = true)
 |-- Avg Number of Schools: double (nullable = true)
 |-- Avg Number of Healthcare facilities: double (nullable = true)
 |-- Avg Houses Sold (Condo): double (nullable = true)
 |-- Avg Condo Assessed Value: double (nullable = true)
 |-- Avg Condo Sale Amount: double (nullable = true)
 |-- Avg Condo Sale Ratio: double (nullable = true)
 |-- Avg Houses Sold (Single): double (nullable = true)
 |-- Avg Single Family Assessed Value: double (nullable = true)
 |-- Avg Single Family Sale Amount: double (nullable = true)
 |-- Avg Single Family Sale Ratio: double (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |

In [93]:
from pyspark.sql.functions import concat, format_string

# Assuming the `coordinates` array has longitude and latitude values
final_df = final_df.withColumn(
    'geometry',
    concat(
        format_string('POINT(%s %s)', final_df['geometry.coordinates'][0], final_df['geometry.coordinates'][1])
    )
)


In [None]:
# Write the DataFrame to a CSV file
final_df.write.csv("Complete.csv", header=True, mode="overwrite")

# Download the CSV file to the system
from google.colab import files
files.download("Complete.csv")