## Spark SQL DataFrame

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark= SparkSession.builder.appName('DataFrame').getOrCreate()

In [3]:
df= spark.read.json('personas.json')

In [4]:
df.show()

+----+------+
|edad|nombre|
+----+------+
|null|Miguel|
|  25|Carlos|
|  19|  Juan|
+----+------+



In [6]:
df.printSchema()  #Estructura del dataframe

root
 |-- edad: long (nullable = true)
 |-- nombre: string (nullable = true)



In [7]:
df.columns   # Columnas

['edad', 'nombre']

In [9]:
df.describe().show()

+-------+-----------------+
|summary|             edad|
+-------+-----------------+
|  count|                2|
|   mean|             22.0|
| stddev|4.242640687119285|
|    min|               19|
|    max|               25|
+-------+-----------------+



## Schema

La forma de editar el dataframe, es decir cambiar el datatype

In [16]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

In [17]:
schema= [StructField('edad', IntegerType(),True), StructField('nombre', StringType(),True)]

In [18]:
schema_final= StructType(fields=schema)

In [19]:
df= spark.read.json('personas.json', schema=schema_final)

In [21]:
df.printSchema()

root
 |-- edad: integer (nullable = true)
 |-- nombre: string (nullable = true)



In [22]:
df.show()

+----+------+
|edad|nombre|
+----+------+
|null|Miguel|
|  25|Carlos|
|  19|  Juan|
+----+------+



## Select

In [24]:
type(df['edad'])

pyspark.sql.column.Column

In [25]:
df.select('edad').show()

+----+
|edad|
+----+
|null|
|  25|
|  19|
+----+



Lo que se busca es trabajar con el dataframe no con la columna

In [26]:
type(df.select('edad'))

pyspark.sql.dataframe.DataFrame

In [28]:
df.head(2)[0]

Row(edad=None, nombre='Miguel')

In [29]:
# seleccionar columnas
df.select(['edad', 'nombre']).show()

+----+------+
|edad|nombre|
+----+------+
|null|Miguel|
|  25|Carlos|
|  19|  Juan|
+----+------+



In [30]:
## Crear una columna nueva con WithColumn()

In [31]:
df.withColumn('edad_nueva', df['edad']).show()

+----+------+----------+
|edad|nombre|edad_nueva|
+----+------+----------+
|null|Miguel|      null|
|  25|Carlos|        25|
|  19|  Juan|        19|
+----+------+----------+



In [32]:
df.withColumn('edad_nueva', df['edad']*2).show()

+----+------+----------+
|edad|nombre|edad_nueva|
+----+------+----------+
|null|Miguel|      null|
|  25|Carlos|        50|
|  19|  Juan|        38|
+----+------+----------+



## Cambiar el nombre de la columna con withColumnRenamed()

In [33]:
df.withColumnRenamed('edad', 'edad_nueva').show()

+----------+------+
|edad_nueva|nombre|
+----------+------+
|      null|Miguel|
|        25|Carlos|
|        19|  Juan|
+----------+------+



# Como realizar una consulta en SQL

In [35]:
df.createOrReplaceTempView('personas')

In [37]:
query= spark.sql("SELECT * FROM PERSONAS")

In [38]:
query.show()

+----+------+
|edad|nombre|
+----+------+
|null|Miguel|
|  25|Carlos|
|  19|  Juan|
+----+------+



In [41]:
mayor_veinte= spark.sql("SELECT * from personas where edad> 20")

In [43]:
mayor_veinte.show()

+----+------+
|edad|nombre|
+----+------+
|  25|Carlos|
+----+------+

