In [1]:
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.sql import types
from pyspark.ml.feature import Bucketizer

In [2]:
spark = SparkSession.builder.appName('local').getOrCreate()

In [3]:
pyspark.__version__

'3.1.2'

In [4]:
pyspark_df = spark.read.csv('D:\\pyspark\\reviews_1.csv', header=True, inferSchema=True,)

In [5]:
#Different ways of selecting a columns
pyspark_df.select(f.col('Clothing ID')).show(2)

pyspark_df.select(pyspark_df.Rating).show(2)

pyspark_df.select(f.col('Clothing ID'), f.col('Age')).show(2)

pyspark_df.select(f.col('Clothing ID'), pyspark_df.Age).show(2)

###Selecting Multiple columns
###once we have columns list we can index them as per requirement 
cols_lst = pyspark_df.columns

pyspark_df.select(cols_lst[2:6]).show()

+-----------+
|Clothing ID|
+-----------+
|        767|
|       1080|
+-----------+
only showing top 2 rows

+------+
|Rating|
+------+
|     4|
|     5|
+------+
only showing top 2 rows

+-----------+---+
|Clothing ID|Age|
+-----------+---+
|        767| 33|
|       1080| 34|
+-----------+---+
only showing top 2 rows

+-----------+---+
|Clothing ID|Age|
+-----------+---+
|        767| 33|
|       1080| 34|
+-----------+---+
only showing top 2 rows

+------+---------------+-----------------------+--------------+
|Rating|Recommended IND|Positive Feedback Count| Division Name|
+------+---------------+-----------------------+--------------+
|     4|              1|                      0|     Initmates|
|     5|              1|                      4|       General|
|     3|              0|                      0|       General|
|     5|              1|                      0|General Petite|
|     5|              1|                      6|       General|
|     2|              0|          

In [95]:
#Apply filters to Data

pyspark_df.filter(pyspark_df.Age.between(20,30)).show(2)

pyspark_df.filter((f.col('Clothing ID') == 1080)).show(2)

pyspark_df.filter((f.col('Clothing ID').isin([1080,829]))).show(5)

pyspark_df.filter((f.col('Rating') >= 3) & (f.col('Age') <=30)).show(2)

pyspark_df.filter((f.col('Rating') >= 2) & (pyspark_df['Age'] >=30)).show(2)

###To cxclude records use "~" before condition. Excluding records where age is not in 20,22,18
pyspark_df.filter(~f.col('Age').isin([20,22,18])).select(f.col('Age')).distinct().sort(f.col('Age')).show()

####As we do not have Null in our data set adding them first to demo.
pyspark_df.withColumn('New_Rating', f.when(pyspark_df.Rating == 5, None).otherwise(pyspark_df.Rating)).show(5)

pyspark_df.withColumn('New_Rating', f.when(pyspark_df.Rating == 5, None).otherwise(pyspark_df.Rating)).dropna().show(5)

+-----------+---+------+---------------+-----------------------+-------------+---------------+----------+
|Clothing ID|Age|Rating|Recommended IND|Positive Feedback Count|Division Name|Department Name|Class Name|
+-----------+---+------+---------------+-----------------------+-------------+---------------+----------+
|       1077| 24|     5|              1|                      0|      General|        Dresses|   Dresses|
|          4| 28|     5|              1|                      0|      General|           Tops|  Sweaters|
+-----------+---+------+---------------+-----------------------+-------------+---------------+----------+
only showing top 2 rows

+-----------+---+------+---------------+-----------------------+-------------+---------------+----------+
|Clothing ID|Age|Rating|Recommended IND|Positive Feedback Count|Division Name|Department Name|Class Name|
+-----------+---+------+---------------+-----------------------+-------------+---------------+----------+
|       1080| 34|    

In [48]:
#Rename Column

pyspark_df.withColumnRenamed('Department Name', 'Dept_Name').show(2)

pyspark_df.withColumnRenamed('Department Name', 'Dept_Name')\
          .withColumnRenamed('Class Name', 'Cls_Nm').show(2)

pyspark_df.select(f.col('Clothing ID').alias('cloth_id')).show(2)

###Rename all columns once
lower_cols = list(map(str.lower,pyspark_df.columns))
pyspark_df.toDF(*lower_cols).show(2)

+-----------+---+------+---------------+-----------------------+-------------+---------+----------+
|Clothing ID|Age|Rating|Recommended IND|Positive Feedback Count|Division Name|Dept_Name|Class Name|
+-----------+---+------+---------------+-----------------------+-------------+---------+----------+
|        767| 33|     4|              1|                      0|    Initmates| Intimate| Intimates|
|       1080| 34|     5|              1|                      4|      General|  Dresses|   Dresses|
+-----------+---+------+---------------+-----------------------+-------------+---------+----------+
only showing top 2 rows

