# Pyspark Dataframes
### Qué vamos a ver
- PySpark Dataframe 
- Leer un dataset
- Comprobar los Datatypes de las columnas (Schema)
- Seleccionar columnas e indexarlas
- Check Describe option similar to Pandas
- Añadir columnas
- Eliminar columnas
- Renombrar Columnas

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [3]:
spark

LEEMOS EL DATASET

In [17]:
from sys import getsizeof

In [23]:
import pandas as pd
df = pd.read_csv("data/adult_data.csv")
df

Unnamed: 0,x,age,workclass,fnlwgt,education,educational-num,marital-status,occupation,relationship,race,gender,capital-gain,capital-loss,hours-per-week,native-country,income
0,1,25,Private,226802,11th,7,Never-married,Machine-op-inspct,Own-child,Black,Male,0,0,40,United-States,<=50K
1,2,38,Private,89814,HS-grad,9,Married-civ-spouse,Farming-fishing,Husband,White,Male,0,0,50,United-States,<=50K
2,3,28,Local-gov,336951,Assoc-acdm,12,Married-civ-spouse,Protective-serv,Husband,White,Male,0,0,40,United-States,>50K
3,4,44,Private,160323,Some-college,10,Married-civ-spouse,Machine-op-inspct,Husband,Black,Male,7688,0,40,United-States,>50K
4,5,18,?,103497,Some-college,10,Never-married,?,Own-child,White,Female,0,0,30,United-States,<=50K
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
48837,48838,27,Private,257302,Assoc-acdm,12,Married-civ-spouse,Tech-support,Wife,White,Female,0,0,38,United-States,<=50K
48838,48839,40,Private,154374,HS-grad,9,Married-civ-spouse,Machine-op-inspct,Husband,White,Male,0,0,40,United-States,>50K
48839,48840,58,Private,151910,HS-grad,9,Widowed,Adm-clerical,Unmarried,White,Female,0,0,40,United-States,<=50K
48840,48841,22,Private,201490,HS-grad,9,Never-married,Adm-clerical,Own-child,White,Male,0,0,20,United-States,<=50K


In [24]:
getsizeof(df)

31663665

In [25]:
df_pyspark = spark.read.option('header', 'true').csv('data/adult_data.csv')


In [26]:
getsizeof(df_pyspark)

48

In [27]:
df_pyspark = spark.read.option('header', 'true').csv('data/test1.csv')
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



COMPROBAMOS EL TIPO DE DATOS QUE TENEMOS

In [28]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



In [29]:
df_pyspark = spark.read.csv('data/test1.csv', header=True, inferSchema=True) # equivalente a spark.read.option('header', 'true').option('inferSchema', 'true').csv('data/test1.csv')
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [30]:
from pyspark.sql.types import StructField, TimestampType, StringType, StructType, IntegerType, FloatType

In [31]:
# también podríamos pasarle el esquema manualmente, muy últil para formatos timestamps por ejemplo


data_schema = StructType([StructField("name", StringType(), True),
               StructField("age", TimestampType(), True),
               StructField("Experience", IntegerType(), True),
               StructField("Salary", FloatType(), True),
               ])

df_pyspark = spark.read.format("csv").option("header", True).schema(data_schema).load('data/test1.csv')
df_pyspark.show()

+---------+----+----------+-------+
|     name| age|Experience| Salary|
+---------+----+----------+-------+
|    Krish|NULL|        10|30000.0|
|Sudhanshu|NULL|         8|25000.0|
|    Sunny|NULL|         4|20000.0|
|     Paul|NULL|         3|20000.0|
|   Harsha|NULL|         1|15000.0|
|  Shubham|NULL|         2|18000.0|
+---------+----+----------+-------+



VOLVEMOS A COMPROBAR LOS DATOS

In [15]:
df_pyspark.printSchema()

root
 |-- time: timestamp (nullable = true)
 |-- action: string (nullable = true)



