### <u>**COMP-548DL**</u>
### <u>**Big Data Management and Processing**</u>
### <u>**Semester Project**</u>

###

#### <u>**Part 2:**</u>
#### For the analysis of our data we will use Spark RDDs and Spark Dataframes, which are built on top of RDDs actually. We also going to use Python's time library to compare the time it takes for RDDs and Dataframes to give us back the exact same information we asking from our data.

In [1]:
from pyspark.context import SparkContext, SparkConf
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import time

In [3]:
conf = SparkConf().setAppName('Read-files').setMaster('local[2]') 
sc = SparkContext.getOrCreate(conf)
spark = SparkSession(sc)

In [5]:
#Preparing the structure for our data
crime_schema = StructType([
    StructField('lsoa_code', StringType(), False),
    StructField('borough', StringType(), False),
    StructField('major_category', StringType(), False),
    StructField('minor_category', StringType(), False),
    StructField('value', IntegerType(), False),
    StructField('year', IntegerType(), False),
    StructField('month', IntegerType(), False)
])

In [1]:
path = 'london_crime_json.json'

In [13]:
crime_df=spark.read.json(path,schema=crime_schema) #reading the json file

In [15]:
crime_df.show() #showing some of the rows in our data

+---------+--------------------+--------------------+--------------------+-----+----+-----+
|lsoa_code|             borough|      major_category|      minor_category|value|year|month|
+---------+--------------------+--------------------+--------------------+-----+----+-----+
|E01001116|             Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646|           Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|E01000677|             Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|E01003774|           Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|E01004563|          Wandsworth|             Robbery|   Personal Property|    0|2008|    6|
|E01001320|              Ealing|  Theft and Handling|         Other Theft|    0|2012|    5|
|E01001342|              Ealing|Violence Against ...|    Offensive Weapon|    0|2010|    7|
|E01002633|            Hounslow|             Robbery|   Personal Property|    0|

###

<h3>Spark RDD</h3>

<h4>Along with RDD, we are going to use some basic map and reduce funtions to get the data we want. We want the top10 boroughs in crimes, top5 crimes in all boroughs through out the years and the top3 years with the most crimes.All 3 MapReduce operations are identical, extracting similar results from different columns.</h4>

In [17]:
crime_rdd=crime_df.rdd #creating the rdd

In [19]:
crime_rdd.getNumPartitions()
#crime_rdd.persist()

16

<h4>Total crimes in each borough (interested in the top10)</h4>

In [44]:
# Start the timer 
start_time = time.time()

crimes_inborou = crime_rdd.map(lambda doc: (doc.borough,doc.value)).reduceByKey(lambda a, b: a + b).sortBy(lambda t4tuple:t4tuple[1],ascending=False)
crimes_inborou.persist()
print(crimes_inborou.take(10))
#crimes_inborou.unpersist()

# End the timer 
end_time = time.time() # Calculate elapsed time 
elapsed_time = end_time - start_time 
print(f"Elapsed time: {end_time - start_time} seconds")


[('Westminster', 455028), ('Lambeth', 292178), ('Southwark', 278809), ('Camden', 275147), ('Newham', 262024), ('Croydon', 260294), ('Ealing', 251562), ('Islington', 230286), ('Tower Hamlets', 228613), ('Brent', 227551)]
Elapsed time: 207.5611755847931 seconds


We see that some of the boroughs with bad reputation like Westminster and Camden are in the top 10 indeed while other areas like Hackney which had a bad reputation for years , is not in the top 10 boroughs in terms of crimes committed.

<h4>Top crimes in all boroughs throughout the years (interested in the top5)</h4>

In [42]:
# Start the timer 
start_time = time.time()

top_crimes_major = crime_rdd.map(lambda doc: (doc.major_category,doc.value)).reduceByKey(lambda a, b: a + b).sortBy(lambda t:t[1],ascending=False)
top_crimes_major.persist()
print(top_crimes_major.take(5))

# End the timer 
end_time = time.time() # Calculate elapsed time 
#elapsed_time = end_time - start_time 
print(f"Elapsed time: {end_time - start_time} seconds")


[('Theft and Handling', 2661861), ('Violence Against the Person', 1558081), ('Burglary', 754293), ('Criminal Damage', 630938), ('Drugs', 470765)]
Elapsed time: 236.7077271938324 seconds


We can see that 'Theft and Handling' and is the top crime in London. Since London is a tourist hotspot, it makes sense theft (pickpocketing) to be amongst the top crimes. With Westminster and Camden being amongst the most visited areas by tourists and amonst the top boroughs with the most crimes, we can maybe check if 'Theft and Handling' is amongst the top crimes in those specific boroughs.

<h4>Total crimes in each year (interested in the top3)</h4>

In [38]:
# Start the timer 
start_time = time.time()

crimes_peryear = crime_rdd.map(lambda doc: (doc.year,doc.value)).reduceByKey(lambda a, b: a + b).sortBy(lambda t:t[1],ascending=False)
crimes_peryear.persist()
print(crimes_peryear.take(3))

