In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg
spark = SparkSession.builder.appName('722').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/18 13:39:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Import dataset by using pandas
import pandas as pd

crime_data_pd=pd.read_excel("Crime.xlsx")
education_data_pd=pd.read_excel("Education.xlsx")

#Transfer pandas dataframe to spark dataframe
crime_data= spark.createDataFrame(crime_data_pd)
education_data= spark.createDataFrame(education_data_pd)

In [3]:
#2 Data Understanding
#2.2 Data Decription
crime_data.show()
education_data.show()


                                                                                

+--------------+----+-----------------------------------+-------------------------------------+-----------------------------------+-----------------------------------------------------+------------------------------------+
|        Region|year|Total amount of convicted juveniles|Total amount of family violence cases|Total amount of people with charges|Total amount of harmful digital communication offense|Total amount of drugs offences cases|
+--------------+----+-----------------------------------+-------------------------------------+-----------------------------------+-----------------------------------------------------+------------------------------------+
|     Northland|2014|                                123|                                  692|                               4335|                                                  NaN|                                 339|
|     Northland|2015|                                138|                                  817|             

In [4]:
#2.3 Data exploration/Display value for each column

def group_and_sum(dataframe, columns):
    for column in columns:
        dataframe.groupBy("Region").sum(column).show()


crime_columns = [
    "Total amount of convicted juveniles",
    "Total amount of family violence cases",
    "Total amount of drugs offences cases",
    "Total amount of people with charges",
    "Total amount of harmful digital communication offense"
]

education_columns = [
    "Total amount of schools",
    "Total amount of Students",
    "Total amount of student attending regularly",
    "Participation in ECE(early childhood education)",
    "Mean household income"
]


group_and_sum(crime_data, crime_columns)
group_and_sum(education_data, education_columns)


+--------------------+----------------------------------------+
|              Region|sum(Total amount of convicted juveniles)|
+--------------------+----------------------------------------+
|          Wellington|                                     186|
|            Auckland|                                     765|
|             Waikato|                                    1221|
|      South Auckland|                                    1833|
|               Otago|                                     495|
|       Bay of Plenty|                                     846|
|          Canterbury|                                    1371|
|           Northland|                                     867|
|           Southland|                                     456|
|Nelson/Marlboroug...|                                     528|
|           Waitematā|                                    1113|
|  Taranaki/Whanganui|                                     738|
|            Waiariki|                  

+--------------------+------------------------------------------------+
|              Region|sum(Total amount of student attending regularly)|
+--------------------+------------------------------------------------+
|          Wellington|                              413768.33999999997|
|            Auckland|                              1286417.8450000002|
|             Waikato|                                      351773.806|
|      South Auckland|                               59098.83899999999|
|               Otago|                              165335.59900000002|
|       Bay of Plenty|                              255676.36899999998|
|          Canterbury|                              464899.91500000004|
|           Northland|                              114122.55500000001|
|           Southland|                                       85321.336|
|Nelson/Marlboroug...|                                       36133.513|
|           Waitematā|                                          

In [5]:
#2.4 Data Quality
#2.4.1 Calculate Missing Value

from pyspark.sql.functions import col, sum as spark_sum, isnan

def calculate_total_missing_values(dataframe):

    missing_counts = dataframe.select([
        (spark_sum(col(column).isNull().cast("int")) + spark_sum(isnan(col(column)).cast("int"))).alias(column)
        for column in dataframe.columns
    ])
    
    total_missing_values = missing_counts.select(
        [spark_sum(col(column)) for column in missing_counts.columns]
    ).first()

    return sum(total_missing_values)


total_missing_crime_data = calculate_total_missing_values(crime_data)
total_missing_education_data = calculate_total_missing_values(education_data)


print(f"Total missing values in crime_data: {total_missing_crime_data}")
print(f"Total missing values in education_data: {total_missing_education_data}")


Total missing values in crime_data: 16
Total missing values in education_data: 157


In [6]:
#3 Data Preparation
#3.1 Data selection
#checking basic info for dataset and each column, crime dataset first
crime_data.show()


+--------------+----+-----------------------------------+-------------------------------------+-----------------------------------+-----------------------------------------------------+------------------------------------+
|        Region|year|Total amount of convicted juveniles|Total amount of family violence cases|Total amount of people with charges|Total amount of harmful digital communication offense|Total amount of drugs offences cases|
+--------------+----+-----------------------------------+-------------------------------------+-----------------------------------+-----------------------------------------------------+------------------------------------+
|     Northland|2014|                                123|                                  692|                               4335|                                                  NaN|                                 339|
|     Northland|2015|                                138|                                  817|             

