In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession \
        .builder \
        .appName("Test") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

In [None]:
df=spark.read.csv("/content/AIT664-Group001-Dataset05-Connecticut-Crime-Data.csv",header=True,inferSchema=True)
print(df.dtypes)
df.printSchema()

[('Town', 'string'), ('FIPS', 'int'), ('Year', 'int'), ('Crime Type', 'string'), ('Measure Type', 'string'), ('Variable', 'string'), ('Value', 'double')]
root
 |-- Town: string (nullable = true)
 |-- FIPS: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Crime Type: string (nullable = true)
 |-- Measure Type: string (nullable = true)
 |-- Variable: string (nullable = true)
 |-- Value: double (nullable = true)



In [None]:
df.show()

+-------+---------+----+--------------------+------------------+-----------+------+
|   Town|     FIPS|Year|          Crime Type|      Measure Type|   Variable| Value|
+-------+---------+----+--------------------+------------------+-----------+------+
|Andover|901301080|2010|         Total Crime|            Number|Crime Index|  35.0|
|Andover|901301080|2010|         Total Crime|Rate (per 100,000)|Crime Index|1098.6|
|Andover|901301080|2010| Total Violent Crime|            Number|Crime Index|   1.0|
|Andover|901301080|2010| Total Violent Crime|Rate (per 100,000)|Crime Index|  31.4|
|Andover|901301080|2010|Total Property Crime|            Number|Crime Index|  34.0|
|Andover|901301080|2010|Total Property Crime|Rate (per 100,000)|Crime Index|1067.2|
|Andover|901301080|2010|  Aggravated Assault|            Number|Crime Index|   0.0|
|Andover|901301080|2010|  Aggravated Assault|Rate (per 100,000)|Crime Index|   0.0|
|Andover|901301080|2010|              Murder|            Number|Crime Index|

In [None]:
from pyspark.sql.functions import when

df = df.withColumn("Number_Column", when(df["Measure Type"] == "Number", df["Value"]).otherwise(None)) \
       .withColumn("Rate_Column", when(df["Measure Type"] == "Rate (per 100,000)", df["Value"]).otherwise(None))

df.show()

+-------+---------+----+--------------------+------------------+-----------+------+-------------+-----------+
|   Town|     FIPS|Year|          Crime Type|      Measure Type|   Variable| Value|Number_Column|Rate_Column|
+-------+---------+----+--------------------+------------------+-----------+------+-------------+-----------+
|Andover|901301080|2010|         Total Crime|            Number|Crime Index|  35.0|         35.0|       NULL|
|Andover|901301080|2010|         Total Crime|Rate (per 100,000)|Crime Index|1098.6|         NULL|     1098.6|
|Andover|901301080|2010| Total Violent Crime|            Number|Crime Index|   1.0|          1.0|       NULL|
|Andover|901301080|2010| Total Violent Crime|Rate (per 100,000)|Crime Index|  31.4|         NULL|       31.4|
|Andover|901301080|2010|Total Property Crime|            Number|Crime Index|  34.0|         34.0|       NULL|
|Andover|901301080|2010|Total Property Crime|Rate (per 100,000)|Crime Index|1067.2|         NULL|     1067.2|
|Andover|9

In [None]:
from pyspark.sql.functions import lead, col, monotonically_increasing_id

# Add a new column "ID" with monotonically increasing values
df = df.withColumn("ID", monotonically_increasing_id())
df.show()

+-------+---------+----+--------------------+------------------+-----------+------+-------------+-----------+---+
|   Town|     FIPS|Year|          Crime Type|      Measure Type|   Variable| Value|Number_Column|Rate_Column| ID|
+-------+---------+----+--------------------+------------------+-----------+------+-------------+-----------+---+
|Andover|901301080|2010|         Total Crime|            Number|Crime Index|  35.0|         35.0|       NULL|  0|
|Andover|901301080|2010|         Total Crime|Rate (per 100,000)|Crime Index|1098.6|         NULL|     1098.6|  1|
|Andover|901301080|2010| Total Violent Crime|            Number|Crime Index|   1.0|          1.0|       NULL|  2|
|Andover|901301080|2010| Total Violent Crime|Rate (per 100,000)|Crime Index|  31.4|         NULL|       31.4|  3|
|Andover|901301080|2010|Total Property Crime|            Number|Crime Index|  34.0|         34.0|       NULL|  4|
|Andover|901301080|2010|Total Property Crime|Rate (per 100,000)|Crime Index|1067.2|     

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lead, when

