In [None]:
import pyspark

#  I chose to use SparkSession instead of SparkContext because SparkSession 
#  eliminates the need to create multiple SparkContext 
#  (using more memory) maintaining all functionalities of SparkContext (built within Spark 3.0? SparkSession API)

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("practice").getOrCreate()

In [None]:

# I downloaded a sample dataset from Kaggle and was playing around with it
# Instead of using an RDD, I decided to use DataFrame, which is more efficient
# It also organzies things into a table, similar to a relational database (SQL) 
# might make it easier to paramterize data from SQL queries
df = spark.read.csv('cereal.csv', header=True, inferSchema=True)
df.show()

In [None]:
# Shows the Column names of DataFrame

df.printSchema()

In [None]:
df.drop("type")

In [None]:
df.show()

In [None]:
# .na.drop(how="any") drops any rows that have null values. There is also "all" which only
# removes rows if all values are null
df.na.drop(how="any").show()

In [None]:
# threshold
# the threshold removes rows that don't meet the "threshold." 
# In that, a row must have a n number of non-null values to not be removed
# If the amount of non-null values in a row meets or exceeds the threshold, it stays
# otherwise, it is removed

df.na.drop(how="any", thresh=15).show()

In [None]:
df.show()

In [None]:
# the subset parameter only removes null values if they are in the specified subset
df.na.drop(subset=['sodium', "fiber"]).show()

In [None]:
# this is probably the function that is most similar to what i have to do?
# lit allows df to add columns in a constant manner

from pyspark.sql.functions import lit

# .filter searches the dataframe for null values in accordance with paramters and stores values in another df
# .isNull() finds null values in specified column
df_sodiumNull = df.filter(df.sodium.isNull())

#,WithColumn adds columns to existing dataframe
errorLog = df_sodiumNull.withColumn("Reason", lit("Sodium Null"))

In [None]:
errorLog.show()

In [None]:
errorLog = errorLog.union(df.filter(df.potass.isNull()).withColumn("Reason", lit("Pot Null")))
errorLog.show()

In [None]:
df.show()

In [34]:
errorLog = errorLog.union(df.filter(df.vitamins < 25).withColumn("Reason", lit("Vitamins Value Incorrect")))
errorLog.show()

+-----------------+----+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+--------------------+
|             name| mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|              Reason|
+-----------------+----+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+--------------------+
|   Almond Delight|   R|   C|     110|      2|  2|  null|  1.0| 14.0|     8|  null|      25|    3|   1.0|0.75|34.384843|         Sodium Null|
|         Cheerios|   G|   C|     110|      6|  2|  null|  2.0| 17.0|     1|   105|      25|    1|   1.0|1.25|50.764999|         Sodium Null|
|100% Natural Bran|null|   C|     120|      3|  5|  null|  2.0|  8.0|     8|   135|       0|    3|   1.0| 1.0|33.983679|         Sodium Null|
|      Bran Flakes|   P|   C|      90|      3|  0|  null|  5.0| 13.0|     5|   190|      25|    3|   1.0|0.67|53.313813|         Sodium Null|
|     