## Curso Big Data #4- Transformaciones en columnas en PySpark

# Librerías

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

# 1. Crear la SparkSession

In [3]:
spark = SparkSession.builder.appName('columnsTransform').getOrCreate()

# 2. Lectura parquet

In [4]:
df = spark.read.parquet('parquet_example')

In [5]:
df.show(2)

+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|Series_reference| Period|Data_value|Suppressed|STATUS|  UNITS|Magnitude|             Subject|               Group|      Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|   BDCQ.SF1AA2CA|2016.06|  1116.386|      null|     F|Dollars|        6|Business Data Col...|Industry by finan...|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|   BDCQ.SF1AA2CA|2016.09|  1070.874|      null|     F|Dollars|        6|Business Data Col...|Industry by finan...|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted| 

In [6]:
df.printSchema()

root
 |-- Series_reference: string (nullable = true)
 |-- Period: string (nullable = true)
 |-- Data_value: string (nullable = true)
 |-- Suppressed: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- UNITS: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Series_title_1: string (nullable = true)
 |-- Series_title_2: string (nullable = true)
 |-- Series_title_3: string (nullable = true)
 |-- Series_title_4: string (nullable = true)
 |-- Series_title_5: string (nullable = true)



# 3. Operaciones simples sobre columnas
# 3.1 Selección de columnas

In [7]:
sel_cols = ['Series_title_1', 'Series_title_2', 'Series_title_3', 'Series_title_4', 'Series_title_5']
df_sel = df.select(sel_cols)

In [8]:
df_sel.show(5)

+--------------------+--------------------+--------------+--------------+--------------+
|      Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+--------------------+--------------------+--------------+--------------+--------------+
|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
+--------------------+--------------------+--------------+--------------+--------------+
only showing top 5 rows



In [9]:
df[sel_cols].show(5)

+--------------------+--------------------+--------------+--------------+--------------+
|      Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+--------------------+--------------------+--------------+--------------+--------------+
|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
+--------------------+--------------------+--------------+--------------+--------------+
only showing top 5 rows



# 3.2 Renombrar una columna

In [10]:
df_sel.withColumnRenamed('Series_title_1', 'ST_1').show(5)

+--------------------+--------------------+--------------+--------------+--------------+
|                ST_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+--------------------+--------------------+--------------+--------------+--------------+
|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
+--------------------+--------------------+--------------+--------------+--------------+
only showing top 5 rows



# 3.3 Ordenar DF

In [11]:
# en forma ascendente

df.sort('Data_value')[['Data_value']].show(10)

+----------+
|Data_value|
+----------+
|  -398.194|
|     -5.26|
|   100.652|
|  1001.875|
|  1005.093|
|  1006.126|
|  1007.319|
|  1007.684|
|   101.353|
|   101.504|
+----------+
only showing top 10 rows



In [12]:
# en forma descendente

from pyspark.sql import functions as F

In [13]:
df.sort(F.desc("Data_value"))[['Data_value']].show(10)

+----------+
|Data_value|
+----------+
|   998.124|
|  9979.557|
|   997.972|
|   997.046|
|   991.864|
|   991.021|
|    99.474|
|   989.697|
|    988.82|
|   987.553|
+----------+
only showing top 10 rows



# 3.4 Cambiar el tipo de la columna

In [15]:
from pyspark.sql.types import DoubleType, IntegerType, StringType

In [16]:
df = df.withColumn('Data_value', F.col('Data_value').cast(DoubleType()))

In [17]:
df.show(5)

+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|Series_reference| Period|Data_value|Suppressed|STATUS|  UNITS|Magnitude|             Subject|               Group|      Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|   BDCQ.SF1AA2CA|2016.06|  1116.386|      null|     F|Dollars|        6|Business Data Col...|Industry by finan...|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|   BDCQ.SF1AA2CA|2016.09|  1070.874|      null|     F|Dollars|        6|Business Data Col...|Industry by finan...|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted| 

In [18]:
df.printSchema()

root
 |-- Series_reference: string (nullable = true)
 |-- Period: string (nullable = true)
 |-- Data_value: double (nullable = true)
 |-- Suppressed: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- UNITS: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Series_title_1: string (nullable = true)
 |-- Series_title_2: string (nullable = true)
 |-- Series_title_3: string (nullable = true)
 |-- Series_title_4: string (nullable = true)
 |-- Series_title_5: string (nullable = true)



In [21]:
df.sort('Data_value')[['Data_value']].show(10)

+----------+
|Data_value|
+----------+
|  -398.194|
|     -5.26|
|     8.274|
|     8.413|
|     9.411|
|     21.14|
|    22.619|
|    23.376|
|    24.071|
|    28.332|
+----------+
only showing top 10 rows



## 3.5 Aplicar filtros

In [23]:
df_filter = df.filter(df.Data_value > 100)

In [24]:
df_filter.show(5)

+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|Series_reference| Period|Data_value|Suppressed|STATUS|  UNITS|Magnitude|             Subject|               Group|      Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|   BDCQ.SF1AA2CA|2016.06|  1116.386|      null|     F|Dollars|        6|Business Data Col...|Industry by finan...|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|   BDCQ.SF1AA2CA|2016.09|  1070.874|      null|     F|Dollars|        6|Business Data Col...|Industry by finan...|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted| 

In [25]:
df.count(), df_filter.count()

(1936, 1838)

In [27]:
#varias columnas

df_filter = df.filter((df.Data_value > 100) & (df.STATUS == 'R'))

In [28]:
df_filter.show(5)

+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|Series_reference| Period|Data_value|Suppressed|STATUS|  UNITS|Magnitude|             Subject|               Group|      Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|   BDCQ.SF1FF1CA|2019.09| 28200.644|      null|     R|Dollars|        6|Business Data Col...|Industry by finan...|Sales (operating ...|     Wholesale Trade|Current prices|    Unadjusted|          null|
|   BDCQ.SF1FF1CA|2020.09| 29794.725|      null|     R|Dollars|        6|Business Data Col...|Industry by finan...|Sales (operating ...|     Wholesale Trade|Current prices|    Unadjusted| 

In [29]:
df.count(), df_filter.count()

(1936, 20)

## 3.6 Separación de una columna (split)

In [30]:
df_year = df\
.withColumn('Year', F.split('Period', '\.')[0].cast(IntegerType()))\
.withColumn('Month', F.split('Period', '\.')[1].cast(IntegerType()))

In [31]:
df_year.show(5)

+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+----+-----+
|Series_reference| Period|Data_value|Suppressed|STATUS|  UNITS|Magnitude|             Subject|               Group|      Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|Year|Month|
+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+----+-----+
|   BDCQ.SF1AA2CA|2016.06|  1116.386|      null|     F|Dollars|        6|Business Data Col...|Industry by finan...|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|2016|    6|
|   BDCQ.SF1AA2CA|2016.09|  1070.874|      null|     F|Dollars|        6|Business Data Col...|Industry by finan...|Sales (operating ...|Forestry

In [32]:
df_year.printSchema()

root
 |-- Series_reference: string (nullable = true)
 |-- Period: string (nullable = true)
 |-- Data_value: double (nullable = true)
 |-- Suppressed: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- UNITS: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Series_title_1: string (nullable = true)
 |-- Series_title_2: string (nullable = true)
 |-- Series_title_3: string (nullable = true)
 |-- Series_title_4: string (nullable = true)
 |-- Series_title_5: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)