# Define a window specification
windowSpec = Window.partitionBy(df["Town"]).orderBy(df["ID"])

# Fill null values in Rate_Column with the next row's value within the same Town
df = df.withColumn("Rate_Column", when(col("Rate_Column").isNull(), lead("Rate_Column").over(windowSpec)).otherwise(col("Rate_Column")))

# Show the filled dataframe
df.show()



+-------+---------+----+--------------------+------------------+-----------+------+-------------+-----------+---+
|   Town|     FIPS|Year|          Crime Type|      Measure Type|   Variable| Value|Number_Column|Rate_Column| ID|
+-------+---------+----+--------------------+------------------+-----------+------+-------------+-----------+---+
|Andover|901301080|2010|         Total Crime|            Number|Crime Index|  35.0|         35.0|     1098.6|  0|
|Andover|901301080|2010|         Total Crime|Rate (per 100,000)|Crime Index|1098.6|         NULL|     1098.6|  1|
|Andover|901301080|2010| Total Violent Crime|            Number|Crime Index|   1.0|          1.0|       31.4|  2|
|Andover|901301080|2010| Total Violent Crime|Rate (per 100,000)|Crime Index|  31.4|         NULL|       31.4|  3|
|Andover|901301080|2010|Total Property Crime|            Number|Crime Index|  34.0|         34.0|     1067.2|  4|
|Andover|901301080|2010|Total Property Crime|Rate (per 100,000)|Crime Index|1067.2|     

In [None]:
df = df.filter((col("ID") % 2 == 0))
df.show()

+-------+---------+----+--------------------+------------+-----------+-----+-------------+-----------+---+
|   Town|     FIPS|Year|          Crime Type|Measure Type|   Variable|Value|Number_Column|Rate_Column| ID|
+-------+---------+----+--------------------+------------+-----------+-----+-------------+-----------+---+
|Andover|901301080|2010|         Total Crime|      Number|Crime Index| 35.0|         35.0|     1098.6|  0|
|Andover|901301080|2010| Total Violent Crime|      Number|Crime Index|  1.0|          1.0|       31.4|  2|
|Andover|901301080|2010|Total Property Crime|      Number|Crime Index| 34.0|         34.0|     1067.2|  4|
|Andover|901301080|2010|  Aggravated Assault|      Number|Crime Index|  0.0|          0.0|        0.0|  6|
|Andover|901301080|2010|              Murder|      Number|Crime Index|  0.0|          0.0|        0.0|  8|
|Andover|901301080|2010|                Rape|      Number|Crime Index|  0.0|          0.0|        0.0| 10|
|Andover|901301080|2010|             

In [None]:
df = df.drop("FIPS", "Measure Type", "Variable", "Value", "ID")
df.printSchema()
df.show()

root
 |-- Town: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Crime Type: string (nullable = true)
 |-- Number_Column: double (nullable = true)
 |-- Rate_Column: double (nullable = true)

+-------+----+--------------------+-------------+-----------+
|   Town|Year|          Crime Type|Number_Column|Rate_Column|
+-------+----+--------------------+-------------+-----------+
|Andover|2010|         Total Crime|         35.0|     1098.6|
|Andover|2010| Total Violent Crime|          1.0|       31.4|
|Andover|2010|Total Property Crime|         34.0|     1067.2|
|Andover|2010|  Aggravated Assault|          0.0|        0.0|
|Andover|2010|              Murder|          0.0|        0.0|
|Andover|2010|                Rape|          0.0|        0.0|
|Andover|2010|             Robbery|          1.0|       31.4|
|Andover|2010|            Burglary|         16.0|      502.2|
|Andover|2010|             Larceny|         17.0|      533.6|
|Andover|2010| Motor Vehicle Theft|          1.

In [None]:
df = df.withColumnRenamed("Number_Column", "Number of Crimes Committed")
df = df.withColumnRenamed("Rate_Column", "Number of Crimes Committed per 100,000")
df.show()

