In [None]:
#setting up pyspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
!ls
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [None]:
import pandas as pd
discrete_df_pandas = pd.read_csv("housing_discrete.csv", delimiter='\t')
discrete_outcomes_pandas = pd.read_csv("outcome_bins.csv").rename(columns={"0": 'medv_bin'})

df_pandas = pd.concat([discrete_df_pandas, discrete_outcomes_pandas], axis=1)
df_pandas

spark_df = spark.createDataFrame(df_pandas)
spark_df.show()

filtered_df = spark_df.filter((col("indus") == 0) & (col("dis") == 2))

filtered_df.show()

+----+---+-----+----+---+---+---+---+---+---+-------+-----+-----+--------+
|crim| zn|indus|chas|nox| rm|age|dis|rad|tax|ptratio|black|lstat|medv_bin|
+----+---+-----+----+---+---+---+---+---+---+-------+-----+-----+--------+
|   0|  1|    0|   0|  2|  2|  1|  2|  0|  1|      0|    3|    0|       5|
|   0|  0|    1|   0|  1|  2|  2|  2|  0|  0|      1|    3|    1|       4|
|   0|  0|    1|   0|  1|  3|  1|  2|  0|  0|      1|    2|    0|       7|
|   0|  0|    0|   0|  1|  3|  1|  3|  0|  0|      1|    2|    0|       7|
|   0|  0|    0|   0|  1|  3|  1|  3|  0|  0|      1|    3|    0|       8|
|   0|  0|    0|   0|  1|  2|  1|  3|  0|  0|      1|    2|    0|       6|
|   0|  1|    1|   0|  1|  1|  1|  3|  2|  1|      0|    2|    2|       5|
|   0|  1|    1|   0|  1|  1|  3|  3|  2|  1|      0|    3|    3|       6|
|   0|  1|    1|   0|  1|  0|  3|  3|  2|  1|      0|    1|    3|       3|
|   0|  1|    1|   0|  1|  1|  2|  3|  2|  1|      0|    1|    2|       4|
|   0|  1|    1|   0|  1|

In [None]:
# importing dataset (discrete dataset for variables as well as outcomes - preprocessing done in jupyter)
from pyspark.sql.functions import monotonically_increasing_id 

discrete_df = spark.read.format("csv").option("header", True).option("delimiter", "\t").load("housing_discrete.csv")
discrete_df = discrete_df.withColumn("id", monotonically_increasing_id())
discrete_df.show()
filtered_df = discrete_df.filter((col("indus") == 0) & (col("dis") == 2))
filtered_df.show()


# print(discrete_df.count()) # avoid count in final code as it is an "action" in spark
outcomes_df = spark.read.format("csv").option("header", True).option("delimiter", "\t").load("outcome_bins.csv")
outcomes_df = outcomes_df.toDF("medv_bin")
outcomes_df = outcomes_df.withColumn("id", monotonically_increasing_id())
# print(outcomes_df.count()) # avoid count in final code as it is an "action" in spark

df = discrete_df.join(outcomes_df, "id")
df.drop("id")

df.show()

+----+---+-----+----+---+---+---+---+---+---+-------+-----+-----+---+
|crim| zn|indus|chas|nox| rm|age|dis|rad|tax|ptratio|black|lstat| id|
+----+---+-----+----+---+---+---+---+---+---+-------+-----+-----+---+
|   0|  1|    0|   0|  2|  2|  1|  2|  0|  1|      0|    3|    0|  0|
|   0|  0|    1|   0|  1|  2|  2|  2|  0|  0|      1|    3|    1|  1|
|   0|  0|    1|   0|  1|  3|  1|  2|  0|  0|      1|    2|    0|  2|
|   0|  0|    0|   0|  1|  3|  1|  3|  0|  0|      1|    2|    0|  3|
|   0|  0|    0|   0|  1|  3|  1|  3|  0|  0|      1|    3|    0|  4|
|   0|  0|    0|   0|  1|  2|  1|  3|  0|  0|      1|    2|    0|  5|
|   0|  1|    1|   0|  1|  1|  1|  3|  2|  1|      0|    2|    2|  6|
|   0|  1|    1|   0|  1|  1|  3|  3|  2|  1|      0|    3|    3|  7|
|   0|  1|    1|   0|  1|  0|  3|  3|  2|  1|      0|    1|    3|  8|
|   0|  1|    1|   0|  1|  1|  2|  3|  2|  1|      0|    1|    2|  9|
|   0|  1|    1|   0|  1|  2|  3|  3|  2|  1|      0|    2|    3| 10|
|   0|  1|    1|   0

In [None]:
# calculate entropy, mean, stddev for outcomes (all rows) which is common irrespective of what slice we pick

from pyspark.sql.functions import col, count, sum, col, log2, mean, stddev

outcomes = df.select("medv_bin")
outcomes.show()
grouped_data = outcomes_df.groupBy("medv_bin").count()
grouped_data = grouped_data.withColumn("count", col("count").cast("int"))
grouped_data.show()
total_count = grouped_data.agg(sum("count")).collect()[0][0]

probabilities = grouped_data.withColumn("probability", (-1 * (col("count") / total_count) * log2(col("count") / total_count)))
# probabilities.show()

outcome_entropy = probabilities.agg(sum("probability")).collect()[0][0]
outcome_mean = outcomes.select(mean("medv_bin")).collect()[0][0]
outcome_stddev = df.select(stddev("medv_bin")).collect()[0][0]

