In [0]:
import java.io.File

import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import scala.collection.mutable
import scala.sys.process._
import scala.util.Try
import org.apache.spark.sql.SQLContext
import scala.xml.XML
import com.google.gson.Gson
import scala.io.Source
import scala.language.postfixOps
import org.json4s._
import org.apache.hadoop.fs._
//import spark.implicits._
import scala.collection.mutable.ListBuffer
import org.json4s.jackson.JsonMethods.parse
import org.json4s.jackson.JsonMethods.parse
import org.json4s.DefaultFormats


val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
    .builder()
    .appName("Punto_3")
    .config("spark.sql.warehouse.dir", warehouseLocation)
    .enableHiveSupport()
    .getOrCreate()
    
val path = "hdfs://cluster-c9a8-m/preprocesados/"
val r = Seq("hdfs", "dfs", "-ls", path).!!
val s = r.split("\n")
val lista_archivos = s.filter(line => !line.equals(s.head)).map(line => line.split(" +").last)
val separador = ","

/*Primera parte del punto 3, que se encarga de leer el archivo .csv ubicado en HDFS*/
val df_datos = spark.read
        .option("header", "true")
        .option("delimiter", separador)
        .csv(lista_archivos: _*)
        
/*Creacion de DataFrame con la estrcutrua del archivo .csv*/        
val datos_df = df_datos.select(
                col("_c0").as("id"),
                col("venta_id"),
                col("first_name"),
                col("last_name"),
                col("book_id"),
                col("cuantos_libros").as("cnt_libros"),
                col("ip_address"),
                col("ccn3(Codigo_pais)").as("ccn3"))

/*Creacion de una tabla en Hive para la insercion de los datos del archivo datos_compras tipo .csv*/
sql("create table if not exists meli.datos_compras (id STRING, venta_id STRING, first_name STRING, last_name STRING, book_id STRING, cnt_libros STRING, ip_address STRING, ccn3 STRING) USING hive")

/*Insercion de los datos hacia la tabla en Hive*/
datos_df.write
          .format("hive")
          .mode("Ignore")
          .saveAsTable("meli.datos_compras")




/*Segunda parte del proceso para extraer los datos de la rest Api*/

implicit val formats = DefaultFormats


/*Se guarda el dataframe consultado dentro de una lista*/        

val df_myList = sql("select cast(ccn3 as int) ccn3 from meli.datos_compras where ccn3 is not null group by cast(ccn3 as int)")

var myList = df_myList.select("ccn3").collect().map(_(0)).toList

        
/*Se crea la tabla en HIVE para guardar los datos consultados en la API de rescountries*/        
sql("create table if not exists meli.countries (ccn3 STRING, common STRING) USING hive")        
      

/*Recorre un ciclo para extraer solo la parte json de los datos que requrimos por codigo de pais*/      
       for ( x <- myList ) yield {
          val urll = "https://restcountries.com/v3.1/alpha/"+s"$x"+"?fields=name,ccn3"
          val apii = scala.io.Source.fromURL(urll).mkString
          val jsonn = ""+apii+""
          val DFF = spark.read.json(spark.createDataset(jsonn :: Nil))
          val diss = DFF.select("ccn3","name.common")
                        diss.show(false)
                        diss.write
                        .format("hive")
                        .mode("Ignore")
                        .saveAsTable("meli.countries")
      } 



/*Tercera parte del punto 3, se realiza una consulta agrupada del campo book_id para que filtre dentro del archivo XML leido*/          
val df_id_book = sql("select book_id from meli.datos_compras group by book_id")

/*Se guarda el dataframe consultado dentro de una lista*/
val id_book_arr = df_id_book.select("book_id").collect().map(_(0)).toList

val conf = new Configuration()

/*Se lee el archivo bookCatalog dentro del HDFS*/

val hdfsPath: Path = new Path("hdfs://cluster-c9a8-m/preprocesados/bookCatalog.xml")
val fs: FileSystem = hdfsPath.getFileSystem(conf)
val inputStream: FSDataInputStream = fs.open(hdfsPath)

try {
/*Creacion de una tabla en Hive para la insercion de los datos del archivo bookCatalog tipo .xml*/
    sql("create table if not exists meli.books (id STRING, publish_date STRING, price STRING, genre STRING) USING hive")

val xml_load = XML.load(inputStream)
val books_id= ((xml_load \\ "catalog" \\ "book"))
  
 for ( y <- id_book_arr ) yield { 
  val cataLib = for {
      item <- books_id
      if (item \ "@id").text == s"$y"
    } yield ((item \\ "genre").text
      ,(item \\ "price").text
      ,(item \\ "publish_date").text
      ,(item \\ "@id").text)

val list = sc.parallelize(List((cataLib(0)._4,cataLib(0)._3,cataLib(0)._2,cataLib(0)._1))).toDF("id","publish_date","price","genre")
    
    list.write
        .format("hive")
        .mode("Ignore")
        .saveAsTable("meli.books")
        }


/*Se crea una tabla tipo fact llamada tbl_fct_datos con los datos de tipo transaccion para ser validados con calculos de tipo agregacion*/

sql("create table if not exists meli.tbl_fct_datos as SELECT venta_id, cnt_libros, publish_date, price, genre, common from meli.datos_compras d join meli.books b on d.book_id = b.id join meli.countries c on cast(d.ccn3 as int)=c.ccn3")

val df_fact = sql("select count(venta_id) venta_id, sum(cnt_libros) cnt_libs, publish_date, sum(cast(price as int)) price, genre, common as pais from meli.tbl_fct_datos group by publish_date, genre, common")

df_fact.show()

    
}finally {
/*cierra recursos*/ 
  inputStream.close()
  fs.close()
}