# End the timer 
end_time = time.time() # Calculate elapsed time 
#elapsed_time = end_time - start_time 
print(f"Elapsed time: {end_time - start_time} seconds")


[(2008, 738641), (2012, 737329), (2016, 736121)]
Elapsed time: 239.12691974639893 seconds


As we can see from the top 3 years in crimes, there is no clear pattern as to if the crimes increase or decrease in time since the top 3 years have each 4 years difference between them.

###

<h3> Spark Dataframe </h3>

We are going to use Spark's Dataframes to run the same 'queries' and retrieve the data we want.

In [47]:
crime_df.printSchema() #we take a look at the Dataframe scheme which was enforced earlier

root
 |-- lsoa_code: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- major_category: string (nullable = true)
 |-- minor_category: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



<h4>Total crimes in each borough</h4>

In [56]:
crime_df.select(countDistinct('borough')).show()

+-----------------------+
|count(DISTINCT borough)|
+-----------------------+
|                     33|
+-----------------------+



In [51]:
# Start the timer 
start_time = time.time()

crimes_per_borough=crime_df.select(['borough','value'])\
                .groupBy(['borough'])\
                .agg(sum("value").alias("total_crimes_per_borough")) \
                .sort(desc('total_crimes_per_borough')) \
                .limit(10)
print(crimes_per_borough.show())

# End the timer 
end_time = time.time() # Calculate elapsed time 
#elapsed_time = end_time - start_time 
print(f"Elapsed time: {end_time - start_time} seconds")

+-------------+------------------------+
|      borough|total_crimes_per_borough|
+-------------+------------------------+
|  Westminster|                  455028|
|      Lambeth|                  292178|
|    Southwark|                  278809|
|       Camden|                  275147|
|       Newham|                  262024|
|      Croydon|                  260294|
|       Ealing|                  251562|
|    Islington|                  230286|
|Tower Hamlets|                  228613|
|        Brent|                  227551|
+-------------+------------------------+

None
Elapsed time: 20.160347938537598 seconds


We see that the results we get from the Spark dataframe are exactly the same we got from using Sparks RDD map and reduce functions

<h4>Top crimes in all boroughs throughout the years</h4>

In [58]:
#crime_df.select(countDistinct('major_category')).show()

In [60]:
# Get distinct elements of the 'major_category' column 
#distinct_major = crime_df.select("major_category").distinct()
#distinct_major.show()

In [62]:
# Start the timer 
start_time = time.time()

crimes_per_major=crime_df.select(['major_category','value'])\
                .groupBy(['major_category'])\
                .agg(sum("value").alias("total_crimes_per_category")) \
                .sort(desc('total_crimes_per_category')) \
                .limit(5)
print(crimes_per_major.show())

# End the timer 
end_time = time.time() # Calculate elapsed time 
#elapsed_time = end_time - start_time 
print(f"Elapsed time: {end_time - start_time} seconds")

+--------------------+-------------------------+
|      major_category|total_crimes_per_category|
+--------------------+-------------------------+
|  Theft and Handling|                  2661861|
|Violence Against ...|                  1558081|
|            Burglary|                   754293|
|     Criminal Damage|                   630938|
|               Drugs|                   470765|
+--------------------+-------------------------+

None
Elapsed time: 23.886985540390015 seconds


<h4>Total crimes in each year</h4>

In [66]:
# Start the timer 
start_time = time.time()

crimes_per_year=crime_df.select(['year','value'])\
                .groupBy(['year'])\
                .agg(sum("value").alias("total_crimes_per_year")) \
                .sort(desc('total_crimes_per_year')) \
                .limit(3)
print(crimes_per_year.show())

# End the timer 
end_time = time.time() # Calculate elapsed time 
#elapsed_time = end_time - start_time 
print(f"Elapsed time: {end_time - start_time} seconds")

+----+---------------------+
|year|total_crimes_per_year|
+----+---------------------+
|2008|               738641|
|2012|               737329|
|2016|               736121|
+----+---------------------+

None
Elapsed time: 17.48635506629944 seconds


#### <u>**Time comparison:**</u>
We can see that Dataframes always take less time, perform better when executing the queries and give us the data we are asking. Dataframes are built on top of RDDs but they leverage optimizations such as Catalyst, Tungsten, and efficient memory management, which make them faster for many workloads compared to RDDs. Also, Dataframes handle certain optimizations by their self while in RDDs it is up to the developer to implement some of the optimizations. 

#### <u>**Conclusion:**</u>
Besides the much better performance by the Dataframes it also feels more natural and easier to handle the Dataframes in comparison to the RDDs.

#

### Hypothesis Testing Examples
For the last part of this notebook we will perform a couple of statistical tests as examples of the interesting information we can obtain from this dataset.
The first one is a Chi-Square test of independence, which will test whether the major crime categories are dependent on the specific borough, using the differences in observed vs expected frequencies of the categorical variables.

The underlying assumptions for the Chi-Square test of independence are actually met here, but, as we also mention on the next test we perform, the actual validity of the test is not the main focus here.

Both tests where initially ran locally on this notebook, using a diminished version of the dataset, just to make sure that the code works and returnes valid results.