+-------+----+--------------------+--------------------------+--------------------------------------+
|   Town|Year|          Crime Type|Number of Crimes Committed|Number of Crimes Committed per 100,000|
+-------+----+--------------------+--------------------------+--------------------------------------+
|Andover|2010|         Total Crime|                      35.0|                                1098.6|
|Andover|2010| Total Violent Crime|                       1.0|                                  31.4|
|Andover|2010|Total Property Crime|                      34.0|                                1067.2|
|Andover|2010|  Aggravated Assault|                       0.0|                                   0.0|
|Andover|2010|              Murder|                       0.0|                                   0.0|
|Andover|2010|                Rape|                       0.0|                                   0.0|
|Andover|2010|             Robbery|                       1.0|                    

In [None]:
df = df.withColumn("Crime ID", monotonically_increasing_id())
df.show()

+-------+----+--------------------+--------------------------+--------------------------------------+--------+
|   Town|Year|          Crime Type|Number of Crimes Committed|Number of Crimes Committed per 100,000|Crime ID|
+-------+----+--------------------+--------------------------+--------------------------------------+--------+
|Andover|2010|         Total Crime|                      35.0|                                1098.6|       0|
|Andover|2010| Total Violent Crime|                       1.0|                                  31.4|       1|
|Andover|2010|Total Property Crime|                      34.0|                                1067.2|       2|
|Andover|2010|  Aggravated Assault|                       0.0|                                   0.0|       3|
|Andover|2010|              Murder|                       0.0|                                   0.0|       4|
|Andover|2010|                Rape|                       0.0|                                   0.0|       5|
|

In [None]:
import geopandas as gpd
import shapely
import pandas as pd
import numpy as np
from geopandas import GeoDataFrame


geojson_df = gpd.read_file("/content/Town_Location.geojson")

geojson_df.count()

Town            170
State           170
result_num      170
osm_id          170
display_name    170
category        170
type            170
latlong         170
geometry        170
dtype: int64

In [None]:
df.printSchema()
geojson_df.info()


df_geo = geojson_df
df_geo.head()



root
 |-- Town: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Crime Type: string (nullable = true)
 |-- Number of Crimes Committed: double (nullable = true)
 |-- Number of Crimes Committed per 100,000: double (nullable = true)
 |-- Crime ID: long (nullable = false)

<class 'geopandas.geodataframe.GeoDataFrame'>
RangeIndex: 170 entries, 0 to 169
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype   
---  ------        --------------  -----   
 0   Town          170 non-null    object  
 1   State         170 non-null    object  
 2   result_num    170 non-null    object  
 3   osm_id        170 non-null    object  
 4   display_name  170 non-null    object  
 5   category      170 non-null    object  
 6   type          170 non-null    object  
 7   latlong       170 non-null    object  
 8   geometry      170 non-null    geometry
dtypes: geometry(1), object(8)
memory usage: 12.1+ KB


Unnamed: 0,Town,State,result_num,osm_id,display_name,category,type,latlong,geometry
0,Andover,CT,0,11059524,"Andover, Capitol Planning Region, Connecticut,...",boundary,administrative,"41.7373212,-72.37036",POINT (-72.37036 41.73732)
1,Ansonia,CT,0,797803532,"Ansonia, Naugatuck Valley Planning Region, Con...",place,city,"41.3423505,-73.043713",POINT (-73.04371 41.34235)
2,Ashford,CT,0,11052658,"Ashford, Northeastern Connecticut Planning Reg...",boundary,administrative,"41.8731532,-72.1214653",POINT (-72.12147 41.87315)
3,Avon,CT,0,5946925,"Avon, Capitol Planning Region, Connecticut, 06...",boundary,administrative,"41.8098209,-72.8306541",POINT (-72.83065 41.80982)
4,Barkhamsted,CT,0,11052518,"Barkhamsted, Northwest Hills Planning Region, ...",boundary,administrative,"41.9292629,-72.9139904",POINT (-72.91399 41.92926)


In [None]:
print('Town' in df.columns)
print('Town' in geojson_df.columns)

True
True


In [None]:
geojson_df['Town'] = geojson_df['Town'].astype(str)

In [None]:
geojson_df.info()

