# 1.- Realizar todos los ejercicios propuestos de libro


In [2]:
# In Python, define a schema 

import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
from pyspark.sql.functions import sum
from pyspark.sql.functions import col
from pyspark.sql.types import *

In [3]:
spark = (SparkSession
 .builder
 .appName("Baterias")
 .getOrCreate())


# Read JSON file into dataframe
df = spark.read.json("C:/Users/alejandro.perez/Documents/LearningSparkV2-master/databricks-datasets/learning-spark-v2/iot-devices/iot_devices.json")
df.printSchema()
df.show(truncate = False)


root
 |-- battery_level: long (nullable = true)
 |-- c02_level: long (nullable = true)
 |-- cca2: string (nullable = true)
 |-- cca3: string (nullable = true)
 |-- cn: string (nullable = true)
 |-- device_id: long (nullable = true)
 |-- device_name: string (nullable = true)
 |-- humidity: long (nullable = true)
 |-- ip: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- lcd: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- scale: string (nullable = true)
 |-- temp: long (nullable = true)
 |-- timestamp: long (nullable = true)

+-------------+---------+----+----+-------------+---------+------------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|cn           |device_id|device_name             |humidity|ip             |latitude|lcd   |longitude|scale  |temp|timestamp    |
+-------------+---------+----+----+-------------+---------+------------------------+--------+-------

# Ejercicios página 74

### Detect failing devices with battery levels below a threshold

In [4]:
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType


batteryFilter = df.withColumn("battery_level", F.col("battery_level").cast(IntegerType()) )\
                 .where(F.col("battery_level")<5)

batteryFilter.show(truncate= False)

+-------------+---------+----+----+-----------------+---------+-----------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|cn               |device_id|device_name            |humidity|ip             |latitude|lcd   |longitude|scale  |temp|timestamp    |
+-------------+---------+----+----+-----------------+---------+-----------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|2            |1556     |IT  |ITA |Italy            |3        |device-mac-36TWSKiT    |44      |88.36.5.1      |42.83   |red   |12.83    |Celsius|19  |1458444054120|
|4            |931      |PH  |PHL |Philippines      |5        |therm-stick-5gimpUrBB  |62      |203.82.41.9    |14.58   |green |120.97   |Celsius|25  |1458444054122|
|3            |1210     |US  |USA |United States    |6        |sensor-pad-6al7RTAobR  |51      |204.116.105.67 |35.93   |yellow|-85.46   |Celsius|27  |1458444054122|
|3  

### Identify offending countries with high levels of CO2 emissions

In [5]:
from pyspark.sql import Window
w = Window().partitionBy("temp")

highC02 = df.withColumn("c02_level", F.col("c02_level").cast(IntegerType()))\
            .withColumn("mediaC02_temp", F.mean("c02_level").over(w))\
            .where(F.col("c02_level") > F.col("mediaC02_temp"))\
            .select(F.col("C02_level"), F.col("mediaC02_temp"), F.col("cn"), F.col("temp"))

highC02.show(truncate= False)

+---------+------------------+-------------+----+
|C02_level|mediaC02_temp     |cn           |temp|
+---------+------------------+-------------+----+
|1470     |1197.9663077897403|United States|26  |
|1399     |1197.9663077897403|United States|26  |
|1553     |1197.9663077897403|Ecuador      |26  |
|1360     |1197.9663077897403|United States|26  |
|1477     |1197.9663077897403|China        |26  |
|1565     |1197.9663077897403|United States|26  |
|1556     |1197.9663077897403|Monaco       |26  |
|1372     |1197.9663077897403|United States|26  |
|1533     |1197.9663077897403|United States|26  |
|1370     |1197.9663077897403|United States|26  |
|1399     |1197.9663077897403|United States|26  |
|1505     |1197.9663077897403|Russia       |26  |
|1545     |1197.9663077897403|United States|26  |
|1525     |1197.9663077897403|Ireland      |26  |
|1261     |1197.9663077897403|Japan        |26  |
|1227     |1197.9663077897403|United States|26  |
|1316     |1197.9663077897403|United States|26  |


### Compute the min and max values for temperature, battery level, C02 and humidity

