In [3]:
# Query 1 RDD

import csv,time
from io import StringIO
from pyspark.sql import SparkSession
from pyspark.sql import Row

sc = SparkSession \
    .builder \
    .appName("Query 1 RDD") \
    .config("spark.executor.instances", "4") \
    .getOrCreate() \
    .sparkContext

def parse_csv_line(line):
    f = StringIO(line)
    reader = csv.reader(f)
    return next(reader) 

def help1(data):
    try:
        age=int(data)
        if age<18 and age>0:
            return "child"
        if age<25:
            return "young adult"
        if age<65 :
            return "adult"
        if age>64:
            return "old"
        else:
            return "no individual victim"
    except:
        return "error"
    

start_time = time.time()
    
rdd1  = sc.textFile("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv")\
.map(parse_csv_line)

rdd2= sc.textFile("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv")\
.map(parse_csv_line)

header1 = rdd1.first()
header2 = rdd2.first()

# Filter out the header
rdd1_data = rdd1.filter(lambda line: line != header1)
rdd2_data = rdd2.filter(lambda line: line != header2)

crime_data = rdd1_data.union(rdd2_data) \
.filter(lambda pair: pair[9].find("AGGRAVATED") != -1 ) \
.map(lambda data: (help1(data[11]), 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy( lambda pair : pair[1], ascending=False )

print(crime_data.collect())

end_time = time.time()

print(f"Time taken: {end_time-start_time:.2f} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('adult', 121093), ('young adult', 38703), ('child', 10830), ('old', 5985)]
Time taken: 22.19 seconds

In [2]:
####query 1 dataframe

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.functions import udf, col
from pyspark.sql import functions as F
import time


spark = SparkSession \
    .builder \
    .appName("Query 1 Dataframe") \
    .config("spark.executor.instances", "4") \
    .getOrCreate() 


def age_group(age_str):
    try:
        age=int(age_str)
        if age<18 and age>0:
            return "child"
        if age<25:
            return "young adult"
        if age<65 :
            return "adult"
        if age>64:
            return "old"
        else:
            return "no individual victim"
    except:
        return "error"


start_time=time.time()
dataframe1= spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True)
dataframe2= spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",header=True)

age_udf=udf(age_group,StringType())
dataframe=dataframe1.union(dataframe2)\
.filter(col("Crm Cd Desc").contains("AGGRAVATED"))\
.withColumn("age_group",age_udf(col("Vict Age")))\
.groupBy("age_group").agg(F.count("*").alias("count"))\
.orderBy("count",ascending=False)


dataframe.show()


end_time = time.time()

print(f"Time taken: {end_time-start_time:.2f} seconds")



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------+
|  age_group| count|
+-----------+------+
|      adult|121093|
|young adult| 38703|
|      child| 10830|
|        old|  5985|
+-----------+------+

Time taken: 11.91 seconds

In [11]:
####query2a dataframe

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.functions import udf, col,count,when
from pyspark.sql import functions as F
from pyspark.sql.window import Window

import time


spark = SparkSession \
    .builder \
    .appName("Query 2 Dataframe") \
    .getOrCreate() 


window_spec = Window.partitionBy("Year").orderBy(F.desc("closed_case_rate"))


start_time=time.time()


dataframe1= spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True)
dataframe2= spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",header=True)



dataframe=dataframe1.union(dataframe2)\
.withColumn("year",col("Date Rptd").substr(7,4))\
.select("year","AREA NAME","Status")\
.groupBy("year","AREA NAME").agg((count(when(col("Status") != "IC", 1)) / count("*") ).alias("closed_case_rate"))\
.withColumn("#", F.row_number().over(window_spec) )\
.filter(col("#") <= 3)\
.withColumnRenamed("AREA NAME", "precinct")

dataframe.show(24)

end_time = time.time()

print(f"Time taken: {end_time-start_time:.2f} seconds")


    




FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+-------------------+---+
|year|   precinct|   closed_case_rate|  #|
+----+-----------+-------------------+---+
|2010|    Rampart|0.32947355855318133|  1|
|2010|    Olympic|0.31962706191728424|  2|
|2010|     Harbor| 0.2963203463203463|  3|
|2011|    Olympic|0.35212167689161555|  1|
|2011|    Rampart|0.32511779630300836|  2|
|2011|     Harbor| 0.2865220520201501|  3|
|2012|    Olympic| 0.3441481831052383|  1|
|2012|    Rampart|  0.329464181029429|  2|
|2012|     Harbor| 0.2981513327601032|  3|
|2013|    Olympic| 0.3352812271731191|  1|
|2013|    Rampart| 0.3208287360549221|  2|
|2013|     Harbor| 0.2916422459266206|  3|
|2014|   Van Nuys| 0.3180567315834039|  1|
|2014|West Valley| 0.3131198995605775|  2|
|2014|    Mission| 0.3116279069767442|  3|
|2015|   Van Nuys| 0.3264134698172773|  1|
|2015|West Valley| 0.3027597402597403|  2|
|2015|    Mission|0.30179460678380154|  3|
|2016|   Van Nuys|0.31880755720117726|  1|
|2016|West Valley| 0.3154798761609907|  2|
|2016|   Fo

In [12]:
###query2a spark sql api


from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.functions import udf, col,count,when
from pyspark.sql import functions as F
from pyspark.sql.window import Window

import time


spark = SparkSession \
    .builder \
    .appName("Query 2 SQL API") \
    .getOrCreate()


start_time=time.time()

dataframe1= spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True)
dataframe2= spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",header=True)

