In [1]:
import pandas as pd
direccion = "./../data/raw/prueba.csv"
df = pd.read_csv(direccion, nrows = 10000)
df.head()

df.Cancelled.unique()

array([0., 1.])

In [42]:
direccion = "./../data/raw/prueba.csv"

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv(direccion, header="true", inferSchema="true").limit(30000)

In [43]:
def clean(df):
    #Pasar a minusculas los nombres de columnas
    for col in df.columns:
        df = df.withColumnRenamed(col, col.lower())

    #Seleccionar columnas no vacias 

    base = df.select(df.year,df.quarter, df.month, df.dayofmonth, df.dayofweek, df.flightdate, df.reporting_airline, df.dot_id_reporting_airline, df.iata_code_reporting_airline, df.tail_number, df.flight_number_reporting_airline, df.originairportid, df.originairportseqid, df.origincitymarketid, df.origin, df.origincityname, df.originstate, df.originstatefips, df.originstatename, df.originwac, df.destairportid, df.destairportseqid, df.destcitymarketid, df.dest, df.destcityname, df.deststate, df.deststatefips, df.deststatename, df.destwac, df.crsdeptime, df.deptime, df.depdelay, df.depdelayminutes, df.depdel15, df.departuredelaygroups, df.deptimeblk, df.taxiout, df.wheelsoff, df.wheelson, df.taxiin, df.crsarrtime, df.arrtime, df.arrdelay, df.arrdelayminutes, df.arrdel15, df.arrivaldelaygroups, df.arrtimeblk, df.cancelled, df.diverted, df.crselapsedtime, df.actualelapsedtime, df.airtime, df.flights, df.distance, df.distancegroup, df.divairportlandings )

    #agregar columna con clasificación de tiempo en horas de atraso del vuelo 0-1.5, 1.5-3.5,3.5-, cancelled

    from pyspark.sql import functions as f
    base = base.withColumn('rangoatrasohoras', f.when(f.col('cancelled') == 1, "cancelled").when(f.col('depdelayminutes') < 90, "0-1.5").when((f.col('depdelayminutes') > 90) & (f.col('depdelayminutes')<210), "1.5-3.5").otherwise("3.5-"))

    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    from pyspark.sql.functions import col, lower, regexp_replace, split

    #Función limpieza
    def clean_text(c):
        c = lower(c)
        c = regexp_replace(c, " ", "_")
        c = f.split(c, '\,')[0]
        return c


     # Aplicación de la función limpieza
    base = base.withColumn("origincityname", clean_text(col("origincityname")))
    base = base.withColumn("destcityname", clean_text(col("destcityname")))
    return base

In [44]:
df = clean(df)

In [45]:

def ignore_list(df, data_types):
    from pyspark.sql.functions import countDistinct, approxCountDistinct
    counts_summary = df.agg(*[countDistinct(c).alias(c) for c in data_types["StringType"]])
    counts_summary = counts_summary.toPandas()

    import pandas as pd
    counts = pd.Series(counts_summary.values.ravel())
    counts.index = counts_summary.columns

    sorted_vars = counts.sort_values(ascending = False)
    ignore = list((sorted_vars[sorted_vars >100]).index)
    return ignore

def get_data_types(df):
    from collections import defaultdict
    data_types = defaultdict(list)
    for entry in df.schema.fields:
        data_types[str(entry.dataType)].append(entry.name)
    return data_types

