## Set up Pyspark requirements

In [0]:
# import pyspark functions
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, FloatType
# import URL processing
import urllib

## Read the AWS Access Key and Secret Access Key to dataframe

In [0]:
# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

Extract the AWS Access Key and Secret Access Key from the spark dataframe

In [0]:
 # Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

## Create functions to:
- Load streaming data from Kinesis 
- Deserialize the stream
- Write the cleaned data to delta table

In [0]:
def load_stream_data(stream_name: str):
    '''Uses spark.readStream to retrieve Kinesis stream and returns stream as dataframe'''
    dataframe = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName', stream_name) \
    .option('initialPosition','earliest') \
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()
    return dataframe

def deserialize_stream(stream, schema):
    '''Takes stream dataframe and schema, deserializes data from stream and returns data as dataframe'''
    dataframe = stream \
    .selectExpr("CAST(data as STRING)") \
    .withColumn("data", from_json(col("data"), schema)) \
    .select(col("data.*"))
    return dataframe

def replace_with_nulls(dataframe, column, value_to_replace):
    '''Converts matched values in column of dataframe to null based on expression'''
    dataframe = dataframe.withColumn(column, when(col(column).like(value_to_replace), None).otherwise(col(column)))
    return dataframe

def replace_bad_values(df, column, error, replacement):
    """ Replace bad values with substitute using regex_replace. Column names must be in quotes. """
    df = df.withColumn(column, regexp_replace(column, error, replacement))
    return df

def cast_datatype(df, column, new_datatype):
    """ Change the datatype of a column to a more appropriate one. """
    df = df.withColumn(column, col(column).cast(new_datatype))
    return df

def write_stream_df_to_table(dataframe, name: str):
    '''Takes dataframe and name string and writes dataframe to delta table using name in options and table name'''
    dataframe.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", f"/tmp/kinesis/0e35b2767ae1_{name}_table_checkpoints/") \
    .table(f"0e35b2767ae1_{name}_table")
       

## Schema Configs

In [0]:
# define schemas for each of the dataframes
pin_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])
geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())
])
user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

- Load streaming data from Kinesis 
- Deserialize the stream
- Write the cleaned data to delta table

In [0]:
pin_stream = load_stream_data('streaming-0e35b2767ae1-pin')
geo_stream = load_stream_data('streaming-0e35b2767ae1-geo')
user_stream = load_stream_data('streaming-0e35b2767ae1-user')

In [0]:
df_pin = deserialize_stream(pin_stream, pin_schema)
df_geo = deserialize_stream(geo_stream, geo_schema)
df_user = deserialize_stream(user_stream, user_schema)

In [0]:
bad_values_dict = {
    "description": "Untitled", 
    "description": "No description available%",
    "description": "No description available Story format",
    "follower_count": "User Info Error", 
    "image_src": "Image src error.", 
    "poster_name": "User Info Error",
    "tag_list": "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e", "tag_list": "0",
    "title": "No Title Data Available"
}

# 
for key, value in bad_values_dict.items():
    df_pin = replace_with_nulls(df_pin, key, value)
    
df_pin = replace_bad_values(df_pin, "follower_count", "k", "000")
df_pin = replace_bad_values(df_pin, "follower_count", "M", "000000")
df_pin = replace_bad_values(df_pin, "save_location", "Local save in ", "")
df_pin = cast_datatype(df_pin, "follower_count", "int")
df_pin = df_pin.withColumnRenamed("index", "ind")
df_pin = df_pin.select(col("ind"), col("unique_id"), col("title"), col("description"), col("follower_count"), col("poster_name"), col("tag_list"), col("is_image_or_video"), col("image_src"), col("save_location"), col("category"))
# display(df_pin)

In [0]:
df_geo = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))
df_geo = df_geo.drop("latitude", "longitude")
df_geo = cast_datatype(df_geo, "timestamp", "timestamp")
df_geo = df_geo.select("ind", "country", "coordinates", "timestamp")

In [0]:
df_user = df_user.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")))
df_user = df_user.drop("first_name", "last_name")
df_user = cast_datatype(df_user, "date_joined", "timestamp")
df_user = df_user.select("ind", "user_name", "age", "date_joined")

In [0]:
display(df_pin)
display(df_geo)
display(df_user)

ind,user_name,age,date_joined
7528,Abigail Ali,20,2015-10-24T11:23:51.000+0000
2863,Dylan Holmes,32,2016-10-23T14:06:51.000+0000
5730,Rachel Davis,36,2015-12-08T20:02:43.000+0000
8304,Charles Berry,25,2015-12-28T04:21:39.000+0000
8731,Andrea Alexander,21,2015-11-10T09:27:42.000+0000
1313,Brittany Jones,32,2016-04-02T03:51:23.000+0000
4315,Michelle Prince,36,2015-12-20T16:38:13.000+0000
10794,Thomas Turner,34,2016-12-22T00:02:02.000+0000
5494,Anne Allen,27,2015-12-16T15:20:05.000+0000
5069,Amanda Ball,25,2016-01-13T17:36:30.000+0000


In [0]:
df_pin.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", f"/tmp/kinesis/0e35b2767ae1_pin_table_checkpoints/") \
.table(f"0e35b2767ae1_pin_table")

In [0]:
df_geo.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", f"/tmp/kinesis/0e35b2767ae1_geo_table_checkpoints/") \
.table(f"0e35b2767ae1_geo_table")

In [0]:
df_user.writeStream.format("delta").outputMode("append").option("checkpointLocation", f"/tmp/kinesis/0e35b2767ae1_user_table_checkpoints/").table(f"0e35b2767ae1_user_table")