+-----------+---+------+---------------+-----------------------+-------------+---------+---------+
|Clothing ID|Age|Rating|Recommended IND|Positive Feedback Count|Division Name|Dept_Name|   Cls_Nm|
+-----------+---+------+---------------+-----------------------+-------------+---------+---------+
|        767| 33|     4|              1|                      0|    Initmates|

In [59]:
###Droping Columns
pyspark_df.drop(pyspark_df.Age).show(2)

pyspark_df.drop('Clothing ID').show(2)

pyspark_df.drop(*['Age','Rating']).show(2)


+-----------+------+---------------+-----------------------+-------------+---------------+----------+
|Clothing ID|Rating|Recommended IND|Positive Feedback Count|Division Name|Department Name|Class Name|
+-----------+------+---------------+-----------------------+-------------+---------------+----------+
|        767|     4|              1|                      0|    Initmates|       Intimate| Intimates|
|       1080|     5|              1|                      4|      General|        Dresses|   Dresses|
+-----------+------+---------------+-----------------------+-------------+---------------+----------+
only showing top 2 rows

+---+------+---------------+-----------------------+-------------+---------------+----------+
|Age|Rating|Recommended IND|Positive Feedback Count|Division Name|Department Name|Class Name|
+---+------+---------------+-----------------------+-------------+---------------+----------+
| 33|     4|              1|                      0|    Initmates|       Intimate

In [66]:
###Adding new column
pyspark_df.withColumn('New_Age', pyspark_df.Age+5).show(3)

pyspark_df.withColumn('New_Age', pyspark_df.Age+5).\
           withColumn('New_Rating', f.col('Rating')+1).show(3)


+-----------+---+------+---------------+-----------------------+-------------+---------------+----------+-------+
|Clothing ID|Age|Rating|Recommended IND|Positive Feedback Count|Division Name|Department Name|Class Name|New_Age|
+-----------+---+------+---------------+-----------------------+-------------+---------------+----------+-------+
|        767| 33|     4|              1|                      0|    Initmates|       Intimate| Intimates|     38|
|       1080| 34|     5|              1|                      4|      General|        Dresses|   Dresses|     39|
|       1077| 60|     3|              0|                      0|      General|        Dresses|   Dresses|     65|
+-----------+---+------+---------------+-----------------------+-------------+---------------+----------+-------+
only showing top 3 rows

+-----------+---+------+---------------+-----------------------+-------------+---------------+----------+-------+----------+
|Clothing ID|Age|Rating|Recommended IND|Positive Fee

In [70]:
### Get Unique Values from a column
pyspark_df.select('Department Name').distinct().show()

+---------------+
|Department Name|
+---------------+
|        Dresses|
|           Tops|
|        Jackets|
|           null|
|          Trend|
|       Intimate|
|        Bottoms|
+---------------+



In [72]:
###Shape of Dataframe

###Row Count
print(pyspark_df.select('Department Name').count())

###Column Count
len(pyspark_df.columns)

23486


8

In [103]:
###Count numbers of observations per class/column 
###Order by or Sort 

pyspark_df.groupBy(f.col('Division Name')).count().show()

pyspark_df.groupBy(f.col('Age')).count().orderBy(f.col('count').desc()).show(5)

pyspark_df.groupBy(f.col('Rating')).count().sort(f.col('count').desc()).show(5)

pyspark_df.groupBy(f.col('Rating')).count().sort(f.col('count')).show(5)

+--------------+-----+
| Division Name|count|
+--------------+-----+
|     Initmates| 1502|
|          null|   14|
|       General|13850|
|General Petite| 8120|
+--------------+-----+

+---+-----+
|Age|count|
+---+-----+
| 39| 1269|
| 35|  909|
| 36|  842|
| 34|  804|
| 38|  780|
+---+-----+
only showing top 5 rows

+------+-----+
|Rating|count|
+------+-----+
|     5|13131|
|     4| 5077|
|     3| 2871|
|     2| 1565|
|     1|  842|
+------+-----+

+------+-----+
|Rating|count|
+------+-----+
|     1|  842|
|     2| 1565|
|     3| 2871|
|     4| 5077|
|     5|13131|
+------+-----+



In [128]:
###find Min & Max 
pyspark_df.agg({'Age':'max'}).show()

print(pyspark_df.agg({'Age':'min'}).collect(), end='\n\n')

pyspark_df.agg(f.max(f.col('Clothing ID'))).show()

pyspark_df.agg({'Rating':'Avg'}).show()