However, since the main approach involves using these tests as separate .py executables, and running them on Dataproc job sessions, the final results from the whole dataset are stored on Dataproc as the respective outputs of the two jobs submitted.

The preliminary results of the initial tests are not shown here, as they are completely irrelevant.

### Chi-Square Test of Independence (Major Crime Categories vs Boroughs)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from google.api_core.retry import Retry

#import findspark
#findspark.init()
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics
from pyspark.mllib.linalg import Vectors


In [None]:
# Initialize Spark Session for the Chi-Square Test of Independence
spark = SparkSession.builder \
    .appName("Chi-Square Test") \
    .master("local[*]") \
    .getOrCreate()

# Load Dataset (The complete json file for this test)
file_path = "london_crime_json.json"  # Dataset path
crime_df = spark.read.json(file_path)

In [None]:
# Create a Contingency Table
contingency_df = crime_df.groupBy("borough", "major_category").count() \
                         .groupBy("borough") \
                         .pivot("major_category") \
                         .sum("count") \
                         .fillna(0)

# Convert all feature columns to DoubleType
contingency_df = contingency_df.select(
    [col(c).cast(DoubleType()).alias(c) if c != "borough" else col(c) for c in contingency_df.columns]
)

# Create a dictionary mapping boroughs to numeric values 
boroughs = contingency_df.select("borough").distinct().rdd.flatMap(lambda x: x).collect() 
borough_mapping = {borough: idx for idx, borough in enumerate(boroughs)}

# Convert Rows to LabeledPoint
def row_to_labeled_point(row,borough_mapping):
    label = borough_mapping[row["borough"]] 

    features = [row[col] for col in row if col != "borough"]  # All other columns are features
    
    return (label,features)



In [None]:
# Checking out the mapping for the boroughs
borough_mapping

In [None]:
# Checking out the contigency table
contingency_df.limit(20).show()

In [None]:
# Convert the contingency DataFrame to an RDD of LabeledPoint
labeled_rdd = contingency_df.rdd.map(lambda row: row_to_labeled_point(row.asDict(),borough_mapping))

In [None]:
# Convert the data to RDD of LabeledPoint 
labeled_rdd2 = labeled_rdd.map(lambda x: LabeledPoint(x[0], Vectors.dense(x[1])))

# Perform the Chi-Squared Test
chi_sq_result = Statistics.chiSqTest(labeled_rdd2)

# Print the results
for i, result in enumerate(chi_sq_result):
    print(f"Feature {i + 1}:")
    print(f"Chi-squared statistic: {result.statistic}")
    print(f"p-value: {result.pValue}")
    print(f"Degrees of freedom: {result.degreesOfFreedom}")
    print(f"Method: {result.method}\n")

# Stop SparkContext
#sc.stop()


###

### T-test between the top 2 Boroughs (for the total crime means)
This test will compare the total crime means of the top 2 boroughs, so we can see whether their difference is statistically significant or not.
Basically it is a Welch's T-test assuming heteroscedasticity (equal_var = False), because we don't actually know the true variances of the samples/population.

Assumptions for the T-test (such as Normality) were also not tested here, although there are ways one should do this, prior to testing (Levene's test for homoscedasticity, Anderson-Darling test for Normality, etc). 
The point is that the focus here is not exactly the validity of these tests itself, but the general big data methodology.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum
from scipy.stats import ttest_ind

# Initialize Spark Session for the T-Test
spark = SparkSession.builder \
    .appName("TTestCrimeMeans") \
    .master("local[*]") \
    .getOrCreate()

# Load Dataset (The complete json file for this test)
file_path = "london_crime_json.json"  # Dataset path
crime_df = spark.read.json(file_path)


In [None]:
# Calculate Total Crimes per Borough
# We need to do this in order to identify the top 2 boroughs in total crime incidences
total_crimes = crime_df.groupBy("borough").agg(_sum("value").alias("total_crimes"))

# Identify the Top 2 Boroughs
top_boroughs = total_crimes.orderBy(col("total_crimes").desc()).limit(2).collect()
top_borough_1 = top_boroughs[0]["borough"]
top_borough_2 = top_boroughs[1]["borough"]


In [None]:
# Filter Crime Values for the Top Boroughs
borough_1_crimes = crime_df.filter(col("borough") == top_borough_1).select("value").rdd.flatMap(lambda x: x).collect()
borough_2_crimes = crime_df.filter(col("borough") == top_borough_2).select("value").rdd.flatMap(lambda x: x).collect()

# Perform the T-Test
t_stat, p_value = ttest_ind(borough_1_crimes, borough_2_crimes, equal_var=False)

# Print Test Results
print("T-Test Results:")
print(f"T-Statistic: {t_stat}")
print(f"P-Value: {p_value}")
print(f"Top Borough 1: {top_borough_1}")
print(f"Top Borough 2: {top_borough_2}")

# Interpret the Results
if p_value < 0.05:
    print("The difference in mean total crimes between the two boroughs is statistically significant.")
else:
    print("The difference in mean total crimes between the two boroughs is not statistically significant.")
