In [0]:
from pyspark.sql.functions import *
import urllib
import pandas as pd

In [0]:
# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")
AWS_S3_BUCKET = "user-129076a9eaf9-bucket"

In [0]:
PIN_SOURCE_URL = "s3n://{0}:{1}@{2}/pin_result.json".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
GEO_SOURCE_URL = "s3n://{0}:{1}@{2}/geo_result.json".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
USER_SOURCE_URL = "s3n://{0}:{1}@{2}/user_result.json".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
df_pin = spark.read.format("json").option("inferSchema", "True").load(PIN_SOURCE_URL)
df_geo = spark.read.format("json").option("inferSchema", "True").load(GEO_SOURCE_URL)
df_user = spark.read.format("json").option("inferSchema", "True").load(USER_SOURCE_URL)

In [0]:
# CLEANING df_pin dataframe

# Replace empty and irrelevant data with None
df_pin = df_pin.withColumn("poster_name", when(df_pin.poster_name == "User Info Error", None).otherwise(df_pin.poster_name))
df_pin = df_pin.withColumn("description", when(df_pin.description == "No description available Story format", None).otherwise(df_pin.description))
df_pin = df_pin.withColumn("title", when(df_pin.title == "No Title Data Available", None).otherwise(df_pin.title))
df_pin = df_pin.withColumn("follower_count", when(df_pin.follower_count == "User Info Error", None).otherwise(df_pin.follower_count))
df_pin = df_pin.withColumn("tag_list", when(df_pin.tag_list == "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e", None).otherwise(df_pin.tag_list))
df_pin = df_pin.withColumn("index", when(df_pin["index"].isNull(), None).otherwise(df_pin["index"]))
df_pin = df_pin.withColumn("unique_id", when(df_pin["unique_id"].isNull(), None).otherwise(df_pin["unique_id"]))
df_pin = df_pin.withColumn("title", when(df_pin["title"].isNull(), None).otherwise(df_pin["title"]))
df_pin = df_pin.withColumn("description", when(df_pin["description"].isNull(), None).otherwise(df_pin["description"]))
df_pin = df_pin.withColumn("follower_count", when(df_pin["follower_count"].isNull(), None).otherwise(df_pin["follower_count"]))
df_pin = df_pin.withColumn("poster_name", when(df_pin["poster_name"].isNull(), None).otherwise(df_pin["poster_name"]))
df_pin = df_pin.withColumn("tag_list", when(df_pin["tag_list"].isNull(), None).otherwise(df_pin["tag_list"]))
df_pin = df_pin.withColumn("is_image_or_video", when(df_pin["is_image_or_video"].isNull(), None).otherwise(df_pin["is_image_or_video"]))
df_pin = df_pin.withColumn("image_src", when(df_pin["image_src"].isNull(), None).otherwise(df_pin["image_src"]))
df_pin = df_pin.withColumn("save_location", when(df_pin["save_location"].isNull(), None).otherwise(df_pin["save_location"]))
df_pin = df_pin.withColumn("category", when(df_pin["category"].isNull(), None).otherwise(df_pin["category"]))

# Set datatype as numeric in columns
df_pin = df_pin.withColumn("index", df_pin["index"].cast("int"))  # REMOVE COMMENT HASH
df_pin = df_pin.withColumnRenamed('index', 'ind') # Renames column to ind
df_pin = df_pin.withColumn("downloaded", df_pin["downloaded"].cast("int"))
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))
df_pin = df_pin.withColumn("follower_count", df_pin["follower_count"].cast("int"))

# Clean saved path in column follower_count
df_pin = df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

# Reorder columns
df_pin = df_pin.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")

df_pin = df_pin.dropna()

In [0]:
# CLEAN dataframe df_geo

# Combine columns as coordinates and drop old columns
df_geo = df_geo.withColumn("coordinates", concat(col("longitude"), lit(", "), col("latitude")))
df_geo = df_geo.drop("latitude", "longitude")

# Set datatype as timestamp
df_geo = df_geo.withColumn("timestamp", to_timestamp("timestamp"))

# Set ind columns datatype as int
df_geo = df_geo.withColumn("ind", df_geo["ind"].cast("int"))

# Reorder columns
df_geo = df_geo.select("ind", "country", "coordinates", "timestamp")

df_geo = df_geo.dropna()

In [0]:
# CLEAN dataframe df_user

df_user = df_user.withColumn("age", when(df_user["age"].isNull(), None).otherwise(df_user["age"]))
# Combine columns as coordinates and drop old columns
df_user = df_user.withColumn("user_name", concat(col("first_name"), lit(" "), col("last_name")))
df_user = df_user.drop("first_name", "last_name")

# Set datatype as timestamp
df_user = df_user.withColumn("date_joined", to_timestamp("date_joined"))

# Set ind columns datatype as int
df_user = df_user.withColumn("ind", df_user["ind"].cast("int"))
df_user = df_user.withColumn("age", df_user["age"].cast("int"))

# Reorder columns
df_user = df_user.select("ind", "user_name", "age", "date_joined")

df_user = df_user.dropna()

In [0]:
# Save dataframes to delta table
df_pin.write.format("delta").mode("overwrite").saveAsTable("129076a9eaf9_pin_table")
df_geo.write.format("delta").mode("overwrite").saveAsTable("129076a9eaf9_geo_table")
df_user.write.format("delta").mode("overwrite").saveAsTable("129076a9eaf9_user_table")