pyspark_df.groupBy('Division Name').agg({'Rating':'Avg'}).show()

pyspark_df.groupBy('Division Name').agg({'Rating':'max'}).show()

pyspark_df.groupBy('Division Name').agg({'Rating':'min'}).show()

###Groupby on String column 
pyspark_df.groupBy('Clothing ID').agg(f.concat_ws('| ',f.collect_list(f.col('Department Name')))).show(10)

###Groupby Null count Example
(pyspark_df.groupBy(f.col('Clothing ID').isNull().alias('Clothing ID'))).agg(
    f.count(f.when(f.col('Clothing ID') == 'true',1).otherwise(0)).alias('x')).show()

+--------+
|max(Age)|
+--------+
|      99|
+--------+

[Row(min(Age)=18)]

+----------------+
|max(Clothing ID)|
+----------------+
|            1205|
+----------------+

+-----------------+
|      avg(Rating)|
+-----------------+
|4.196031678446734|
+-----------------+

+--------------+-----------------+
| Division Name|      avg(Rating)|
+--------------+-----------------+
|     Initmates|4.286284953395473|
|          null|              5.0|
|       General|4.176606498194946|
|General Petite|4.211083743842365|
+--------------+-----------------+

+--------------+-----------+
| Division Name|max(Rating)|
+--------------+-----------+
|     Initmates|          5|
|          null|          5|
|       General|          5|
|General Petite|          5|
+--------------+-----------+

+--------------+-----------+
| Division Name|min(Rating)|
+--------------+-----------+
|     Initmates|          1|
|          null|          5|
|       General|          1|
|General Petite|          1|
+---------

In [131]:
###Describe funcation example

pyspark_df.select(f.col('Clothing ID'), f.col('Age'), f.col('Rating')).describe().show()

pyspark_df.describe().show()

+-------+-----------------+------------------+-----------------+
|summary|      Clothing ID|               Age|           Rating|
+-------+-----------------+------------------+-----------------+
|  count|            23486|             23486|            23486|
|   mean|918.1187090181385|43.198543813335604|4.196031678446734|
| stddev| 203.298979722048|12.279543615591514|1.110030719824388|
|    min|                0|                18|                1|
|    max|             1205|                99|                5|
+-------+-----------------+------------------+-----------------+

+-------+-----------------+------------------+-----------------+-------------------+-----------------------+-------------+---------------+----------+
|summary|      Clothing ID|               Age|           Rating|    Recommended IND|Positive Feedback Count|Division Name|Department Name|Class Name|
+-------+-----------------+------------------+-----------------+-------------------+-----------------------+------

In [12]:
###Creating buckets or assigning bins to data based on the some range.
###In below Bucketizer funct helps to assign bins based on condition like 0-20 = 0th bin, 21-40 1st bin 
###0,20,40,60,80,100 are starting points of new bins so each bin will end on 19,39,59,79,99

bucketizer = Bucketizer(splits=[0,20,40,60,80,100], inputCol='Age', outputCol='Bins')
bucketizer.setHandleInvalid('skip').transform(pyspark_df).show()


###Creating Bins Using when statement 
pyspark_df.select(f.col('*'), f.when(f.col('Age') < 20, 0).
                      when(f.col('Age').between(20,39), 1).
                      when(f.col('Age').between(40,59), 2).
                      when(f.col('Age').between(60,79), 3).
                      otherwise(4).alias('Age_Bin')).show()

###Creating Bins Using when statement Categorical values
pyspark_df.select(f.col('*'), f.when(f.col('Age') < 20, '0-19').
                      when(f.col('Age').between(20,39), '20-39').
                      when(f.col('Age').between(40,59), '40-59').
                      when(f.col('Age').between(60,79), '60-79').
                      when(f.col('Age').between(60,79), '80-99').alias('Age_Bin')).show()

+-----------+---+------+---------------+-----------------------+--------------+---------------+----------+----+
|Clothing ID|Age|Rating|Recommended IND|Positive Feedback Count| Division Name|Department Name|Class Name|Bins|
+-----------+---+------+---------------+-----------------------+--------------+---------------+----------+----+
|        767| 33|     4|              1|                      0|     Initmates|       Intimate| Intimates| 1.0|
|       1080| 34|     5|              1|                      4|       General|        Dresses|   Dresses| 1.0|
|       1077| 60|     3|              0|                      0|       General|        Dresses|   Dresses| 3.0|
|       1049| 50|     5|              1|                      0|General Petite|        Bottoms|     Pants| 2.0|
|        847| 47|     5|              1|                      6|       General|           Tops|   Blouses| 2.0|
|       1080| 49|     2|              0|                      4|       General|        Dresses|   Dresse

