# Christmas Assignment

### MDM - Rafael Caballero

Due to a computer virus, the extesions of several files have been mixed.
Fortunately, we had collected data of some of the files in advance, in particular:

     |-- length: integer (nullable = true)  length of the name of the file without extension
     |-- kbytes: integer (nullable = true)  Size of the file in Kbytes 
     |-- curlybrackets: integer (nullable = true) Number of characters {,} in the file
     |-- roundbrackets: double (nullable = true) Number of characters (,) in the file
     |-- colon: double (nullable = true) Number of :
     |-- semicolon: double (nullable = true) Number of ;
     |-- comma: double (nullable = true) Number of ,
     |-- other: double (nullable = true) Other characters
     |-- lines: double (nullable = true) Number of lines
     |-- words: double (nullable = true) Number of words
     |-- lower: double (nullable = true) Number of lowercase characters
     |-- upper: double (nullable = true) Number of uppeercase characters
     |-- digit: double (nullable = true) Number of digits
     |-- relat: double (nullable = true) Number of relational operators <,>,=...
     |-- space: double (nullable = true) Number of blank characters
     |-- ext: string (nullable = true)   File extension, can be either "ipynb", or "java"
 
 Our goal is to use Machine Learning in Spark to predict the extension (ext) from the rest of columns. The code is free but following these rules

1.- Use any Spark model, transformer, etc. You can also try models or transformers that we haven't seen in class. Explore new possibilities!
 
2.- The code must include a variable `model` With the final model. Use a *pipeline* (see the (Pipeline Notebook[http://gpd.sip.ucm.es/rafa/docencia/mtda/mdm/code/pipelines_en.ipynb]) in order to combina in the model all the preprocessing steps.

3.- The data does not include null. Although you can (and should) make many experiments searching for the best model, please only include the code of the last and definitive `model`
 
4.- The model generation process cannot take more that 10 seconds in the virtual machine. If you use any hyperparameter of integer type, the maximum value it can take is 100.

5.- The evaluation of the assignment will be obtained using the "Test Section". I will use different data for the evaluation, but this is not important.

6.- The score will depend on the kappa metric returned by the function `evaluate_predictions`. To pass the test a minimum of k=0.58 is required.


### Setup

In [None]:
import os.path
from subprocess import check_call
import importlib
import os
import sys
import findspark
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession

modules = ["findspark","pyspark"]

def check_modules(modules, upgrade=False):
    print("Checking required modules")
    for m in modules:
        torch_loader = importlib.util.find_spec(m)
        if torch_loader is not None and not upgrade:
            print(m," found")
        else:
            if upgrade:
                print("upgrading ",m)
            else:
                print(m," not found, installing")
            if 'google.colab' in sys.modules:
                if upgrade:
                    check_call(["pip", "install", "--upgrade", m])
                else:
                    check_call(["pip", "install", "-q", m])
            else:
                if upgrade:
                    check_call([sys.executable, "-m", "pip", "install", "--user", "--upgrade", m])
                else:
                    check_call([sys.executable, "-m", "pip", "install", "--user", m])

check_modules(modules,upgrade=False)

findspark.init()
spark = SparkSession.builder.getOrCreate() # SparkSession.builder.getOrCreate()
sc = spark.sparkContext
df = spark.sql('''select 'spark' as hi ''')
df.show()

Next code includes a few useful functions, in particular `evaluate_predictions` which we will use to obtain the kappa metric

In [None]:
#file = "./twords.csv"
url = "https://raw.githubusercontent.com/RafaelCaballero/tdm/master/datos/filetrain.csv"
file = "./filetrain.csv"


import matplotlib.pyplot as plt
%matplotlib inline
from pyspark_dist_explore import hist
import pyspark.sql.functions as func 


def load_file(file):
    df = spark.read.format("com.databricks.spark.csv")\
            .options(header='true', inferschema='true') \
            .load(file)
    return df

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
import numpy as np

def evaluate_predictions(predictions,verb=True):
    
    preds_and_labels = predictions.select(['prediction','label'])
    pl = preds_and_labels.rdd.map(tuple)
    metrics = MulticlassMetrics(pl)
    cm = metrics.confusionMatrix()

    # 
    class_temp = predictions.select("label").groupBy("label")\
                            .count().sort('count', ascending=False).toPandas()
    class_names = class_temp["label"].values.tolist()
    
    
    print(class_names)
    cm = cm.toArray()
    if verb:
        print(cm)
    # add by rows to compute the recall
    sumaf = []
    if verb:
        print("Recall ")
    for i in range(len(class_names)):
        suma = sum(cm[i])
        if verb:
            print(i,':',round(cm[i][i]/suma,2))
        sumaf.append(suma)
              
    # add by columns to compute the precision
    sumac = []
    sumad = []
    if verb:
        print("Precision ")
    for i in range(len(class_names)):
        suma = 0
        for j in range(len(class_names)):
              suma += cm[j][i]
        sumac.append(suma)
        if verb:
            print(i,':',round(cm[i][i]/suma,2))
        sumad.append(cm[i][i])
    oa = np.sum(sumad)/sum(sumac)
    ac = 0
    for i in range(len(sumac)):
        ac += sumac[i]*sumaf[i]
    ac /= (sum(sumac)*sum(sumac))
    #print(oa,ac)
    kappa = (oa-ac)/(1-ac)
    if verb:
        # Instantiate metrics object
        #precision = metrics.precision()
        #recall = metrics.recall()
        #f1Score = metrics.fMeasure()
        print("***Global Statistics***")
        #print("Precision = %s" % precision)
        #print("Recall = %s" % recall)
        #print("F1 Score = %s" % f1Score)
        #print("Area under ROC = %s" % metrics.areaUnderROC)        
        print("kappa ",round(kappa,3))
        acc = pl.filter(lambda x: x[0] == x[1]).count() / float(pl.count())
        print("Model accuracy: %.3f%%" % (acc * 100))
    return cm,kappa

def histogram(df,col,bins=20,color="red"):
    fig,ax = plt.subplots()
    hist(ax, df.select([col]), bins = bins, color=[color])
    plt.xlabel(col)
    plt.show()

### Loading the file

In [None]:
import urllib.request

f = urllib.request.urlretrieve(url,file ) # in case of error download manually and comment this line
# load the dataframe
df = load_file(file)
print(f"{df.count()} rows")
df.printSchema()

Write here your code, remember that the final model must be in a varible `model`

In [None]:
### solution




This should return OK

In [None]:
try:
    model
    print("Ok!")
except NameError:
    print("Error, the variable 'model' must exist!")


# Test

The final test. You can execute but no change this part of the code

In [None]:
import pandas as pd
url = "https://raw.githubusercontent.com/RafaelCaballero/tdm/master/datos/filetrain.csv"
import urllib.request
filetest = "./test.csv"
f = urllib.request.urlretrieve(url,filetest ) # in case of error download manually and comment this line
# load the rest file
df_mytest = load_file(filetest)

predictions = model.transform(df_mytest)
cm,kappa = evaluate_predictions(predictions,verb=False)
print(kappa)