dataframe=dataframe1.union(dataframe2)

dataframe.createOrReplaceTempView("Dataset")
query= """
    WITH extracted_data AS (
        SELECT 
            substr(`Date Rptd`, 7, 4) AS year,
            `AREA NAME` AS precinct,
            Status
        FROM Dataset
    ),
    aggregated_data AS (
        SELECT
            year,
            precinct,
            COUNT(CASE WHEN Status != 'IC' THEN 1 END)  / COUNT(*) AS closed_case_rate
        FROM extracted_data
        GROUP BY year, precinct
    ),
    ranked_data AS (
        SELECT
            year,
            precinct,
            closed_case_rate,
            ROW_NUMBER() OVER (PARTITION BY year ORDER BY closed_case_rate DESC) AS `#`
        FROM aggregated_data
    )
    SELECT 
        year,
        precinct,
        closed_case_rate,
        `#`
    FROM ranked_data
    WHERE `#` <= 3
"""
res=spark.sql(query)
res.show(24)

end_time = time.time()

print(f"Time taken: {end_time-start_time:.2f} seconds")



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+-------------------+---+
|year|   precinct|   closed_case_rate|  #|
+----+-----------+-------------------+---+
|2010|    Rampart|0.32947355855318133|  1|
|2010|    Olympic|0.31962706191728424|  2|
|2010|     Harbor| 0.2963203463203463|  3|
|2011|    Olympic|0.35212167689161555|  1|
|2011|    Rampart|0.32511779630300836|  2|
|2011|     Harbor| 0.2865220520201501|  3|
|2012|    Olympic| 0.3441481831052383|  1|
|2012|    Rampart|  0.329464181029429|  2|
|2012|     Harbor| 0.2981513327601032|  3|
|2013|    Olympic| 0.3352812271731191|  1|
|2013|    Rampart| 0.3208287360549221|  2|
|2013|     Harbor| 0.2916422459266206|  3|
|2014|   Van Nuys| 0.3180567315834039|  1|
|2014|West Valley| 0.3131198995605775|  2|
|2014|    Mission| 0.3116279069767442|  3|
|2015|   Van Nuys| 0.3264134698172773|  1|
|2015|West Valley| 0.3027597402597403|  2|
|2015|    Mission|0.30179460678380154|  3|
|2016|   Van Nuys|0.31880755720117726|  1|
|2016|West Valley| 0.3154798761609907|  2|
|2016|   Fo

In [None]:
####2b
####make parquet dataset