In [177]:
###Cast a column or Convert datatype of a column

pyspark_df.withColumn('Age_2', f.col('Age').cast(types.FloatType())).show()

+-----------+---+------+---------------+-----------------------+--------------+---------------+----------+-----+
|Clothing ID|Age|Rating|Recommended IND|Positive Feedback Count| Division Name|Department Name|Class Name|Age_2|
+-----------+---+------+---------------+-----------------------+--------------+---------------+----------+-----+
|        767| 33|     4|              1|                      0|     Initmates|       Intimate| Intimates| 33.0|
|       1080| 34|     5|              1|                      4|       General|        Dresses|   Dresses| 34.0|
|       1077| 60|     3|              0|                      0|       General|        Dresses|   Dresses| 60.0|
|       1049| 50|     5|              1|                      0|General Petite|        Bottoms|     Pants| 50.0|
|        847| 47|     5|              1|                      6|       General|           Tops|   Blouses| 47.0|
|       1080| 49|     2|              0|                      4|       General|        Dresses| 

In [11]:
###Data type related info

pyspark_df.printSchema()

print(pyspark_df.dtypes, end='\n\n')

###works on single column as well
#pyspark_df.select(f.col('Age')).dtypes

root
 |-- Clothing ID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Recommended IND: integer (nullable = true)
 |-- Positive Feedback Count: integer (nullable = true)
 |-- Division Name: string (nullable = true)
 |-- Department Name: string (nullable = true)
 |-- Class Name: string (nullable = true)

[('Clothing ID', 'int'), ('Age', 'int'), ('Rating', 'int'), ('Recommended IND', 'int'), ('Positive Feedback Count', 'int'), ('Division Name', 'string'), ('Department Name', 'string'), ('Class Name', 'string')]



In [21]:
windowSpec = Window.partitionBy('Clothing ID').orderBy('Clothing ID')

pyspark_df.withColumn('rownumber', f.row_number().over(windowSpec)).show()

pyspark_df.withColumn('old_rating', f.lag('Rating').over(windowSpec)).show()

pyspark_df.withColumn('next_rating', f.lead('Rating', default=0).over(windowSpec)).show()

+-----------+---+------+---------------+-----------------------+-------------+---------------+----------+---------+
|Clothing ID|Age|Rating|Recommended IND|Positive Feedback Count|Division Name|Department Name|Class Name|rownumber|
+-----------+---+------+---------------+-----------------------+-------------+---------------+----------+---------+
|        148| 48|     5|              1|                      0|    Initmates|       Intimate|     Sleep|        1|
|        148| 49|     5|              1|                      1|    Initmates|       Intimate|     Sleep|        2|
|        148| 54|     5|              1|                     20|    Initmates|       Intimate|     Sleep|        3|
|        463| 62|     5|              1|                      0|    Initmates|       Intimate|  Layering|        1|
|        463| 67|     5|              1|                      2|    Initmates|       Intimate|  Layering|        2|
|        463| 32|     5|              1|                      0|    Init

In [6]:
schema = types.StructType(
    [types.StructField('Clothing ID', types.IntegerType()),
    types.StructField('Age', types.IntegerType()),
    types.StructField('Rating', types.IntegerType()),
    types.StructField('Recommended IND', types.IntegerType()),
    types.StructField('Positive Feedback Count', types.IntegerType()),
    types.StructField('Division Name', types.StringType()),
    types.StructField('Department Name', types.StringType()),
    types.StructField('Class Name', types.StringType()),
    types.StructField('Title', types.StringType()),
    types.StructField('Review Text', types.StringType()),
    types.StructField('malformed_data', types.StringType())]

)

pyspark_df = spark.read.csv(r'D:\pyspark\reviews.csv', header=True, schema=schema, encoding='utf8',\
                           mode='PERMISSIVE', columnNameOfCorruptRecord='malformed_data', multiLine=True)

pyspark_df.show()

+-----------+---+------+---------------+-----------------------+--------------+---------------+----------+--------------------+--------------------+--------------------+
|Clothing ID|Age|Rating|Recommended IND|Positive Feedback Count| Division Name|Department Name|Class Name|               Title|         Review Text|      malformed_data|
+-----------+---+------+---------------+-----------------------+--------------+---------------+----------+--------------------+--------------------+--------------------+
|        767| 33|     4|              1|                      0|     Initmates|       Intimate| Intimates|                null|Absolutely wonder...|                null|
|       1080| 34|     5|              1|                      4|       General|        Dresses|   Dresses|                null|"Love this dress!...|                null|
|       1077| 60|     3|              0|                      0|       General|        Dresses|   Dresses|Some major design...|I had such high h...|  