In [0]:
%spark

import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.legacy.allowUntypedScalaUDF", "true")

val data = spark
.read
.option("inferSchema", "true")
.option("header", "true")
.option("delimiter", ",")
.csv("/home/carlos/Descargas/PhiUSIIL_Phishing_URL_Dataset.csv")

In [1]:
%spark
data.printSchema()

In [2]:
%spark

def getSecurityCategory(
    IsHTTPS: Int, HasTitle: Int, Robots: Int, HasCopyrightInfo: Int, HasObfuscation: Int, NoOfURLRedirect: Int,
    NoOfPopup: Int, NoOfiFrame: Int, HasExternalFormSubmit: Int, HasPasswordField: Int, Bank: Int, Pay: Int, Crypto: Int,
    URLCharProb: Double, URLLength: Int, NoOfSubDomain: Int
): String = {

    var score = 0 

    // Características que suman puntos a favor
    if (IsHTTPS == 1) score += 3
    if (HasTitle == 1) score += 2
    if (Robots == 1) score += 1
    if (HasCopyrightInfo == 1) score += 3

    // Características que restan puntos a favor
    if (HasObfuscation == 1) score -= 2
    if (NoOfURLRedirect > 0) score -= 2
    if (NoOfPopup > 0) score -= 3
    if (NoOfiFrame > 0) score -= 1
    if (HasExternalFormSubmit == 1) score -= 2
    if (HasPasswordField == 0 && (Bank == 1 || Pay == 1 || Crypto == 1)) score -= 4 // Si no necesita password, pero si banco, pago o cripto, restar varios puntos
    if (URLCharProb < 0.5) score -= 2

    // COnsideraciones mínimas que restan puntos a favor.
    if (URLLength > 100) score -= 1
    if (NoOfSubDomain > 3) score -= 1

    // Definir los rangos y retornar la categoría
    score match {
        case s if s >= 5 => "HIGH"       
        case s if s >= 0 && s < 5 => "MEDIUM"   
        case s if s >= -5 && s < 0 => "LOW"     
        case s if s < -5 => "VERY LOW"   
        case _ => "UNDEFINED"              
    }
}

// Crear función UDF para que pueda ser usada en cada iteración: 
val getSecurityCategoryUdf = udf(getSecurityCategory(
    _: Int, _: Int, _: Int, _: Int, _: Int, _: Int, _: Int, _: Int,
    _: Int, _: Int, _: Int, _: Int, _: Int, _: Double, _: Int, _: Int
), StringType)

In [3]:
%spark

def getUrlComplexity(NoOfSubDomain : Int, URLLength : Int, NoOfOtherSpecialCharsInURL : Int, SpacialCharRatioInURL : Double) : String = {
    val complexityScore = (NoOfSubDomain * 1.5) + (URLLength * 0.1) + (NoOfOtherSpecialCharsInURL * 1.0) + (SpacialCharRatioInURL * 5.0)
     complexityScore match {
        case score if score < 10 => "SIMPLE"
        case score if score < 30 => "MEDIUM"
        case score if score < 60 => "COMPLEX"
        case score if score >= 60 => "VERY COMPLEX"
        case _ => "UNDEFINED" 
    }
    
}

//Función UDF para que pueda ser usada en cada iteración

val getUrlComplexityUdf = udf(getUrlComplexity(
    _: Int, _: Int, _: Int, _: Double), StringType)




In [4]:
%spark


def getPresenceRatio(NoOfImage : Int, NoOfCSS : Int, NoOfJS : Int, LineOfCode : Int) : String = {
    
val totalContentElements = NoOfImage + NoOfCSS + NoOfJS
val contentRatio = if (LineOfCode > 0) {
    totalContentElements.toDouble / LineOfCode.toDouble 
}else{
    0.0
} 

  contentRatio match {
        case ratio if ratio < 0.05 => "VERY LOW"
        case ratio if ratio < 0.2 => "LOW"
        case ratio if ratio < 0.6 => "MEDIUM"
        case ratio if ratio >= 0.6 => "HIGH"
        case _ => "UNDEFINED" 
    }

}

//Función UDF para que pueda ser usada en cada iteración

val getPresenceRatioUdf = udf(getPresenceRatio(
    _: Int, _: Int, _: Int, _: Int), StringType)

