In [0]:
## check the contents in Filestore, to get name of credentials file
dbutils.fs.ls("/FileStore/tables")

## Imports


In [0]:

from pyspark.sql.functions import *
from pyspark.sql.types import *
import urllib
     

## AWS Credentials


In [0]:
# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
def get_kinesis_stream(kinesis_stream_name):
    '''Retrieves Kinesis stream using spark.readstream and returns stream as dataframe'''
    stream_df = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName', kinesis_stream_name) \
    .option('initialPosition','earliest') \
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()
    return stream_df

def deserialize_stream_df(stream_df, schema):
    '''Takes input as stream dataframe schema and  deserializes , returns data as dataframe'''
    df = stream_df \
    .selectExpr("CAST(data as STRING)") \
    .withColumn("data", from_json(col("data"), schema)) \
    .select(col("data.*"))
    return df

def write_df_to_delta_table(df, delta_table):
    '''Takes dataframe and table_name as input and writes dataframe to delta table''' 
    df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", f"/tmp/kinesis/0a70d64d47bd_{delta_table}_checkpoints/") \
    .table(f"0a70d64d47bd_{delta_table}")

## Define schemas for all the dataframes

In [0]:

# define schemas for pin dataframe
pin_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])

# define schemas for geo dataframe
geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())
])

# define schemas for user dataframe
user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

## Get Kinesis Streams and Deserialize the streams

In [0]:
# Get streams from kinesis
df_pin_stream = get_kinesis_stream('streaming-0a70d64d47bd-pin')
df_geo_stream = get_kinesis_stream('streaming-0a70d64d47bd-geo')
df_user_stream = get_kinesis_stream('streaming-0a70d64d47bd-user')

#deserialize the streams
df_pin = deserialize_stream_df(df_pin_stream, pin_schema)
df_geo = deserialize_stream_df(df_pin_stream, geo_schema)
df_user = deserialize_stream_df(df_pin_stream, user_schema)

## Clean df_pin DataFrame

In [0]:
def add_none(dataframe, column, value):
    '''Find the value matched with value parameter in a column and replace with None'''
    dataframe = dataframe.withColumn(column, when(col(column).like(value),None).otherwise(col(column)))
    return dataframe


# replace empty entries and entries with no relevant data in each column with Nones
# columns need to be replaced by null
columns_values_to_replace_none = {
    "description": "No description available%",
    "tag_list": "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",
    "follower_count": "User Info Error",
    "image_src": "Image src error.",
    "poster_name": "User Info Error",
    "title": "No Title Data Available"
}

# loop through the dictionary to replace column values with none
for column, value in columns_values_to_replace_none.items():
    df_pin = add_none(df_pin, column, value)
# Transformation made to "follower_count" column and cast column to integer
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))
df_pin = df_pin.withColumn("follower_count", col("follower_count").cast('int'))
# convert save_location column to include only the save location path
df_pin = df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))
# rename the index column to ind
df_pin = df_pin.withColumnRenamed("index", "ind")
# reorder columns
new_pin_column_order = [
    "ind",
    "unique_id",
    "title",
    "description",
    "follower_count",
    "poster_name",
    "tag_list",
    "is_image_or_video",
    "image_src",
    "save_location",
    "category"
]
df_pin = df_pin.select(new_pin_column_order)

## Clean df_geo DataFrame

In [0]:
from pyspark.sql.functions import array
#Create a new column coordinates that contains an array based on the latitude and longitude columns
df_geo = df_geo.withColumn("coordinates",array(df_geo.latitude,df_geo.longitude))

#Drop the latitude and longitude columns from the DataFrame
df_geo = df_geo.drop(*('latitude','longitude'))

#Convert the timestamp column from a string to a timestamp data type
df_geo = df_geo.withColumn("timestamp",to_timestamp("timestamp")) 

#Reorder the DataFrame columns
new_geo_column_order = [
    "ind",
    "country",
    "coordinates",
    "timestamp"
]

df_geo = df_geo.select(new_geo_column_order)
new_geo_column_order = [
    "ind",
    "country",
    "coordinates",
    "timestamp"
]

df_geo = df_geo.select(new_geo_column_order)

## Clean df_user DataFrame


In [0]:
#Create a new column user_name that concatenates the information found in the first_name and last_name columns
df_user = df_user.withColumn("user_name", concat_ws(" " ,df_user['first_name'], df_user['last_name']))
#Drop the first_name and last_name columns from the DataFrame
df_user = df_user.drop(*('first_name','last_name'))
#Convert the date_joined column from a string to a timestamp data type
df_user = df_user.withColumn("date_joined",to_timestamp("date_joined")) 
#Reorder the DataFrame columns
new_user_column_order = [
    "ind",
    "user_name",
    "age",
    "date_joined"
]

df_user = df_user.select(new_user_column_order)


#Write dataframes to delta tables


In [0]:
write_df_to_delta_table(df_pin, "pin_table")
write_df_to_delta_table(df_pin, "geo_table")
write_df_to_delta_table(df_pin, "user_table")