In [135]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf, col
from pyspark.sql import Row
from pyspark.ml.stat import ChiSquareTest
from pyspark.ml.feature import ChiSqSelector

In [114]:
train = spark.read.csv("../../../../data/train/train.csv", header=True)
train.cache()
print(train.count())

9557


In [115]:
train.printSchema()

root
 |-- Id: string (nullable = true)
 |-- v2a1: string (nullable = true)
 |-- hacdor: string (nullable = true)
 |-- rooms: string (nullable = true)
 |-- hacapo: string (nullable = true)
 |-- v14a: string (nullable = true)
 |-- refrig: string (nullable = true)
 |-- v18q: string (nullable = true)
 |-- v18q1: string (nullable = true)
 |-- r4h1: string (nullable = true)
 |-- r4h2: string (nullable = true)
 |-- r4h3: string (nullable = true)
 |-- r4m1: string (nullable = true)
 |-- r4m2: string (nullable = true)
 |-- r4m3: string (nullable = true)
 |-- r4t1: string (nullable = true)
 |-- r4t2: string (nullable = true)
 |-- r4t3: string (nullable = true)
 |-- tamhog: string (nullable = true)
 |-- tamviv: string (nullable = true)
 |-- escolari: string (nullable = true)
 |-- rez_esc: string (nullable = true)
 |-- hhsize: string (nullable = true)
 |-- paredblolad: string (nullable = true)
 |-- paredzocalo: string (nullable = true)
 |-- paredpreb: string (nullable = true)
 |-- pareddes: string

In [116]:
## Define all groups of data
groups_data = {}
groups_data["hacdor"] = ["hacdor"]
groups_data["hacapo"] = ["hacapo"]
groups_data["v14a"] = ["v14a"]
groups_data["v18q"] = ["v18q"]
groups_data["refrig"] = ["refrig"]
groups_data["pared"] = ["paredblolad", "paredzocalo", "paredpreb", "pareddes", "paredmad", "paredzinc", "paredfibras", "paredother"]
groups_data["piso"] = ["pisomoscer", "pisocemento", "pisoother","pisonatur", "pisonotiene", "pisomadera"]
groups_data["techo"] = ["techozinc", "techoentrepiso", "techocane", "techootro"]
groups_data["cielorazo"] = ["cielorazo"]
groups_data["abastagua"] = ["abastaguadentro", "abastaguafuera", "abastaguano"]
groups_data["electricity"] = ["public", "planpri", "noelec", "coopele"]
groups_data["sanitario"] = ["sanitario1", "sanitario2", "sanitario3", "sanitario5", "sanitario6"]
groups_data["energcocinar"] = ["energcocinar1", "energcocinar2", "energcocinar3", "energcocinar4"]
groups_data["elimbasu"] = ["elimbasu1", "elimbasu2", "elimbasu3", "elimbasu4", "elimbasu5", "elimbasu6"]
groups_data["epared"] = ["epared1", "epared2", "epared3"]
groups_data["etecho"] = ["etecho1", "etecho2", "etecho3"]
groups_data["eviv"] = ["eviv1", "eviv2", "eviv3"]
groups_data["dis"] = ["dis"]
groups_data["gender"] = ["male", "female"]
groups_data["estadocivil"] = ["estadocivil1", "estadocivil2", "estadocivil3", "estadocivil4", "estadocivil5", "estadocivil6", "estadocivil7"]
groups_data["parentesco"] = ["parentesco1", "parentesco2", "parentesco3", "parentesco4", "parentesco5", "parentesco6", "parentesco7", "parentesco8", "parentesco9", "parentesco10", "parentesco11", "parentesco12"]
groups_data["tipovivi"] = ["tipovivi1", "tipovivi2", "tipovivi3", "tipovivi4", "tipovivi5"]
groups_data["instlevel"] = ["instlevel1", "instlevel2", "instlevel3", "instlevel4", "instlevel5", "instlevel6", "instlevel7", "instlevel8", "instlevel9"]
groups_data["computer"] = ["computer"]
groups_data["television"] = ["television"]
groups_data["mobilephone"] = ["mobilephone"]
groups_data["lugar"] = ["lugar1", "lugar2", "lugar3", "lugar4", "lugar5", "lugar6"]
groups_data["area"] = ["area1", "area2"]