In [7]:
#checking basic info for dataset and each column, education dataset first
education_data.show()

+--------------+----+-----------------------+------------------------+-------------------------------------------+---------------------+-----------------------------------------------+
|        Region|year|Total amount of schools|Total amount of Students|Total amount of student attending regularly|Mean household income|Participation in ECE(early childhood education)|
+--------------+----+-----------------------+------------------------+-------------------------------------------+---------------------+-----------------------------------------------+
|     Northland|2014|                  151.0|                 22169.0|                                  12946.696|              66248.0|                                         7028.0|
|     Northland|2015|                  152.0|                 20037.0|                         12162.458999999999|              68880.0|                                         6939.0|
|     Northland|2016|                  152.0|                 22336.0|     

In [8]:
#drop the unnecessary column
crime_data=crime_data.drop("Total amount of harmful digital communication offense")
crime_data.show()

+--------------+----+-----------------------------------+-------------------------------------+-----------------------------------+------------------------------------+
|        Region|year|Total amount of convicted juveniles|Total amount of family violence cases|Total amount of people with charges|Total amount of drugs offences cases|
+--------------+----+-----------------------------------+-------------------------------------+-----------------------------------+------------------------------------+
|     Northland|2014|                                123|                                  692|                               4335|                                 339|
|     Northland|2015|                                138|                                  817|                               4528|                                 341|
|     Northland|2016|                                117|                                  772|                               4435|                        

In [9]:
#3.2 Data cleaning
#3.2.1 Missing value

def calculate_mean_values(dataframe, exclude_columns):
    dataframe = dataframe.fillna(0)
    mean_values = dataframe.agg(
        *[avg(col(column)).alias(column) for column in dataframe.columns if column not in exclude_columns]
    ).first().asDict()
    return mean_values

def replace_missing_values_with_mean(dataframe, exclude_columns):
    mean_values = calculate_mean_values(dataframe, exclude_columns)
    dataframe = dataframe.fillna(mean_values)
    return dataframe


exclude_columns=["Region","Year"]

crime_data=replace_missing_values_with_mean(crime_data, exclude_columns)
education_data=replace_missing_values_with_mean(education_data, exclude_columns)


total_missing_crime_data_update=calculate_total_missing_values(crime_data)
total_missing_education_data_update=calculate_total_missing_values(education_data)

print(f"Total missing values in crime_data after dealing: {total_missing_crime_data_update}")
print(f"Total missing values in education_data after dealing: {total_missing_education_data_update}")

Total missing values in crime_data after dealing: 0
Total missing values in education_data after dealing: 0


In [10]:
#3.2.1 Outliers and Extreme values，use quantile(0.1/0.9) to define outliners and extreme values
from pyspark.sql.functions import col, mean, when

spark = SparkSession.builder.appName("OutlierReplacement").getOrCreate()

crime_data_columns = crime_data.columns
print(crime_data_columns)

crime_data_needed_columns = [
    'Total amount of convicted juveniles', 
    'Total amount of family violence cases', 
    'Total amount of people with charges', 
    'Total amount of drugs offences cases'
]

education_data_columns = education_data.columns
print(education_data_columns)

education_data_needed_columns = [
    'Total amount of schools', 
    'Total amount of Students', 
    'Total amount of student attending regularly', 
    'Mean household income', 
    'Participation in ECE(early childhood education)'
]

def replace_outliers(dataframe, columns, lower_quantile=0.1, upper_quantile=0.9):
    for column in columns:
        Q1 = dataframe.approxQuantile(column, [lower_quantile], 0.05)[0]
        Q3 = dataframe.approxQuantile(column, [upper_quantile], 0.05)[0]
        IQR = Q3 - Q1
        
        lower_bound = Q1 - 3 * IQR
        upper_bound = Q3 + 3 * IQR
        
        mean_value = dataframe.select(mean(col(column))).collect()[0][0]
        
        dataframe = dataframe.withColumn(
            column,
            when(col(column) < lower_bound, mean_value).when(col(column) > upper_bound, mean_value).otherwise(col(column))
        )
        
    return dataframe


crime_data = replace_outliers(crime_data, crime_data_needed_columns)
education_data = replace_outliers(education_data, education_data_needed_columns)


crime_data.show()
education_data.show()


