## Functions for Cleaning Pinterest DataFrames with Error Handling
I created a function to clean the "df_pin" DataFrame, it can encapsulate the cleaning steps into a single function. 

Also I created an error handling to the function, it can use a try-except block to catch any potential errors that may occur during the cleaning process. Here's the function with error handling:

## Task 1: Clean the DataFrame that contains information about Pinterest posts

#### To clean the df_pin DataFrame you should perform the following transformations:

- Replace empty entries and entries with no relevant data in each column with Nones
- Perform the necessary transformations on the follower_count to ensure every entry is a number. Make sure the data type of this column is an int.
- Ensure that each column containing numeric data has a numeric data type
- Clean the data in the save_location column to include only the save location path
- Rename the index column to ind.
- Reorder the DataFrame columns to have the following column order:
  - ind
  - unique_id
  - title
  - description
  - follower_count
  - poster_name
  - tag_list
  - is_image_or_video
  - image_src
  - save_location
  - category

In [None]:
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import IntegerType, LongType

def clean_pin_dataframe(df_pin):
    """
    Clean the DataFrame that contains information about Pinterest posts.

    Parameters:
    df_pin (DataFrame): DataFrame containing Pinterest posts data.

    Returns:
    df_pin_cleaned (DataFrame): Cleaned DataFrame.
    """
    try:
        # Replace empty entries and entries with no relevant data in each column with None
        df_pin_cleaned = df_pin.fillna("None")

        # Replace "k" with "000" and cast to integer for follower_count column
        df_pin_cleaned = df_pin_cleaned.withColumn("follower_count", 
                                                   regexp_replace(col("follower_count"), "k", "000").cast(IntegerType()))

        # Cast downloaded column to IntegerType
        df_pin_cleaned = df_pin_cleaned.withColumn("downloaded", df_pin_cleaned["downloaded"].cast(IntegerType()))

        # Cast index column to LongType
        df_pin_cleaned = df_pin_cleaned.withColumn("index", df_pin_cleaned["index"].cast(LongType()))

        # Rename index column to ind
        df_pin_cleaned = df_pin_cleaned.withColumnRenamed("index", "ind")

        # Reorder the DataFrame columns
        df_pin_cleaned = df_pin_cleaned.select("ind", "unique_id", "title", "description", "follower_count", 
                                               "poster_name", "tag_list", "is_image_or_video", "image_src", 
                                               "save_location", "category")

        return df_pin_cleaned

    except Exception as e:
        print(f"An error occurred during data cleaning: {str(e)}")
        return None


I can then call this function passing the "df_pin" DataFrame as an argument to clean it:

In [None]:
# Define file locations
pin_file_location = "dbfs:/mnt/mount_name/topics/topics/1209b9ad90a5.pin/partition=0/*.json"

# Read JSON files into DataFrames
df_pin = spark.read.json(pin_file_location)

# Clean the Pinterest posts DataFrame
cleaned_df_pin = clean_pin_dataframe(df_pin)

# Show the cleaned DataFrame
cleaned_df_pin.show()

## Functions to clean dataframes
I created a function to clean the "df_geo" DataFrame, it can encapsulate the cleaning steps into a single function. 

Also I created an error handling to the function, it can use a try-except block to catch any potential errors that may occur during the cleaning process. Here's the function with error handling:

## Task 2:

#### To clean the df_geo DataFrame you should perform the following transformations:

- Create a new column coordinates that contains an array based on the latitude and longitude columns
- Drop the latitude and longitude columns from the DataFrame
- Convert the timestamp column from a string to a timestamp data type
- Reorder the DataFrame columns to have the following column order:
  - ind
  - country
  - coordinates
  - timestamp

In [None]:
from pyspark.sql.functions import array, col, to_timestamp

def clean_geo_dataframe(df_geo):
    """
    Clean the DataFrame that contains information about Pinterest geolocation data.

    Parameters:
    df_geo (DataFrame): DataFrame containing Pinterest geolocation data.

    Returns:
    df_geo_cleaned (DataFrame): Cleaned DataFrame.
    """
    try:
        # Create a new column coordinates containing an array of latitude and longitude
        df_geo_cleaned = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))

        # Drop the latitude and longitude columns
        df_geo_cleaned = df_geo_cleaned.drop("latitude", "longitude")

        # Convert the timestamp column to a timestamp data type
        df_geo_cleaned = df_geo_cleaned.withColumn("timestamp", to_timestamp("timestamp"))

        # Reorder the DataFrame columns
        df_geo_cleaned = df_geo_cleaned.select("ind", "country", "coordinates", "timestamp")

        return df_geo_cleaned

    except Exception as e:
        print(f"An error occurred during data cleaning: {str(e)}")
        return None


I can then call this function passing the "df_geo" DataFrame as an argument to clean it:

In [None]:
# Define file locations
geo_file_location = "dbfs:/mnt/mount_name/topics/topics/1209b9ad90a5.geo/partition=0/*.json"

# Read JSON files into DataFrames
df_geo = spark.read.json(geo_file_location)

# Clean the Pinterest posts DataFrame
cleaned_df_geo = clean_pin_dataframe(df_geo)

# Show the cleaned DataFrame
cleaned_df_geo.show()


## Task 3:
1. Create a new column user_name by concatenating first_name and last_name
2. Drop the first_name and last_name columns
3. Convert the date_joined column to a timestamp data type
4. Reorder the DataFrame columns
   - "ind", "user_name", "age", "date_joined"

In [None]:
from pyspark.sql.functions import concat, col, to_timestamp

def clean_user_dataframe(df_user):
    """
    Clean the DataFrame that contains information about Pinterest user data.

    Parameters:
    df_user (DataFrame): DataFrame containing Pinterest user data.

    Returns:
    df_user_cleaned (DataFrame): Cleaned DataFrame.
    """
    try:
        # 1. Create a new column user_name by concatenating first_name and last_name
        df_user_cleaned = df_user.withColumn("user_name", concat(col("first_name"), col("last_name")))

        # 2. Drop the first_name and last_name columns
        df_user_cleaned = df_user_cleaned.drop("first_name", "last_name")

        # 3. Convert the date_joined column to a timestamp data type
        df_user_cleaned = df_user_cleaned.withColumn("date_joined", to_timestamp("date_joined"))

        # 4. Reorder the DataFrame columns
        df_user_cleaned = df_user_cleaned.select("ind", "user_name", "age", "date_joined")

        return df_user_cleaned

    except Exception as e:
        print(f"An error occurred during data cleaning: {str(e)}")
        return None

I can then call this function passing the "df_user" DataFrame as an argument to clean it:

In [None]:
# Define file locations
user_file_location = "dbfs:/mnt/mount_name/topics/topics/1209b9ad90a5.user/partition=0/*.json"

# Read JSON files into DataFrames
df_user = spark.read.json(user_file_location)

# Clean the Pinterest posts DataFrame
cleaned_df_user = clean_pin_dataframe(df_user)

# Show the cleaned DataFrame
cleaned_df_user.show()