#AWS Credentials


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, DoubleType
import urllib

file_type = "csv"
first_row_is_header = "true"
delimiter = ","
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

#Creating Dataframes


In [0]:
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12f4a3e5b9c5-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12f4a3e5b9c5-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12f4a3e5b9c5-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

#Schema Definitions

In [0]:
pin_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])
geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("country", StringType())
])
user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

In [0]:
deserialize_df_pin = df_pin.selectExpr("CAST(data as STRING)")
deserialize_df_pin = deserialize_df_pin.withColumn("data", from_json(col("data"), pin_schema))
deserialize_df_pin = deserialize_df_pin.selectExpr("data.*")

deserialize_df_geo = df_geo.selectExpr("CAST(data as STRING)")
deserialize_df_geo = deserialize_df_geo.withColumn("data", from_json(col("data"), geo_schema))
deserialize_df_geo = deserialize_df_geo.selectExpr("data.*")


deserialize_df_user = df_user.selectExpr("CAST(data as STRING)")
deserialize_df_user = deserialize_df_user.withColumn("data", from_json(col("data"), user_schema))
deserialize_df_user = deserialize_df_user.selectExpr("data.*")

In [0]:
#Remove duplicated data
df_pin = deserialize_df_pin.drop_duplicates([column_name for column_name, data_type in deserialize_df_pin.dtypes])
df_geo = deserialize_df_geo.drop_duplicates([column_name for column_name, data_type in deserialize_df_geo.dtypes])
df_user = deserialize_df_user.drop_duplicates([column_name for column_name, data_type in deserialize_df_user.dtypes])

In [0]:
from pyspark.sql.functions import col,when
from pyspark.sql.functions import regexp_extract

#Cleaning df_pin Dataframe

In [0]:
# Replace null values into None
df_pin_clean = df_pin.withColumn("category", when(df_pin["category"].isNull(), None).otherwise(df_pin["category"]))
df_pin_clean = df_pin.withColumn("description", when(df_pin["description"].isNull(), None).otherwise(df_pin["description"]))

# Replace entries with no relevant data into None
df_pin_clean = df_pin.withColumn("description", when(col("description").contains("No description"), None).otherwise(col("description")))
df_pin_clean = df_pin.withColumn("follower_count", when(col("follower_count").contains("User Info Error"), None).otherwise(col("follower_count")))
df_pin_clean = df_pin.withColumn("image_src", when(col("image_src").contains("Image src error"), None).otherwise(col("image_src")))
df_pin_clean = df_pin.withColumn("poster_name", when(col("poster_name").contains("User Info Error"), None).otherwise(col("poster_name")))

# Change M and k inside follower_column into its coresponding value
df_pin_clean = df_pin_clean.withColumn("follower_count", regexp_replace(df_pin_clean["follower_count"], "M", "000000"))
df_pin_clean = df_pin_clean.withColumn("follower_count", regexp_replace(df_pin_clean["follower_count"], "k", "000"))

# Change follower_count data type into int
df_pin_clean = df_pin_clean.withColumn("follower_count", df_pin_clean["follower_count"].cast("int"))

# Ensuring that each column containing numeric data has a numeric data type
df_pin_clean = df_pin_clean.withColumn("downloaded", df_pin_clean["downloaded"].cast("int"))
df_pin_clean = df_pin_clean.withColumn("index", df_pin_clean["index"].cast("int"))

# Cleaning the save_location column
df_pin_clean = df_pin_clean.withColumn("save_location", regexp_replace(df_pin_clean["save_location"], "Local save in", ""))

# Renaming index column into ind column
df_pin_clean = df_pin_clean.withColumnRenamed("index", "ind")

# Reorder dataframe columns
df_pin_clean = df_pin_clean.select(["ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category"])

#Cleaning df_geo Dataframe

In [0]:
# Creating a new coordinates column
df_geo_clean = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))

# Dropping latitude and longitude column
df_geo_clean = df_geo_clean.drop("latitude", "longitude")

# Coverting timestamp into a timestamp data type
df_geo_clean = df_geo_clean.withColumn("timestamp", df_geo_clean["timestamp"].cast("timestamp"))

# Reordering columns
df_geo_clean = df_geo_clean.select(["ind", "country", "coordinates", "timestamp"])


#Cleaning df_user Dataframe

In [0]:
# Creating an username column
df_user_clean = df_user.withColumn("user_name", concat(col("first_name"), col("last_name")))

# Dropping first and last name columns
df_user_clean = df_user_clean.drop("first_name", "last_name")

# Coverting date_joined column into a timestamp
df_user_clean = df_user_clean.withColumn("date_joined", df_user_clean["date_joined"].cast("timestamp"))

# Reodering columns
df_user_clean = df_user_clean.select(["ind","user_name","age","date_joined"])

#Streaming Data to Delta Tables


In [0]:
#Deletes the checkpoint folder
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

#Writes df_pin_clean dataframe into delta tables
df_pin_clean.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12f4a3e5b9c5_pin_table")


In [0]:
#Deletes the checkpoint folder
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

#Writes df_geo_clean dataframe into delta tables
df_geo_clean.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12f4a3e5b9c5_geo_table")


In [0]:
#Deletes the checkpoint folder
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

#Writes df_user_clean dataframe into delta tables
df_user_clean.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12f4a3e5b9c5_user_table")