# Limpieza PySpark

### Configuraciones de PySpark

In [1]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
            .appName("Spark Alex") \
            .getOrCreate()

### Creamos dataframe

In [4]:
from pyspark.sql import Row

data=(
  Row(1,'Muhammad',22),
  Row(2,'Abdullah',24),
  Row(3,'Ahmed',44),
  Row(4,'John',55)
)
data=spark.createDataFrame(data)

In [5]:
data.show()

+---+--------+---+
| _1|      _2| _3|
+---+--------+---+
|  1|Muhammad| 22|
|  2|Abdullah| 24|
|  3|   Ahmed| 44|
|  4|    John| 55|
+---+--------+---+



In [7]:
data.printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)



### Leemos el csv

In [9]:
store_df = spark.read.csv('C:/Users/Usuario/Desktop/SparkDataSet/store.csv')

In [10]:
store_df.show()

+-----+---------+----------+-------------------+--------------------+--------------------+------+---------------+---------------+----------------+
|  _c0|      _c1|       _c2|                _c3|                 _c4|                 _c5|   _c6|            _c7|            _c8|             _c9|
+-----+---------+----------+-------------------+--------------------+--------------------+------+---------------+---------------+----------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSi...|CompetitionOpenSi...|Promo2|Promo2SinceWeek|Promo2SinceYear|   PromoInterval|
|    1|        c|         a|               1270|                   9|                2008|     0|           NULL|           NULL|            NULL|
|    2|        a|         a|                570|                  11|                2007|     1|             13|           2010| Jan,Apr,Jul,Oct|
|    3|        a|         a|              14130|                  12|                2006|     1|             14|     

Como vemos, no tiene definido un esquema, lo cual es bastante peligroso cuando trabajamos en producción. Definiremos un esquema previamente y leeremos el csv indicando que tenemos cabeceras.


### Definimos el esquema

In [14]:
from pyspark.sql.types import *
Schema=StructType([
  StructField('Store',StringType(),nullable=True),
  StructField('StoreType',StringType(),nullable=True),
  StructField('Assortment',StringType(),nullable=True),
  StructField('CompetitionDistance',FloatType(),nullable=True),
  StructField('CompetitionOpenSinceMonth',IntegerType(),nullable=True),  
  StructField('CompetitionOpenSinceYear',IntegerType(),nullable=True),  
  StructField('Promo2',IntegerType(),nullable=True),
  StructField('Promo2SinceWeek',IntegerType(),nullable=True),
  StructField('Promo2SinceYear',IntegerType(),nullable=True),
  StructField('PromoInterval',StringType(),nullable=True)
])

In [15]:
df = spark.read.option('header',True).schema(Schema).csv('C:/Users/Usuario/Desktop/SparkDataSet/store.csv')
df.show()

+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|   PromoInterval|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|    1|        c|         a|             1270.0|                        9|                    2008|     0|           NULL|           NULL|            NULL|
|    2|        a|         a|              570.0|                       11|                    2007|     1|             13|           2010| Jan,Apr,Jul,Oct|
|    3|        a|         a|            14130.0|                       12|                    2006|     1|             14|           2011| Jan,Apr,Jul,Oct|
|    4|        c|         c|              620.0|                

### Data types

In [16]:
df.dtypes

[('Store', 'string'),
 ('StoreType', 'string'),
 ('Assortment', 'string'),
 ('CompetitionDistance', 'float'),
 ('CompetitionOpenSinceMonth', 'int'),
 ('CompetitionOpenSinceYear', 'int'),
 ('Promo2', 'int'),
 ('Promo2SinceWeek', 'int'),
 ('Promo2SinceYear', 'int'),
 ('PromoInterval', 'string')]

### Count of rows

In [17]:
df.count()

1115

### DROPMALFORMED

We can drop invalid rows while reading the dataset by setting the read mode as “DROPMALFORMED”

In [18]:
df_1=spark.read.option('header',True).option('mode','DROPMALFORMED').csv('C:/Users/Usuario/Desktop/SparkDataSet/store.csv')

In [20]:
df_1.show()

+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|   PromoInterval|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|    1|        c|         a|               1270|                        9|                    2008|     0|           NULL|           NULL|            NULL|
|    2|        a|         a|                570|                       11|                    2007|     1|             13|           2010| Jan,Apr,Jul,Oct|
|    3|        a|         a|              14130|                       12|                    2006|     1|             14|           2011| Jan,Apr,Jul,Oct|
|    4|        c|         c|                620|                

### FILLNA

fillna() is used to replace null value with any other value

In [21]:
df.fillna(value=0).show()

