In [0]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

 val spark = SparkSession
 .builder
 .appName("Example-3_7")
 .getOrCreate()

 // Get the path to the JSON file
 val jsonFile = "gs://ejercicios-bigdata1/notebooks/zeppelin/repo/blogs.json"

 
 // Define our schema programmatically
 val schema = StructType(Array(StructField("Id", IntegerType, false),
 StructField("First", StringType, false),
 StructField("Last", StringType, false),
 StructField("Url", StringType, false),
 StructField("Published", StringType, false),
 StructField("Hits", IntegerType, false),
 StructField("Campaigns", ArrayType(StringType), false)))
 // Create a DataFrame by reading from the JSON file 
 // with a predefined schema
 val blogsDF = spark.read.schema(schema).json(jsonFile)

 // Show the DataFrame schema as output
 blogsDF.show(false)

 // Print the schema
 println(blogsDF.printSchema)
 println(blogsDF.schema)



In [1]:
blogsDF.take(20).foreach(println) //jjj

In [2]:
blogsDF.select(expr("Hits * 2")).show(2)
 blogsDF.select(col("Hits") * 2).show(2)

In [3]:
blogsDF.withColumn("Big Hitters", (expr("Hits > 10000"))).show()


In [4]:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

 // Get the path to the CSV file
 val csvFile = "gs://ejercicios-bigdata1/notebooks/zeppelin/repo/sf-fire-calls.csv"
 
 val df_fire_calls = spark.read.options(Map("header"->"true", "inferSchema"->"true","samplingRatio"->"0.001")).csv(csvFile)

 df_fire_calls.schema.foreach(println)


In [5]:
// - What were all the different types of fire calls in 2018?
df_fire_calls.cache()
val diff_types_of_calls = df_fire_calls.select("CallType").distinct()
//diff_types_of_calls.show(truncate=false)

// - What months within the year 2018 saw the highest number of fire calls?
val only_dates = df_fire_calls.select(to_date(col("CallDate"), "MM/dd/yyyy").as("date")) // Primero hacemos un DataFrame donde convertimos la fechas a algo entendible por SQL
only_dates.createOrReplaceTempView("firecalls")
val months_with_most_calls = spark.sql("SELECT MONTH(date) AS Month, COUNT(*) AS count FROM firecalls WHERE YEAR(date) LIKE 2018 GROUP BY MONTH ORDER BY COUNT(*) DESC")

val months_with_most_calls2 = only_dates.filter(year($"date") === 2018).groupBy(month($"date").as("Month")).count().as("count").orderBy(desc("count"))
//months_with_most_calls2.show(truncate=false)
//months_with_most_calls.show(truncate=false)

// - Which neighborhood in San Francisco generated the most fire calls in 2018?

val callDate_neighborhoods_delay = df_fire_calls.select(to_date(col("CallDate"), "MM/dd/yyyy").as("date"), $"Neighborhood", $"Delay")

callDate_neighborhoods_delay.createOrReplaceTempView("total_firecalls")

val most_fire_neighborhoods = callDate_neighborhoods_delay
                                            .select($"Neighborhood")
                                            .filter(year($"date") === 2018)
                                            .groupBy($"Neighborhood")
                                            .count().as("count")
                                            .orderBy(desc("count"))
//most_fire_neighborhoods.show(3, truncate=false)

val most_fire_neighborhoods_sql = spark.sql("SELECT Neighborhood, COUNT(*) FROM total_firecalls WHERE YEAR(date) LIKE 2018 GROUP BY Neighborhood ORDER BY COUNT(*) DESC")
//most_fire_neighborhoods_sql.show(3, truncate=false)


// - Which neighborhoods had the worst response times to fire calls in 2018?

val most_delay_neighborhoods = callDate_neighborhoods_delay
                                .where(year($"date") === 2018)
                                .groupBy($"Neighborhood")
                                .agg(avg($"Delay").as("promedio"))
                                .orderBy(desc("promedio"))
                                
//most_delay_neighborhoods.show(3)

val most_delay_neighborhoods_sql = spark.sql("SELECT Neighborhood, AVG(Delay) AS promedio FROM total_firecalls WHERE YEAR(date) LIKE 2018 GROUP BY Neighborhood ORDER BY AVG(Delay) DESC")
//most_delay_neighborhoods_sql.show(3, truncate=false)

// - Which week in the year in 2018 had the most fire calls?
val most_fires_a_week = callDate_neighborhoods_delay.select(weekofyear($"date").as("semana")).where(year($"date") === 2018).groupBy("semana").count().as("count").orderBy(desc("count"))
//most_fires_a_week.show(3, truncate = false)

val most_fires_a_week_sql = spark.sql("SELECT WEEKOFYEAR(date) as Semana, COUNT(*) as Count FROM total_firecalls WHERE YEAR(date) LIKE 2018 GROUP BY Semana ORDER BY Count DESC")
//most_fires_a_week_sql.show(3, truncate = false)

// - Is there a correlation between neighborhood, zip code, and number of fire calls?


val firecalls_for_corr = df_fire_calls.select($"Neighborhood", $"ZipCode")
firecalls_for_corr.createOrReplaceTempView("firecalls_for_corr")
val dataFromCorr_sql = spark.sql("SELECT Neighborhood, ZipCode, COUNT(*) as Count FROM firecalls_for_corr GROUP BY Neighborhood, ZipCode ORDER BY ZipCode DESC")
dataFromCorr_sql.show(truncate=false)

// - How can we use Parquet files or SQL tables to store this data and read it back?





In [6]:
//Significa elementNullable = true, quiere decir que ese elemento puede tomar un valor nulo

In [7]:
//Dataset: es un conjunto de datos tipados 
//mientras que DataFrame es un conjunto de Datasets sin tipar
//Para definir un Dataset, al ser un conjunto de datos enorme, inferir el schema puede llegar a ser costoso
//por ello se utiliza "case class" para definir una clase con todos los tipos de cada columna del dataset

In [8]:
import org.apache.avro
//JSON
val jsonFolder = "gs://ejercicios-bigdata1/notebooks/zeppelin/repo/sf-fire-calls_json"
val jsonFile = "gs://ejercicios-bigdata1/notebooks/zeppelin/repo/sf-fire-calls.json"
print(df_fire_calls.rdd.partitions.size+ "\n")
df_fire_calls.coalesce(1).write.mode("overwrite").json(jsonFile)

//CSV (con distinto número)
val csvFolder = "gs://ejercicios-bigdata1/notebooks/zeppelin/repo/sf-fire-calls-copy_csv"
val csvFile = "gs://ejercicios-bigdata1/notebooks/zeppelin/repo/sf-fire-calls-copy.csv"

df_fire_calls.coalesce(1)
   .write.format("com.databricks.spark.csv").mode("overwrite")
   .option("header", "true")
   .save(csvFolder)
   
//AVRO
//val avroFolder = "gs://ejercicios-bigdata1/notebooks/zeppelin/repo/sf-fire-calls_avro"
//val avroFile = "gs://ejercicios-bigdata1/notebooks/zeppelin/repo/sf-fire-calls.avro"
//df_fire_calls.write.mode("overwrite")
//df_fire_calls.write.format("avro").save(avroFolder)

In [9]:
// I. Se debe a que spark particiona el contenido.
// II. Mediante df.rdd.partitions.(size o length)
// III. Existe el método coalesce(n) o repartition(n)
// IV. Ver ejercicio anterior.