print ("outcome_entropy: ", outcome_entropy)
print ("outcome_mean: ", outcome_mean)
print ("outcome_stddev: ", outcome_stddev)

+--------+
|medv_bin|
+--------+
|       5|
|       4|
|       7|
|       7|
|       8|
|       6|
|       5|
|       6|
|       3|
|       4|
|       3|
|       4|
|       4|
|       4|
|       4|
|       4|
|       5|
|       3|
|       4|
|       4|
+--------+
only showing top 20 rows

+--------+-----+
|medv_bin|count|
+--------+-----+
|       7|   30|
|      11|    1|
|       3|   81|
|       8|   13|
|       5|   96|
|       6|   39|
|       9|    8|
|       1|   21|
|      10|    6|
|       4|  147|
|       2|   48|
+--------+-----+

outcome_entropy:  2.8033607009396215
outcome_mean:  4.3244897959183675
outcome_stddev:  1.811915865931041


In [None]:
# now build a dataframe for slice from original df
from pyspark.sql.functions import when

# example slice for indus=0 AND dis=2



# example_slice_df = df.withColumn("slice", 
#                    when((col("indus") == 0) & (col("dis") == 2), True)
#                    .otherwise(False))
# example_slice_df.show()
example_slice_df = df.filter((col("indus") == 0) & (col("dis") == 2))
example_slice_df.show()

print(example_slice_df.count(), "examples in slice")
example_slice_df.show()

+---+----+---+-----+----+---+---+---+---+---+---+-------+-----+-----+--------+
| id|crim| zn|indus|chas|nox| rm|age|dis|rad|tax|ptratio|black|lstat|medv_bin|
+---+----+---+-----+----+---+---+---+---+---+---+-------+-----+-----+--------+
|  0|   0|  1|    0|   0|  2|  2|  1|  2|  0|  1|      0|    3|    0|       5|
|  1|   0|  0|    0|   0|  1|  2|  1|  2|  0|  0|      1|    3|    1|       4|
|  2|   0|  0|    0|   0|  1|  3|  1|  2|  0|  0|      1|    2|    0|       7|
|  3|   0|  0|    0|   0|  1|  1|  1|  2|  0|  0|      1|    2|    2|       7|
|  4|   0|  0|    0|   0|  1|  1|  1|  2|  0|  0|      1|    2|    1|       8|
|  5|   0|  0|    0|   0|  1|  3|  2|  2|  0|  0|      1|    3|    0|       6|
|  6|   0|  0|    0|   0|  1|  3|  1|  2|  0|  0|      1|    2|    0|       5|
|  7|   0|  0|    0|   0|  0|  3|  1|  2|  0|  0|      1|    0|    0|       6|
|  8|   0|  0|    0|   0|  0|  1|  1|  2|  0|  0|      1|    2|    1|       3|
|  9|   0|  0|    0|   0|  0|  3|  1|  2|  0|  0|   

In [None]:
slice_outcomes = example_slice_df.select("medv_bin")
print(slice_outcomes.collect())
slice_grouped_data = slice_outcomes.groupBy("medv_bin").count()
slice_grouped_data = slice_grouped_data.withColumn("count", col("count").cast("int"))
slice_grouped_data.show()
slice_count = slice_grouped_data.agg(sum("count")).collect()[0][0]

slice_probabilities = slice_grouped_data.withColumn("probability", (-1 * (col("count") / slice_count) * log2(col("count") / slice_count)))
# probabilities.show()

slice_outcome_entropy = slice_probabilities.agg(sum("probability")).collect()[0][0]
slice_outcome_mean = slice_outcomes.select(mean("medv_bin")).collect()[0][0]

print ("slice_outcome_entropy: ", slice_outcome_entropy)
print ("slice_outcome_mean: ", slice_outcome_mean)

[Row(medv_bin='5'), Row(medv_bin='4'), Row(medv_bin='7'), Row(medv_bin='7'), Row(medv_bin='8'), Row(medv_bin='6'), Row(medv_bin='5'), Row(medv_bin='6'), Row(medv_bin='3'), Row(medv_bin='4'), Row(medv_bin='3'), Row(medv_bin='4'), Row(medv_bin='4'), Row(medv_bin='4'), Row(medv_bin='4'), Row(medv_bin='4'), Row(medv_bin='5'), Row(medv_bin='3'), Row(medv_bin='4'), Row(medv_bin='4'), Row(medv_bin='2'), Row(medv_bin='4'), Row(medv_bin='3'), Row(medv_bin='3'), Row(medv_bin='3'), Row(medv_bin='3'), Row(medv_bin='3')]
+--------+-----+
|medv_bin|count|
+--------+-----+
|       7|    2|
|       3|    8|
|       8|    1|
|       5|    3|
|       6|    2|
|       4|   10|
|       2|    1|
+--------+-----+

slice_outcome_entropy:  2.3113997817546874
slice_outcome_mean:  4.2592592592592595


In [None]:
entropy_score = outcome_entropy/slice_outcome_entropy
mean_difference_l1 = (slice_outcome_mean - outcome_mean) / outcome_stddev
# group_size = log2(1 + slice_count / count)

print("Entropy score: ", entropy_score)
print("Difference score: ", mean_difference_l1)
# print("Group size score:", group_size(df, slice_grouped_data))

Entropy score:  1.212841120375751
Difference score:  -0.036000863994636784