def create_pipeline(df, ignore):
    # Esto lo ponemos aqui para poder modificar las 
    #variables de los estimadores/transformadores
    data_types = get_data_types(df)    
    #--------------------------------------
    
    # -------------- STRING --------------
    strings_used = [var for var in data_types["StringType"] if var not in ignore]

    # -------------- DOUBLE --------------
    numericals_double = [var for var in data_types["DoubleType"] if var not in ignore]
    numericals_double_imputed = [var + "_imputed" for var in numericals_double]

    # -------------- INTEGERS --------------
    from pyspark.sql.types import IntegerType, DoubleType
    numericals_int = [var for var in data_types["IntegerType"] if var not in ignore]
    
    for c in numericals_int:
        df = df.withColumn(c, df[c].cast(DoubleType()))
        df = df.withColumn(c, df[c].cast("double"))
        
    numericals_int_imputed = [var + "_imputed" for var in numericals_int]
    # =======================================

    ## %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
    ##            P I P E L I N E
    ## %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

    # ============= ONE HOT ENCODING ================
    from pyspark.ml.feature import OneHotEncoder, StringIndexer
    stage_string = [StringIndexer(inputCol= c, outputCol= c+"_string_encoded") for c in strings_used]
    stage_one_hot = [OneHotEncoder(inputCol= c+"_string_encoded", outputCol= c+ "_one_hot") for c in strings_used]

    # =============== IMPUTADORES ====================
    from pyspark.ml.feature import Imputer
    stage_imputer_double = Imputer(inputCols = numericals_double, 
                                   outputCols = numericals_double_imputed) 
    stage_imputer_int = Imputer(inputCols = numericals_int, 
                                outputCols = numericals_int_imputed) 

    # ============= VECTOR ASESEMBLER ================
    from pyspark.ml.feature import VectorAssembler

    features =  numericals_double_imputed \
              + [var + "_one_hot" for var in strings_used]
    stage_assembler = VectorAssembler(inputCols = features, outputCol= "assem_features")

    # ==================== SCALER =======================
    from pyspark.ml.feature import StandardScaler
    stage_scaler = StandardScaler(inputCol= stage_assembler.getOutputCol(), 
                                  outputCol="scaled_features", withStd=True, withMean=True)

    # ================== PIPELINE ===================
    from pyspark.ml import Pipeline
    pipeline = Pipeline(stages= stage_string + stage_one_hot +          # Categorical Data
                              [stage_imputer_double,
                               stage_imputer_int,                       # Data Imputation
                               stage_assembler,                         # Assembling data
                               stage_scaler,                            # Standardize data
                          ])
                          
    ## Tenemos que regesar el df porque las variables int las combierte en double
    return  pipeline , df


def imputa_categoricos(df, ignore):
    strings_used = [var for var in data_types["StringType"] if var not in ignore]
    
    missing_data_fill = {}
    for var in strings_used:
        missing_data_fill[var] = "missing"

    df = df.fillna(missing_data_fill)
    return df




def evaluate(predictionAndLabels):
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.mllib.evaluation import MulticlassMetrics
    
    log = {}

    # Show Validation Score (AUROC)
    evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')
    log['AUROC'] = "%f" % evaluator.evaluate(predictionAndLabels)    
    print("Area under ROC = {}".format(log['AUROC']))

    # Show Validation Score (AUPR)
    evaluator = BinaryClassificationEvaluator(metricName='areaUnderPR')
    log['AUPR'] = "%f" % evaluator.evaluate(predictionAndLabels)
    print("Area under PR = {}".format(log['AUPR']))

    # Metrics
    predictionRDD = predictionAndLabels.select(['label', 'prediction']) \
                            .rdd.map(lambda line: (line[1], line[0]))
    metrics = MulticlassMetrics(predictionRDD)

    # Confusion Matrix
    print(metrics.confusionMatrix().toArray())

    # Overall statistics
    log['precision'] = "%s" % metrics.precision()
    log['recall'] = "%s" % metrics.recall()
    log['F1 Measure'] = "%s" % metrics.fMeasure()
    print("[Overall]\tprecision = %s | recall = %s | F1 Measure = %s" % \
            (log['precision'], log['recall'], log['F1 Measure']))

    # Statistics by class
    labels = [0.0, 1.0]
    for label in sorted(labels):
        log[label] = {}
        log[label]['precision'] = "%s" % metrics.precision(label)
        log[label]['recall'] = "%s" % metrics.recall(label)
        log[label]['F1 Measure'] = "%s" % metrics.fMeasure(label, 
                                                           beta=0.5)
        print("[Class %s]\tprecision = %s | recall = %s | F1 Measure = %s" \
                  % (label, log[label]['precision'], 
                    log[label]['recall'], log[label]['F1 Measure']))

    return log