In [None]:
df_pyspark.dtypes

In [None]:
df_pyspark.describe().show()

MOSTRAMOS LOS PRIMEROS VALORES

In [32]:
df_pyspark.head(3)

[Row(name='Krish', age=None, Experience=10, Salary=30000.0),
 Row(name='Sudhanshu', age=None, Experience=8, Salary=25000.0),
 Row(name='Sunny', age=None, Experience=4, Salary=20000.0)]

#### SELECCIONAR COLUMNAS (select)

SELECCIONAR COLUMNAS POR NOMBRE

In [34]:
df_pyspark.select(['Name', 'Experience', "Name"]).show()

+---------+----------+---------+
|     Name|Experience|     Name|
+---------+----------+---------+
|    Krish|        10|    Krish|
|Sudhanshu|         8|Sudhanshu|
|    Sunny|         4|    Sunny|
|     Paul|         3|     Paul|
|   Harsha|         1|   Harsha|
|  Shubham|         2|  Shubham|
+---------+----------+---------+



In [44]:
df_pyspark["Name"]

Column<'Name'>

SELECCIONAR COLUMNAS POR ÍNDICE

No existe el comando como tal, pero podemos trampearlo de esta forma:

In [45]:
# El método columns nos devuelve una lista de las columnas que le indiquemos por índice

df_pyspark.columns[:3]

['name', 'age', 'Experience']

In [47]:
# De este modo, podemos obtener de forma indirecta las columnas por índice

df_pyspark.select(df_pyspark.columns[:3]).show(3)

+---------+----+----------+
|     name| age|Experience|
+---------+----+----------+
|    Krish|null|        10|
|Sudhanshu|null|         8|
|    Sunny|null|         4|
+---------+----+----------+
only showing top 3 rows



#### AÑADIR, ELIMINAR Y RENOMBRAR COLUMNAS

AÑADIR COLUMNAS

In [36]:
df_pyspark_new_column = df_pyspark.withColumn('Experience after 2 years', df_pyspark['Experience']+2) # Nombre de la columna y valores de la columna

In [37]:
df_pyspark_new_column.show()

+---------+----+----------+-------+------------------------+
|     name| age|Experience| Salary|Experience after 2 years|
+---------+----+----------+-------+------------------------+
|    Krish|NULL|        10|30000.0|                      12|
|Sudhanshu|NULL|         8|25000.0|                      10|
|    Sunny|NULL|         4|20000.0|                       6|
|     Paul|NULL|         3|20000.0|                       5|
|   Harsha|NULL|         1|15000.0|                       3|
|  Shubham|NULL|         2|18000.0|                       4|
+---------+----+----------+-------+------------------------+



ELIMINAR COLUMNAS

In [38]:
df_pyspark_drop_column = df_pyspark.drop('Experience after 2 years')


In [39]:
df_pyspark_drop_column.show()

+---------+----+----------+-------+
|     name| age|Experience| Salary|
+---------+----+----------+-------+
|    Krish|NULL|        10|30000.0|
|Sudhanshu|NULL|         8|25000.0|
|    Sunny|NULL|         4|20000.0|
|     Paul|NULL|         3|20000.0|
|   Harsha|NULL|         1|15000.0|
|  Shubham|NULL|         2|18000.0|
+---------+----+----------+-------+



RENOMBRAR COLUMNAS

In [40]:
df_pyspark.withColumnRenamed('Name', 'New_name').show()

+---------+----+----------+-------+
| New_name| age|Experience| Salary|
+---------+----+----------+-------+
|    Krish|NULL|        10|30000.0|
|Sudhanshu|NULL|         8|25000.0|
|    Sunny|NULL|         4|20000.0|
|     Paul|NULL|         3|20000.0|
|   Harsha|NULL|         1|15000.0|
|  Shubham|NULL|         2|18000.0|
+---------+----+----------+-------+

