In [None]:
import scala.util.Try
import java.net.{ URL => url }
import java.net.URLDecoder.{ decode => dec }
import org.apache.spark.sql.Column
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.functions.broadcast

In [None]:
val decode_url = udf { (url: String) => Try(new url(dec(url)).getHost).toOption }

In [None]:
val autousers = spark.read.json("/labs/laba02/autousers.json").select(explode('autousers).alias("UID2"))

val csvOptions = Map("delimiter"->"\t", "inferSchema" -> "true")

val logsRaw = spark.read.options(csvOptions).csv("/labs/laba02/logs")
//val logsRaw = spark.read.options(csvOptions).csv("/labs/laba02/logs/part-00000")

val logs =
    logsRaw
      .filter(col("_c2").isNotNull)
      .filter(col("_c2").contains("http"))
      .withColumn("URLClean", decode_url(col("_c2")))
      .withColumn("URLClean", regexp_replace(col("URLClean"), "^www\\.", ""))
      .select(col("URLClean"), col("_c0").alias("UID"))

//logs.show(5)

In [None]:
val left = logs.select('URLClean, 'UID)
val right = autousers.select('UID2)

In [None]:
import org.apache.spark.sql.functions.when

val URLActual = 
    left
        .join(right, left("UID") === right("UID2"), "left")
        .select('URLClean.as("URL"), 'UID2.as("Auto_flag"))
        .withColumn("Auto_flag", when($"Auto_flag".isNull, 0).otherwise(1))//.take(50)

//URLActual.show(10)

In [None]:
import org.apache.spark.sql.expressions.Window

val windowCountry = Window.partitionBy("URL")

val result = URLActual.localCheckpoint
                //.withColumn('URL).as("domain")
                .withColumn("cnt_visits", count("*").over(windowCountry))
                .withColumn("cnt_flags", sum("Auto_flag").over(windowCountry))
                .withColumn("cnt_flags_all", sum("Auto_flag")over(Window.partitionBy()))
                .withColumn("cnt_visits_all", count("*").over(Window.partitionBy()))
.withColumn("relevance", (('cnt_flags/'cnt_visits_all) * ('cnt_flags/'cnt_visits_all)) / (('cnt_visits/'cnt_visits_all) * ('cnt_flags_all/'cnt_visits_all)))
.withColumn("relevance", round(col("relevance"), 15))
//result.select('URL, 'Auto_flag, 'cnt_visits, 'cnt_flags, 'cnt_flags_all, 'cnt_visits_all, 'relevance)
.select('URL.as("domain"), 'relevance)

//result.show(10)

In [None]:
val res = result.groupBy("domain").agg(max("relevance").as("relevance"))
.orderBy('relevance.desc,'domain)

//.show(200)

In [None]:
val itog = res.limit(200).collect()

import scala.util.Try
import java.io._

def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) {
  val p = new java.io.PrintWriter(f)
  try { op(p) } finally { p.close() }
}

val rowsToFile = itog.map(r => s"${r(0)}\t${"%.20f".format(r(1))}")

printToFile(new File("laba02_domains.txt")) {
    p => rowsToFile.foreach(p.println)
}