Read data from Kinesis streams in Databricks

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

In [None]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [None]:
import time
    
def create_streaming_dataframe(record_type):
    '''
    Parameters
    ----------
    record_type: str

    Returns
    -------
    df: 
        Dataframe containing streaming data
    '''
    # Read data from Kinesis pin stream using structured streaming
    df = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName', f'streaming-0e4c2ab6fb3b-{record_type}') \
    .option('initialPosition','earliest') \
    .option("format", "json") \
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .option("inferSchema", "true") \
    .load()
    
    # Deserialise the "data" column of the dataframe to see the data contained in the stream
    df = df.selectExpr("CAST(data as STRING)")
    
    return df

# query = streaming_df_pin.writeStream.format("console").start()

def create_delta_table(df, record_type):
    '''
    Parameters
    ----------
    df:
        Dataframe that has been cleaned and transformed
    
    record_type: str

    Returns
    -------
    delta_table:    
        Delta table containing streaming data
    '''
    delta_table = df.writeStream \
   .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", f"/tmp/kinesis/_checkpoints/") \
    .table(f"0e4c2ab6fb3b_{record_type}_table")

    return delta_table

Transform the Kinesis streams

In [None]:
# Replace empty entries and entries with no relevant data in each column with Nones
def transform_pin_data(df):
    '''
    Parameters
    ----------
    df:
        Dataframe containing information about Pinterest posts
    
    Returns
    -------
    df_cleaned:
        Cleaned version of streaming_df_pin  
    '''
    df = df.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in df.columns])

    df = df.withColumn("follower_count", when(df.follower_count.contains("User Info Error"), 0).otherwise(df.follower_count))

    df = df.withColumn("poster_name", when(df.poster_name.contains("User Info Error"), None).otherwise(df.poster_name))

    df = df.select([when(col(c).contains("N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e"), None).otherwise(col(c)).alias(c) for c in df.columns])

    df = df.select([when(col(c).contains("Image src error"), None).otherwise(col(c)).alias(c) for c in df.columns])

    df = df.select([when(col(c).contains("No description"), None).otherwise(col(c)).alias(c) for c in df.columns])

    df = df.select([when(col(c).contains("Untitled"), None).otherwise(col(c)).alias(c) for c in df.columns])

    df = df.select([when(col(c).contains("No Title Data Available"), None).otherwise(col(c)).alias(c) for c in df.columns])

    # Ensure that each column containing numeric data has a numeric data type
    df = df.withColumn('follower_count', when(df.follower_count.endswith('k'), regexp_replace(df.follower_count, 'k', '000')) \
        .when(df.follower_count.endswith('M'), regexp_replace(df.follower_count, 'M', '000000')) \
        .otherwise(df.follower_count))

    # Change the datatype of the "follower_count" column to integer
    df = df.withColumn("follower_count", df.follower_count.cast('int'))

    # Clean the data in the save_location column to include only the save location path
    df = df.withColumn('save_location', when(df.save_location.startswith('Local save in '), regexp_replace(df.save_location, 'Local save in ', '')))

    # Rename the index column to ind
    df = df.withColumnRenamed('index', 'ind')

    # Reorder the DataFrame columns
    df_cleaned = df.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")

    return df_cleaned

In [None]:
def transform_geo_data(df):
    '''
    Parameters
    ----------
    df:
        Dataframe containing information about Geolocation
    
    Returns
    -------
    df_cleaned:
        Cleaned version of streaming_df_geo  
    '''
    # Create a new column coordinates that contains an array based on the latitude and longitude columns
    # Drop the latitude and longitude columns from the DataFrame
    # Reorder the DataFrame columns
    df = df.withColumn("coordinates", array("latitude", "longitude")) \
    .select("ind", "country", "coordinates", "timestamp")

    # Convert the timestamp column from a string to a timestamp data type
    df_cleaned = df.withColumn("timestamp", to_timestamp("timestamp"))

    return df_cleaned

