In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *


In [6]:
# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

In [7]:
# Read CSV file into DataFrame
store_df = spark.read.csv('store.csv')


In [8]:
# Show DataFrame
store_df.show()


+-----+---------+----------+-------------------+--------------------+--------------------+------+---------------+---------------+----------------+
|  _c0|      _c1|       _c2|                _c3|                 _c4|                 _c5|   _c6|            _c7|            _c8|             _c9|
+-----+---------+----------+-------------------+--------------------+--------------------+------+---------------+---------------+----------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSi...|CompetitionOpenSi...|Promo2|Promo2SinceWeek|Promo2SinceYear|   PromoInterval|
|    1|        c|         a|               1270|                   9|                2008|     0|           NULL|           NULL|            NULL|
|    2|        a|         a|                570|                  11|                2007|     1|             13|           2010| Jan,Apr,Jul,Oct|
|    3|        a|         a|              14130|                  12|                2006|     1|             14|     

In [9]:
# Define the schema
Schema = StructType([
    StructField('Store', StringType(), nullable=True),
    StructField('StoreType', StringType(), nullable=True),
    StructField('Assortment', StringType(), nullable=True),
    StructField('CompetitionDistance', FloatType(), nullable=True),
    StructField('CompetitionOpenSinceMonth', IntegerType(), nullable=True),
    StructField('CompetitionOpenSinceYear', IntegerType(), nullable=True),
    StructField('Promo2', IntegerType(), nullable=True),
    StructField('Promo2SinceWeek', IntegerType(), nullable=True),
    StructField('Promo2SinceYear', IntegerType(), nullable=True),
    StructField('PromoInterval', StringType(), nullable=True)
])


In [10]:
# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

In [11]:
# Read CSV file into DataFrame with schema
df = spark.read.option("header", True).schema(Schema).csv('store.csv')


In [12]:
# Show DataFrame
df.show()

+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|   PromoInterval|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|    1|        c|         a|             1270.0|                        9|                    2008|     0|           NULL|           NULL|            NULL|
|    2|        a|         a|              570.0|                       11|                    2007|     1|             13|           2010| Jan,Apr,Jul,Oct|
|    3|        a|         a|            14130.0|                       12|                    2006|     1|             14|           2011| Jan,Apr,Jul,Oct|
|    4|        c|         c|              620.0|                

In [13]:
df.dtypes

[('Store', 'string'),
 ('StoreType', 'string'),
 ('Assortment', 'string'),
 ('CompetitionDistance', 'float'),
 ('CompetitionOpenSinceMonth', 'int'),
 ('CompetitionOpenSinceYear', 'int'),
 ('Promo2', 'int'),
 ('Promo2SinceWeek', 'int'),
 ('Promo2SinceYear', 'int'),
 ('PromoInterval', 'string')]

In [14]:
df.count()

1115

DROPMALFORMED:
We can drop invalid rows while reading the dataset by setting the read mode as “DROPMALFORMED”

In [16]:
df_1 = spark.read.option("header", True).option("mode", 'DROPMALFORMED').csv('store.csv')


In [17]:
df.fillna(value=0).show()

+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|   PromoInterval|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|    1|        c|         a|             1270.0|                        9|                    2008|     0|              0|              0|            NULL|
|    2|        a|         a|              570.0|                       11|                    2007|     1|             13|           2010| Jan,Apr,Jul,Oct|
|    3|        a|         a|            14130.0|                       12|                    2006|     1|             14|           2011| Jan,Apr,Jul,Oct|
|    4|        c|         c|              620.0|                

In [19]:
df.fillna(value=-99,subset=["Promo2SinceWeek","Promo2SinceYear"]).show()

+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|   PromoInterval|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|    1|        c|         a|             1270.0|                        9|                    2008|     0|            -99|            -99|            NULL|
|    2|        a|         a|              570.0|                       11|                    2007|     1|             13|           2010| Jan,Apr,Jul,Oct|
|    3|        a|         a|            14130.0|                       12|                    2006|     1|             14|           2011| Jan,Apr,Jul,Oct|
|    4|        c|         c|              620.0|                

SELECT:
We can select a specific column for analysis purpose, by passing argument count in the show we can select limited record

In [21]:
df_1.select('Store','StoreType').show(2)

+-----+---------+
|Store|StoreType|
+-----+---------+
|    1|        c|
|    2|        a|
+-----+---------+
only showing top 2 rows



WHEN:
In Spark we can conditionally replace values as we do in normal coding if-else. For that we can use when and otherwise methods on dataframe.

Here we are creating a new column, with value equal to 1 if Promo2SinceYear > 2000 otherwise 0

In [25]:
from pyspark.sql.functions import when

# Assuming `df` is your DataFrame
df.withColumn("greater_than_2000", when(df.CompetitionDistance == 2000, 1).otherwise(0)).show()


+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+-----------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|   PromoInterval|greater_than_2000|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+-----------------+
|    1|        c|         a|             1270.0|                        9|                    2008|     0|           NULL|           NULL|            NULL|                0|
|    2|        a|         a|              570.0|                       11|                    2007|     1|             13|           2010| Jan,Apr,Jul,Oct|                0|
|    3|        a|         a|            14130.0|                       12|                    2006|     1|             14|        

FILTER:
In the Data cleaning process, filtering plays an essential role, and good news is that it’s super simple to use filter method. Filter method is an alias of where method, so we can use where method as well instead of filter.



In [26]:
df.filter(df.CompetitionDistance==2000).show()

+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+---------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|  PromoInterval|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+---------------+
|  128|        d|         c|             2000.0|                     NULL|                    NULL|     1|              1|           2013|Jan,Apr,Jul,Oct|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+---------------+



GROUP BY:
Similar to the SQL GROUP BY clause, PySpark groupBy() function is used to collect the identical data into groups on DataFrame and perform aggregate functions on the grouped data.

In [27]:
df_1.select('StoreType','Promo2SinceWeek').groupby('Promo2SinceWeek').sum().show()

+---------------+
|Promo2SinceWeek|
+---------------+
|             22|
|             28|
|             35|
|              5|
|             31|
|             18|
|             27|
|             26|
|              6|
|             23|
|             40|
|             44|
|             48|
|              9|
|              1|
|             36|
|             10|
|             37|
|             49|
|             39|
+---------------+
only showing top 20 rows



In [28]:
df.groupBy("storeType").mean("CompetitionDistance").show()

+---------+------------------------+
|storeType|avg(CompetitionDistance)|
+---------+------------------------+
|        d|       6913.063583815029|
|        c|      3522.5675675675675|
|        b|      1060.5882352941176|
|        a|         5123.0615640599|
+---------+------------------------+



DROP:
An undesirable column can be a drop from dataframe as well

In [29]:
df=df.drop('Assortment')
df.show()

+-----+---------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|Store|StoreType|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|   PromoInterval|
+-----+---------+-------------------+-------------------------+------------------------+------+---------------+---------------+----------------+
|    1|        c|             1270.0|                        9|                    2008|     0|           NULL|           NULL|            NULL|
|    2|        a|              570.0|                       11|                    2007|     1|             13|           2010| Jan,Apr,Jul,Oct|
|    3|        a|            14130.0|                       12|                    2006|     1|             14|           2011| Jan,Apr,Jul,Oct|
|    4|        c|              620.0|                        9|                    2009|     0|           NULL|           NULL|   