['Region', 'year', 'Total amount of convicted juveniles', 'Total amount of family violence cases', 'Total amount of people with charges', 'Total amount of drugs offences cases']
['Region', 'year', 'Total amount of schools', 'Total amount of Students', 'Total amount of student attending regularly', 'Mean household income', 'Participation in ECE(early childhood education)']
+--------------+----+-----------------------------------+-------------------------------------+-----------------------------------+------------------------------------+
|        Region|year|Total amount of convicted juveniles|Total amount of family violence cases|Total amount of people with charges|Total amount of drugs offences cases|
+--------------+----+-----------------------------------+-------------------------------------+-----------------------------------+------------------------------------+
|     Northland|2014|                              123.0|                                692.0|                       

In [11]:
#Check if outliers still exist

def check_outliers(dataframe, columns, lower_quantile=0.1, upper_quantile=0.9):
    outlier_counts = {}
    for column in columns:
        Q1 = dataframe.approxQuantile(column, [lower_quantile], 0.05)[0]
        Q3 = dataframe.approxQuantile(column, [upper_quantile], 0.05)[0]
        IQR = Q3 - Q1
        
        lower_bound = Q1 - 3 * IQR
        upper_bound = Q3 + 3 * IQR
        
        outliers = dataframe.filter((col(column) < lower_bound) | (col(column) > upper_bound))
        count_of_outliers = outliers.count()
        
        outlier_counts[column] = count_of_outliers
    
    return outlier_counts

print(check_outliers(crime_data,crime_data_needed_columns))
print(check_outliers(education_data,education_data_needed_columns))


{'Total amount of convicted juveniles': 0, 'Total amount of family violence cases': 0, 'Total amount of people with charges': 0, 'Total amount of drugs offences cases': 0}
{'Total amount of schools': 0, 'Total amount of Students': 0, 'Total amount of student attending regularly': 0, 'Mean household income': 0, 'Participation in ECE(early childhood education)': 0}


In [12]:
#3.3 Constructing new features
from pyspark.sql.functions import round

education_data = education_data.withColumn(
    "Student Attendance",
    (col("Total amount of student attending regularly") / col("Total amount of Students") * 100).cast("double")
)

education_data = education_data.withColumn("Student Attendance", round(col("Student Attendance"), 1))

education_data = education_data.drop("Total amount of student attending regularly", "Total amount of Students")

education_data.show(20)

+--------------+----+-----------------------+---------------------+-----------------------------------------------+------------------+
|        Region|year|Total amount of schools|Mean household income|Participation in ECE(early childhood education)|Student Attendance|
+--------------+----+-----------------------+---------------------+-----------------------------------------------+------------------+
|     Northland|2014|                  151.0|              66248.0|                                         7028.0|              58.4|
|     Northland|2015|                  152.0|              68880.0|                                         6939.0|              60.7|
|     Northland|2016|                  152.0|              73469.0|                                         7200.0|              55.9|
|     Northland|2017|                  151.0|              75326.0|                                         7422.0|              50.9|
|     Northland|2018|                  151.0|          

In [13]:
#3.4 Data Integrating, using join to combine two datasets (sql)

Integrated_data = crime_data.join(education_data, on=["Region", "Year"], how="left")
Integrated_data.show()

+--------------+----+-----------------------------------+-------------------------------------+-----------------------------------+------------------------------------+-----------------------+---------------------+-----------------------------------------------+------------------+
|        Region|year|Total amount of convicted juveniles|Total amount of family violence cases|Total amount of people with charges|Total amount of drugs offences cases|Total amount of schools|Mean household income|Participation in ECE(early childhood education)|Student Attendance|
+--------------+----+-----------------------------------+-------------------------------------+-----------------------------------+------------------------------------+-----------------------+---------------------+-----------------------------------------------+------------------+
|South Auckland|2015|                              297.0|                               1484.0|                            10484.0|                       

In [16]:
#4.1.1 Feature Selection, choose four of most significant fields for target variables
from pyspark.ml.feature import VectorAssembler, ChiSqSelector
from pyspark.ml.linalg import Vectors


Copy_data=Integrated_data


columns_list = Copy_data.columns
useless_columns = ['Region', 'year', 'Total amount of convicted juveniles']
needed_columns = []

for column in columns_list:
    if column not in useless_columns:
        needed_columns.append(column)

print(needed_columns)


features_combination = VectorAssembler(
    inputCols=needed_columns,
    outputCol='useful_features_list'
)

