In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructType, StructField, StringType,
                               DoubleType, IntegerType, LongType)
from pyspark.sql.functions import *
from pyspark import SparkConf
from pyspark import SparkContext
import multiprocessing
from pyspark.ml.feature import StringIndexer

In [2]:
cores = multiprocessing.cpu_count()
p = 10
conf = SparkConf()
conf.set("spark.driver.cores", cores)
conf.set("spark.driver.memory", "10g")
conf.set("spark.sql.shuffle.partitions", p * cores)
conf.set("spark.default.parallelism", p * cores)
sc = SparkContext(conf=conf)

In [3]:
spark = SparkSession.builder.appName('a').getOrCreate()

In [4]:
df_num = spark.read.csv('data/df_num/*.csv', header=True, inferSchema=True)

In [5]:
df_num.persist()
df_num.count()

16774736

In [10]:
init_cols = df_num.columns

In [57]:
# proporcion de 1's en el dataset (~26.6% un poco desbalanceado)
n_rows = df_num.count()
n_1 = df_num.filter(col('HasDetections') == 1).count()
print('Proporcion de 1s en el DF: {} %'.format(n_1/n_rows*100))

Proporcion de 1s en el DF: 26.58099656531107 %


In [7]:
df_num.show(10, False)

+------+----------------+----------------+-------------------------+-------------------------+-------------------+-----------------+------+-----------------+--------------+----------------------+-----------------+---------------------------+-------+-------+-----------+---------------+-----+---------------+--------+------------+------------------------+-------------------------+-------------------------+--------------------------------------+-------------------------------+-------------------------------+--------------------------------+--------------------------+-----------------------+-------------------------------------------------+-------------------------------------------------+-----------------------------------------------+-------------------------------------+--------------------+----------------------+----------------------------------+---------------------------+--------------------------------+--------------------------+------------------------+---------------------+-------

In [9]:
df_num.describe().show()

+-------+--------------------+------------------+-------------------+-------------------------+-------------------------+-------------------+------------------+-------------------+------------------+------------------+----------------------+------------------+---------------------------+------------------+------------------+-------------------+--------------------+--------------------+------------------+-------------------+------------------+------------------------+-------------------------+-------------------------+--------------------------------------+-------------------------------+-------------------------------+--------------------------------+--------------------------+-----------------------+-------------------------------------------------+-------------------------------------------------+-----------------------------------------------+-------------------------------------+--------------------+----------------------+----------------------------------+-------------------------

In [9]:
n_nulls = df_num.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_num.columns]).collect()

In [12]:
n_nulls

