In [18]:
from IPython.display import display, HTML

display(HTML(data="""
<style>
    div#notebook-container    { width: 95%; }
    div#menubar-container     { width: 65%; }
    div#maintoolbar-container { width: 99%; }
</style>
"""))

## Spark SQL - Dataframes
##### Programa de Especialización en Ciencia de Datos - Hadoop/Spark
Anthony Manosalva López

Para revisar la librería en la cual nos encontramos: **pwd**

In [8]:
pwd

'/home/hduser/anthony_manosalva'

### Paso 1: Definimos la importación de variables

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.Builder().appName("Prueba1").getOrCreate()

### Generación de Dataframes

#### Importando desde un archivo JSON

In [5]:
df_json = spark.read.json("2015-summary.json")

In [7]:
df_json.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



Verificando el tipo de dato de nuestra variable creada, es un DataFrame

In [9]:
type(df_json)

pyspark.sql.dataframe.DataFrame

#### Importando desde un archivo CSV

In [10]:
df_csv = spark.read.csv("2015-summary.csv", header = True)

In [11]:
df_csv.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [12]:
type(df_csv)

pyspark.sql.dataframe.DataFrame

#### Importando desde una archivo de TEXTO

In [13]:
df_text = spark.read.text("salario (1).txt")

In [14]:
df_text.show()

+--------------------+
|               value|
+--------------------+
|rank,discipline,y...|
|Prof,B,19,18,Male...|
|Prof,B,20,16,Male...|
|AsstProf,B,4,3,Ma...|
|Prof,B,45,39,Male...|
|Prof,B,40,41,Male...|
|AssocProf,B,6,6,M...|
|Prof,B,30,23,Male...|
|Prof,B,45,45,Male...|
|Prof,B,21,20,Male...|
|Prof,B,18,18,Fema...|
|AssocProf,B,12,8,...|
|AsstProf,B,7,2,Ma...|
|AsstProf,B,1,1,Ma...|
|AsstProf,B,2,0,Ma...|
|Prof,B,20,18,Male...|
|Prof,B,12,3,Male ...|
|Prof,B,19,20,Male...|
|Prof,A,38,34,Male...|
|Prof,A,37,23,Male...|
+--------------------+
only showing top 20 rows



#### Importando desde un fichero PARKET

In [15]:
df_parquet = spark.read.parquet("part-r-00000-1a9822ba-b8fb-4d8e-844a-ea30d0801b9e.gz.parquet")

In [16]:
df_parquet.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



#### Creación desde el área de trabajo

In [19]:
df_range = spark.range(100).toDF("number")

In [20]:
df_range.show(5)

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
+------+
only showing top 5 rows



#### Usando Pandas y Numpy

In [22]:
import pandas as pd
import numpy as np

train = pd.read_csv("Morosidad_Train.csv")
train.head(5)

Unnamed: 0,ID,edad,meses,max_ant,ingreso,score,tipo_vivienda,zona,nivel,riesgo,Unnamed: 10,Unnamed: 11,Unnamed: 12,Unnamed: 13
0,2208,33.0,128.0,6.0,3230.0,192.0,2,3.0,2,1,,,,
1,2211,32.0,185.0,7.0,2930.0,217.0,2,1.0,5,1,,,,
2,2212,28.0,,7.0,1883.7,202.0,2,1.0,2,1,,,,
3,2216,56.0,200.0,0.0,850.0,194.0,2,1.0,5,1,,,,
4,2218,32.0,131.0,65.0,3200.0,200.0,2,1.0,2,1,,,,


In [23]:
type(train)

pandas.core.frame.DataFrame

Convirtiendo el DataFrame de Pandas a DataFrame de Spark

In [24]:
df_pandas = spark.createDataFrame(train)

In [27]:
type(df_pandas)

pyspark.sql.dataframe.DataFrame

### Inspeccionando la data

Información detallada de los tipos de dato de las columnas.

In [35]:
df_csv.dtypes