<class 'geopandas.geodataframe.GeoDataFrame'>
RangeIndex: 170 entries, 0 to 169
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype   
---  ------        --------------  -----   
 0   Town          170 non-null    object  
 1   State         170 non-null    object  
 2   result_num    170 non-null    object  
 3   osm_id        170 non-null    object  
 4   display_name  170 non-null    object  
 5   category      170 non-null    object  
 6   type          170 non-null    object  
 7   latlong       170 non-null    object  
 8   geometry      170 non-null    geometry
dtypes: geometry(1), object(8)
memory usage: 12.1+ KB


In [None]:
geojson_df['geometry'] = geojson_df['geometry'].apply(lambda geom: geom.wkt)



In [None]:
geojson_df.info()

<class 'geopandas.geodataframe.GeoDataFrame'>
RangeIndex: 170 entries, 0 to 169
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   Town          170 non-null    object
 1   State         170 non-null    object
 2   result_num    170 non-null    object
 3   osm_id        170 non-null    object
 4   display_name  170 non-null    object
 5   category      170 non-null    object
 6   type          170 non-null    object
 7   latlong       170 non-null    object
 8   geometry      170 non-null    object
dtypes: object(9)
memory usage: 12.1+ KB


In [None]:
spark_geo_df = spark.createDataFrame(geojson_df)

In [None]:
merged_df_spark = df.join(spark_geo_df, on="Town", how="left")

In [None]:
merged_df_spark.show()

+-------+----+--------------------+--------------------------+--------------------------------------+--------+-----+----------+--------+--------------------+--------+--------------+--------------------+--------------------+
|   Town|Year|          Crime Type|Number of Crimes Committed|Number of Crimes Committed per 100,000|Crime ID|State|result_num|  osm_id|        display_name|category|          type|             latlong|            geometry|
+-------+----+--------------------+--------------------------+--------------------------------------+--------+-----+----------+--------+--------------------+--------+--------------+--------------------+--------------------+
|Andover|2010|         Total Crime|                      35.0|                                1098.6|       0|   CT|         0|11059524|Andover, Capitol ...|boundary|administrative|41.7373212,-72.37036|POINT (-72.37036 ...|
|Andover|2010| Total Violent Crime|                       1.0|                                  31.4|   

In [None]:
# List of columns to drop
columns_to_drop = ['result_num', 'osm_id', 'display_name', 'category', 'type', 'latlong']

# Drop the specified columns
merged_df_spark = merged_df_spark.drop(*columns_to_drop)

# Show the resulting DataFrame
merged_df_spark.show()

+-------+----+--------------------+--------------------------+--------------------------------------+--------+-----+--------------------+
|   Town|Year|          Crime Type|Number of Crimes Committed|Number of Crimes Committed per 100,000|Crime ID|State|            geometry|
+-------+----+--------------------+--------------------------+--------------------------------------+--------+-----+--------------------+
|Andover|2010|         Total Crime|                      35.0|                                1098.6|       0|   CT|POINT (-72.37036 ...|
|Andover|2010| Total Violent Crime|                       1.0|                                  31.4|       1|   CT|POINT (-72.37036 ...|
|Andover|2010|Total Property Crime|                      34.0|                                1067.2|       2|   CT|POINT (-72.37036 ...|
|Andover|2010|  Aggravated Assault|                       0.0|                                   0.0|       3|   CT|POINT (-72.37036 ...|
|Andover|2010|              Murder

In [None]:
for col_name in merged_df_spark.columns:
    # Filter and count null values and empty strings in the column
    null_count = merged_df_spark.filter(col(col_name).isNull() | (col(col_name) == "")).count()
    # Print the column name and corresponding null count
    print(f"Column: {col_name}\t Null Count: {null_count}")

# Count total rows in the DataFrame
total_count = merged_df_spark.count()
print(f"Total Count: {total_count}")

Column: Town	 Null Count: 0
Column: Year	 Null Count: 0
Column: Crime Type	 Null Count: 0
Column: Number of Crimes Committed	 Null Count: 0
Column: Number of Crimes Committed per 100,000	 Null Count: 0
Column: Crime ID	 Null Count: 0
Column: State	 Null Count: 0
Column: geometry	 Null Count: 0
Total Count: 14960


In [None]:
# Repartition the DataFrame to have only one partition
merged_df_spark = merged_df_spark.repartition(1)

# Write the DataFrame to a new CSV file
merged_df_spark.write.csv("Crime.csv", header=True)

# Download the CSV file to the system
from google.colab import files
files.download("Crime.csv")