In [2]:
import pyspark
from pyspark.sql import SQLContext, SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
df = spark.read.csv('bestsellers\ with\ categories.csv', inferSchema=True, header=True)
df.show(5)

+--------------------+--------------------+-----------+-------+-----+----+-----------+
|                Name|              Author|User Rating|Reviews|Price|Year|      Genre|
+--------------------+--------------------+-----------+-------+-----+----+-----------+
|10-Day Green Smoo...|            JJ Smith|        4.7|  17350|    8|2016|Non Fiction|
|   11/22/63: A Novel|        Stephen King|        4.6|   2052|   22|2011|    Fiction|
|12 Rules for Life...|  Jordan B. Peterson|        4.7|  18979|   15|2018|Non Fiction|
|1984 (Signet Clas...|       George Orwell|        4.7|  21424|    6|2017|    Fiction|
|5,000 Awesome Fac...|National Geograph...|        4.8|   7665|   12|2019|Non Fiction|
+--------------------+--------------------+-----------+-------+-----+----+-----------+
only showing top 5 rows



In [5]:
#shows us the information about all the columns which are there in the dataset.
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Author: string (nullable = true)
 |-- User Rating: double (nullable = true)
 |-- Reviews: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Genre: string (nullable = true)



In [6]:
#Count the no of records. The code implemented for this shown below
df.count()

550

In [7]:
#4. Display record in different ways
df.take(1)

[Row(Name='10-Day Green Smoothie Cleanse', Author='JJ Smith', User Rating=4.7, Reviews=17350, Price=8, Year=2016, Genre='Non Fiction')]

In [11]:
#used to display the statistical properties of all the columns in the dataset.
df.describe().show()

+-------+--------------------+----------------+-------------------+------------------+------------------+------------------+-----------+
|summary|                Name|          Author|        User Rating|           Reviews|             Price|              Year|      Genre|
+-------+--------------------+----------------+-------------------+------------------+------------------+------------------+-----------+
|  count|                 550|             550|                550|               550|               550|               550|        550|
|   mean|                null|            null|  4.618363636363641|11953.281818181818|              13.1|            2014.0|       null|
| stddev|                null|            null|0.22698036502519656|11731.132017431892|10.842261978422364|3.1651563841692782|       null|
|    min|"The Plant Parado...|Abraham Verghese|                3.3|                37|                 0|              2009|    Fiction|
|    max|You Are a Badass:...|    Zhi Gan

In [12]:
#use describe function column-wise also.
df.describe('Price').show()

+-------+------------------+
|summary|             Price|
+-------+------------------+
|  count|               550|
|   mean|              13.1|
| stddev|10.842261978422364|
|    min|                 0|
|    max|               105|
+-------+------------------+



In [13]:
#removal of duplication
df =df.dropDuplicates()

In [14]:
df.count()

550

In [33]:
#Checking missing values
total =df.count()
for col in df.columns:
    df_filter =(df.filter(df[col]=="0")).count()
    percen_filter = df_filter/total
    print(col, "\t","with '0' values: ",percen_filter)

Name 	 with '0' values:  0.0
Author 	 with '0' values:  0.0
User Rating 	 with '0' values:  0.0
Reviews 	 with '0' values:  0.0
Price 	 with '0' values:  0.02181818181818182
Year 	 with '0' values:  0.0
Genre 	 with '0' values:  0.0


In [54]:
#printing missing coloumns
missing_col =[]
for col in df.columns:
    df_filter =(df.filter(df[col]=="0")).count()
    if(df_filter > 0):
        missing_col.append(col)
print(missing_col)

['Price']


In [55]:
#Imputing mean values for numerical coloumns
#for null values with 0, it will be filled for the missing numerical coloumns.
df =df.fillna(0)

In [69]:
#finding the avg of all numeric columns
from pyspark.sql.functions import avg

def find_columns_fill_mean(df,num_col, verbose=False):
    col_with_mean=[]
    for col in num_col:
        mean_value =df.select(avg(df[col]))
        avg_col =mean_value.columns[0]
        result = mean_value.rdd.map(lambda row : row[avg_col]).collect()
        
        if (verbose==True): print(mean_value.columns[0], "\t", result[0])
        col_with_mean.append([col, result[0]])
    return col_with_mean

In [70]:
print(col, "\t", "mean values: ", find_columns_fill_mean(df,missing_col))

Genre 	 mean values:  [['Price', 13.1]]


In [71]:
#filling missing values for mean
from pyspark.sql.functions import when, lit

def fill_missing_value_with_mean(df, numeric_cols):
    col_mean = mean_of_pyspark_columns(df, numeric_cols)
    
    for col, mean in col_mean:
        df = df.withColumn(col, when(df[col]==0, lit(mean)).otherwise(df[col]))
        
    return df

In [None]:
df_mean_filter1 = fill_missing_value_with_mean(df, missing_col)

In [None]:
df_mean_filter1 =df_mean_filter1.repartition(1)
df_mean_filter1.write.csv('bestsellers with categories.csv', header=True)