In [0]:
# BATCH PROCESSING: SPARK ON DATABRICKS - PART 1

In [0]:
# Get cleaning functions

In [0]:
%run "/Users/rjhelan@outlook.com/data_cleaning_tools"

In [0]:
# Import necessary libraries
from pyspark.sql import DataFrame
from pyspark.sql.functions import *

In [0]:
# Create Dataframes from data in S3 bucket
# File location and type
file_type = "json"
# Ask Spark to infer the schema
infer_schema = "true"

# Asterisk(*) indicates reading all the content of the specified file that have .json extension
# Note: The path to the JSON objects in your S3 bucket should match the structure seen in the file_location url: `topics/<UserID>.pin/partition=0/`
pin_file_location = "/mnt/s3_bucket/topics/0a3db223d459.pin/partition=0/*.json"
geo_file_location = "/mnt/s3_bucket/topics/0a3db223d459.geo/partition=0/*.json"
user_file_location = "/mnt/s3_bucket/topics/0a3db223d459.user/partition=0/*.json"

# Function to read JSON data from a given file location in mounted S3 bucket and return dataframe
def create_spark_dataframe(file_location):
    dataframe = spark.read.format(file_type) \
        .option("inferSchema", infer_schema) \
        .load(file_location)
    return dataframe

df_pin = create_spark_dataframe(pin_file_location)
df_geo = create_spark_dataframe(geo_file_location)
df_user = create_spark_dataframe(user_file_location)

In [0]:
# Task 1: Clean the DataFrame that contains information about Pinterest posts
df_pin_cleaned = clean_df_pin(df_pin)

# Task 2: Clean the DataFrame that contains information about geolocation.
df_geo_cleaned = clean_df_geo(df_geo)

# Task 3: Clean the DataFrame that contains information about users.
df_user_cleaned = clean_df_user(df_user)

In [0]:
# Save cleaned dataframes in parquet table for queries in 'data_query' notebook
df_pin_cleaned.write.mode('overwrite').parquet("/mnt/0a3db223d459_storage/df_pin")
df_geo_cleaned.write.mode('overwrite').parquet("/mnt/0a3db223d459_storage/df_geo")
df_user_cleaned.write.mode('overwrite').parquet("/mnt/0a3db223d459_storage/df_user")