groups_data_broadcast = spark.sparkContext.broadcast(groups_data)
new_features_broadcast = spark.sparkContext.broadcast(groups_data.keys())
permanent_features_broadcast = spark.sparkContext.broadcast(["Id", "Target"])

In [117]:
for key,values in groups_data.iteritems():
    data_count = train.select(values).dropDuplicates().count()
    if data_count != len(values) and len(values) >= 2:
        print key, len(values), data_count

electricity 4 5
techo 4 5
instlevel 9 10
elimbasu 6 5


In [118]:
sum([len(value) for value in groups_data.values()])

102

In [119]:
new_features = [key for key in groups_data.iteritems() if len(value)==1]

In [120]:
def define_row(row, permanent_features, new_features, groups_data):
    new_row = [row[key] for key in permanent_features]
    for key in new_features:
        group_new = [row[name] for name in groups_data[key]]
        new_row.append(str(group_new))
    return new_row

In [121]:
rdd = train.rdd.map(lambda row: define_row(row, permanent_features_broadcast.value, new_features_broadcast.value, groups_data_broadcast.value))

In [122]:
new_data = spark.createDataFrame(rdd, permanent_features_broadcast.value+new_features_broadcast.value)

#### New indexation

In [123]:
for key in new_features_broadcast.value:
    print(key)
    stringIndexer = StringIndexer(inputCol=key, outputCol="{0}_indexed".format(key))
    model = stringIndexer.fit(new_data)
    new_data = model.transform(new_data).drop(stringIndexer.getInputCol())

lugar
v14a
hacdor
cielorazo
computer
tipovivi
etecho
piso
area
electricity
estadocivil
pared
v18q
energcocinar
techo
instlevel
parentesco
epared
eviv
elimbasu
hacapo
mobilephone
television
sanitario
abastagua
gender
refrig
dis


In [124]:
new_data.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Target: string (nullable = true)
 |-- lugar_indexed: double (nullable = false)
 |-- v14a_indexed: double (nullable = false)
 |-- hacdor_indexed: double (nullable = false)
 |-- cielorazo_indexed: double (nullable = false)
 |-- computer_indexed: double (nullable = false)
 |-- tipovivi_indexed: double (nullable = false)
 |-- etecho_indexed: double (nullable = false)
 |-- piso_indexed: double (nullable = false)
 |-- area_indexed: double (nullable = false)
 |-- electricity_indexed: double (nullable = false)
 |-- estadocivil_indexed: double (nullable = false)
 |-- pared_indexed: double (nullable = false)
 |-- v18q_indexed: double (nullable = false)
 |-- energcocinar_indexed: double (nullable = false)
 |-- techo_indexed: double (nullable = false)
 |-- instlevel_indexed: double (nullable = false)
 |-- parentesco_indexed: double (nullable = false)
 |-- epared_indexed: double (nullable = false)
 |-- eviv_indexed: double (nullable = false)
 |-- elimbasu

In [149]:
new_data.stat.crosstab("Target", "electricity_indexed").sort("Target_electricity_indexed").show()

+--------------------------+----+---+---+---+---+
|Target_electricity_indexed| 0.0|1.0|2.0|3.0|4.0|
+--------------------------+----+---+---+---+---+
|                         1| 669| 78|  4|  4|  0|
|                         2|1399|179|  9|  9|  1|
|                         3|1069|140|  0|  0|  0|
|                         4|5322|662|  8|  2|  2|
+--------------------------+----+---+---+---+---+



In [125]:
udf_create_dense_vector = udf(lambda values: Vectors.dense(values), VectorUDT())
udf_get_target = udf(lambda values: Vectors.dense([values.index(1)]), VectorUDT())

In [154]:
columns = new_data.columns
columns.remove("Id")
columns.remove("Target")
print(columns)
columns_broadcast = spark.sparkContext.broadcast(columns)
columns_broadcast = spark.sparkContext.broadcast(["electricity_indexed"])