data_with_features = features_combination.transform(Copy_data)

selector = ChiSqSelector(
    featuresCol='useful_features_list',
    outputCol='selected_features',
    labelCol='Total amount of convicted juveniles',
    numTopFeatures=4,
)


model = selector.fit(data_with_features)


selected_indices = model.selectedFeatures
print(f"Selected feature indices: {selected_indices}")


selected_feature_names = [needed_columns[i] for i in selected_indices]
print(f"Selected feature names: {selected_feature_names}")



['Total amount of family violence cases', 'Total amount of people with charges', 'Total amount of drugs offences cases', 'Total amount of schools', 'Mean household income', 'Participation in ECE(early childhood education)', 'Student Attendance']
Selected feature indices: [2, 3, 5, 6]
Selected feature names: ['Total amount of drugs offences cases', 'Total amount of schools', 'Participation in ECE(early childhood education)', 'Student Attendance']


In [18]:
import pandas as pd
from pyspark.sql import SparkSession
from sklearn.feature_selection import SelectKBest, f_regression


pandas_df = Copy_data.toPandas()
X = pandas_df.drop(['Total amount of convicted juveniles', 'Region', 'year'], axis=1)
y = pandas_df['Total amount of convicted juveniles']


selector = SelectKBest(score_func=f_regression, k=4)
X_new = selector.fit_transform(X, y)


selected_features = X.columns[selector.get_support()]
print("Selected features:", selected_features)


selected_pandas_df = pd.DataFrame(X_new, columns=selected_features)
selected_pandas_df['Total amount of convicted juveniles'] = y.values


selected_spark_df = spark.createDataFrame(selected_pandas_df)


selected_spark_df.show()


Selected features: Index(['Total amount of family violence cases',
       'Total amount of people with charges',
       'Total amount of drugs offences cases', 'Mean household income'],
      dtype='object')
+-------------------------------------+-----------------------------------+------------------------------------+---------------------+-----------------------------------+
|Total amount of family violence cases|Total amount of people with charges|Total amount of drugs offences cases|Mean household income|Total amount of convicted juveniles|
+-------------------------------------+-----------------------------------+------------------------------------+---------------------+-----------------------------------+
|                               1104.0|                             5758.0|                               456.0|             103179.0|                              162.0|
|                               1142.0|                             7797.0|                               48

In [19]:
#6/7 Linear Regression, find out coefficient for each field

from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

Copy_data=Integrated_data

columns_list = Copy_data.columns
useless_columns = ['Region', 'year', 'Total amount of convicted juveniles']
needed_columns = []

for column in columns_list:
    if column not in useless_columns:
        needed_columns.append(column)

print(needed_columns)


features_combination = VectorAssembler(
    inputCols=needed_columns,
    outputCol='features'
)


data_with_features = features_combination.transform(Copy_data)


correlation_matrix = Correlation.corr(data_with_features, 'features').head()[0]
correlation_matrix = correlation_matrix.toArray()


target_col = 'Total amount of convicted juveniles'
target_values = Copy_data.select(target_col).rdd.flatMap(lambda x: x).collect()
feature_correlations = {}

for idx, feature in enumerate(needed_columns):
    feature_values = Copy_data.select(feature).rdd.flatMap(lambda x: x).collect()
    feature_vector = spark.createDataFrame(zip(feature_values, target_values), schema=["feature", "target"])
    corr = feature_vector.stat.corr("feature", "target")
    feature_correlations[feature] = corr


for feature, corr in feature_correlations.items():
    print(f"Feature: {feature}, Correlation with target: {corr}")


['Total amount of family violence cases', 'Total amount of people with charges', 'Total amount of drugs offences cases', 'Total amount of schools', 'Mean household income', 'Participation in ECE(early childhood education)', 'Student Attendance']




Feature: Total amount of family violence cases, Correlation with target: 0.8017268920358637
Feature: Total amount of people with charges, Correlation with target: 0.8390244661615818
Feature: Total amount of drugs offences cases, Correlation with target: 0.6442361268576722
Feature: Total amount of schools, Correlation with target: -0.04314677324316987
Feature: Mean household income, Correlation with target: -0.403738440574519
Feature: Participation in ECE(early childhood education), Correlation with target: 0.09295223847649478
Feature: Student Attendance, Correlation with target: -0.18060006885635693


Selected feature names: ['Total amount of family violence cases', 'Total amount of people with charges', 'Total amount of drugs offences cases', 'Total amount of schools']