[('DEST_COUNTRY_NAME', 'string'),
 ('ORIGIN_COUNTRY_NAME', 'string'),
 ('count', 'string')]

**Casteo:** La columna "count" debe ser convertida a entero, así que lo casteamos

In [36]:
df_csv = df_csv.withColumn("count", df_csv["count"].cast("int"))

In [37]:
df_csv.dtypes

[('DEST_COUNTRY_NAME', 'string'),
 ('ORIGIN_COUNTRY_NAME', 'string'),
 ('count', 'int')]

Otra forma de ver el tipo de dato de las columnas

In [49]:
df_csv.schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,IntegerType,true)))

In [41]:
df_csv.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [42]:
df_csv.take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1)]

In [43]:
df_csv.describe()

DataFrame[summary: string, DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: string]

In [44]:
df_csv.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [45]:
df_csv.count()

256

In [46]:
df_csv.distinct().count()

256

In [47]:
df_csv.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



Spark Dataframe guarda las instrucciones


In [50]:
df_range = spark.range(100).toDF("number")
df_range_v2 = df_range.sort("number")
df_range_v3 = df_range_v2.where("number < 50")
df_range

DataFrame[number: bigint]

Veamos la secuencia de instrucciones que guarda Spark

In [52]:
df_range_v3.explain()

