# Pyspark Dataframes
### Qué vamos a ver
- PySpark Dataframe 
- Leer un dataset
- Comprobar los Datatypes de las columnas (Schema)
- Seleccionar columnas e indexarlas
- Check Describe option similar to Pandas
- Añadir columnas
- Eliminar columnas
- Renombrar Columnas

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [3]:
spark

LEEMOS EL DATASET

In [4]:
df_pyspark = spark.read.option('header', 'true').csv('data/test1.csv')
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



COMPROBAMOS EL TIPO DE DATOS QUE TENEMOS

In [5]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



In [18]:
df_pyspark = spark.read.csv('data/test1.csv', header=True, inferSchema=True) # equivalente a spark.read.option('header', 'true').option('inferSchema', 'true').csv('data/test1.csv')
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [21]:
from pyspark.sql.types import StructField, TimestampType, StringType, StructType, IntegerType, FloatType

In [27]:
# también podríamos pasarle el esquema manualmente, muy últil para formatos timestamps por ejemplo


data_schema = StructType([StructField("name", StringType(), True),
               StructField("age", TimestampType(), True),
               StructField("Experience", IntegerType(), True),
               StructField("Salary", FloatType(), True),
               ])

df_pyspark = spark.read.format("csv").option("header", True).schema(data_schema).load('data/test1.csv')
df_pyspark.show()

+---------+----+----------+-------+
|     name| age|Experience| Salary|
+---------+----+----------+-------+
|    Krish|null|        10|30000.0|
|Sudhanshu|null|         8|25000.0|
|    Sunny|null|         4|20000.0|
|     Paul|null|         3|20000.0|
|   Harsha|null|         1|15000.0|
|  Shubham|null|         2|18000.0|
+---------+----+----------+-------+



VOLVEMOS A COMPROBAR LOS DATOS

In [15]:
df_pyspark.printSchema()

root
 |-- time: timestamp (nullable = true)
 |-- action: string (nullable = true)



In [None]:
df_pyspark.dtypes

In [None]:
df_pyspark.describe().show()

MOSTRAMOS LOS PRIMEROS VALORES

In [None]:
df_pyspark.head(3)

#### SELECCIONAR COLUMNAS (select)

SELECCIONAR COLUMNAS POR NOMBRE

In [28]:
df_pyspark.select(['Name', 'Experience']).show()

+---------+----------+
|     Name|Experience|
+---------+----------+
|    Krish|        10|
|Sudhanshu|         8|
|    Sunny|         4|
|     Paul|         3|
|   Harsha|         1|
|  Shubham|         2|
+---------+----------+



In [44]:
df_pyspark["Name"]

Column<'Name'>

SELECCIONAR COLUMNAS POR ÍNDICE

No existe el comando como tal, pero podemos trampearlo de esta forma:

In [45]:
# El método columns nos devuelve una lista de las columnas que le indiquemos por índice

df_pyspark.columns[:3]

['name', 'age', 'Experience']

In [47]:
# De este modo, podemos obtener de forma indirecta las columnas por índice

df_pyspark.select(df_pyspark.columns[:3]).show(3)

+---------+----+----------+
|     name| age|Experience|
+---------+----+----------+
|    Krish|null|        10|
|Sudhanshu|null|         8|
|    Sunny|null|         4|
+---------+----+----------+
only showing top 3 rows



#### AÑADIR, ELIMINAR Y RENOMBRAR COLUMNAS

AÑADIR COLUMNAS

In [50]:
df_pyspark_new_column = df_pyspark.withColumn('Experience after 2 years', df_pyspark['Experience']+2) # Nombre de la columna y valores de la columna

In [51]:
df_pyspark_new_column.show()

+---------+----+----------+-------+------------------------+
|     name| age|Experience| Salary|Experience after 2 years|
+---------+----+----------+-------+------------------------+
|    Krish|null|        10|30000.0|                      12|
|Sudhanshu|null|         8|25000.0|                      10|
|    Sunny|null|         4|20000.0|                       6|
|     Paul|null|         3|20000.0|                       5|
|   Harsha|null|         1|15000.0|                       3|
|  Shubham|null|         2|18000.0|                       4|
+---------+----+----------+-------+------------------------+



ELIMINAR COLUMNAS

In [53]:
df_pyspark_drop_column = df_pyspark.drop('Experience after 2 years')


In [54]:
df_pyspark_drop_column.show()

+---------+----+----------+-------+
|     name| age|Experience| Salary|
+---------+----+----------+-------+
|    Krish|null|        10|30000.0|
|Sudhanshu|null|         8|25000.0|
|    Sunny|null|         4|20000.0|
|     Paul|null|         3|20000.0|
|   Harsha|null|         1|15000.0|
|  Shubham|null|         2|18000.0|
+---------+----+----------+-------+



RENOMBRAR COLUMNAS

In [55]:
df_pyspark.withColumnRenamed('Name', 'New_name').show()

+---------+----+----------+-------+
| New_name| age|Experience| Salary|
+---------+----+----------+-------+
|    Krish|null|        10|30000.0|
|Sudhanshu|null|         8|25000.0|
|    Sunny|null|         4|20000.0|
|     Paul|null|         3|20000.0|
|   Harsha|null|         1|15000.0|
|  Shubham|null|         2|18000.0|
+---------+----+----------+-------+

