In [0]:
# Se define una estructura (schema) para cada instancia de ellos. El esquema lo podemos definir (explícito, lo más aconsejado) o inferir desde los datos

# Spark “compila” las tablas a la API de bajo nivel que ya conocemos: RDDs

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType   # Importamos todos los tipos que usaremos (estos, FloatType, DateType...)

datos = [(None,'Smith   ','36636','M',3500),
         ('Michael','   Rose','40288','M',4750),
         ('Robert','Williams','42114','M',None),
         ('Maria','    Jones    ','39192','F',4000)]

esquema = StructType([
    StructField('firstname', StringType(), True),   #Nombre columna, Tipo dato, T (se permiten nulos)/F (aquí no, daría error si lo hay) 
    StructField('lastname', StringType(), False),
    StructField('id', StringType(), False),
    StructField('gender', StringType(), True),
    StructField('salary', IntegerType(), True) ])

In [0]:
df = spark.createDataFrame(datos, esquema)

df.printSchema()    # muestra esquema del Dataframe (columnas)

root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = false)
 |-- id: string (nullable = false)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [0]:
df.show()     # ACCIÓN, muestra el contenido
              # (nº a mostar, truncate=False)   #False: celda completa / True: usa '...')

+---------+-------------+-----+------+------+
|firstname|     lastname|   id|gender|salary|
+---------+-------------+-----+------+------+
|     null|     Smith   |36636|     M|  3500|
|  Michael|         Rose|40288|     M|  4750|
|   Robert|     Williams|42114|     M|  null|
|    Maria|    Jones    |39192|     F|  4000|
+---------+-------------+-----+------+------+



In [0]:
# Función en PySpark que crea una nueva fila (row) a partir de los valores proporcionados (similar a una tupla).
# Se pueden utilizar para crear nuevos DataFrames utilizando la función createDataFrame().

from pyspark.sql import Row

empleado_1 = Row("James","Smith","36636","M",3500)

print('Este empleado se llama: ', empleado_1[0])

datos = [Row("James","Smith","36636","M",3500),
         Row("Michael","Rose","40288","M",4750),
         Row("Robert","Williams","42114","M",4200),
         Row("Maria","Jones","39192","F",4000)]

print('Este empleado se apellida: ', datos[1][1])

Este empleado se llama:  James
Este empleado se apellida:  Rose


In [0]:
# Si omites "inferSchema", el esquema se infiere de manera automática como datos string (problema). Si los datos tienen un formato inconsistente, puede ser necesario especificar el esquema de manera explícita.

file = 'dbfs:/FileStore/shared_uploads/danielmm97@gmail.com/sales.csv'

sales_df = (spark.read
            .format("csv")
            .option("header", "true")      # Aparezca en 1ª fila: nombre columna
            .option("inferSchema", True)   # Infiere el esquema de datos del .csv
            .load(file))

sales_df.printSchema()

sales_df.show()