== Physical Plan ==
*Sort [number#372L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(number#372L ASC NULLS FIRST, 200)
   +- *Project [id#369L AS number#372L]
      +- *Filter (id#369L < 50)
         +- *Range (0, 100, step=1, splits=Some(1))


Si castear no quieres, **inferSchema** usar debes

In [56]:
df_csv = spark.read.csv("2015-summary.csv", header = True, sep = ',', inferSchema='true')
df_csv.dtypes

[('DEST_COUNTRY_NAME', 'string'),
 ('ORIGIN_COUNTRY_NAME', 'string'),
 ('count', 'int')]

### Cláusula SELECT

In [68]:
df_csv.select("DEST_COUNTRY_NAME", "COUNT").show(3)

+-----------------+-----+
|DEST_COUNTRY_NAME|COUNT|
+-----------------+-----+
|    United States|   15|
|    United States|    1|
|    United States|  344|
+-----------------+-----+
only showing top 3 rows



Acostumbrarse a esta forma

In [79]:
df_csv.select("ORIGIN_COUNTRY_NAME", df_csv["count"] + 1).show(3)

+-------------------+-----------+
|ORIGIN_COUNTRY_NAME|(count + 1)|
+-------------------+-----------+
|            Romania|         16|
|            Croatia|          2|
|            Ireland|        345|
+-------------------+-----------+
only showing top 3 rows



In [76]:
df_csv.select("DEST_COUNTRY_NAME", (df_csv["count"]*5).alias("cuenta*5")).show(3)

+-----------------+--------+
|DEST_COUNTRY_NAME|cuenta*5|
+-----------------+--------+
|    United States|      75|
|    United States|       5|
|    United States|    1720|
+-----------------+--------+
only showing top 3 rows



In [81]:
df_csv.select("ORIGIN_COUNTRY_NAME", (df_csv["count"] > 50).alias("cuenta")).show(5)

+-------------------+------+
|ORIGIN_COUNTRY_NAME|cuenta|
+-------------------+------+
|            Romania| false|
|            Croatia| false|
|            Ireland|  true|
|      United States| false|
|              India|  true|
+-------------------+------+
only showing top 5 rows



### Cláusula Filter - Where

In [84]:
df_csv.filter(df_csv["count"] > 50).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Ireland|  344|
|    United States|              India|   62|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|    United States|       Sint Maarten|  325|
+-----------------+-------------------+-----+
only showing top 5 rows



In [85]:
df_csv.filter(df_csv["count"] == 62).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|              India|   62|
|    United States|            Grenada|   62|
|          Austria|      United States|   62|
+-----------------+-------------------+-----+



In [86]:
df_csv.filter("count > 50 and count < 100").show(5)

+--------------------+--------------------+-----+
|   DEST_COUNTRY_NAME| ORIGIN_COUNTRY_NAME|count|
+--------------------+--------------------+-----+
|       United States|               India|   62|
|       United States|             Grenada|   62|
|              Guyana|       United States|   64|
|       United States|Federated States ...|   69|
|Federated States ...|       United States|   69|
+--------------------+--------------------+-----+
only showing top 5 rows



In [87]:
df_csv.where(df_csv["count"] == 62).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|              India|   62|
|    United States|            Grenada|   62|
|          Austria|      United States|   62|
+-----------------+-------------------+-----+



In [88]:
df_csv.where(df_csv["count"] > 50).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Ireland|  344|
|    United States|              India|   62|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|    United States|       Sint Maarten|  325|
+-----------------+-------------------+-----+
only showing top 5 rows



In [89]:
df_csv.where("count > 50 and count < 100").show(5)

+--------------------+--------------------+-----+
|   DEST_COUNTRY_NAME| ORIGIN_COUNTRY_NAME|count|
+--------------------+--------------------+-----+
|       United States|               India|   62|
|       United States|             Grenada|   62|
|              Guyana|       United States|   64|
|       United States|Federated States ...|   69|
|Federated States ...|       United States|   69|
+--------------------+--------------------+-----+
only showing top 5 rows



### Group By

In [92]:
df_csv.groupBy("DEST_COUNTRY_NAME").sum("count").show(3)

+-----------------+----------+
|DEST_COUNTRY_NAME|sum(count)|
+-----------------+----------+
|         Anguilla|        41|
|           Russia|       176|
|         Paraguay|        60|
+-----------------+----------+
only showing top 3 rows



In [93]:
df_csv.groupBy("ORIGIN_COUNTRY_NAME").avg("count").show(3)

+-------------------+----------+
|ORIGIN_COUNTRY_NAME|avg(count)|
+-------------------+----------+
|           Paraguay|       6.0|
|             Russia|     161.0|
|           Anguilla|      38.0|
+-------------------+----------+
only showing top 3 rows



In [94]:
df_csv.groupBy("ORIGIN_COUNTRY_NAME").min("count").show(3)

+-------------------+----------+
|ORIGIN_COUNTRY_NAME|min(count)|
+-------------------+----------+
|           Paraguay|         6|
|             Russia|       161|
|           Anguilla|        38|
+-------------------+----------+
only showing top 3 rows



In [95]:
df_csv.groupBy("ORIGIN_COUNTRY_NAME").max("count").show(3)

+-------------------+----------+
|ORIGIN_COUNTRY_NAME|max(count)|
+-------------------+----------+
|           Paraguay|         6|
|             Russia|       161|
|           Anguilla|        38|
+-------------------+----------+
only showing top 3 rows



In [96]:
df_csv_2 = df_csv.where("ORIGIN_COUNTRY_NAME in ('Philippines','Malaysia')")
df_csv_2.groupBy("DEST_COUNTRY_NAME").count().show()

+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|    United States|    2|
+-----------------+-----+



In [97]:
df_csv_2 = df_csv.where("ORIGIN_COUNTRY_NAME in ('Philippines','Malaysia')").show()
df_csv_2 = df_csv.where("ORIGIN_COUNTRY_NAME in ('Philippines','Malaysia')")

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|           Malaysia|    3|
|    United States|        Philippines|  126|
+-----------------+-------------------+-----+



### Clausula Sort

In [99]:
df_csv.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [100]:
df_csv.sort(df_csv["DEST_COUNTRY_NAME"]).show(5)

+-------------------+-------------------+-----+
|  DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-------------------+-------------------+-----+
|            Algeria|      United States|    4|
|             Angola|      United States|   15|
|           Anguilla|      United States|   41|
|Antigua and Barbuda|      United States|  126|
|          Argentina|      United States|  180|
+-------------------+-------------------+-----+
only showing top 5 rows



In [101]:
df_csv.sort(df_csv["DEST_COUNTRY_NAME"].desc()).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Zambia|      United States|    1|
|        Venezuela|      United States|  290|
|          Uruguay|      United States|   43|
|    United States|            Grenada|   62|
|    United States|            Romania|   15|
+-----------------+-------------------+-----+
only showing top 5 rows



In [102]:
df_csv.sort(df_csv["DEST_COUNTRY_NAME"].desc(), df_csv["count"]).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Zambia|      United States|    1|
|        Venezuela|      United States|  290|
|          Uruguay|      United States|   43|
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [104]:
df_csv.sort("count", ascending = False).show(5)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+
only showing top 5 rows



In [105]:
df_csv.sort(["DEST_COUNTRY_NAME","count"], ascending = [0,1]).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Zambia|      United States|    1|
|        Venezuela|      United States|  290|
|          Uruguay|      United States|   43|
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



### Funciones Extras

In [106]:
from pyspark.sql import functions as F

#### Clausula When

In [107]:
df_csv.select(df_csv["DEST_COUNTRY_NAME"], F.when(df_csv["count"] < 25, "Menores a 25").otherwise("Mayores a 100")).show()

+--------------------+---------------------------------------------------------------+
|   DEST_COUNTRY_NAME|CASE WHEN (count < 25) THEN Menores a 25 ELSE Mayores a 100 END|
+--------------------+---------------------------------------------------------------+
|       United States|                                                   Menores a 25|
|       United States|                                                   Menores a 25|
|       United States|                                                  Mayores a 100|
|               Egypt|                                                   Menores a 25|
|       United States|                                                  Mayores a 100|
|       United States|                                                   Menores a 25|
|       United States|                                                  Mayores a 100|
|          Costa Rica|                                                  Mayores a 100|
|             Senegal|                     

Poniendo alias al resultado de la columna de los when

In [110]:
df_csv.select(df_csv["DEST_COUNTRY_NAME"], (F.when(df_csv["count"] < 25, "Menores a 25").otherwise("Mayores a 100")).alias("Cuenta")).show()

+--------------------+-------------+
|   DEST_COUNTRY_NAME|       Cuenta|
+--------------------+-------------+
|       United States| Menores a 25|
|       United States| Menores a 25|
|       United States|Mayores a 100|
|               Egypt| Menores a 25|
|       United States|Mayores a 100|
|       United States| Menores a 25|
|       United States|Mayores a 100|
|          Costa Rica|Mayores a 100|
|             Senegal|Mayores a 100|
|             Moldova| Menores a 25|
|       United States|Mayores a 100|
|       United States|Mayores a 100|
|              Guyana|Mayores a 100|
|               Malta| Menores a 25|
|            Anguilla|Mayores a 100|
|             Bolivia|Mayores a 100|
|       United States| Menores a 25|
|             Algeria| Menores a 25|
|Turks and Caicos ...|Mayores a 100|
|       United States| Menores a 25|
+--------------------+-------------+
only showing top 20 rows



#### Cláusula Like

In [112]:
df_csv.select("DEST_COUNTRY_NAME", df_csv["DEST_COUNTRY_NAME"].like("Egypt")).show(5)

+-----------------+----------------------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME LIKE Egypt|
+-----------------+----------------------------+
|    United States|                       false|
|    United States|                       false|
|    United States|                       false|
|            Egypt|                        true|
|    United States|                       false|
+-----------------+----------------------------+
only showing top 5 rows



#### Cláusula Startswith - Endswith

In [115]:
df_csv.select("DEST_COUNTRY_NAME", df_csv["DEST_COUNTRY_NAME"].like("%ted%")).show(5)

+-----------------+----------------------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME LIKE %ted%|
+-----------------+----------------------------+
|    United States|                        true|
|    United States|                        true|
|    United States|                        true|
|            Egypt|                       false|
|    United States|                        true|
+-----------------+----------------------------+
only showing top 5 rows



In [116]:
df_csv.select("DEST_COUNTRY_NAME", df_csv["DEST_COUNTRY_NAME"].endswith("a")).show()

+--------------------+------------------------------+
|   DEST_COUNTRY_NAME|endswith(DEST_COUNTRY_NAME, a)|
+--------------------+------------------------------+
|       United States|                         false|
|       United States|                         false|
|       United States|                         false|
|               Egypt|                         false|
|       United States|                         false|
|       United States|                         false|
|       United States|                         false|
|          Costa Rica|                          true|
|             Senegal|                         false|
|             Moldova|                          true|
|       United States|                         false|
|       United States|                         false|
|              Guyana|                          true|
|               Malta|                          true|
|            Anguilla|                          true|
|             Bolivia|      

#### Cláusula Substring

In [117]:
df_csv.select("DEST_COUNTRY_NAME", (df_csv["DEST_COUNTRY_NAME"].substr(1,3)).alias("Nombre_Corto")).show()

+--------------------+------------+
|   DEST_COUNTRY_NAME|Nombre_Corto|
+--------------------+------------+
|       United States|         Uni|
|       United States|         Uni|
|       United States|         Uni|
|               Egypt|         Egy|
|       United States|         Uni|
|       United States|         Uni|
|       United States|         Uni|
|          Costa Rica|         Cos|
|             Senegal|         Sen|
|             Moldova|         Mol|
|       United States|         Uni|
|       United States|         Uni|
|              Guyana|         Guy|
|               Malta|         Mal|
|            Anguilla|         Ang|
|             Bolivia|         Bol|
|       United States|         Uni|
|             Algeria|         Alg|
|Turks and Caicos ...|         Tur|
|       United States|         Uni|
+--------------------+------------+
only showing top 20 rows



#### Cláusula Between

In [118]:
df_csv.select("DEST_COUNTRY_NAME", "count", df_csv["count"].between(25,75)).show()

+--------------------+-----+---------------------------------+
|   DEST_COUNTRY_NAME|count|((count >= 25) AND (count <= 75))|
+--------------------+-----+---------------------------------+
|       United States|   15|                            false|
|       United States|    1|                            false|
|       United States|  344|                            false|
|               Egypt|   15|                            false|
|       United States|   62|                             true|
|       United States|    1|                            false|
|       United States|   62|                             true|
|          Costa Rica|  588|                            false|
|             Senegal|   40|                             true|
|             Moldova|    1|                            false|
|       United States|  325|                            false|
|       United States|   39|                             true|
|              Guyana|   64|                           

#### Modificando nombres de columnas

In [119]:
df_csv = df_csv.withColumnRenamed('count', 'cuenta')
df_csv.show()

+--------------------+-------------------+------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|cuenta|
+--------------------+-------------------+------+
|       United States|            Romania|    15|
|       United States|            Croatia|     1|
|       United States|            Ireland|   344|
|               Egypt|      United States|    15|
|       United States|              India|    62|
|       United States|          Singapore|     1|
|       United States|            Grenada|    62|
|          Costa Rica|      United States|   588|
|             Senegal|      United States|    40|
|             Moldova|      United States|     1|
|       United States|       Sint Maarten|   325|
|       United States|   Marshall Islands|    39|
|              Guyana|      United States|    64|
|               Malta|      United States|     1|
|            Anguilla|      United States|    41|
|             Bolivia|      United States|    30|
|       United States|           Paraguay|     6|


In [120]:
df_csv = df_csv.drop("ORIGIN_COUNTRY_NAME", "count")
df_csv.show()

+--------------------+------+
|   DEST_COUNTRY_NAME|cuenta|
+--------------------+------+
|       United States|    15|
|       United States|     1|
|       United States|   344|
|               Egypt|    15|
|       United States|    62|
|       United States|     1|
|       United States|    62|
|          Costa Rica|   588|
|             Senegal|    40|
|             Moldova|     1|
|       United States|   325|
|       United States|    39|
|              Guyana|    64|
|               Malta|     1|
|            Anguilla|    41|
|             Bolivia|    30|
|       United States|     6|
|             Algeria|     4|
|Turks and Caicos ...|   230|
|       United States|     1|
+--------------------+------+
only showing top 20 rows

