In [0]:
# Define a function to find count for empty, None, Null, Nan with string literals. Return a df.
def nulls(df, df_columns):
        df_nulls = df.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c)).alias(c)
                    for c in df_columns])
        return df_nulls

In [0]:
# Define a function that changes '' and 'User Info Error' into None using .replace() function and return a new df.
def replace_to_none(df):
    df_none = df.replace({'': None}, subset=['title'])
    df_none = df_none.replace({'User Info Error': None}, subset=['follower_count'])
    return df_none

In [0]:
from pyspark.sql.functions import regexp_replace

# Define function replacing k and M letters with the correct number of 0's in chosen df and column. 
def k_M_to_zeros(df, column):
    df = df.withColumn(column, regexp_replace(column, "k", "000"))
    df = df.withColumn(column, regexp_replace(column, "M", "000000"))
    return df

In [0]:
# Define function that casts a column to integer type.
def cast_to_int(df, column):
    df = df.withColumn(column, df[column].cast("integer"))
    return df

In [0]:
def clean_pin(df):
    # 'title' column had an empty space. It was also noticed, that there are 'User Info Error' in 'follower_count' column in a general display of results.
    df = replace_to_none(df)

    # follower_count column contains k and M. Replace them with the correct number of 0's using k_M_to_zeros(). 
    df = k_M_to_zeros(df, "follower_count")

    df = df.withColumnRenamed("index", "ind")

    # Cast each column containing numeric data into a numeric data type
    df = cast_to_int(df, "follower_count")
    df = cast_to_int(df, "ind")

    # Clean the data in the save_location column to include only the save location path. 
    df = df.withColumn("save_location", regexp_replace("save_location", "Local save in", ""))

    # Rearange column order, show the result and print the schema.
    df = df.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")
    # df.show(10)
    # df.printSchema()
    return df

In [0]:
from pyspark.sql.functions import concat, lit, to_timestamp

# Define function for cleaning df_geo data, return df.
def clean_geo(df):
    # Create a column coordinates that contains an array based on the latitude and longitude columns
    df = df.withColumn("coordinates", concat("latitude", lit(", "), "longitude"))   

    # Drop the latitude and longitude columns from the DataFrame
    df = df.drop("latitude", "longitude") 

    # Originally it was 'ind' but in Databicks it apeared as 'index', so change it back to match df_pin
    df = df.withColumnRenamed("index", "ind")                      

    # Rearrange the order of the dataframe columns
    df = df.select("ind", "country", "coordinates", "timestamp")   

    # Caste 'timestamp' column into 'timestamp' dType
    df = df.withColumn("timestamp", to_timestamp("timestamp"))      
    # df_geo.printSchema()
    # df_geo.show(10)
    return df

In [0]:
# Define function for cleaning df_user data, return df.
def clean_user(df):
    # Create a new column user_name that contains an array based on the first_name andd last_name columns
    df = df.withColumn("user_name", concat("first_name", lit(" "), "last_name"))   
    df = df.drop("first_name", "last_name")

    # Caste 'date_joined' column into 'timestamp' dType
    df = df.withColumn("date_joined", to_timestamp("date_joined"))   

    # Change "index" to "ind" to match df_pin, df_geo
    df = df.withColumnRenamed("index", "ind")                                    

    # Rearrange the order of the DataFrame columns
    df = df.select("ind", "user_name", "age", "date_joined")                     
    # df_user.printSchema()
    # df_user.show(10)
    return df