['lugar_indexed', 'v14a_indexed', 'hacdor_indexed', 'cielorazo_indexed', 'computer_indexed', 'tipovivi_indexed', 'etecho_indexed', 'piso_indexed', 'area_indexed', 'electricity_indexed', 'estadocivil_indexed', 'pared_indexed', 'v18q_indexed', 'energcocinar_indexed', 'techo_indexed', 'instlevel_indexed', 'parentesco_indexed', 'epared_indexed', 'eviv_indexed', 'elimbasu_indexed', 'hacapo_indexed', 'mobilephone_indexed', 'television_indexed', 'sanitario_indexed', 'abastagua_indexed', 'gender_indexed', 'refrig_indexed', 'dis_indexed']


In [155]:
Person = Row("Id", "Target", "values")

rdd = (new_data.rdd
       .map(lambda row: (Person(row["Id"], row["Target"], [row[column] for column in columns_broadcast.value]))))

In [156]:
df = (spark.createDataFrame(rdd)
      .withColumn("features", udf_create_dense_vector(col("values")))
      .select(col("Id"), col("Target"), col("features")))

In [157]:
df.select("Id", "Target", "features").show()

+------------+------+--------+
|          Id|Target|features|
+------------+------+--------+
|ID_279628684|     4|   [0.0]|
|ID_f29eb3ddd|     4|   [0.0]|
|ID_68de51c94|     4|   [0.0]|
|ID_d671db89c|     4|   [0.0]|
|ID_d56d6f5f5|     4|   [0.0]|
|ID_ec05b1a7b|     4|   [0.0]|
|ID_e9e0c1100|     4|   [0.0]|
|ID_3e04e571e|     4|   [0.0]|
|ID_1284f8aad|     4|   [0.0]|
|ID_51f52fdd2|     4|   [0.0]|
|ID_db44f5c59|     4|   [0.0]|
|ID_de822510c|     4|   [0.0]|
|ID_d94071d7c|     4|   [0.0]|
|ID_064b57869|     4|   [0.0]|
|ID_5c837d8a4|     4|   [0.0]|
|ID_0a39e419e|     4|   [0.0]|
|ID_4ff51f90c|     4|   [0.0]|
|ID_336c51386|     4|   [0.0]|
|ID_c51938edf|     4|   [0.0]|
|ID_35b66f7c6|     4|   [0.0]|
+------------+------+--------+
only showing top 20 rows



In [158]:
stringIndexer = StringIndexer(inputCol="Target", outputCol="label")
model = stringIndexer.fit(new_data)
df = model.transform(df).drop(stringIndexer.getInputCol()).select("label", "features")

In [159]:
df.show(3)

+-----+--------+
|label|features|
+-----+--------+
|  0.0|   [0.0]|
|  0.0|   [0.0]|
|  0.0|   [0.0]|
+-----+--------+
only showing top 3 rows



In [160]:
len(columns)

28

In [161]:
chiSqResult = ChiSquareTest.test(df, 'features', 'label')

In [162]:
chiSqResult.select("pValues").show(truncate=False)
chiSqResult.select("degreesOfFreedom").show(truncate=False)
chiSqResult.select("statistics").show(truncate=False)

+-----------------------+
|pValues                |
+-----------------------+
|[1.4984640577253572E-6]|
+-----------------------+

+----------------+
|degreesOfFreedom|
+----------------+
|[12]            |
+----------------+

+-------------------+
|statistics         |
+-------------------+
|[49.82678802304894]|
+-------------------+



In [150]:
alpha = 0.01

chiSelector = (ChiSqSelector()
               .setSelectorType("fpr")
               .setFpr(alpha)
               .setLabelCol("label")
               .setFeaturesCol("features")
               .setOutputCol("$featureColumn-Selected"))

chiSelector.fit(df).transform(df).show(truncate=False)

+-----+-----------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                         |$featureColumn-Selected                                                                                          |
+-----+-----------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------+
|0.0  |[0.0,0.0,0.0,0.0,0.0,1.0,2.0,0.0,0.0,0.0,6.0,0.0,0.0,1.0,1.0,1.0,1.0,1.0,2.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0]|[0.0,0.0,0.0,0.0,0.0,1.0,2.0,0.0,0.0,0.0,6.0,0.0,0.0,1.0,1.0,1.0,1.0,1.0,2.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0]|
|0.0  |[0.0,0.0,0.0,0.0,0.0,1.0,1.0,2.0,0.0,0.0,6.0,2.0,

In [153]:
columns.index("electricity_indexed")

9