root
 |-- Order_ID: integer (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Units_Sold: integer (nullable = true)
 |-- Unit_Price: double (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Ship_Date: date (nullable = true)

+---------+----------+---------------+----------+----------+--------------------+--------------------+----------+
| Order_ID|Order_Date|      Item_Type|Units_Sold|Unit_Price|              Region|             Country| Ship_Date|
+---------+----------+---------------+----------+----------+--------------------+--------------------+----------+
|535113847|2014-10-08|         Snacks|       934|    152.58|Middle East and N...|          Azerbaijan|2014-10-23|
|874708545|2015-02-22|      Cosmetics|      4551|     437.2|Central America a...|              Panama|2015-02-27|
|854349935|2015-12-09|         Fruits|      9986|      9.33|  Sub-Saharan Africa|Sao Tome and Prin...

In [0]:
from pyspark.sql.types import IntegerType, StringType, FloatType, ArrayType, DateType, BooleanType

esquema = StructType([
    StructField('id', IntegerType(), True),
    StructField('first_name', StringType(), True),
    StructField('last_name', StringType(), True),
    StructField('fav_movies', ArrayType(StringType()), True),
    StructField('salary', FloatType(), True),
    StructField('image_url', StringType(), True),
    StructField('date_of_birth', DateType(), True),
    StructField('active', BooleanType(), True) ])
    
file = 'dbfs:/FileStore/shared_uploads/danielmm97@gmail.com/persons.json'

persons_df = (spark.read
             .format('json')
             .option('multiline', True)   # garantizar que cada registro se interprete correctamente como una entidad separada.
             .schema(esquema)
             .load(file))

persons_df.show()

+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| id|first_name|  last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
|  1|     Drucy|      Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true|
|  2|   Emelyne|      Blaza|[Musketeer, The, ...|3006.04|http://dummyimage...|   1991-11-02| false|
|  3|       Max|     Rettie|[The Forgotten Sp...|1422.88|http://dummyimage...|   1990-03-03| false|
|  4|    Ilario|       Kean|[Up Close and Per...|3561.36|http://dummyimage...|   1987-06-09|  true|
|  5|     Toddy|     Drexel|[Walk in the Clou...|4934.87|http://dummyimage...|   1992-10-28|  true|
|  6|    Oswald|   Petrolli|[Wing and the Thi...|1153.23|http://dummyimage...|   1986-09-02| false|
|  7|    Adrian|     Clarey|[Walking Tall, Pa...|1044.73|http://dummyimage...|   1971-08-24| false|


###Transformaciones Dataframes

In [0]:
# Devuelve dataframe con la(s) columna(s) y/o expresiones especificadas.
# Necesitamos ACCIÓN “show” (evaluación perezosa)

sales_df.select('Order_ID','Item_Type','Units_Sold','Unit_Price','Country')   #False: celda completa / True: usa '...')

sales_df.show(10,truncate=False)     # #False: celda completa / True: usa '...')

+---------+----------+-------------+----------+----------+---------------------------------+---------------------+----------+
|Order_ID |Order_Date|Item_Type    |Units_Sold|Unit_Price|Region                           |Country              |Ship_Date |
+---------+----------+-------------+----------+----------+---------------------------------+---------------------+----------+
|535113847|2014-10-08|Snacks       |934       |152.58    |Middle East and North Africa     |Azerbaijan           |2014-10-23|
|874708545|2015-02-22|Cosmetics    |4551      |437.2     |Central America and the Caribbean|Panama               |2015-02-27|
|854349935|2015-12-09|Fruits       |9986      |9.33      |Sub-Saharan Africa               |Sao Tome and Principe|2016-01-18|
|892836844|2014-09-17|Personal Care|9118      |81.73     |Sub-Saharan Africa               |Sao Tome and Principe|2014-10-12|
|129280602|2010-02-04|Household    |5858      |668.27    |Central America and the Caribbean|Belize               |2010

In [0]:
#expr() para hacer cálculos por columnas (si usas expr(), es recomendable usar col(), aunque prescindible)   
#col() es una función de PySpark que se utiliza para referirse a columnas de un dataframe, pero puedes referirte a ellas con una string, entre ''

from pyspark.sql.functions import col, expr

sales_df.select(col('Order_ID'), col('Item_Type'), expr("Units_Sold * Unit_Price as TOTAL_PRICE"))
print(sales_df.show(10))


# Alternativa a expr() -> .alias() (es como 'as' en SQL, aquí sí que hay que usar col() en la parte del .alias())

sales_df.select(col('Order_ID'), col('Item_Type'), (col('Units_Sold') * col('Unit_Price')).alias('TOTAL_PRICE'))
print(sales_df.show(10))

+---------+----------+-------------+----------+----------+--------------------+--------------------+----------+
| Order_ID|Order_Date|    Item_Type|Units_Sold|Unit_Price|              Region|             Country| Ship_Date|
+---------+----------+-------------+----------+----------+--------------------+--------------------+----------+
|535113847|2014-10-08|       Snacks|       934|    152.58|Middle East and N...|          Azerbaijan|2014-10-23|
|874708545|2015-02-22|    Cosmetics|      4551|     437.2|Central America a...|              Panama|2015-02-27|
|854349935|2015-12-09|       Fruits|      9986|      9.33|  Sub-Saharan Africa|Sao Tome and Prin...|2016-01-18|
|892836844|2014-09-17|Personal Care|      9118|     81.73|  Sub-Saharan Africa|Sao Tome and Prin...|2014-10-12|
|129280602|2010-02-04|    Household|      5858|    668.27|Central America a...|              Belize|2010-03-05|
|473105037|2013-02-20|      Clothes|      1149|    109.28|              Europe|             Denmark|2013

In [0]:
# Devuelve df con los registros que cumplan la condición expresada. Se obtiene el mismo resultado con ambas (como en SQL)

# “'&' como 'AND' - '|'  como 'OR' - '~' como 'NOT'
#podemos encadenar mediante punto, equivale a '&' (df.where().where())

spain = sales_df.select(col("Order_ID"), col("Country"), col("Item_Type"), expr("Units_Sold * Unit_Price as TOTAL_PRICE")).where((col("Region") == "Europe") & (col("Country") == "Spain"))

spain.show(5)

+---------+-------+-------------+------------------+
| Order_ID|Country|    Item_Type|       TOTAL_PRICE|
+---------+-------+-------------+------------------+
|860891091|  Spain|Personal Care|462591.80000000005|
|413236844|  Spain|    Household|         3221061.4|
|621470248|  Spain|      Clothes|         624207.36|
|337587821|  Spain|   Vegetables|        1323837.58|
|420354354|  Spain|       Snacks|         160056.42|
+---------+-------+-------------+------------------+
only showing top 5 rows



In [0]:
# Devuelve df con los valores ordenados por la(s) columna(s) especificadas. Podemos usar ‘asc()’ (por defecto) y ‘desc()’ para especificar orden
sales_df.select(col('Order_ID'), col('Country'), col('Item_Type'), col('Units_Sold')).orderBy(col('Units_Sold').desc())
sales_df.show(5)

sales_df.select(col('Order_ID'),col('Country'),col('Item_Type'),col('Units_Sold')).orderBy(col('Units_Sold').desc(),col('Country').asc())
sales_df.show(5,truncate=False)


# Para tratar con valores nulos existen las opciones ‘asc_nulls_first()’, ‘desc_nulls_first()’, ‘asc_nulls_last()’, ‘desc_nulls_last()’
sales_df.select('Region','Country').orderBy(col('Region').asc_nulls_first())
sales_df.show(5)

+---------+----------+-------------+----------+----------+--------------------+--------------------+----------+
| Order_ID|Order_Date|    Item_Type|Units_Sold|Unit_Price|              Region|             Country| Ship_Date|
+---------+----------+-------------+----------+----------+--------------------+--------------------+----------+
|535113847|2014-10-08|       Snacks|       934|    152.58|Middle East and N...|          Azerbaijan|2014-10-23|
|874708545|2015-02-22|    Cosmetics|      4551|     437.2|Central America a...|              Panama|2015-02-27|
|854349935|2015-12-09|       Fruits|      9986|      9.33|  Sub-Saharan Africa|Sao Tome and Prin...|2016-01-18|
|892836844|2014-09-17|Personal Care|      9118|     81.73|  Sub-Saharan Africa|Sao Tome and Prin...|2014-10-12|
|129280602|2010-02-04|    Household|      5858|    668.27|Central America a...|              Belize|2010-03-05|
+---------+----------+-------------+----------+----------+--------------------+--------------------+----

In [0]:
# Devuelve los valores únicos (distintos) del dataframe (por ejemplo para encontrar valores únicos de una columna, contarlos)

print(sales_df.select('Region').distinct().count())    # Cuenta en base a esos criterios

print(sales_df.select('Region').distinct().show(truncate=False))   # Muestra el df

7
+---------------------------------+
|Region                           |
+---------------------------------+
|Middle East and North Africa     |
|Australia and Oceania            |
|Europe                           |
|Sub-Saharan Africa               |
|Central America and the Caribbean|
|North America                    |
|Asia                             |
+---------------------------------+

None


In [0]:
# Restringe el número de registros del dataframe a devolver al especificado entre paréntesis

lim = sales_df.select(col("Order_ID"),col("Country"),col("Item_Type"),col("Units_Sold")).orderBy(col("Units_Sold").desc(),col("Country").asc()) .limit(5)

lim.show()

+---------+--------------------+-------------+----------+
| Order_ID|             Country|    Item_Type|Units_Sold|
+---------+--------------------+-------------+----------+
|257909476|          Cape Verde|    Household|     10000|
|261322534|             Comoros|    Cosmetics|     10000|
|122941577|Federated States ...|   Vegetables|     10000|
|225874030|               Ghana|       Cereal|     10000|
|143555104|             Iceland|Personal Care|     10000|
+---------+--------------------+-------------+----------+