In [5]:
%spark
/*
Poblar la tabla con SecurityScore
*/
val dataWithSecurity = data.withColumn(
    "SecurityCategory",
    getSecurityCategoryUdf(
        col("IsHTTPS"),
        col("HasTitle"),
        col("Robots"),
        col("HasCopyrightInfo"),
        col("HasObfuscation"),
        col("NoOfURLRedirect"),
        col("NoOfPopup"),
        col("NoOfiFrame"),
        col("HasExternalFormSubmit"),
        col("HasPasswordField"),
        col("Bank"),
        col("Pay"),
        col("Crypto"),
        col("URLCharProb"),
        col("URLLength"),
        col("NoOfSubDomain")
    )
)

dataWithSecurity.select($"URL", $"SecurityCategory").show(false)

In [6]:
%spark
/*
Poblar la tabla con URL complexity:
*/

val dataWithComplexityUrl = data.withColumn(
    "UrlComplexity",
    getUrlComplexityUdf(
        col("NoOfSubDomain"),
        col("URLLength"),
        col("NoOfOtherSpecialCharsInURL"),
        col("SpacialCharRatioInURL"),
    )
)

dataWithComplexityUrl.select($"URL", $"UrlComplexity").where($"UrlComplexity" === "VERY COMPLEX").show(false)

In [7]:
%spark
/*
Poblar la tabla con presencia de contenido
*/
val dataWithPresenceRatio = data.withColumn(
    "PresenceRatio",
    getPresenceRatioUdf(
        col("NoOfImage"),
        col("NoOfCSS"),
        col("NoOfJS"),
        col("LineOfCode"),
    )
)


dataWithPresenceRatio.select($"URL", $"PresenceRatio").show(false)


In [8]:
%spark

data.select($"Domain") // Selecciona solo la columna 'Domain'
  .where(col("Domain").endsWith(".ec")) // Filtra donde la columna 'Domain' contenga la subcadena "ec"
  .show(2000,false) 

In [9]:
%spark
// Gráfica para saber cuantas páginas son de phishing y cuales no lo son
val phishingPages = data.select($"label")
.groupBy($"label".as("Legítima"))
.agg(count("*").as("Total"))


z.show(phishingPages)

# Consultas en base a las preguntas
Las siguientes consultas tienen como objetivo responder a las preguntas planteadas previamente.

In [11]:
%spark
// ¿Cuáles son los cinco dominios más recurrentes en el dataset?
val fiveDomains = data.select($"domain").groupBy($"domain")
.agg(count($"domain").as("Total dominios"))
.orderBy($"Total dominios".desc)
.limit(5)

z.show(fiveDomains)

In [12]:
%spark
// ¿Cuáles son los factores más comunes en las páginas consideradas como phishing?
val dangerousPages = data.select($"URL", $"label", $"NoOfURLRedirect", $"Crypto", $"HasCopyrightInfo", $"IsHTTPS")
.where($"NoOfURLRedirect" > 0)
.where($"Crypto" === 1)
.where($"HasCopyrightInfo" === 0)
.where($"IsHTTPS" === 0)


z.show(dangerousPages)







In [13]:
%spark
//¿Cuántas páginas de cada tipo de complejidad de URL existen dentro del dataset?
val dataUrlComplexity = dataWithComplexityUrl.select($"URL", $"UrlComplexity")
.groupBy($"UrlComplexity")
.agg(count($"UrlComplexity").as("Total de páginas"))
.orderBy($"Total de páginas".desc)

z.show(dataUrlComplexity)


In [14]:
%spark
//¿Cuáles son los cinco dominios con más seguridad basados en la columna derivada? 
val highSecurityDomains = dataWithSecurity.select($"SecurityCategory", $"Domain")
.where($"SecurityCategory" === "HIGH")
.groupBy($"SecurityCategory", $"Domain")
.agg(count("*").as("Dominios totales"))
.where($"Dominios totales" > 10)
.orderBy($"Dominios totales".desc)

z.show(highSecurityDomains)

In [15]:
%spark
//¿Cuántas páginas con dominio ‘ec’ existen dentro del dataset y que porcentaje de estas con consideradas como ‘phishing’? 

val ecuadorDomains = data.select($"label".as("legitima")) 
  .where(col("Domain").endsWith(".ec")) 
  .groupBy($"legitima")
  .agg(count("*").as("Total dominios .ec"))
  
  z.show(ecuadorDomains)

In [16]:
%spark
data.count()

In [17]:
%spark