def cv_stats(cvModel):
    bestModel =  cvModel.bestModel
    print("iter", bestModel.stages[-1]._java_obj.getMaxIter())


In [49]:
def add_ids(X_train, X_test, y_train, y_test):
    from pyspark.sql.functions import monotonically_increasing_id
    
    X_train = X_train.withColumn("id", monotonically_increasing_id())
    X_test = X_test.withColumn("id", monotonically_increasing_id())
    y_train = y_train.withColumn("id", monotonically_increasing_id())
    y_test = y_test.withColumn("id", monotonically_increasing_id())
        
    return X_train, X_test, y_train, y_test


In [55]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression()

lr_paramGrid = ParamGridBuilder() \
.addGrid(stage_pca.k, [1]) \
.addGrid(lr.maxIter, [1]) \
.build()

from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier()

dt_paramGrid = ParamGridBuilder() \
.addGrid(stage_pca.k, [1]) \
.addGrid(dt.maxDepth, [2]) \
.build()

paramGrid_list = [lr_paramGrid, dt_paramGrid]
model_list = [lr,dt]
    


## Crea uno contra todos manual

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

data_types = get_data_types(df)
ignore =   ignore_list(df, data_types) 
illegal = [s for s in df.columns if "del" in s]
extra_illegal = ['cancelled', 'rangoatrasohoras']
legal = [var for var in df.columns if (var not in ignore and var not in illegal and var not in extra_illegal)]
lista_objetivos = df.select('rangoatrasohoras').distinct().rdd.map(lambda r: r[0]).collect()

df = imputa_categoricos(df, ignore)
X = df[legal]
y = df[['rangoatrasohoras']]

pipeline, X = create_pipeline(X, ignore)

X_train, X_test = X.randomSplit([0.8,0.2], 123)
y_train, y_test = y.randomSplit([0.8,0.2], 123)

model = pipeline.fit(X_train)

X_train = model.transform(X_train)
X_test = model.transform(X_test)

X_train, X_test, y_train, y_test = add_ids(X_train, X_test, y_train, y_test)

In [None]:

for objetivo in lista_objetivos:
    print("objetivo: ", objetivo)
    
    y_test = y_test.withColumn("label",  when(y_test.rangoatrasohoras == objetivo, 1.0).otherwise(0.0))
    y_train = y_train.withColumn("label",  when(y_train.rangoatrasohoras == objetivo, 1.0).otherwise(0.0))

    df_train = X_train.join(y_train, "id", "outer").drop("id")
    df_test = X_test.join(y_test, "id", "outer").drop("id")
    
    from pyspark.ml.feature import PCA
    stage_pca = PCA(k = 15,inputCol = "scaled_features", 
                            outputCol = "features")
    
    for clr_model, params in zip(model_list, paramGrid_list):
        print("Modelo evaluado: ", clr_model)
        pipeline = Pipeline(stages= [stage_pca, clr_model])

        from pyspark.ml.evaluation import MulticlassClassificationEvaluator
        crossval = CrossValidator(estimator=pipeline,
                                  estimatorParamMaps=params,
                                  evaluator=MulticlassClassificationEvaluator(metricName='f1'),
                                  numFolds=2)  # use 3+ folds in practice

        cvModel  = crossval.fit(df_train)
        cv_stats(cvModel)
        prediction = cvModel.transform(df_test)
        evaluate(prediction)




objetivo:  0-1.5
Modelo evaluado:  LogisticRegression_ec4b5c3d5156


## INDIVIDUAL

In [32]:
# Separamos en train y test
# Este dataframe ya debe filtrar

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

data_types = get_data_types(df)
ignore =   ignore_list(df, data_types) 

df2 = df.withColumnRenamed("cancelled","label")
df2 = imputa_categoricos(df2, ignore)