from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.functions import udf, col,count,when
from pyspark.sql import functions as F
from pyspark.sql.window import Window

import time


spark = SparkSession \
    .builder \
    .appName("Query 2b write parquet") \
    .getOrCreate()

dataframe1= spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True)
dataframe2= spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",header=True)
dataframe=dataframe1.union(dataframe2)

dataframe.coalesce(1).write.mode("overwrite").parquet("s3://groups-bucket-dblab-905418150721/group35/main_dataset_parquet") ##coalesce gia 1 file






In [20]:
####2b test parquet file on 2a query dataframe

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.functions import udf, col,count,when
from pyspark.sql import functions as F
from pyspark.sql.window import Window

import time


spark = SparkSession \
    .builder \
    .appName("Query 2b test parquet for 2b Dataframe") \
    .getOrCreate()

start_time=time.time()

dataframe = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group35/main_dataset_parquet")

dataframe=dataframe.withColumn("year",col("Date Rptd").substr(7,4))\
.select("year","AREA NAME","Status")\
.groupBy("year","AREA NAME").agg((count(when(col("Status") != "IC", 1)) / count("*") ).alias("closed_case_rate"))\
.withColumn("#", F.row_number().over(window_spec) )\
.filter(col("#") <= 3)\
.withColumnRenamed("AREA NAME", "precinct")

dataframe.show(24)

end_time = time.time()

print(f"Time taken: {end_time-start_time:.2f} seconds")



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+-------------------+---+
|year|   precinct|   closed_case_rate|  #|
+----+-----------+-------------------+---+
|2010|    Rampart|0.32947355855318133|  1|
|2010|    Olympic|0.31962706191728424|  2|
|2010|     Harbor| 0.2963203463203463|  3|
|2011|    Olympic|0.35212167689161555|  1|
|2011|    Rampart|0.32511779630300836|  2|
|2011|     Harbor| 0.2865220520201501|  3|
|2012|    Olympic| 0.3441481831052383|  1|
|2012|    Rampart|  0.329464181029429|  2|
|2012|     Harbor| 0.2981513327601032|  3|
|2013|    Olympic| 0.3352812271731191|  1|
|2013|    Rampart| 0.3208287360549221|  2|
|2013|     Harbor| 0.2916422459266206|  3|
|2014|   Van Nuys| 0.3180567315834039|  1|
|2014|West Valley| 0.3131198995605775|  2|
|2014|    Mission| 0.3116279069767442|  3|
|2015|   Van Nuys| 0.3264134698172773|  1|
|2015|West Valley| 0.3027597402597403|  2|
|2015|    Mission|0.30179460678380154|  3|
|2016|   Van Nuys|0.31880755720117726|  1|
|2016|West Valley| 0.3154798761609907|  2|
|2016|   Fo

In [33]:
####query 3

from sedona.spark import *
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import time

# Create spark Session
spark = SparkSession.builder \
    .appName("GeoJSON read") \
    .getOrCreate()

# Create sedona context
sedona = SedonaContext.create(spark)
# Read the file from s3
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"

start_time=time.time()
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")
# Formatting magic
blocks_census = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type") \
            .dropna(subset=["COMM","ZCTA10"])  \


census = blocks_census \
    .select("COMM", "POP_2010", "ZCTA10", "HOUSING10") \
    .na.fill({"POP_2010": 0, "HOUSING10": 0}) \
    .groupBy("COMM", "ZCTA10") \
    .agg(
        F.sum("POP_2010").alias("TOTAL_POP_2010"),
        F.sum("HOUSING10").alias("TOTAL_HOUSING10")
    )

income= spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True)

res1= income.withColumn( "Estimated Median Income", F.regexp_replace(col("Estimated Median Income"), "[$,.]", "").cast("float"))\
.join(census,census["ZCTA10"]==income["Zip Code"])\
.withColumn("total_income",col("Estimated Median Income")*col("TOTAL_HOUSING10") )\
.groupBy("COMM").agg(
                    F.sum("TOTAL_POP_2010").alias("total_population"),
                    F.sum("total_income").alias("comm_total_income"))\