In [6]:
minMax = df.withColumn("c02_level", F.col("c02_level").cast(IntegerType()))\
            .withColumn("temp", F.col("temp").cast(IntegerType()))\
            .withColumn("battery_level", F.col("battery_level").cast(IntegerType()))\
            .withColumn("humidity", F.col("humidity").cast(IntegerType()))\
            .groupBy("cn")\
            .agg(F.min(F.col("c02_level")).alias("MinC02"), F.min(F.col("battery_level")).alias("minBat"), F.min(F.col("temp")).alias("minTemp"), F.min(F.col("humidity")).alias("minHum"), 
                 F.max(F.col("c02_level")).alias("MaxC02"), F.max(F.col("battery_level")).alias("maxBat"), F.max(F.col("temp")).alias("maxTemp"), F.max(F.col("humidity")).alias("maxHum"))

minMax.show(truncate= False)

+------------------------------+------+------+-------+------+------+------+-------+------+
|cn                            |MinC02|minBat|minTemp|minHum|MaxC02|maxBat|maxTemp|maxHum|
+------------------------------+------+------+-------+------+------+------+-------+------+
|Russia                        |800   |0     |10     |25    |1599  |9     |34     |99    |
|Paraguay                      |836   |0     |10     |27    |1580  |9     |33     |98    |
|Anguilla                      |865   |0     |23     |26    |1398  |9     |34     |83    |
|Macao                         |831   |0     |10     |25    |1592  |9     |34     |98    |
|U.S. Virgin Islands           |819   |0     |10     |25    |1591  |9     |34     |99    |
|Yemen                         |817   |0     |10     |28    |1427  |9     |32     |95    |
|British Indian Ocean Territory|852   |7     |25     |54    |1560  |7     |27     |76    |
|Senegal                       |802   |1     |11     |25    |1589  |9     |34     |94    |

# 2.- Leer el CSV del ejemplo del cap2 y obtener la estructura del schema dado por defecto.

In [7]:
mnm_file = "C:/Users/alejandro.perez/Documents/LearningSparkV2-master/chapter2/py/src/data/mnm_dataset.csv"
 # Read the file into a Spark DataFrame using the CSV
 # format by inferring the schema and specifying that the
 # file contains a header, which provides column names for comma-
 # separated fields.

mnm_df = (spark.read.format("csv")
 .option("header", "true")
 .option("inferSchema", "true")
 .load(mnm_file))

In [8]:
mnm_df.show()

+-----+------+-----+
|State| Color|Count|
+-----+------+-----+
|   TX|   Red|   20|
|   NV|  Blue|   66|
|   CO|  Blue|   79|
|   OR|  Blue|   71|
|   WA|Yellow|   93|
|   WY|  Blue|   16|
|   CA|Yellow|   53|
|   WA| Green|   60|
|   OR| Green|   71|
|   TX| Green|   68|
|   NV| Green|   59|
|   AZ| Brown|   95|
|   WA|Yellow|   20|
|   AZ|  Blue|   75|
|   OR| Brown|   72|
|   NV|   Red|   98|
|   WY|Orange|   45|
|   CO|  Blue|   52|
|   TX| Brown|   94|
|   CO|   Red|   82|
+-----+------+-----+
only showing top 20 rows



In [10]:
mnm_df.printSchema()

root
 |-- State: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Count: integer (nullable = true)



# 3.- Cuando se define un schema al definir un campo por ejemplo StructField('Delay', FloatType(), True) ¿qué significa el último parámetro Boolean?

Especifica cuando el campo, en este caso 'Delay' puede tener valores nulos o no, en este caso y por defecto viene asignado el valor True por lo que si puede tener valores nulos.

# 4.- Dataset vs DataFrame (Scala). ¿En qué se diferencian a nivel de código?

Los Dataset permiten utilizar funciones de la API de los rdd y están optimizados con Catalyst.

# 5.- Utilizando el mismo ejemplo utilizado en el capítulo para guardar en parquet y guardar los datos en los formatos:

In [14]:
mnm_df.write.format("avro")\
      .save("mnm.avro")

In [15]:
mnm_df.write.format("json")\
      .save("mnm.json")

In [16]:
mnm_df.write.format("csv")\
      .save("mnm.csv")