[Row(IsBeta=0, RtpStateBitfield=64540, IsSxsPassiveMode=0, DefaultBrowsersIdentifier=16034179, AVProductStatesIdentifier=59988, AVProductsInstalled=59988, AVProductsEnabled=59988, HasTpm=0, CountryIdentifier=0, CityIdentifier=517371, OrganizationIdentifier=5233647, GeoNameIdentifier=360, LocaleEnglishNameIdentifier=0, OsBuild=0, OsSuite=0, IsProtected=59693, AutoSampleOptIn=0, SMode=6369031, IeVerIdentifier=108690, Firewall=149822, UacLuaenable=18703, Census_OEMNameIdentifier=185024, Census_OEMModelIdentifier=198168, Census_ProcessorCoreCount=102583, Census_ProcessorManufacturerIdentifier=102594, Census_ProcessorModelIdentifier=102657, Census_PrimaryDiskTotalCapacity=127717, Census_SystemVolumeTotalCapacity=127692, Census_HasOpticalDiskDrive=0, Census_TotalPhysicalRAM=175584, Census_InternalPrimaryDiagonalDisplaySizeInInches=89744, Census_InternalPrimaryDisplayResolutionHorizontal=89497, Census_InternalPrimaryDisplayResolutionVertical=89497, Census_InternalBatteryNumberOfCharges=507988

In [16]:
n_nulls[0]['IsBeta']

0

In [18]:
dict_nulls = dict()
for col in init_cols:
    if n_nulls[0][col] != 0:
        dict_nulls[col] = n_nulls[0][col]

In [20]:
'''
Se observa que hay muchas columnas con un alto numero de nulls
Otras tienen un numero de nulls bajos
Tambien que hay agrupaciones, por ejemplo las columnas "AVP" tienen el mismo numero de nulls
por tanto nos da una pista de que ese NULL significa algo.

La estrategia sera imputar estos valores para añadir mas informacion al DF.
'''

dict_nulls

{'RtpStateBitfield': 64540,
 'DefaultBrowsersIdentifier': 16034179,
 'AVProductStatesIdentifier': 59988,
 'AVProductsInstalled': 59988,
 'AVProductsEnabled': 59988,
 'CityIdentifier': 517371,
 'OrganizationIdentifier': 5233647,
 'GeoNameIdentifier': 360,
 'IsProtected': 59693,
 'SMode': 6369031,
 'IeVerIdentifier': 108690,
 'Firewall': 149822,
 'UacLuaenable': 18703,
 'Census_OEMNameIdentifier': 185024,
 'Census_OEMModelIdentifier': 198168,
 'Census_ProcessorCoreCount': 102583,
 'Census_ProcessorManufacturerIdentifier': 102594,
 'Census_ProcessorModelIdentifier': 102657,
 'Census_PrimaryDiskTotalCapacity': 127717,
 'Census_SystemVolumeTotalCapacity': 127692,
 'Census_TotalPhysicalRAM': 175584,
 'Census_InternalPrimaryDiagonalDisplaySizeInInches': 89744,
 'Census_InternalPrimaryDisplayResolutionHorizontal': 89497,
 'Census_InternalPrimaryDisplayResolutionVertical': 89497,
 'Census_InternalBatteryNumberOfCharges': 507988,
 'Census_OSInstallLanguageIdentifier': 118827,
 'Census_IsFlightin

In [23]:
mediana_GeoNameIdentifier = df_num.approxQuantile("GeoNameIdentifier", [0.5], 0.25)[0]

In [24]:
mediana_GeoNameIdentifier

295.0

In [36]:
media_RtpStateBitfield = df_num.agg(avg("RtpStateBitfield")).collect()[0][0]
media_RtpStateBitfield

6.848561082108193

In [87]:
columna = 'Wdft_RegionIdentifier'
df_num.agg(min(columna),
          max(columna)).show()

+--------------------------+--------------------------+
|min(Wdft_RegionIdentifier)|max(Wdft_RegionIdentifier)|
+--------------------------+--------------------------+
|                         1|                        15|
+--------------------------+--------------------------+



In [88]:
imputaciones = \
{'RtpStateBitfield': media_RtpStateBitfield, # tiene STDDEV = 1 por lo que la media puede resultar
 'DefaultBrowsersIdentifier': 0, # el 0 no existe, lo metemos en esta categoria
 'AVProductStatesIdentifier': 1, # el minimo es 2, metemos los null en el 1
 'AVProductsInstalled': 8, # el maximo es 8
 'AVProductsEnabled': 6, # el maximo es 5
 'CityIdentifier': -99,
 'OrganizationIdentifier': -99,
 'GeoNameIdentifier': mediana_GeoNameIdentifier, # Al ser pocos casos, le meteremos la mediana
 'IsProtected': 2, # es 1 o 0, lo metemos en 2 (unknown)
 'SMode': 2, # igual
 'IeVerIdentifier': -99,
 'Firewall': 2,
 'UacLuaenable': -99,
 'Census_OEMNameIdentifier': 0,
 'Census_OEMModelIdentifier': 0,
 'Census_ProcessorCoreCount': -99,
 'Census_ProcessorManufacturerIdentifier': 0,
 'Census_ProcessorModelIdentifier': 0,
 'Census_PrimaryDiskTotalCapacity': -99,
 'Census_SystemVolumeTotalCapacity': -99,
 'Census_TotalPhysicalRAM': -99,
 'Census_InternalPrimaryDiagonalDisplaySizeInInches': -99,
 'Census_InternalPrimaryDisplayResolutionHorizontal': -99,
 'Census_InternalPrimaryDisplayResolutionVertical': -99,
 'Census_InternalBatteryNumberOfCharges': -99,
 'Census_OSInstallLanguageIdentifier': 0,
 'Census_IsFlightingInternal': 2,
 'Census_IsFlightsDisabled': 2,
 'Census_ThresholdOptIn': 2,
 'Census_FirmwareManufacturerIdentifier': 0,
 'Census_FirmwareVersionIdentifier': 0,
 'Census_IsWIMBootEnabled': 2,
 'Census_IsVirtualDevice': 2,
 'Census_IsAlwaysOnAlwaysConnectedCapable': 2,
 'Wdft_IsGamer': 2,
 'Wdft_RegionIdentifier': 0}

In [90]:
# FILL NA
# df.fillna(0, subset=['a', 'b'])
df_imputado = df_num.fillna(imputaciones)

In [91]:
df_imputado.show(4)

+------+----------------+----------------+-------------------------+-------------------------+-------------------+-----------------+------+-----------------+--------------+----------------------+-----------------+---------------------------+-------+-------+-----------+---------------+-----+---------------+--------+------------+------------------------+-------------------------+-------------------------+--------------------------------------+-------------------------------+-------------------------------+--------------------------------+--------------------------+-----------------------+-------------------------------------------------+-------------------------------------------------+-----------------------------------------------+-------------------------------------+--------------------+----------------------+----------------------------------+---------------------------+--------------------------------+--------------------------+------------------------+---------------------+-------

In [92]:
# Contemos los numeros de nulls ahora
n_nulls_2 = df_imputado.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_imputado.columns]).collect()

In [94]:
# Solo queda el target
for c in init_cols:
    if n_nulls_2[0][c] != 0:
        print('La columna {} aun tiene nulls'.format(c))

La columna HasDetections aun tiene nulls