+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|   PromoInterval|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|    1|        c|         a|             1270.0|                        9|                    2008|     0|              0|              0|            NULL|
|    2|        a|         a|              570.0|                       11|                    2007|     1|             13|           2010| Jan,Apr,Jul,Oct|
|    3|        a|         a|            14130.0|                       12|                    2006|     1|             14|           2011| Jan,Apr,Jul,Oct|
|    4|        c|         c|              620.0|                

Here, we are replacing the null value with -99 of specific subset

In [22]:
df.fillna(value=-99,subset=['Promo2SinceWeek','Promo2SinceYear']).show()

+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|   PromoInterval|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|    1|        c|         a|             1270.0|                        9|                    2008|     0|            -99|            -99|            NULL|
|    2|        a|         a|              570.0|                       11|                    2007|     1|             13|           2010| Jan,Apr,Jul,Oct|
|    3|        a|         a|            14130.0|                       12|                    2006|     1|             14|           2011| Jan,Apr,Jul,Oct|
|    4|        c|         c|              620.0|                

### SELECT

In [23]:
df_1.select('Store','StoreType').show(2)

+-----+---------+
|Store|StoreType|
+-----+---------+
|    1|        c|
|    2|        a|
+-----+---------+
only showing top 2 rows



### WHEN

In Spark we can conditionally replace values as we do in normal coding if-else. For that we can use when and otherwise methods on dataframe.

Here we are creating a new column, with value equal to 1 if Promo2SinceYear > 2000 otherwise 0

In [38]:
from pyspark.sql.functions import when  # Importar la función 'when'

df.withColumn('greater_than_2000', when(df['Promo2SinceYear'] > 2000, 1).otherwise(0)).show()

+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+-----------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|   PromoInterval|greater_than_2000|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+-----------------+
|    1|        c|         a|             1270.0|                        9|                    2008|     0|           NULL|           NULL|            NULL|                0|
|    2|        a|         a|              570.0|                       11|                    2007|     1|             13|           2010| Jan,Apr,Jul,Oct|                1|
|    3|        a|         a|            14130.0|                       12|                    2006|     1|             14|        

In [39]:
df.select('Promo2SinceYear', 'greater_than_2000').show()

+---------------+-----------------+
|Promo2SinceYear|greater_than_2000|
+---------------+-----------------+
|           NULL|                0|
|           2010|                1|
|           2011|                1|
|           NULL|                0|
|           NULL|                0|
|           NULL|                0|
|           NULL|                0|
|           NULL|                0|
|           NULL|                0|
|           NULL|                0|
|           2012|                1|
|           2010|                1|
|           2009|                1|
|           2011|                1|
|           2011|                1|
|           NULL|                0|
|           2010|                1|
|           2012|                1|
|           2011|                1|
|           2014|                1|
+---------------+-----------------+
only showing top 20 rows



### FILTER

In the Data cleaning process, filtering plays an essential role, and good news is that it’s super simple to use filter method. Filter method is an alias of where method, so we can use where method as well instead of filter.

In [40]:
df.filter(df.CompetitionDistance==2000).show()

+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+---------------+-----------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|  PromoInterval|greater_than_2000|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+---------------+-----------------+
|  128|        d|         c|             2000.0|                     NULL|                    NULL|     1|              1|           2013|Jan,Apr,Jul,Oct|                1|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+---------------+-----------------+



### GROUPBY

Similar to the SQL GROUP BY clause, PySpark groupBy() function is used to collect the identical data into groups on DataFrame and perform aggregate functions on the grouped data.

In [43]:
df_1.select('StoreType','Promo2SinceWeek').groupby('Promo2SinceWeek').sum().show()

+---------------+
|Promo2SinceWeek|
+---------------+
|             22|
|             28|
|             35|
|              5|
|             31|
|             18|
|             27|
|             26|
|              6|
|             23|
|             40|
|             44|
|             48|
|              9|
|              1|
|             36|
|             10|
|             37|
|             49|
|             39|
+---------------+
only showing top 20 rows



In [44]:
df.groupBy('storeType').mean('CompetitionDistance').show()

+---------+------------------------+
|storeType|avg(CompetitionDistance)|
+---------+------------------------+
|        d|       6913.063583815029|
|        c|      3522.5675675675675|
|        b|      1060.5882352941176|
|        a|         5123.0615640599|
+---------+------------------------+



### DROP

Drop columns which are not necessary

In [45]:
df=df.drop('Assortment')
df.show()

+-----+---------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+-----------------+
|Store|StoreType|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|   PromoInterval|greater_than_2000|
+-----+---------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+-----------------+
|    1|        c|             1270.0|                        9|                    2008|     0|           NULL|           NULL|            NULL|                0|
|    2|        a|              570.0|                       11|                    2007|     1|             13|           2010| Jan,Apr,Jul,Oct|                1|
|    3|        a|            14130.0|                       12|                    2006|     1|             14|           2011| Jan,Apr,Jul,Oct|                1|
|    4|        c|     