In [None]:
def transform_user_data(df):
    '''
    Parameters
    ----------
    streaming_df_user:
        Dataframe containing information about users
    
    Returns
    -------
    streaming_df_user_cleaned:
        Cleaned version of streaming_df_user  
    '''
    # Create a new column user_name that concatenates the information found in the first_name and last_name columns
    # Drop the first_name and last_name columns from the DataFrame
    df = df.withColumn("user_name", concat("first_name", "last_name")) \
        .select("age", "date_joined", "ind", "user_name")

    # Convert the date_joined column from a string to a timestamp data type
    # Reorder the DataFrame columns
    df_cleaned = df.withColumn("date_joined", to_timestamp("date_joined")) \
        .select("ind", "user_name", "age", "date_joined")

    return df_cleaned

Write the streaming data to Delta tables

In [None]:
# Create the structure of the Pinterest dataframe 
schema = StructType([
        StructField("index", IntegerType()),
        StructField("unique_id", StringType()),
        StructField("title", StringType()),
        StructField("description", StringType()),
        StructField("poster_name", StringType()),
        StructField("follower_count", StringType()),
        StructField("tag_list", StringType()),
        StructField("is_image_or_video", StringType()),
        StructField("image_src", StringType()),
        StructField("downloaded", IntegerType()),
        StructField("save_location", StringType()),
        StructField("category", StringType())
    ])

df_pin = create_streaming_dataframe("pin")
df_pin = df_pin.withColumn("cast_data", from_json(col("data"), schema)) \
    .select("cast_data.*")

df_pin.columns

transformed_df_pin = transform_pin_data(df_pin)
create_delta_table(transformed_df_pin, "pin")

The following block of code above firstly creates the dataframe containing the streaming Pinterest data. The "data" column is then named "cast_data" and the column expression that is used is the "data" column itself, which is parsed from the corresponding JSON string, and the schema that has been defined above is the schema used; finally, all the columns within "cast_data" are selected. The dataframe is then transformed using the transform_pin_data() function created earlier, and is given the name "transformed_df_pin", which is then used to create a delta table using the create_delta_table() function created earlier.

In [None]:
# Create the structure of the Geolocation dataframe
schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())
])

df_geo = create_streaming_dataframe("geo")
df_geo = df_geo.withColumn("cast_data", from_json(col("data"), schema)) \
    .select("cast_data.*")

transformed_df_geo = transform_geo_data(df_geo)
create_delta_table(transformed_df_geo, "geo")

The following block of code above firstly creates the dataframe containing the streaming Geolocation data. The "data" column is then named "cast_data" and the column expression that is used is the "data" column itself, which is parsed from the corresponding JSON string, and the schema that has been defined above is the schema used; finally, all the columns within "cast_data" are selected. The dataframe is then transformed using the transform_geo_data() function created earlier, and is given the name "transformed_df_geo", which is then used to create a delta table using the create_delta_table() function created earlier.

In [None]:
# Create the structure of the User dataframe
schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", StringType())
])

df_user = create_streaming_dataframe("user")
df_user = df_user.withColumn("cast_data", from_json(col("data"), schema)) \
    .select("cast_data.*")

transformed_df_user = transform_user_data(df_user)
create_delta_table(transformed_df_user, "user")

The following block of code above firstly creates the dataframe containing the streaming User data. The "data" column is then named "cast_data" and the column expression that is used is the "data" column itself, which is parsed from the corresponding JSON string, and the schema that has been defined above is the schema used; finally, all the columns within "cast_data" are selected. The dataframe is then transformed using the transform_user_data() function created earlier, and is given the name "transformed_df_user", which is then used to create a delta table using the create_delta_table() function created earlier.

In [None]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

The command above deletes the checkpoint folder that is created that allows you to recover the previous state of a query in case of failure. This is necessary in order to write the streaming data to data tables again.