.withColumn("Average Income", col("comm_total_income")/col("total_population"))


crime_dataset = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group35/main_dataset_parquet")\
.withColumn("point",ST_Point("LON", "LAT"))

res2 = crime_dataset \
    .join(blocks_census, ST_Within(crime_dataset.point, blocks_census.geometry))\
.select("COMM","POP_2010")\
.groupBy("COMM")\
.agg(F.sum("POP_2010").alias("TotalPopulation"),F.count("*").alias("NumberOfCrimes"))\
.withColumn("Crimes per Person",col("NumberOfCrimes")/col("TotalPopulation"))

res=res1.join(res2,res1["COMM"]==res2["COMM"]).select(res1["COMM"].alias("COMM"),"Average Income","Crimes per Person").orderBy(col("Average Income").desc())

end_time = time.time()

print(f"Time taken: {end_time-start_time:.2f} seconds")



res.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Time taken: 5.05 seconds
+--------------------+------------------+--------------------+
|                COMM|    Average Income|   Crimes per Person|
+--------------------+------------------+--------------------+
|      Marina del Rey| 76428.84908639747|0.030683159228987282|
|   Pacific Palisades| 70656.11180545464|0.004497808363497533|
|              Malibu|  67135.0118623962|0.003460207612456...|
| Palisades Highlands| 66867.44038612054|0.003936594845458679|
|    Marina Peninsula|65235.692875259396| 0.01562518384299514|
|             Bel Air| 63041.33942621959|0.001519214928910...|
|Palos Verdes Estates| 61905.61214466438|0.008547008547008548|
|     Manhattan Beach|60985.189241497086|0.030103480714957668|
|       Beverly Crest| 60947.48978754819|0.002445942879372386|
|           Brentwood| 60840.62462032012|0.003654077071904...|
|       Hermosa Beach| 57924.85594176151|0.010483401281304601|
|   Mandeville Canyon| 55572.11011444479|0.004121446588871122|
|La Cañada Flintridge| 54900.6