# Tenemos que regesar el pipeline porque las variables int las combierte en double
init_stages ,df2 = create_pipeline(df2, ignore)

from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml import Pipeline

from pyspark.ml.classification import LogisticRegression
clr = LogisticRegression(maxIter=10, regParam=0.01,
                         fitIntercept=True) 
from pyspark.ml.feature import PCA
stage_pca = PCA(k = 15,inputCol = stage_scaler.getOutputCol(), 
                outputCol = "features")


pipeline = Pipeline(stages= init_stages + [stage_pca, clr])

paramGrid = ParamGridBuilder() \
.addGrid(stage_pca.k, [2,3]) \
.addGrid(clr.maxIter, [2,3]) \
.build()


crossval = CrossValidator(estimator=pipeline,
                      estimatorParamMaps=paramGrid,
                      evaluator=BinaryClassificationEvaluator(),
                      numFolds=2)  # use 3+ folds in practice

df_train, df_test = df2.randomSplit([0.8,0.2], 123)

cvModel  = crossval.fit(df_train)

cv_stats(cvModel)

prediction = cvModel.transform(df_test)

evaluate(prediction)



NameError: name 'stage_scaler' is not defined

## Useful Resources

http://people.stat.sc.edu/haigang/sparkCaseStudy.html
https://chih-ling-hsu.github.io/2018/09/17/spark-mllib

In [6]:
def printMetrics(metrics, df):
    labels = df.rdd.map(lambda lp: lp.label).distinct().collect()
    for label in sorted(labels):
        print("Class %s precision = %s" % (label, metrics.precision(label)))
        print("Class %s recall = %s" % (label, metrics.recall(label)))
        print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)))
        print ("")

    # Weighted stats
    print("Weighted recall = %s" % metrics.weightedRecall)
    print("Weighted precision = %s" % metrics.weightedPrecision)
    print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
    print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
    print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)
    print("Accuracy = %s" % metrics.accuracy)

import collections  

TestResult = collections.namedtuple("TestResult", ["params", "metrics"])

from pyspark.ml.tuning import CrossValidator, CrossValidatorModel
from pyspark.sql.functions import rand

class CrossValidatorVerbose(CrossValidator):

    def _fit(self, dataset):
        folds = []
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)

        eva = self.getOrDefault(self.evaluator)
        metricName = eva.getMetricName()
        nFolds = self.getOrDefault(self.numFolds)
        seed = self.getOrDefault(self.seed)
        h = 1.0 / nFolds

        randCol = self.uid + "_rand"
        df = dataset.select("*", rand(seed).alias(randCol))
        metrics = [0.0] * numModels

        for i in range(nFolds):
            folds.append([])
            foldNum = i + 1
            print("Comparing models on fold %d" % foldNum)

            validateLB = i * h
            validateUB = (i + 1) * h
            condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
            validation = df.filter(condition)
            train = df.filter(~condition)

            for j in range(numModels):
                paramMap = epm[j]
                model = est.fit(train, paramMap)
                # TODO: duplicate evaluator to take extra params from input
                prediction = model.transform(validation, paramMap)
                metric = eva.evaluate(prediction)
                metrics[j] += metric

                avgSoFar = metrics[j] / foldNum
                print("params: %s\t%s: %f\tavg: %f" % (
                    {param.name: val for (param, val) in paramMap.items()},
                    metricName, metric, avgSoFar))

                predictionLabels = prediction.select("prediction", "label")
                allMetrics = MulticlassMetrics(predictionLabels.rdd)
                folds[i].append(TestResult(paramMap.items(), allMetrics))


        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)

        bestParams = epm[bestIndex]
        bestModel = est.fit(dataset, bestParams)
        avgMetrics = [m / nFolds for m in metrics]
        bestAvg = avgMetrics[bestIndex]
        print("Best model:\nparams: %s\t%s: %f" % (
            {param.name: val for (param, val) in bestParams.items()},
            metricName, bestAvg))

        return self._copyValues(CrossValidatorModel(bestModel, avgMetrics)), folds