In [22]:
res.explain(True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Project [COMM#24143 AS COMM#24609, 'Average Income, 'Crimes per Person]
+- Join Inner, (COMM#24143 = COMM#24574)
   :- Project [COMM#24143, total_population#24339L, comm_total_income#24341, (comm_total_income#24341 / cast(total_population#24339L as double)) AS Average Income#24345]
   :  +- Aggregate [COMM#24143], [COMM#24143, sum(POP_2010#24278L) AS total_population#24339L, sum(total_income#24323) AS comm_total_income#24341]
   :     +- Project [Zip Code#24300, Community#24301, Estimated Median Income#24306, COMM#24143, ZCTA10#24160, POP_2010#24278L, (Estimated Median Income#24306 * cast(POP_2010#24278L as float)) AS total_income#24323]
   :        +- Join Inner, (ZCTA10#24160 = Zip Code#24300)
   :           :- Project [Zip Code#24300, Community#24301, cast(regexp_replace(Estimated Median Income#24302, [$,], , 1) as float) AS Estimated Median Income#24306]
   :           :  +- Relation [Zip Code#24300,Community#24301,Estimated Median Income#24302] csv
   : 

In [7]:
# query 4 conf 1
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from sedona.spark import *

import time

spark = SparkSession \
    .builder \
    .config("spark.executor.instances", "2")\
    .config("spark.executor.cores", "1")\
    .config("spark.executor.memory", "2g")\
    .appName("Query 4") \
    .getOrCreate()

sedona = SedonaContext.create(spark)
    
start_time=time.time()

#geojson handling
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"

#the communities we want
target_comm_values = ["Santa Clarita","Lancaster","Rancho Palos Verdes"]

blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

flattened_df = blocks_df.select( \
                [F.col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")\
            .filter(F.col("COMM").isin(target_comm_values))

#Crime data handling in the areas of interest
dataframe = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group35/main_dataset_parquet") \
    .select("Vict Descent","LAT","LON")
    
#Creation of geometry type column
dataframe = dataframe\
    .withColumn("geom", ST_Point("LON","LAT"))

#keep data related to the specific areas
dataframe = dataframe\
    .join(flattened_df, ST_Within(dataframe.geom, flattened_df.geometry), "inner")\
    .select("Vict Descent")

#Ethnic-Race dataframe handling
ethnic_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv", header=True, inferSchema=True)\
    .withColumnRenamed("Vict Descent", "Vict Descent right")

#Conversion of ethnic codes to full Name of Race and count for each group the number of victs
result_df = dataframe \
    .join(ethnic_df, dataframe["Vict Descent"] == ethnic_df["Vict Descent right"], how="left")\
    .withColumn("Vict Descent", F.col("Vict Descent Full"))\
    .select("Vict Descent")\
    .withColumnRenamed("Vict Descent", "Victim Descent")\
    .groupBy("Victim Descent").agg(F.count("*").alias("#"))\
    .orderBy("#",ascending=False)
    
result_df.show()

end_time = time.time()

print(f"Time taken: {end_time-start_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---+
|      Victim Descent|  #|
+--------------------+---+
|               White| 90|
|Hispanic/Latin/Me...| 76|
|             Unknown| 51|
|               Other| 40|
|               Black| 33|
|                NULL| 28|
|         Other Asian|  6|
|             Chinese|  1|
|            Filipino|  1|
|          Vietnamese|  1|
|            Japanese|  1|
+--------------------+---+

Time taken: 41.66 seconds

In [None]:
# query 4 conf 2
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from sedona.spark import *

import time

spark = SparkSession \
    .builder \
    .config("spark.executor.instances", "2")\
    .config("spark.executor.cores", "2")\
    .config("spark.executor.memory", "4g")\
    .appName("Query 4") \
    .getOrCreate()

sedona = SedonaContext.create(spark)
    
start_time=time.time()

#geojson handling
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"

#the communities we want
target_comm_values = ["Santa Clarita","Lancaster","Rancho Palos Verdes"]

blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

flattened_df = blocks_df.select( \
                [F.col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")\
            .filter(F.col("COMM").isin(target_comm_values))

#Crime data handling in the areas of interest
dataframe = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group35/main_dataset_parquet") \
    .select("Vict Descent","LAT","LON")
    
#Creation of geometry type column
dataframe = dataframe\
    .withColumn("geom", ST_Point("LON","LAT"))

#keep data related to the specific areas
dataframe = dataframe\
    .join(flattened_df, ST_Within(dataframe.geom, flattened_df.geometry), "inner")\
    .select("Vict Descent")

#Ethnic-Race dataframe handling
ethnic_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv", header=True, inferSchema=True)\
    .withColumnRenamed("Vict Descent", "Vict Descent right")

#Conversion of ethnic codes to full Name of Race and count for each group the number of victs
result_df = dataframe \
    .join(ethnic_df, dataframe["Vict Descent"] == ethnic_df["Vict Descent right"], how="left")\
    .withColumn("Vict Descent", F.col("Vict Descent Full"))\
    .select("Vict Descent")\
    .withColumnRenamed("Vict Descent", "Victim Descent")\
    .groupBy("Victim Descent").agg(F.count("*").alias("#"))\
    .orderBy("#",ascending=False)
    
result_df.show()

end_time = time.time()

print(f"Time taken: {end_time-start_time:.2f} seconds")

In [None]:
# query 4 conf 2
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from sedona.spark import *

import time

spark = SparkSession \
    .builder \
    .config("spark.executor.instances", "2")\
    .config("spark.executor.cores", "4")\
    .config("spark.executor.memory", "8g")\
    .appName("Query 4") \
    .getOrCreate()

sedona = SedonaContext.create(spark)
    
start_time=time.time()

#geojson handling
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"

#the communities we want
target_comm_values = ["Santa Clarita","Lancaster","Rancho Palos Verdes"]

blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

flattened_df = blocks_df.select( \
                [F.col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")\
            .filter(F.col("COMM").isin(target_comm_values))

#Crime data handling in the areas of interest
dataframe = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group35/main_dataset_parquet") \
    .select("Vict Descent","LAT","LON")
    
#Creation of geometry type column
dataframe = dataframe\
    .withColumn("geom", ST_Point("LON","LAT"))

#keep data related to the specific areas
dataframe = dataframe\
    .join(flattened_df, ST_Within(dataframe.geom, flattened_df.geometry), "inner")\
    .select("Vict Descent")

#Ethnic-Race dataframe handling
ethnic_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv", header=True, inferSchema=True)\
    .withColumnRenamed("Vict Descent", "Vict Descent right")

#Conversion of ethnic codes to full Name of Race and count for each group the number of victs
result_df = dataframe \
    .join(ethnic_df, dataframe["Vict Descent"] == ethnic_df["Vict Descent right"], how="left")\
    .withColumn("Vict Descent", F.col("Vict Descent Full"))\
    .select("Vict Descent")\
    .withColumnRenamed("Vict Descent", "Victim Descent")\
    .groupBy("Victim Descent").agg(F.count("*").alias("#"))\
    .orderBy("#",ascending=False)
    
result_df.show()

end_time = time.time()

print(f"Time taken: {end_time-start_time:.2f} seconds")

In [13]:
# query 5 conf 1

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from sedona.spark import *
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
import time

spark = SparkSession \
    .builder \
    .config("spark.executor.instances", "2")\
    .config("spark.executor.cores", "4")\
    .config("spark.executor.memory", "8g")\
    .appName("Query 5") \
    .getOrCreate()
        
sedona = SedonaContext.create(spark)

start_time=time.time()

#Precint data handling
precincts_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv", header=True)\
    .withColumnRenamed("DIVISION","Division")\
    .withColumn("geom", ST_Point(F.col("Y"),F.col("X")))\
    .select("Division","geom")


#Crime data handling
crime_df = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group35/main_dataset_parquet") \
    .withColumn("crime_geom", ST_Point(F.col("LAT"),F.col("LON")))\
    .select("crime_geom")

#Calculating the distance of each case with each precinct
joined_df = precincts_df.crossJoin(crime_df) \
    .withColumn("distance", ST_DistanceSphere(col("geom"), F.col("crime_geom")))

#Finding the closest precinct to each crime
window_spec = Window.partitionBy("crime_geom").orderBy(F.col("distance"))

closest_division_df = joined_df.withColumn("rank", row_number().over(window_spec))\
    .filter(F.col("rank") == 1) \
    .select("Division", "crime_geom", "distance") 
    
result_df = closest_division_df.groupBy("Division") \
    .agg(
        F.avg("distance").alias("avg_distance"),
        F.count("crime_geom").alias("#"),
    )\
    .orderBy("#",ascending=False)    
 
result_df.show()

end_time = time.time()

print(f"Time taken: {end_time-start_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+------------------+----+
|        Division|      avg_distance|   #|
+----------------+------------------+----+
|         TOPANGA| 3097.041046676431|7199|
| NORTH HOLLYWOOD|2789.6757914709897|7174|
|WEST LOS ANGELES|2988.6934728031933|7101|
|      HOLLENBECK| 5051.412468047077|7014|
|         PACIFIC| 2644.958011210727|6455|
|        WILSHIRE|2280.9372680604174|6421|
|        VAN NUYS| 2496.780848087917|6378|
|     WEST VALLEY|3009.1054804543173|5865|
|          HARBOR|2801.4499340320704|5583|
|         MISSION|2408.4567916898773|5328|
|        FOOTHILL|   2302.8236985538|5265|
|      DEVONSHIRE| 2532.616141824519|4594|
|       SOUTHWEST| 1832.859973967432|4578|
|       HOLLYWOOD|2735.1101302923275|3858|
|         OLYMPIC| 1653.605771356625|3815|
|       SOUTHEAST|2353.4705692131356|3691|
|       NORTHEAST| 3697.289880077716|3590|
|         RAMPART|1320.5912012440954|3058|
|     77TH STREET|1332.6590954181906|2927|
|          NEWTON|1547.3572308802582|2868|
+----------